1.绪论
本文主要讲解常见的几种延迟队列的实现方式,以及其原理。
2.延迟队列的使用场景
延迟队列主要用于解决每个被调度的任务开始执行的时间不一致的场景,主要包含如下场景:
1.比如订单超过15分钟后,关闭未关闭的订单。
2.比如用户可以下发任务,并且可以自定义任务的开始时间。
3.延迟队列的几大要素
延迟队列主要包含如下几个要素:
1.延迟队列里面存储的其实就是需要调度的任务,所以我们需要一个存储任务的容器;这个容器,可以是的数据库,redis或者内存中队列(包括链表,优先队列等);
2.一个线程,来轮询的存储任务的容器,判断任务是否已经到达执行时间;
4.延迟队列的实现方式
上面我们说了,定时任务其实就是由两个部分组成,分别是存储任务的容器和轮询线程,接下来我们根据这两个组件来分析各种延迟队列的实现。
4.1 定时任务扫表
4.1.1 组成组件
1.调度线程:一般采用分布式的定时任务,如果xxljob等。
2.存储任务的容器:数据库
4.1.2 实现方式
启动定时任务,每隔一段时间,轮询数据库,找出已经到达任务开始时间的任务,查询出后,执行业务逻辑。
4.1.3 优缺点
1.频繁的对数据库进行全表扫描,数据库压力大。
2.可能有时间延迟问题,延迟大小取决于轮询间隔。
3.定时轮询也会增加自身服务器开销。
4.2 基于内存队列的实现方式
4.2.1 实现原理
1.基本实现
如图所示,可以将需要调度的任务,存储到一个链表里面,然后开启一个线程,轮询该链表,如果如果某个任务的执行时间已到,便执行该任务。
但是上述场景存在一个问题,就是每个需要遍历整个链表,时间复杂度为o(n)。在这个定时任务重,过期时间小的任务一定会先被执行,所以我们可以考虑将时间最小的任务放到队首,这样就以o(1)的时间复杂度取出下一个需要执行的任务。
2.基于优先队列的优化
在jdk中可以采用优先队列来实现PriorityQueue,它其实就是一个小顶堆,每次插入元素的时候,可以以O(nlogn)的时间复杂度来维持堆首元素最小的特征。
4.2.2 jdk自带的延迟队列DelayQueue实现方式
我们可以看一下jdk自带的延迟队列DelayQueue的实现方式。
1.组成组件
1.存储任务的容器:数据库
2.调度线程:可能有同学好奇DelayQueue的调度线程是哪一个,其实是我们在使用DelayQqueue时间,会启动一个线程,循环轮询DelayQueue,这线程就是DelayQueue的调度线程。
new Thread(() -> {while(true) {delayQueue.offer();}
}).start();
2.添加元素
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}
添加元素其实就是往优先队列里面写入一个任务,优先队列会自动的将过期时间最小的任务放在队首。
3.取出元素
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try { //一直循环for (;;) {//取出队首元素,即下一个过期的元素E first = q.peek();if (first == null)available.await();else {//获取随手元素时间long delay = first.getDelay(NANOSECONDS);//如果已经到期,返回队首元素if (delay <= 0)return q.poll();first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {//如果没到期,等阻塞线程到队首元素的开始时间available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}}
其实就是轮询整个优先队列,优先队列的队首元素就是下一个需要调度的任务,如果队首元素的直线时间小于当前时间,返回该任务,否者阻塞当前任务到下一个任务的执行时间,再返回当前任务。
4.2.3 优缺点
1.被调度任务存储在内存中,如果重启服务,需要调度的任务会丢失。
2.基于优先队列实现,插入调度任务时间复杂度为o(nlogn),如果是数据量庞大,插入性能可能会被影响,并且上一个任务的执行时间可能会影响到下一个任务的执行。
4.3 基于时间轮的实现
4.3.1 什么是时间轮
时间轮其实就是利用一个环形队列来表示时间,队列上的每个元素挂载了在这个时间刻度上需要执行的任务。
1.单层时间轮
如图所示,就是一个时间轮,分成了6个刻度,假设每个刻度代表1秒,假设当前时间为0秒,则第一秒执行的任务放在刻度1,第2秒执行的任务放在刻度2。如果任务的执行时间超过了刻度6,比如第8秒需要执行的任务放在哪儿呢。我们可以将其对6求余,放在刻度2的位置,然后用ticket来表示还差几轮才会轮到自己执行。
所以时间轮的执行步骤为,通过一个线程轮询环形队列,找到当前刻度,取出当前刻度上任务链表,如果任务链表中的任务的ticket为1,立刻执行该任务,如果大于1,便将ticket减1,说明是后面轮次的任务。
2.多层时间轮
单层时间轮,一旦时间跨度过大,就会导致时间轮的轮数过多,每个刻度上挂载的链表过长,所以引入多层时间轮。
多层时间轮,其实就是有多个不同刻度的单层时间轮组成的一种结构,以一天为例子,可以用一个3层时间轮来表示。其中,一个时间轮刻度为1秒,一个时间轮刻度为1分钟,一个时间轮刻度为1小时。如果秒时间轮已经转完60个刻度,即1分钟。则分时间轮需要向下转动一个刻度,将任务取出分散到秒时间轮上。这样便实现了任务的分散。
4.3.2 Netty中的时间轮
1.组成组件
1.存储任务的容器:任务数组
2.调度线程:netty启动的推动时间轮的线程。
2.添加元素
//向时间轮中添加定时任务的方法,但该方法实际上只会把定时任务存放到timeouts队列中public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task == null) {throw new NullPointerException("task");}if (unit == null) {throw new NullPointerException("unit");}//启动工作线程,并且确保只启动一次,这里面会涉及线程的等待和唤醒start();//计算该定时任务的执行时间,startTime是worker线程的开始时间。以后所有添加进来的任务的执行时间,都是根据这个开始时间做的对比long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;HashedWheelTimeout timeout = new HashedWheelTimeout(task, deadline);//将定时任务和任务的执行时间添加到普通任务队列中timeouts.add(timeout);return timeout;}
注意:netty实现的方式是,业务线程会执行任务加入到HashedWheelTimeout 这个普通队列中,然后再由推动时间轮的线程,来将HashedWheelTimeout中的任务移到时间轮中。其实这一步,也可以省略,直接在业务线程添加调度的任务的时候,将执行任务写入到时间轮询中。增加HashedWheelTimeout的原因应该是为了减少并发读写。
3.执行定时任务
//时间轮线程一直执行的private final class Worker implements Runnable {//这个属性代表当前时间轮的指针移动了几个刻度private long tick;@Overridepublic void run() {//给starttime赋值,这里要等待该值复制成功后,另一个线程才能继续向下执行startTime = System.nanoTime();//这里是不是就串联起来了,通知之前的线程可以继续向下运行了startTimeInitialized.countDown();do {//返回的是时间轮线程从开始工作到现在执行了多少时间了final long deadline = waitForNextTick();if (deadline > 0) {//获取要执行的定时任务的那个数组下标。就是让指针当前的刻度和掩码做位运算int idx = (int) (tick & mask);//上面已经得到了要执行的定时任务的数组下标,这里就可以得到该bucket,而这个bucket就是定时任务的一个双向链表//链表中的每个节点都是一个定时任务HashedWheelBucket bucket = wheel[idx];//在真正执行定时任务之前,把即将被执行的任务从普通任务队列中放到时间轮的数组当中transferTimeoutsToBuckets();//执行定时任务bucket.expireTimeouts(deadline);//指针已经移动了,所以加1tick++;}//暂且让时间轮线程一直循环} while (true);}
}
这里可以看出会有一个线程,来一直推动时间轮向前。并执行任务。
4.4 基于redis的实现方式
redis实现延迟队列有两种方式,分别是监听key过期和通过zset来存储调度任务。
4.4.1 监听key过期
1.实现原理
即业务系统将调度任务数据存储到redis作为key,过期时间设置为任务执行时间。并监听这些key,当key过期被删除的时候,redis回给业务系统发送通知。
2.优缺点
1.redis采用定期删除+惰性删除的方式,所以一个key计算过期,也可能不会被立即删除掉,而是等待下一次访问该key或者被redis的定时任务扫到,才会删除key,导致任务执行时间不精准。
4.4.2 基于zset存储key和执行时间实现
1.实现原理
实现原理和jdk自带的延迟队列实现原理一样,只是存储任务的数据采用Redis中的zset实现,下次需要执行的任务放在zset的首部,只需要获取首部任务元素,然后获取到该元素的过期时间,redission启动一个定时任务,阻塞线程至首部元素的执行时间,才开始执行任务,并且将其加入到一个阻塞队列中。业务系统会启动一个线程,一直监听阻塞队列,如果有数据,证明有任务到达执行时间了,便取出数据,开始执行任务。
2.Redisson的实现
1.组成组件
1.存储任务的容器:
- 一个普通的list:主要是为了保存执行任务的插入顺序,方便执行增删改操作;
- 一个zset:key为执行任务,score为任务执行时间,利用zset的排序功能zrange,可以取出执行时间最小的任务;
- blist:阻塞队列,如果执行任务到期便会被转移到阻塞队列中,业务线程会轮询阻塞队列,取出里面执行任务,完成消费逻辑。
2.执行线程:其实是redisson客户端开启的一个线程。
2.源码分析
redisson的执行逻辑其实可以分成两个层面:
1.就是上面的普通逻辑,redisson客户端会启动一个线程,一直轮询zset,取出里面的过期任务,转移到阻塞队列中。但是这里轮询并不是定时扫描,而是每次取出到期任务过后,会返回最近的下一次任务的到达时间,然后启动一个定时器,等到下一个任务执行时间到期后,才再次从redis中拉取数据,大大的减少了io操作。这一操作其实和jdk的延迟队列是一样的。
2.还有就是处理特殊场景,一是在初始化的时候,如何判断下一个任务到达时间是多少;二是在redis中已经拉取到最新的一条任务的过期时间后,有新的任务添加到redis中,而且这个新的任务的过期时间是小于以前的最近的一条任务的过期时间的。针对这两种情况,redisson采用发布订阅的思想。redisson在构造延迟队列的时候,会订阅redisson_delay_queue_channel这个channel。
每次添加任务的时候,会判断被添加任务的过期时间是不是超过zset中所有任务的过期时间,如果是,便会向redisson_delay_queue_channel发布消息,消息体包含了最近的这条任务的时间。redisson收到消息过后,会更新定时器的执行时间为最新的一条执行任务的时间。
a)构造器
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {super(codec, commandExecutor, name);//创建定时任务QueueTransferTask task = new QueueTransferTask(commandExecutor.getServiceManager()) { //这个逻辑是核心的转移逻辑,就是前面说的取出zset前面100条数据,并且返回下一次任务//的执行时间protected RFuture<Long> pushTaskAsync() {return commandExecutor.evalWriteAsync(RedissonDelayedQueue.this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('Bc0Lc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;", Arrays.asList(RedissonDelayedQueue.this.getRawName(), RedissonDelayedQueue.this.timeoutSetName, RedissonDelayedQueue.this.queueName), new Object[]{System.currentTimeMillis(), 100});}//创建redisson_delay_queue_channel这个channelprotected RTopic getTopic() {return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, RedissonDelayedQueue.this.channelName);}};//真正的定时任务调度转移zset任务至阻塞队列的逻辑queueTransferService.schedule(this.queueName, task);this.queueTransferService = queueTransferService;}
//可以看出,在启动这个线程的时候,会订阅redisson_delay_queue_channel这个topic
public void start() {RTopic schedulerTopic = this.getTopic();this.statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {public void onSubscribe(String channel) {QueueTransferTask.this.pushTask();}});this.messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {//当有消息到达的时候,证明此时有新的执行任务的过期时间小于zset中任务最小的过期时间public void onMessage(CharSequence channel, Long startTime) {//所以需要更新定时器中定时时间QueueTransferTask.this.scheduleTask(startTime);}});}
可以看出,在初始化的时候,redisson就订阅了redisson_delay_queue_channel这个channel,其他都是回调方法。
b)添加任务
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {if (delay < 0L) {throw new IllegalArgumentException("Delay can't be negative");} else {long delayInMs = timeUnit.toMillis(delay);long timeout = System.currentTimeMillis() + delayInMs;byte[] random = this.getServiceManager().generateIdArray(8);//这是添加任务的核心方法,其实就是将任务添加到zset中,如果当前任务的过期时间小于zset中所有任务的过期时间,便会执行发布一条消息。return this.commandExecutor.evalWriteNoRetryAsync(this.getRawName(), this.codec, RedisCommands.EVAL_VOID, "local value = struct.pack('Bc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);redis.call('zadd', KEYS[2], ARGV[1], value);redis.call('rpush', KEYS[3], value);local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); end;", Arrays.asList(this.getRawName(), this.timeoutSetName, this.queueName, this.channelName), new Object[]{timeout, random, this.encode(e)});}}
可以看出,添加任务其实就是将任务添加到zset中,如果当前任务的过期时间小于zset中所有任务的过期时间,便会执行发布一条消息到redisson_delay_queue_channel中,触发上面回调方法QueueTransferTask.this.scheduleTask(startTime)。
c)转移任务至阻塞队列
private void scheduleTask(Long startTime) {QueueTransferTask.TimeoutTask oldTimeout = (QueueTransferTask.TimeoutTask)this.lastTimeout.get();if (startTime != null) {if (oldTimeout != null) {oldTimeout.getTask().cancel();}long delay = startTime - System.currentTimeMillis();if (delay > 10L) {//创建一个定时器,其实是由java中的timer实现的Timeout timeout = this.serviceManager.newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {QueueTransferTask.this.pushTask();//定时器时间为当前zset中的最小时间QueueTransferTask.TimeoutTask currentTimeout = (QueueTransferTask.TimeoutTask)QueueTransferTask.this.lastTimeout.get();if (currentTimeout.getTask() == timeout) {QueueTransferTask.this.lastTimeout.compareAndSet(currentTimeout, (Object)null);}}}, delay, TimeUnit.MILLISECONDS);if (!this.lastTimeout.compareAndSet(oldTimeout, new QueueTransferTask.TimeoutTask(startTime, timeout))) {timeout.cancel();}} else {this.pushTask();}}}
其实就是创建一个定时器,定时器为当前zset中的最小时间,当定时任务到达时,执行 QueueTransferTask.this.pushTask()方法。
最终执行的其实就是前面构造函数中的pushTaskAsync方法,里面其实就是一段lua脚本:
local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('Bc0Lc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;
这个逻辑是核心的转移逻辑,就是前面说的取出zset前面100条数据,如果任务到期,便转移到阻塞队列中,并且返回下一次任务的执行时间。
4.5 基于RocketMq的实现方式
RocketMQ 本身不直接支持延时消息队列,但是可以通过特定的设置来实现类似的功能。在 RocketMQ 中,消息的延时级别可以在发送消息时通过设置 delayLevel来实现,delayLevel
是一个整数,表示消息延时级别,级别越高,延时越大。RocketMQ 默认定义了 18 个延时级别,级别 1 表示 1s 延时,级别 2 表示 5s 延时,依此类推,级别 18 表示 18levels 延时(level 是自定义的延时系数,默认是 1000 毫秒。在rocketmq5.0中,也支持了自定义任务执行时间的延迟队列。它本质上还是通过时间轮来实现的。
5.总结
可以看出,延迟队列主要包括两个部分,分别是存储任务的数据结构(可以是内存队列,redis,数据库,mq等),还有就是需要线程,来推送扫描队列中的任务。万变不离其宗。