diff --git a/README.md b/README.md index dca9303..3cb6d8b 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,7 @@ # delay-queue -[![Go Report Card](https://goreportcard.com/badge/github.com/ouqiang/delay-queue)](https://goreportcard.com/report/github.com/ouqiang/delay-queue) -[![Downloads](https://img.shields.io/github/downloads/ouqiang/delay-queue/total.svg)](https://github.com/ouqiang/delay-queue/releases) -[![license](https://img.shields.io/github/license/mashape/apistatus.svg?maxAge=2592000)](https://github.com/ouqiang/delay-queue/blob/master/LICENSE) -[![Release](https://img.shields.io/github/release/ouqiang/delay-queue.svg?label=Release)](https://github.com/ouqiang/delay-queue/releases) + +基于[ouqiang/delay-queue](https://github.com/ouqiang/delay-queue) [suhuaguo/delay-queue](https://github.com/suhuaguo/delay-queue) [moshuipan/delay-queue](https://github.com/moshuipan/delay-queue/) 代码并加入编译优化脚本 + 基于Redis实现的延迟队列, 参考[有赞延迟队列设计](http://tech.youzan.com/queuing_delay)实现 diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..e98037e --- /dev/null +++ b/build.sh @@ -0,0 +1,15 @@ +mkdir -p dist +if [[ `uname -s` == "Darwin" ]]; then + echo "system is macos" + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags '-w' -o dist/delay-queue-linux-64 + CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags '-w' -o dist/delay-queue-win-64 + go build -ldflags '-w' -o dist/delay-queue-macos-64 +else + echo "system is linux" + CGO_ENABLED=0 GOOS=macos GOARCH=amd64 go build -ldflags '-w' -o dist/delay-queue-linux-64 + CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags '-w' -o dist/delay-queue-win-64 + go build -ldflags '-w' -o dist/delay-queue-macos-64 +fi +upx -9 dist/delay-queue-linux-64 +upx -9 dist/delay-queue-win-64 +upx -9 dist/delay-queue-macos-64 diff --git a/cmd/cmd.go b/cmd/cmd.go index 6a0f1dc..fc8e783 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -7,9 +7,9 @@ import ( "net/http" "os" - "github.com/ouqiang/delay-queue/config" - "github.com/ouqiang/delay-queue/delayqueue" - "github.com/ouqiang/delay-queue/routers" + "github.com/php-cpm/delay-queue/config" + "github.com/php-cpm/delay-queue/delayqueue" + "github.com/php-cpm/delay-queue/routers" ) // Cmd 应用入口Command @@ -54,6 +54,7 @@ func (cmd *Cmd) parseCommandArgs() { // 运行Web Server func (cmd *Cmd) runWeb() { + // 有详细的说明:https://studygolang.com/resources/4657 讲的非常详细 http.HandleFunc("/push", routers.Push) http.HandleFunc("/pop", routers.Pop) http.HandleFunc("/finish", routers.Delete) diff --git a/config/config.go b/config/config.go index 5917009..cdba6a2 100644 --- a/config/config.go +++ b/config/config.go @@ -40,12 +40,14 @@ const ( DefaultRedisReadTimeout = 180000 // DefaultRedisWriteTimeout Redis写入超时时间, 单位毫秒 DefaultRedisWriteTimeout = 3000 + // DefaultKeyName key名称 + DefaultKeyName = "dq_key_" ) // Config 应用配置 type Config struct { BindAddress string // http server 监听地址 - BucketSize int // bucket数量 + BucketSize int // bucket数量,timer 的数量 BucketName string // bucket在redis中的键名, QueueName string // ready queue在redis中的键名 QueueBlockTimeout int // 调用blpop阻塞超时时间, 单位秒, 修改此项, redis.read_timeout必须做相应调整 @@ -66,22 +68,28 @@ type RedisConfig struct { // Init 初始化配置 func Init(path string) { + // &Config{} :结构体初始化 + // http://blog.csdn.net/xxx9001/article/details/52574501 结构体初始化 Setting = &Config{} if path == "" { + // 加载默认的配置文件 Setting.initDefaultConfig() return } + // 转换配置文件 Setting.parse(path) } // 解析配置文件 func (config *Config) parse(path string) { + // 加载配置文件 file, err := ini.Load(path) if err != nil { log.Fatalf("无法解析配置文件#%s", err.Error()) } + // bucket 参数 section := file.Section("") config.BindAddress = section.Key("bind_address").MustString(DefaultBindAddress) config.BucketSize = section.Key("bucket_size").MustInt(DefaultBucketSize) @@ -89,6 +97,7 @@ func (config *Config) parse(path string) { config.QueueName = section.Key("queue_name").MustString(DefaultQueueName) config.QueueBlockTimeout = section.Key("queue_block_timeout").MustInt(DefaultQueueBlockTimeout) + // redis 相关的参数 config.Redis.Host = section.Key("redis.host").MustString(DefaultRedisHost) config.Redis.Db = section.Key("redis.db").MustInt(DefaultRedisDb) config.Redis.Password = section.Key("redis.password").MustString(DefaultRedisPassword) @@ -104,7 +113,7 @@ func (config *Config) initDefaultConfig() { config.BindAddress = DefaultBindAddress config.BucketSize = DefaultBucketSize config.BucketName = DefaultBucketName - config.QueueName = DefaultQueueName + config.QueueName = DefaultQueueName // ready queue config.QueueBlockTimeout = DefaultQueueBlockTimeout config.Redis.Host = DefaultRedisHost diff --git a/delayqueue/bucket.go b/delayqueue/bucket.go index e084f5e..53500bd 100644 --- a/delayqueue/bucket.go +++ b/delayqueue/bucket.go @@ -6,8 +6,8 @@ import ( // BucketItem bucket中的元素 type BucketItem struct { - timestamp int64 - jobId string + timestamp int64 // 时间戳 + jobId string // jobId } // 添加JobId到bucket中 @@ -19,6 +19,7 @@ func pushToBucket(key string, timestamp int64, jobId string) error { // 从bucket中获取延迟时间最小的JobId func getFromBucket(key string) (*BucketItem, error) { + // 返回有序集 key 中,指定区间内的成员。其中成员的位置按 score 值递增(从小到大)来排序。 value, err := execRedisCommand("ZRANGE", key, 0, 0, "WITHSCORES") if err != nil { return nil, err diff --git a/delayqueue/delay_queue.go b/delayqueue/delay_queue.go index 2b0c8fe..4f53382 100644 --- a/delayqueue/delay_queue.go +++ b/delayqueue/delay_queue.go @@ -6,7 +6,7 @@ import ( "log" "time" - "github.com/ouqiang/delay-queue/config" + "github.com/php-cpm/delay-queue/config" ) var ( @@ -18,8 +18,14 @@ var ( // Init 初始化延时队列 func Init() { + // 初始化 redis 连接池 RedisPool = initRedisPool() + + // 初始化一些列 Timer initTimers() + + // golang 函数:http://blog.csdn.net/mungo/article/details/52481285 + // 进行初始化,产生一个 go routine bucketNameChan = generateBucketName() } @@ -34,6 +40,8 @@ func Push(job Job) error { log.Printf("添加job到job pool失败#job-%+v#%s", job, err.Error()) return err } + + // 轮询的方式存放。ZADD 命令 。存放在有序集合中。将会从这里获取 要运行的 job err = pushToBucket(<-bucketNameChan, job.Delay, job.Id) if err != nil { log.Printf("添加job到bucket失败#job-%+v#%s", job, err.Error()) @@ -45,7 +53,9 @@ func Push(job Job) error { // Pop 轮询获取Job func Pop(topics []string) (*Job, error) { - jobId, err := blockPopFromReadyQueue(topics, config.Setting.QueueBlockTimeout) + // ready queue 里面只有 topic 作为 key + // jobId, err := blockPopFromReadyQueue(topics, config.Setting.QueueBlockTimeout) + jobId, err := blPopFromReadyQueue(topics, config.Setting.QueueBlockTimeout) if err != nil { return nil, err } @@ -66,8 +76,10 @@ func Pop(topics []string) (*Job, error) { return nil, nil } + // 重新放到 Bucket 中,等待重新消费。实现至少一次的逻辑。如果客户端删除了 job ,那么。调度到此 jobId 的时候,发现 job 不存在,直接在 bucket 中删除 timestamp := time.Now().Unix() + job.TTR - err = pushToBucket(<-bucketNameChan, timestamp, job.Id) + // 表示从 <-bucketNameChan 。这个 channel 接收一个值 + err = pushToBucket(<-bucketNameChan, timestamp, job.Id) //待确认的消息放入bucket1 return job, err } @@ -93,10 +105,22 @@ func Get(jobId string) (*Job, error) { // 轮询获取bucket名称, 使job分布到不同bucket中, 提高扫描速度 func generateBucketName() <-chan string { + // 阻塞 channel c := make(chan string) + // 1、为什么这么写呢?为什么不直接写个 for 死循环呢? + // 如果直接写 for 循环,那在初始化的时候,会阻塞其他 init 函数。如果另起一个 go routine 的话,就不会阻塞其他的 + + // 2、每次都 产生一个 go routine 。是怎么销毁的呀? + // 因为把这个函数 赋给某个 变量了。在 init 中初始化了。只有一个 go routine 。 + + // 3、我感觉到 里面的 i 变量好像没有作用的呀,因为都没有和其他 go routine 交换。 + // 因为在 初始化 init 一下。每次都从 bucketNameChan 这个 channel 读取信息。 go func() { i := 1 + + // 死循环 for { + // chan <- 发送消息 c <- fmt.Sprintf(config.Setting.BucketName, i) if i >= config.Setting.BucketSize { i = 1 @@ -109,13 +133,19 @@ func generateBucketName() <-chan string { return c } -// 初始化定时器 +// 初始化定时器 https://yq.aliyun.com/articles/69303 func initTimers() { timers = make([]*time.Ticker, config.Setting.BucketSize) var bucketName string for i := 0; i < config.Setting.BucketSize; i++ { + + // 每 1s 执行一次 timers[i] = time.NewTicker(1 * time.Second) + + // 如果这里部署多实例的话,就会产生竞争 bucketName = fmt.Sprintf(config.Setting.BucketName, i+1) + + // 并发执行 go waitTicker(timers[i], bucketName) } } @@ -123,7 +153,7 @@ func initTimers() { func waitTicker(timer *time.Ticker, bucketName string) { for { select { - case t := <-timer.C: + case t := <-timer.C: // 我们启动一个新的goroutine,来以阻塞的方式从Timer的C这个channel中,等待接收一个值,这个值是到期的时间。 tickHandler(t, bucketName) } } @@ -132,6 +162,7 @@ func waitTicker(timer *time.Ticker, bucketName string) { // 扫描bucket, 取出延迟时间小于当前时间的Job func tickHandler(t time.Time, bucketName string) { for { + // 拿到第一个元素。bucket 存放 jobid 和时间戳 bucketItem, err := getFromBucket(bucketName) if err != nil { log.Printf("扫描bucket错误#bucket-%s#%s", bucketName, err.Error()) @@ -165,11 +196,12 @@ func tickHandler(t time.Time, bucketName string) { if job.Delay > t.Unix() { // 从bucket中删除旧的jobId removeFromBucket(bucketName, bucketItem.jobId) - // 重新计算delay时间并放入bucket中 + // 重新计算delay时间并放入其他的 bucket 中 pushToBucket(<-bucketNameChan, job.Delay, bucketItem.jobId) continue } + // 放到 Ready 队列中,普通的 redis list 即可。RPUSH 方式 err = pushToReadyQueue(job.Topic, bucketItem.jobId) if err != nil { log.Printf("JobId放入ready queue失败#bucket-%s#job-%+v#%s", diff --git a/delayqueue/job.go b/delayqueue/job.go index b108411..0d6f4d8 100644 --- a/delayqueue/job.go +++ b/delayqueue/job.go @@ -1,21 +1,22 @@ package delayqueue import ( + "github.com/php-cpm/delay-queue/config" "github.com/vmihailenco/msgpack" ) // Job 使用msgpack序列化后保存到Redis,减少内存占用 type Job struct { - Topic string `json:"topic" msgpack:"1"` - Id string `json:"id" msgpack:"2"` // job唯一标识ID + Topic string `json:"topic" msgpack:"1"` // topic ,唯一 + Id string `json:"id" msgpack:"2"` // job唯一标识ID 。客户端需要保证唯一性。有关联关系的 Delay int64 `json:"delay" msgpack:"3"` // 延迟时间, unix时间戳 - TTR int64 `json:"ttr" msgpack:"4"` - Body string `json:"body" msgpack:"5"` + TTR int64 `json:"ttr" msgpack:"4"` // 超时时间,TTR的设计目的是为了保证消息传输的可靠性。 + Body string `json:"body" msgpack:"5"` // body } // 获取Job func getJob(key string) (*Job, error) { - value, err := execRedisCommand("GET", key) + value, err := execRedisCommand("GET", config.DefaultKeyName+key) if err != nil { return nil, err } @@ -39,14 +40,14 @@ func putJob(key string, job Job) error { if err != nil { return err } - _, err = execRedisCommand("SET", key, value) + _, err = execRedisCommand("SET", config.DefaultKeyName+key, value) return err } // 删除Job func removeJob(key string) error { - _, err := execRedisCommand("DEL", key) + _, err := execRedisCommand("DEL", config.DefaultKeyName+key) return err } diff --git a/delayqueue/ready_queue.go b/delayqueue/ready_queue.go index e93e9df..ef9f118 100644 --- a/delayqueue/ready_queue.go +++ b/delayqueue/ready_queue.go @@ -2,11 +2,16 @@ package delayqueue import ( "fmt" + "math/rand" + "time" - "github.com/ouqiang/delay-queue/config" + "github.com/php-cpm/delay-queue/config" ) // 添加JobId到队列中 +// 将消息放到 redis +// http://www.redis.cn/commands/rpush.html +// queueName:Topic func pushToReadyQueue(queueName string, jobId string) error { queueName = fmt.Sprintf(config.Setting.QueueName, queueName) _, err := execRedisCommand("RPUSH", queueName, jobId) @@ -16,12 +21,49 @@ func pushToReadyQueue(queueName string, jobId string) error { // 从队列中阻塞获取JobId func blockPopFromReadyQueue(queues []string, timeout int) (string, error) { + // var args []interface{} + // args = append(args, queue) + // } + // args = append(args, timeout) + var value interface{} + var err error + t := time.Now().Unix() + int64(timeout) +A: + for time.Now().Unix() < t { + for _, queue := range queues { + queue = fmt.Sprintf(config.Setting.QueueName, queue) + value, err = execRedisCommand("LPOP", queue) //使用codis,去掉blpop命令 + if err != nil { + return "", err + } + if value != nil { + break A + } + } + sleepTimeInterval() + } + if value == nil { + return "", nil + } + // var valueBytes []interface{} + // valueBytes = value.([]interface{}) + // if len(valueBytes) == 0 { + // return "", nil + // } + element := string(value.([]byte)) + + return element, nil +} + +// 从队列中阻塞获取JobId,blpop +func blPopFromReadyQueue(queues []string, timeout int) (string, error) { var args []interface{} for _, queue := range queues { queue = fmt.Sprintf(config.Setting.QueueName, queue) args = append(args, queue) } args = append(args, timeout) + // http://www.redis.cn/commands/blpop.html value, err := execRedisCommand("BLPOP", args...) if err != nil { return "", err @@ -38,3 +80,18 @@ func blockPopFromReadyQueue(queues []string, timeout int) (string, error) { return element, nil } + +// 请求的最小时间间隔(毫秒) +var RetryMinTimeInterval int64 = 5 + +// 请求的最大时间间隔(毫秒) +var RetryMaxTimeInterval int64 = 30 + +// sleepTimeInterval 随机休眠一段时间 +// 随机时间范围[RetryMinTimeInterval,RetryMaxTimeInterval) +func sleepTimeInterval() { + var unixNano = time.Now().UnixNano() + var r = rand.New(rand.NewSource(unixNano)) + var randValue = RetryMinTimeInterval + r.Int63n(RetryMaxTimeInterval-RetryMinTimeInterval) + time.Sleep(time.Duration(randValue) * time.Millisecond) +} diff --git a/delayqueue/redis.go b/delayqueue/redis.go index 48fdd78..652a3a4 100644 --- a/delayqueue/redis.go +++ b/delayqueue/redis.go @@ -5,7 +5,7 @@ import ( "time" "github.com/garyburd/redigo/redis" - "github.com/ouqiang/delay-queue/config" + "github.com/php-cpm/delay-queue/config" ) var ( diff --git a/dist/delay-queue-linux-64 b/dist/delay-queue-linux-64 new file mode 100755 index 0000000..dda9d00 Binary files /dev/null and b/dist/delay-queue-linux-64 differ diff --git a/dist/delay-queue-macos-64 b/dist/delay-queue-macos-64 new file mode 100755 index 0000000..a7a282e Binary files /dev/null and b/dist/delay-queue-macos-64 differ diff --git a/dist/delay-queue-win-64 b/dist/delay-queue-win-64 new file mode 100755 index 0000000..9aa0edf Binary files /dev/null and b/dist/delay-queue-win-64 differ diff --git a/main.go b/main.go index bd48471..bbd495b 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,6 @@ package main -import "github.com/ouqiang/delay-queue/cmd" +import "github.com/php-cpm/delay-queue/cmd" func main() { cmd := new(cmd.Cmd) diff --git a/routers/routers.go b/routers/routers.go index 6e81577..cde8585 100644 --- a/routers/routers.go +++ b/routers/routers.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/ouqiang/delay-queue/delayqueue" + "github.com/php-cpm/delay-queue/delayqueue" ) // TopicRequest Job类型请求json @@ -41,13 +41,13 @@ func Push(resp http.ResponseWriter, req *http.Request) { return } - if job.Delay <= 0 || job.Delay > (1<<31) { - resp.Write(generateFailureBody("delay 取值范围1 - (2^31 - 1)")) + if job.Delay < 0 || job.Delay > (1<<31) { + resp.Write(generateFailureBody("delay 取值范围0 - (2^31 - 1)")) return } - if job.TTR <= 0 || job.TTR > 86400 { - resp.Write(generateFailureBody("ttr 取值范围1 - 86400")) + if job.TTR < 0 || job.TTR > 86400 { + resp.Write(generateFailureBody("ttr 取值范围0 - 86400")) return } @@ -90,12 +90,14 @@ func Pop(resp http.ResponseWriter, req *http.Request) { } type Data struct { + Topic string `json:"topic"` Id string `json:"id"` Body string `json:"body"` } data := Data{ Id: job.Id, + Topic: job.Topic, Body: job.Body, } @@ -163,6 +165,7 @@ type ResponseBody struct { } func readBody(resp http.ResponseWriter, req *http.Request, v interface{}) error { + // 一定要等到有error或EOF的时候才会返回结果,因此只能等到客户端退出时才会返回结果。 body, err := ioutil.ReadAll(req.Body) if err != nil { log.Printf("读取body错误#%s", err.Error()) @@ -179,10 +182,12 @@ func readBody(resp http.ResponseWriter, req *http.Request, v interface{}) error return nil } +// 成功:0 func generateSuccessBody(msg string, data interface{}) []byte { return generateResponseBody(0, msg, data) } +// 失败:1 func generateFailureBody(msg string) []byte { return generateResponseBody(1, msg, nil) } diff --git a/vendor/vendor.json b/vendor/vendor.json index 2a9012d..4cfd304 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -37,5 +37,5 @@ "revisionTime": "2017-06-02T20:46:24Z" } ], - "rootPath": "github.com/ouqiang/delay-queue" + "rootPath": "github.com/php-cpm/delay-queue" }