头条移动端项目Day05 —— 延迟队列精准发布文章

❤ 作者主页:欢迎来到我的技术博客😎
❀ 个人介绍:大家好,本人热衷于Java后端开发,欢迎来交流学习哦!( ̄▽ ̄)~*
🍊 如果文章对您有帮助,记得关注点赞收藏评论⭐️⭐️⭐️
📣 您的支持将是我创作的动力,让我们一起加油进步吧!!!🎉🎉

文章目录

  • 延迟任务精准发布文章
    • 1、文章定时发布
    • 2、延迟任务概述
      • 2.1 什么是延迟任务
      • 2.2 技术对比
        • 2.2.1 DelayQueue
        • 2.2.2 RabbitMQ实现延迟任务
        • 2.2.3 redis实现
    • 3、redis实现延迟任务
    • 4、延迟任务服务实现
      • 4.1 搭建heima-leadnews-schedule模块
      • 4.2 数据库准备
      • 4.3 安装redis
      • 4.4 项目集成redis
      • 4.5 添加任务
      • 4.6 取消任务
      • 4.7 消费任务
      • 4.8 未来数据定时刷新
        • 4.8.1 reids key值匹配
        • 4.8.2 redis管道
        • 4.8.3 未来数据定时刷新-功能完成
      • 4.9 分布式锁解决集群下的方法抢占执行
        • 4.9.1 问题描述
        • 4.9.2 分布式锁
        • 4.9.3 redis分布式锁
        • 4.9.4 在工具类CacheService中添加方法
      • 4.10 数据库同步到redis
    • 5、延迟队列解决精准时间发布文章
      • 5.1 延迟队列服务提供对外接口
      • 5.2 发布文章集成添加延迟队列接口
      • 5.3 消费任务进行审核文章

延迟任务精准发布文章

1、文章定时发布

2、延迟任务概述

2.1 什么是延迟任务

  • 定时任务:有固定周期的,有明确的触发时间
  • 延迟队列:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟
    在这里插入图片描述

应用场景:

场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消

场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止


2.2 技术对比

2.2.1 DelayQueue

JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
在这里插入图片描述
DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法

getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。

compareTo方法:用于排序,确定元素出队列的顺序。

实现:

1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,

2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,

3:循环的从延迟队列中拉取任务

public class DelayedTask  implements Delayed{// 任务的执行时间private int executeTime = 0;public DelayedTask(int delay){Calendar calendar = Calendar.getInstance();calendar.add(Calendar.SECOND,delay);this.executeTime = (int)(calendar.getTimeInMillis() /1000 );}/*** 元素在队列中的剩余时间* @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {Calendar calendar = Calendar.getInstance();return executeTime - (calendar.getTimeInMillis()/1000);}/*** 元素排序* @param o* @return*/@Overridepublic int compareTo(Delayed o) {long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);return val == 0 ? 0 : ( val < 0 ? -1: 1 );}public static void main(String[] args) {DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();queue.add(new DelayedTask(5));queue.add(new DelayedTask(10));queue.add(new DelayedTask(15));System.out.println(System.currentTimeMillis()/1000+" start consume ");while(queue.size() != 0){DelayedTask delayedTask = queue.poll();if(delayedTask !=null ){System.out.println(System.currentTimeMillis()/1000+" cosume task");}//每隔一秒消费一次try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}     }
}

DelayQueue实现完成之后思考一个问题:

使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保证数据不丢失,需要持久化(磁盘)。


2.2.2 RabbitMQ实现延迟任务

  • TTL:Time To Live (消息存活时间)

  • 死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)

在这里插入图片描述


2.2.3 redis实现

zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序
在这里插入图片描述


3、redis实现延迟任务

实现思路
在这里插入图片描述

问题思路

  1. 为什么任务需要存储在数据库中?

    延迟任务是一个通用的服务,任何需要延迟的任务都可以调用该服务,需要考虑数据持久化的问题,存储在数据库中是一种数据安全的考虑。

  2. 为什么redis中使用两种数据类型,list和zset?

    效率问题,算法的时间复杂度

  3. 在添加zset数据的时候,为什么不需要预加载?

    任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。


4、延迟任务服务实现

4.1 搭建heima-leadnews-schedule模块

leadnews-schedule是一个通用的服务,单独创建模块来管理任何类型的延迟任务

①:导入资料文件夹下的heima-leadnews-schedule模块到heima-leadnews-service下,如下图所示:
在这里插入图片描述

②:添加bootstrap.yml

server:port: 51701
spring:application:name: leadnews-schedulecloud:nacos:discovery:server-addr: 192.168.200.130:8848config:server-addr: 192.168.200.130:8848file-extension: yml

③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置

spring:datasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTCusername: rootpassword: root
# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:mapper-locations: classpath*:mapper/*.xml# 设置别名包扫描路径,通过该属性可以给包中的类注册别名type-aliases-package: com.heima.model.schedule.pojos

4.2 数据库准备

导入leadnews_schedule数据库:

SET FOREIGN_KEY_CHECKS=0;-- ----------------------------
-- Table structure for taskinfo
-- ----------------------------
DROP TABLE IF EXISTS `taskinfo`;
CREATE TABLE `taskinfo` (`task_id` BIGINT(20) NOT NULL COMMENT '任务id',`execute_time` DATETIME(3) NOT NULL COMMENT '执行时间',`parameters` LONGBLOB COMMENT '参数',`priority` INT(11) NOT NULL COMMENT '优先级',`task_type` INT(11) NOT NULL COMMENT '任务类型',PRIMARY KEY (`task_id`),KEY `index_taskinfo_time` (`task_type`,`priority`,`execute_time`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;-- ----------------------------
-- Records of taskinfo
-- ------------------------------ ----------------------------
-- Table structure for taskinfo_logs
-- ----------------------------
DROP TABLE IF EXISTS `taskinfo_logs`;
CREATE TABLE `taskinfo_logs` (`task_id` BIGINT(20) NOT NULL COMMENT '任务id',`execute_time` DATETIME(3) NOT NULL COMMENT '执行时间',`parameters` LONGBLOB COMMENT '参数',`priority` INT(11) NOT NULL COMMENT '优先级',`task_type` INT(11) NOT NULL COMMENT '任务类型',`version` INT(11) NOT NULL COMMENT '版本号,用乐观锁',`status` INT(11) DEFAULT '0' COMMENT '状态 0=初始化状态 1=EXECUTED 2=CANCELLED',PRIMARY KEY (`task_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;-- ----------------------------
-- Records of taskinfo_logs
-- ----------------------------

taskinfo 任务表

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a6dSSPmU-1685593677207)(延迟任务精准发布文章.assets\image-20210513151812858.png)]

实体类

package com.heima.model.schedule.pojos;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;import java.io.Serializable;
import java.util.Date;@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {private static final long serialVersionUID = 1L;/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;/*** 执行时间*/@TableField("execute_time")private Date executeTime;/*** 参数*/@TableField("parameters")private byte[] parameters;/*** 优先级*/@TableField("priority")private Integer priority;/*** 任务类型*/@TableField("task_type")private Integer taskType;}

taskinfo_logs 任务日志表

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DvbXp5ny-1685593677207)(延迟任务精准发布文章.assets\image-20210513151835752.png)]

实体类

package com.heima.model.schedule.pojos;import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;import java.io.Serializable;
import java.util.Date;@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {private static final long serialVersionUID = 1L;/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;/*** 执行时间*/@TableField("execute_time")private Date executeTime;/*** 参数*/@TableField("parameters")private byte[] parameters;/*** 优先级*/@TableField("priority")private Integer priority;/*** 任务类型*/@TableField("task_type")private Integer taskType;/*** 版本号,用乐观锁*/@Versionprivate Integer version;/*** 状态 0=int 1=EXECUTED 2=CANCELLED*/@TableField("status")private Integer status;}

乐观锁支持:

/*** mybatis-plus乐观锁支持* @return*/
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;
}

4.3 安装redis

①拉取镜像

docker pull redis

② 创建容器

docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"

③链接测试

打开资料中的Redis Desktop Manager,输入host、port、password链接测试

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uzpy1AWw-1685593731253)(延迟任务精准发布文章.assets\image-20210513152138388.png)]

能链接成功,即可。


4.4 项目集成redis

① 在项目导入redis相关依赖,已经完成

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency>

② 在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis

spring:redis:host: 192.168.200.130password: leadnewsport: 6379

③ 拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加自动配置

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8yORF6aS-1685593772940)(延迟任务精准发布文章.assets\image-20210514181214681.png)]

④:测试

package com.heima.schedule.test;import com.heima.common.redis.CacheService;
import com.heima.schedule.ScheduleApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.Set;@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {@Autowiredprivate CacheService cacheService;@Testpublic void testList(){//在list的左边添加元素
//        cacheService.lLeftPush("list_001","hello,redis");//在list的右边获取元素,并删除String list_001 = cacheService.lRightPop("list_001");System.out.println(list_001);}@Testpublic void testZset(){//添加数据到zset中  分值/*cacheService.zAdd("zset_key_001","hello zset 001",1000);cacheService.zAdd("zset_key_001","hello zset 002",8888);cacheService.zAdd("zset_key_001","hello zset 003",7777);cacheService.zAdd("zset_key_001","hello zset 004",999999);*///按照分值获取数据Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);System.out.println(zset_key_001);}
}

4.5 添加任务

①:拷贝mybatis-plus生成的文件,mapper

@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {}
@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {public List<Taskinfo> queryFutureTime(@Param("taskType")int type, @Param("priority")int priority, @Param("future") Date future);
}

②:创建task类,用于接收添加任务的参数

package com.heima.model.schedule.dtos;import lombok.Data;import java.io.Serializable;@Data
public class Task implements Serializable {/*** 任务id*/private Long taskId;/*** 类型*/private Integer taskType;/*** 优先级*/private Integer priority;/*** 执行id*/private long executeTime;/*** task参数*/private byte[] parameters;}

③:创建TaskService

package com.heima.schedule.service;import com.heima.model.schedule.dtos.Task;/*** 对外访问接口*/
public interface TaskService {/*** 添加任务* @param task   任务对象* @return       任务id*/public long addTask(Task task) ;}

实现:

package com.heima.schedule.service.impl;import com.alibaba.fastjson.JSON;
import com.heima.common.constants.ScheduleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.util.Calendar;
import java.util.Date;@Service
@Transactional
@Slf4j
public class TaskServiceImpl implements TaskService {/*** 添加延迟任务** @param task* @return*/@Overridepublic long addTask(Task task) {//1.添加任务到数据库中boolean success = addTaskToDb(task);if (success) {//2.添加任务到redisaddTaskToCache(task);}return task.getTaskId();}@Autowiredprivate CacheService cacheService;/*** 把任务添加到redis中** @param task*/private void addTaskToCache(Task task) {String key = task.getTaskType() + "_" + task.getPriority();//获取5分钟之后的时间  毫秒值Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);long nextScheduleTime = calendar.getTimeInMillis();//2.1 如果任务的执行时间小于等于当前时间,存入listif (task.getExecuteTime() <= System.currentTimeMillis()) {cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));} else if (task.getExecuteTime() <= nextScheduleTime) {//2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());}}@Autowiredprivate TaskinfoMapper taskinfoMapper;@Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;/*** 添加任务到数据库中** @param task* @return*/private boolean addTaskToDb(Task task) {boolean flag = false;try {//保存任务表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);//设置taskIDtask.setTaskId(taskinfo.getTaskId());//保存任务日志数据TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}
}

ScheduleConstants常量类

package com.heima.common.constants;public class ScheduleConstants {//task状态public static final int SCHEDULED=0;   //初始化状态public static final int EXECUTED=1;       //已执行状态public static final int CANCELLED=2;   //已取消状态public static String FUTURE="future_";   //未来数据key前缀public static String TOPIC="topic_";     //当前数据key前缀
}

④:测试


4.6 取消任务

在TaskService中添加方法

/*** 取消任务* @param taskId        任务id* @return              取消结果*/
public boolean cancelTask(long taskId);

实现

/*** 取消任务* @param taskId* @return*/
@Override
public boolean cancelTask(long taskId) {boolean flag = false;//删除任务,更新日志Task task = updateDb(taskId,ScheduleConstants.EXECUTED);//删除redis的数据if(task != null){removeTaskFromCache(task);flag = true;}return flag;
}/*** 删除redis中的任务数据* @param task*/
private void removeTaskFromCache(Task task) {String key = task.getTaskType()+"_"+task.getPriority();if(task.getExecuteTime()<=System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));}else {cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));}
}/*** 删除任务,更新任务日志状态* @param taskId* @param status* @return*/
private Task updateDb(long taskId, int status) {Task task = null;try {//删除任务taskinfoMapper.deleteById(taskId);TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(status);taskinfoLogsMapper.updateById(taskinfoLogs);task = new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());}catch (Exception e){log.error("task cancel exception taskid={}",taskId);}return task;}

测试


4.7 消费任务

在TaskService中添加方法

/*** 按照类型和优先级来拉取任务* @param type* @param priority* @return*/
public Task poll(int type,int priority);

实现

/*** 按照类型和优先级拉取任务* @return*/
@Override
public Task poll(int type,int priority) {Task task = null;try {String key = type+"_"+priority;String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);//更新数据库信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error("poll task exception");}return task;
}

4.8 未来数据定时刷新

4.8.1 reids key值匹配

方案1:keys 模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞。
在这里插入图片描述

方案2:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。
在这里插入图片描述

代码案例:

@Test
public void testKeys(){Set<String> keys = cacheService.keys("future_*");System.out.println(keys);Set<String> scan = cacheService.scan("future_*");System.out.println(scan);
}

4.8.2 redis管道

普通redis客户端和服务器交互模式

在这里插入图片描述

Pipeline请求模型
在这里插入图片描述

官方测试结果数据对比
在这里插入图片描述

测试案例对比:

	//耗时6151@Testpublic  void testPiple1(){long start =System.currentTimeMillis();for (int i = 0; i <10000 ; i++) {Task task = new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());cacheService.lLeftPush("1001_1", JSON.toJSONString(task));}System.out.println("耗时"+(System.currentTimeMillis()- start));}//耗时1782@Testpublic void testPiple2(){long start  = System.currentTimeMillis();//使用管道技术List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {@Nullable@Overridepublic Object doInRedis(RedisConnection redisConnection) throws DataAccessException {for (int i = 0; i <10000 ; i++) {Task task = new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());}return null;}});System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");}

4.8.3 未来数据定时刷新-功能完成

在TaskService中添加方法

@Scheduled(cron = "0 */1 * * * ?") //每隔一分钟执行一次
public void refresh() {System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}
}

在引导类中添加开启任务调度注解:@EnableScheduling

在这里插入图片描述


4.9 分布式锁解决集群下的方法抢占执行

4.9.1 问题描述

启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法

在这里插入图片描述


4.9.2 分布式锁

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

解决方案:
在这里插入图片描述


4.9.3 redis分布式锁

sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
在这里插入图片描述

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
  • 客户端A执行代码完成,删除锁
  • 客户端B在等待一段时间后再去请求设置key的值,设置成功
  • 客户端B执行代码完成,删除锁

4.9.4 在工具类CacheService中添加方法

/*** 加锁** @param name* @param expire* @return*/
public String tryLock(String name, long expire) {name = name + "_lock";String token = UUID.randomUUID().toString();RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();RedisConnection conn = factory.getConnection();try {//参考redis命令://set key value [EX seconds] [PX milliseconds] [NX|XX]Boolean result = conn.set(name.getBytes(),token.getBytes(),Expiration.from(expire, TimeUnit.MILLISECONDS),RedisStringCommands.SetOption.SET_IF_ABSENT //NX);if (result != null && result)return token;} finally {RedisConnectionUtils.releaseConnection(conn, factory,false);}return null;
}

修改未来定时刷新的方法,如下:

/*** 未来数据定时刷新*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);if(StringUtils.isNotBlank(token)){log.info("未来数据定时刷新---定时任务");//获取所有未来数据的集合keySet<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");for (String futureKey : futureKeys) {//future_100_50//获取当前数据的key  topicString topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];//按照key和分值查询符合条件的数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());//同步数据if(!tasks.isEmpty()){cacheService.refreshWithPipeline(futureKey,topicKey,tasks);log.info("成功的将"+futureKey+"刷新到了"+topicKey);}}}
}

4.10 数据库同步到redis

在这里插入图片描述

/*** 数据库任务定时同步到redis*/@Scheduled(cron = "0 */5 * * * ?")@PostConstructpublic void reloadData() {//清理缓存中的数据 list zsetclearCache();//查询符合条件的任务 小于未来5分钟的数据//获取5分钟之后的时间 毫秒值Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));//把任务添加到redisif (allTasks != null && allTasks.size() > 0) {for (Taskinfo taskinfo : allTasks) {Task task = new Task();BeanUtils.copyProperties(taskinfo, task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());addTaskToCache(task);}}log.info("数据库的任务同步到了redis");}/*** 清理缓存中的数据*/private void clearCache(){// 删除缓存中未来数据集合和当前消费者队列的所有keySet<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_cacheService.delete(futurekeys);cacheService.delete(topickeys);}

5、延迟队列解决精准时间发布文章

5.1 延迟队列服务提供对外接口

提供远程的feign接口,在heima-leadnews-feign-api编写类如下:

@FeignClient("leadnews-schedule")
public interface IScheduleClient {/*** 添加任务* @param task   任务对象* @return       任务id*/@PostMapping("/api/v1/task/add")public ResponseResult addTask(@RequestBody Task task);/*** 取消任务* @param taskId        任务id* @return              取消结果*/@GetMapping("/api/v1/task/cancel/{taskId}")public ResponseResult cancelTask(@PathVariable("taskId") long taskId);/*** 按照类型和优先级来拉取任务* @param type* @param priority* @return*/@GetMapping("/api/v1/task/poll/{type}/{priority}")public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority")  int priority);
}

在heima-leadnews-schedule微服务下提供对应的实现

@RestController
public class ScheduleClient implements IScheduleClient {@Autowiredprivate TaskService taskService;/*** 添加任务* @param task 任务对象* @return 任务id*/@PostMapping("/api/v1/task/add")public ResponseResult addTask(@RequestBody Task task) {return ResponseResult.okResult(taskService.addTask(task));}/*** 取消任务* @param taskId 任务id* @return 取消结果*/@GetMapping("/api/v1/task/cancel/{taskId}")public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {return ResponseResult.okResult(taskService.cancelTask(taskId));}/*** 按照类型和优先级来拉取任务* @param type* @param priority* @return*/@GetMapping("/api/v1/task/poll/{type}/{priority}")public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {return ResponseResult.okResult(taskService.poll(type,priority));}
}

5.2 发布文章集成添加延迟队列接口

在创建WmNewsTaskService

package com.heima.wemedia.service;import com.heima.model.wemedia.pojos.WmNews;public interface WmNewsTaskService {/*** 添加任务到延迟队列中* @param id  文章的id* @param publishTime  发布的时间  可以做为任务的执行时间*/public void addNewsToTask(Integer id, Date publishTime);}

实现:

package com.heima.wemedia.service.impl;import com.heima.apis.schedule.IScheduleClient;
import com.heima.model.common.enums.TaskTypeEnum;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.utils.common.ProtostuffUtil;
import com.heima.wemedia.service.WmNewsTaskService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class WmNewsTaskServiceImpl  implements WmNewsTaskService {@Autowiredprivate IScheduleClient scheduleClient;/*** 添加任务到延迟队列中* @param id          文章的id* @param publishTime 发布的时间  可以做为任务的执行时间*/@Override@Asyncpublic void addNewsToTask(Integer id, Date publishTime) {log.info("添加任务到延迟服务中----begin");Task task = new Task();task.setExecuteTime(publishTime.getTime());task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());WmNews wmNews = new WmNews();wmNews.setId(id);task.setParameters(ProtostuffUtil.serialize(wmNews));scheduleClient.addTask(task);log.info("添加任务到延迟服务中----end");}}

枚举类:

package com.heima.model.common.enums;import lombok.AllArgsConstructor;
import lombok.Getter;@Getter
@AllArgsConstructor
public enum TaskTypeEnum {NEWS_SCAN_TIME(1001, 1,"文章定时审核"),REMOTEERROR(1002, 2,"第三方接口调用失败,重试");private final int taskType; //对应具体业务private final int priority; //业务不同级别private final String desc; //描述信息
}

序列化工具对比

  • JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组
  • Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类

拷贝资料中的两个类到heima-leadnews-utils下

Protostuff需要引导依赖:

<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>1.6.0</version>
</dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>1.6.0</version>
</dependency>

修改发布文章代码:

把之前的异步调用修改为调用延迟任务

@Autowired
private WmNewsTaskService wmNewsTaskService;/*** 发布修改文章或保存为草稿* @param dto* @return*/
@Override
public ResponseResult submitNews(WmNewsDto dto) {//0.条件判断if(dto == null || dto.getContent() == null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//1.保存或修改文章WmNews wmNews = new WmNews();//属性拷贝 属性名词和类型相同才能拷贝BeanUtils.copyProperties(dto,wmNews);//封面图片  list---> stringif(dto.getImages() != null && dto.getImages().size() > 0){//[1dddfsd.jpg,sdlfjldk.jpg]-->   1dddfsd.jpg,sdlfjldk.jpgString imageStr = StringUtils.join(dto.getImages(), ",");wmNews.setImages(imageStr);}//如果当前封面类型为自动 -1if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){wmNews.setType(null);}saveOrUpdateWmNews(wmNews);//2.判断是否为草稿  如果为草稿结束当前方法if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}//3.不是草稿,保存文章内容图片与素材的关系//获取到文章内容中的图片信息List<String> materials =  ectractUrlInfo(dto.getContent());saveRelativeInfoForContent(materials,wmNews.getId());//4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片saveRelativeInfoForCover(dto,wmNews,materials);//审核文章//        wmNewsAutoScanService.autoScanWmNews(wmNews.getId());wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}

5.3 消费任务进行审核文章

WmNewsTaskService中添加方法

/*** 消费延迟队列数据*/
public void scanNewsByTask();

实现

@Autowired
private WmNewsAutoScanServiceImpl wmNewsAutoScanService;/*** 消费延迟队列数据*/
@Scheduled(fixedRate = 1000)
@Override
@SneakyThrows
public void scanNewsByTask() {log.info("文章审核---消费任务执行---begin---");ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());if(responseResult.getCode().equals(200) && responseResult.getData() != null){String json_str = JSON.toJSONString(responseResult.getData());Task task = JSON.parseObject(json_str, Task.class);byte[] parameters = task.getParameters();WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);System.out.println(wmNews.getId()+"-----------");wmNewsAutoScanService.autoScanWmNews(wmNews.getId());}log.info("文章审核---消费任务执行---end---");
}

在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling

最后重新启动项目进行测试即可。


 
非常感谢您阅读到这里,如果这篇文章对您有帮助,希望能留下您的点赞👍 关注💖 分享👥 留言💬thanks!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/100920.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安卓图形显示系统

Android图形显示系统 Android图形显示系统是Android比较重要的一个子系统&#xff0c;和很多其他子系统的关联紧密。 Android图形系统比较复杂&#xff0c;这里我们从整体上理一遍&#xff0c;细节留待后期再去深入。Android图形系统主要包括以下几个方面&#xff1a; - 渲染…

司徒理财:8.21黄金空头呈阶梯下移!今日操作策略

黄金走势分析 盘面裸k分析&#xff1a;1小时周期的行情局部于1896附近即下行通道上轨附近录得一系列的K线呈震荡下行并筑圆顶&#xff0c;上轨压制有效&#xff0c;下行通道并未突破&#xff0c;后市建议延续看下行。4小时周期局部录得一系列的纺锤线呈震荡&#xff0c;但行情整…

组合总和-LeetCode

给你一个无重复元素的整数数组 candidates 和一个目标整数 target &#xff0c;找出 candidates 中可以使数字和为目标数 target 的所有不同组合 &#xff0c;并以列表形式返回。你可以按 任意顺序返回这些组合。 candidates 中的同一个数字可以无限制重复被选取 。如果至少一个…

回归预测 | MATLAB实现SOM-BP自组织映射结合BP神经网络多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现SOM-BP自组织映射结合BP神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现SOM-BP自组织映射结合BP神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果一览基本介绍…

【python办公自动化】使用PysimpleGUI实现AHP指标的添加和删除及编号重新排序

使用PysimpleGUI实现AHP指标的添加和删除 1 运行界面2 添加指标3 删除指标4 编码重新排序5 全部代码1 运行界面 2 添加指标 输入框中输入内容,点击“添加指标”按钮 然后就会自动添加到上方列表中 3 删除指标 选中要删除的指标,点击“删除指标”按钮 此时就把第三个选…

Protocol Buffers [protobuf]

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

牛股预测器V1.0实战(工银瑞信金融科技挑战赛排名第二)

全代码和数据关注公众号《三个篱笆三个班》免费提供&#xff01;一键可跑&#xff0c;每日选股。 对AI炒股感兴趣的小伙伴可加WX群&#xff1a; 赛题概述&#xff1a; 基于人工智能的量化选股投资策略建模挑战 任务描述&#xff1a; 通过数学和计算机技术分析市场数据&…

Docker容器无法启动 Cannot find /usr/local/tomcat/bin/setclasspath.sh

报错信息如下 解决办法 权限不够 加上--privileged 获取最大权限 docker run --privileged --name lenglianerqi -p 9266:8080 -v /opt/docker/lenglianerqi/webapps:/usr/local/tomcat/webapps/ -v /opt/docker/lenglianerqi/webapps/userfile:/usr/local/tomcat/webapps/u…

视觉SLAM:一直在入门,如何能精通,CV领域的绝境长城,

目录 前言 福利&#xff1a;文末有chat-gpt纯分享&#xff0c;无魔法&#xff0c;无限制 1 什么是SLAM&#xff1f; 2 为什么用SLAM&#xff1f; 3 视觉SLAM怎么实现&#xff1f; 4 前端视觉里程计 5 后端优化 6 回环检测 7 地图构建 8 结语 前言 上周的组会上&…

【AGC】Publishing api怎么上传绿色认证审核材料

【问题描述】 华为应用市场会对绿色应用标上特有的绿色标识&#xff0c;代表其通过华为终端开放实验室DevEco云测平台的兼容性、稳定性、安全、功耗和性能的检测和认证&#xff0c;是应用高品质的象征。想要自己的应用认证为绿色应用就需要在发布应用时提供绿色认证审核材料&a…

uniapp条形码实现

条形码在实际应用场景是经常可见的。 这里教大家如何集成uniapp条形码。条形码依赖类库JsBarcode. 下载JsBarcode源码&#xff0c;对CanvasRenderer进行了改进兼容uniapp。 import merge from "../help/merge.js"; import {calculateEncodingAttributes, getTotal…

三分钟解决AE缓存预览渲染错误、暂停、卡顿问题

一、清除RAM缓存&#xff08;内存&#xff09; 你应该做的第一件事是清除你的RAM。这将清除当前存储在内存中的所有临时缓存文件。要执行此操作&#xff0c;请导航到编辑>清除>所有内存。这将从头开始重置RAM缓存 二、清空磁盘缓存 您也可以尝试清空磁盘缓存。执行此操作…

武汉凯迪正大—变比组别测试仪

一、概述 在电力变压器的半成品、成品生产过程中&#xff0c;新安装的变压器投入运行之前以及根据国家电力部的预防性试验规程中&#xff0c;要求变压器进行匝数比或电压比测试。传统的变比电桥操作繁琐&#xff0c;读数不直观&#xff0c;且要进行必要的换算&#xff0c;测试时…

案例-基于MVC和三层架构实现商品表的增删改查

文章目录 0. 项目介绍1. 环境准备2. 查看所有2.1 编写BrandMapper接口2.2 编写服务类&#xff0c;创建BrandService&#xff0c;用于调用该方法2.5 编写Servlet2.4 编写brand.jsp页面2.5 测试 3.添加3.1 编写BrandMapper接口 添加方法3.2 编写服务3.3 改写Brand.jsp页面&#x…

springboot数据库密码加密的配置方法_Java

前言 由于系统安全的考虑&#xff0c;配置文件中不能出现明文密码的问题&#xff0c;本文就给大家详细介绍下springboot配置数据库密码加密的方法&#xff0c;下面话不多说了&#xff0c;来一起看看详细的介绍吧 1.导入依赖 <!--数据库密码加密--> <dependency>&…

学习ts(五)类

定义 是面向对象程序设计&#xff08;OOP&#xff09;实现信息封装的基础 类是一种用户定义的引用数据类型&#xff0c;也称类类型 JavaScript的class,虽然本质是构造函数&#xff0c;但是使用起来已经方便了许多&#xff0c;js中没有加入修饰符和抽象类等特性 ts的class支持面…

爬虫逆向实战(十九)--某号站登录

一、数据接口分析 主页地址&#xff1a;某号站 1、抓包 通过抓包可以发现登录接口 2、判断是否有加密参数 请求参数是否加密&#xff1f; 通过查看“载荷”模块可以发现有一个jsondata_rsa的加密参数 请求头是否加密&#xff1f; 无响应是否加密&#xff1f; 无cookie是否…

C#和Java的大端位和小端位的问题

C#代码里就是小端序,Java代码里就是大端序&#xff0c; 大端位:big endian,是指数据的高字节保存在内存的低地址中&#xff0c;而数据的低字节保存在内存的高地址中&#xff0c;也叫高尾端 小端位:little endian,是指数据的高字节保存在内存的高地址中,而数据的低字节保存在内存…

【数字实验室】时钟切换

大部分开发者使用 BUFGCTRL 或 BUFGMUX进行时钟切换&#xff0c;它们在时钟切换上可以提供无毛刺输出。然而&#xff0c;了解所涉及的原理是有好处的。 当然&#xff0c;无论我们在同步逻辑中使用哪种技术&#xff0c;重要的是要确保在进行时钟切换时输出上没有毛刺。任何故障都…

【报错】yarn --version Unrecognized option: --version Error...

文章目录 问题分析解决问题 在使用 npm install -g yarn 全局安装 yarn 后,查看yarn 的版本号,报错如下 PS D:\global-data-display> yarn --version Unrecognized option: --version Error: Could