目录
一、什么是延时队列
二、延时队列的应用
三、举例说明
我的设计思想:
一、什么是延时队列
延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理或者是在某个时间进行处理。
二、延时队列的应用
- 12306 下单成功后,在半个小时内没有支付,自动取消订单。
- 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存。
- 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。
- 外卖平台发送订餐通知,下单成功后 60s 给用户推送短信。
- 规定某个时间执行某个任务
三、举例说明
以下是我做项目时遇到的例子: 需求就是用户设定时间,到时间之后,系统自动执行某个任务
我的设计思想:
采用轮询的策略监听redis的key的值,将用户输入的时间在后端转换为一个时间戳,利用redis Zset的数据结构来存储,主要用来判断的就是时间戳,Zset是一个有序的集合,所有时间戳在前面的就是先要执行的事件,当然用时间戳来比较的话,就是从0到现在时间的时间戳来比较大小,如何redis存入的时间戳大于0且 小于当前时间戳就代表执行任务,否则就代表待执行
首先,封装了一个实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DelayMessageVo implements Serializable {/*** 切记实例化*/private static final long serialVersionUID = -7671756385477179547L;/*** 消息 id*/private Integer id;/*** 消息内容*/private String content;/*** 消息到期时间*/private long expireTime;}
对Redis进行操作
@Component
public class DelayQueueService {/*** key后面拼接当前机器的内网ip : 用于集群区分,解决集群出现的并发问题*/private static final String KEY = "delay_queue:" + getHostAddress();@Autowiredprivate RedisTemplate redisTemplate;/*** 添加消息到延时队列中*/public void put(DelayMessageVo message ) {redisTemplate.opsForZSet().add(KEY, message, message.getExpireTime());}/*** 从延时队列中删除消息*/public Long remove(DelayMessageVo message) {Long remove = redisTemplate.opsForZSet().remove(KEY, message);return remove;}/*** 获取延时队列中已到期的消息*/public List<DelayMessageVo> getExpiredMessages() {
// 1 : 获取到开始时间long minScore = 0;
// 2 : 获取当前时间long maxScore = System.currentTimeMillis();
// 3 : 获取到指定范围区间的数据列表Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(KEY, minScore, maxScore);if (messages == null || messages.isEmpty()) {return Collections.emptyList();}
// 4 : 把对象进行封装,返回List<DelayMessageVo> result = new ArrayList<>();for (Object message : messages) {// 将 DelayMessageVo 对象转换为 JSON 字符串String jsonMessage = JSON.toJSONString(message);DelayMessageVo delayMessage = JSONObject.parseObject(jsonMessage, DelayMessageVo.class);result.add(delayMessage);}return result;}/*** 获取地址(服务器的内网地址)(内网ip)** @return*/public static String getHostAddress() {InetAddress localHost = null;try {localHost = InetAddress.getLocalHost();} catch (UnknownHostException e) {e.printStackTrace();}return localHost.getHostAddress();}
}
轮询策略
@Componentpublic class DelayMessageHandler {public static SimpleDateFormat dateTimeFormater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate DelayQueueService delayQueue;@Autowiredprivate ExamineMapper examineMapper;/*** 处理已到期的消息(轮询)*/@Scheduled(fixedDelay = 60000)public void handleExpiredMessages() {String currentTime = getCurrentTime();
// 1 : 扫描任务,并将需要执行的任务加入到任务队列中List<DelayMessageVo> messages = delayQueue.getExpiredMessages();System.out.println(currentTime + " 待处理消息数量:" + messages.size());
// 2 : 开始处理消息if (!messages.isEmpty()) {for (DelayMessageVo message : messages) {
// 2.1 : 处理消息:先删除消息,获取当前消息是否已经被其他人消费Long remove = delayQueue.remove(message);if (remove > 0) {
// 2.2 : 开启线程异步处理消息:不让处理消息的时间阻塞当前线程new Thread(() -> {System.out.println(currentTime + " :" + message.getId() + " --> 消息开始处理");Integer id = message.getId();String content = message.getContent();if (content.equals("任务开始时间")){examineMapper.updateBeginExamineStatus(id);}else if (content.equals("任务结束时间")){examineMapper.updateFinishExamineStatus(id);}else if (content.equals("公告结束时间")){examineMapper.updatePublicityExamineStatus(id);}try {
// 2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}System.out.println(currentTime + " :" + message.getId() + " --> 消息处理结束");}).start();}}}}/*** 获取到的当前时分秒** @return*/public static String getCurrentTime() {String format = dateTimeFormater.format(new Date());return format;}}