____ _ ___
| _ \ ___| | __ _ _ _ / _ \ _ _ ___ _ _ ___
| | | |/ _ \ |/ _` | | | | | | | | | | |/ _ \ | | |/ _ \
| |_| | __/ | (_| | |_| | | |_| | |_| | __/ |_| | __/
|____/ \___|_|\__,_|\__, | \__\_\\__,_|\___|\__,_|\___|
|___/
这个项目是仿照有赞的延迟队列进行设计的。现在这个队列是通过Redis Cluster来进行存储和实现分布式高可用的,
# clone project
git clone https://github.com/changsongl/delay-queue.git
# build the project
make
# run the project
bin/delayqueue
# flags
bin/delayqueue -help
-config.file string
config file (default "../../config/config.yaml")
-config.type string
config type: yaml, json
-env string
delay queue env: debug, release (default "release")
-version
display build info
-
SDK 链接
// Push job
POST 127.0.0.1:8000/topic/mytopic/job
body: {"id": "myid1","delay":10, "ttr":4, "body":"body"}
// response
{
"message": "ok",
"success": true
}
// Pop job (timeout: 秒)
GET 127.0.0.1:8000/topic/mytopic/job?timeout=5
// response
{
"message": "ok",
"success": true,
"data": {
"body": "body",
"delay": 10,
"id": "myid1",
"topic": "mytopic",
"ttr": 4
}
}
// Delete job
DELETE 127.0.0.1:8000/topic/mytopic/job/myid1
// response
{
"message": "ok",
"success": true
}
// Delete job
PUT 127.0.0.1:8000/topic/mytopic/job/myid1
// response
{
"message": "ok",
"success": true
}
- Job:需要异步处理的任务,是延迟队列里的基本单元。与具体的Topic关联在一起。
- Topic:一组相同类型Job的集合(队列)。供消费者来订阅。
- Topic:Job类型。可以理解成具体的业务名称。
- Id:Job的唯一标识。用来检索和删除指定的Job信息。Topic和Id的组合应该是业务中唯一的。
- Delay:Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
- TTR(time-to-run):Job执行超时时间,超过此事件后,会将此Job再次发给消费者消费。单位:秒。
- Body:Job的内容,供消费者做具体的业务处理,可以为json格式。
有四个组件
- Job Pool: 用来存放所有Job的元信息。
- Delay Bucket: 是一组以时间为维度的有序队列,用来存放所有需要延迟的/已经被reserve的Job(这里只存放Job Id)。
- Timer: 负责实时扫描各个Bucket,并将delay时间大于等于当前时间的Job放入到对应的Ready Queue。
- Ready Queue: 存放处于Ready状态的Job(这里只存放Job Id),以供消费程序消费。
Job的状态一共有4种,同一时间下只能有一种状态。
- ready:可执行状态,等待消费。
- delay:不可执行状态,等待时钟周期。
- reserved:已被消费者读取,但还未得到消费者的响应(delete、finish)。
- deleted:已被消费完成或者已被删除。
项目使用普罗米修斯作为监控手段,暴露了metrics接口给普罗米修斯进行数据拉取。 你可以使用普罗米修斯和Grafana的作为自己的监控手段。
# HELP delay_queue_in_flight_jobs_numbers_in_bucket Gauge of the number of inflight jobs in each bucket
# TYPE delay_queue_in_flight_jobs_numbers_in_bucket gauge
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_0"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_1"} 3
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_2"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_3"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_4"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_5"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_6"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_7"} 0
.
.
.
# HELP delay_queue_in_flight_jobs_numbers_in_queue Gauge of the number of inflight jobs in each queue
# TYPE delay_queue_in_flight_jobs_numbers_in_queue gauge
delay_queue_in_flight_jobs_numbers_in_queue{queue="dq_queue_mytopic"} 1
我将持续打磨这个项目,并且加入更多的功能和修复问题。我将会让这个项目可以投入到生产环境使用。 如果喜欢的话,欢迎给个星或者Fork参与进来,这里欢迎你的贡献!
- 在Issue里发布自己的问题或评论。
- 我们会在问题中进行讨论,并进行设计如何开发。
- Fork项目进行开发,并以develop分支,创建自己的分支进行开发。如fix-xxx, feature-xxx等等。
- 开发完成后,发起PR合入develop。
- Code Review后将会把你的代码合进分支。
Youzan Design Concept Youzan Link