场景描述
设备比较多,几十万甚至上百万,设备在时不时会上报消息。
用户可以设置设备60分钟、50分钟、40分钟、30分钟未上报数据,发送通知给用户,消息要及时可靠。
基本思路
思路:
由于设备在一直上报,如果直接存储到数据库对数据库的压力比较大,考虑使用缓存,每次上报都更新到缓存中; 由于是多久未上报发通知,考虑使用定时任务查找超过60/50/40/30min的设备;定时任务遍历时要尽可能少的查询设备缓存,因为绝大多数设备是不需要进行通知的,最好是只遍历需要发送通知的设备缓存,可以考虑使用类似于时间窗口机制,将设备缓存按时间进行分割,建立两个缓存,缓存1设备数据指向缓存2(主要用于实现设备数据在缓存2不用时间窗口转换),缓存2数据,用于定时任务数据扫描;考虑到消息通知的及时性,考虑使用延迟定时任务,来及时发送消息通知。由于设备比较大,考虑对缓存1按hash算法分割开来,来提升性能。
思路转化方案:
- 涉及的Redis缓存
- 缓存1(hash),用于找到缓存2
大Key:
device:one:0
,小Key:pk:8620241008283980
,Value:device:two:202410091900
, 即{"pk:8620241008283980":"device:two:202410091900"}
- 缓存2(hash), 通过缓存2达到过滤数据的目的
大Key:
device:two:202410091900
,小Key:pk:8620241008283980
,Value:1728473450149
, 即{"pk:8620241008283980":"1728473450149"}
- PK:DK按照hash算法,分成100份,设备上报时,存储到缓存1中
- 按照1分钟为跨度,设备上报时,将当前设备数据存储到缓存2中
- 设备上报时,判断该设备是否有延迟定时任务,如果存在删除该延迟定时任务,判断该设备是否存在缓存1与缓存2,如果存在先删除,再添加。(其过程实现了数据在缓存2不同集合的转化)
- 定时任务:根据当前时间,扫描对应60/50/40min前的缓存2数据,并添加到延迟定时任务(考虑到消息要及时发送)中
- 延迟定时任执行:删除缓存1该设备数据,删除缓存2该设备数据,下发通知
基本流程
方案示意图:
设备上报处理流程:
定时任务处理流程:
业务流程实现
设备上报处理逻辑
场景1: 缓存1中存在,缓存2中也存在,延迟定时任务中也存在
删除该设备延迟定时任务数据,删除缓存2数据,删除缓存1数据,新增缓存2,新增缓存1
场景2: 缓存1中存在,缓存2中也存在,延迟定时任务中不存在
删除缓存2数据,删除缓存1数据,新增缓存2,新增缓存1
场景3: 缓存1不存在,缓存2中不存在,延迟定时任务中不存在
新增缓存2,新增缓存1
相关代码:
package com.angel.ocean.service;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.angel.ocean.redis.RedisCacheKey;
import com.angel.ocean.util.FutureTaskUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;@Slf4j
@Service
public class DataHandlerService {private static final String COMMA = ":";@Resourceprivate RedissonClient redissonClient;@Resourceprivate ScheduleTaskService scheduleTaskService;public void setCache(String productKey, String deviceKey, long ts, int expiredNoticeTime) {String childKey = productKey + COMMA + deviceKey;String oneKey = RedisCacheKey.getCacheOneHashKey(productKey, deviceKey);RMap<String, String> oneHash = redissonClient.getMap(oneKey);String oldTwoKey = oneHash.get(childKey);if(StrUtil.isNotEmpty(oldTwoKey)) {if(FutureTaskUtil.futureTasks.containsKey(childKey)) {log.info("移除通知延迟任务,{}", childKey);scheduleTaskService.stopTask(childKey);}RMap<String, String> oldTwoHash = redissonClient.getMap(oldTwoKey);log.info("该设备缓存已存在,先删除历史缓存,再更新,{}", childKey);// 删除缓存2oldTwoHash.remove(childKey);// 删除缓存1oneHash.remove(childKey);}String twoKey = RedisCacheKey.getCacheTwoHashKey(ts);RMap<String, String> twoHash = redissonClient.getMap(twoKey);long expiredTime = ts + expiredNoticeTime * 60 * 1000L;twoHash.put(childKey, Long.toString(expiredTime));oneHash.put(childKey, twoKey);}
}
缓存工具类:
package com.angel.ocean.redis;import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;/*** 缓存键*/
public class RedisCacheKey {public static final String COMMA = ":";private static final int n = 100;private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm");/*** 获取缓存1 Key,依据pk和dk* @param productKey* @param deviceKey* @return*/public static String getCacheOneHashKey(String productKey, String deviceKey) {String data = productKey + COMMA + deviceKey;return "device:one:" + Math.abs(data.hashCode()) % n;}/*** 获取缓存2 Key,依据时间戳* @param ts* @return*/public static String getCacheTwoHashKey(long ts) {// 将时间戳转换为 InstantInstant instant = Instant.ofEpochMilli(ts);ZoneId zoneId = ZoneId.systemDefault();// 转换为 ZonedDateTimeZonedDateTime zdt = instant.atZone(zoneId);// 格式化 ZonedDateTimeString formattedDateTime = zdt.format(formatter);// 构建并返回缓存键return "device:two:" + formattedDateTime;}public static void main(String[] args) {System.out.println(getCacheTwoHashKey(System.currentTimeMillis()));}
}
定时任务逻辑(每分钟执行一次)
- 依据当前时间和多久未上报(60/50/40min),获取对应的缓存2数据
- 遍历该缓存2集合
- 判断该设备的通知时间,是否小于当前时间加上1分钟,如果小于就加入到延迟定时任务中
- 延迟定时任务执行时,删除该设备的缓存2数据,删除该设备的缓存1数据
相关代码:
package com.angel.ocean.task;import com.angel.ocean.service.DataHandlerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;@Slf4j
@Component
public class ScheduledTasks {@Resourceprivate DataHandlerService dataHandlerService;// 每1分钟执行一次// 遍历缓存2,放入延迟定时任务中@Scheduled(cron = "0 0/1 * * * ?")public void dataHandler() {log.info("dataHandler....");// 60分钟未上报通知dataHandlerService.delayTaskHandler(60);// 50分钟未上报通知dataHandlerService.delayTaskHandler(50);// 40分钟未上报通知dataHandlerService.delayTaskHandler(40);}
}
package com.angel.ocean.service;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.angel.ocean.redis.RedisCacheKey;
import com.angel.ocean.util.FutureTaskUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;@Slf4j
@Service
public class DataHandlerService {private static final String COMMA = ":";@Resourceprivate RedissonClient redissonClient;@Resourceprivate ScheduleTaskService scheduleTaskService;// 将数据放入延迟定时任务public void delayTaskHandler(int delayTime) {long start = System.currentTimeMillis();log.info("delayTaskHandler() start..., time:{}", System.currentTimeMillis());long now = System.currentTimeMillis();long ts = now - delayTime * 60 * 1000L;String twoKey = RedisCacheKey.getCacheTwoHashKey(ts);RMap<String, String> hashMap = redissonClient.getMap(twoKey);if(CollUtil.isEmpty(hashMap)) {return;}Map<String, String> allEntries = hashMap.readAllMap();allEntries.forEach((key, value) -> {long tsLimit = now + 60000;log.info("tsLimit={}, ts={}", tsLimit, value);if(Long.parseLong(value) < tsLimit) {Runnable task = () -> {noticeHandler(key, twoKey);};if(Long.parseLong(value) <= System.currentTimeMillis()) {scheduleTaskService.singleTask(task);} else {scheduleTaskService.delayTask(key, task, Long.parseLong(value) - System.currentTimeMillis());}}});long end = System.currentTimeMillis();log.info("delayTaskHandler() end..., 耗时:{}毫秒", (end - start));}// 模拟通知逻辑private void noticeHandler(String childKey, String twoKey) {log.info("发送通知,设备:{}, ts={}", childKey, System.currentTimeMillis());String[] arr = childKey.split(RedisCacheKey.COMMA);String oneKey = RedisCacheKey.getCacheOneHashKey(arr[0], arr[1]);RMap<String, String> oneHash = redissonClient.getMap(oneKey);String currentTwoKey = oneHash.get(childKey);// 由于并发问题,会存在延迟定时任务(twoKey)的与缓存1中存储的值(currentTwoKey)不一致,因此,需要校验两个值是否相同。if(StrUtil.isNotEmpty(currentTwoKey) && currentTwoKey.equals(twoKey)) {// TODO 相同的话执行通知逻辑,删除缓存1// 删除缓存1oneHash.remove(childKey);}// 删除缓存2,无论twoKey与currentTwoKey相不相同都删除RMap<String, String> twoHash = redissonClient.getMap(twoKey);twoHash.remove(childKey);}
}
延迟定时任务实现
Springboot定时任务,线程池配置
package com.angel.ocean.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;@Configuration
public class SchedulerConfig {@Beanpublic ThreadPoolTaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(20); // 设置线程池大小scheduler.setThreadNamePrefix("Thread-task-"); // 设置线程名称前缀scheduler.setDaemon(true); // 设置为守护线程// 你可以继续设置其他属性...return scheduler;}
}
定时任务工具类
package com.angel.ocean.util;import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;@Slf4j
public class FutureTaskUtil {private FutureTaskUtil() {}// FutureTask集合public static ConcurrentMap<String, ScheduledFuture<?>> futureTasks = new ConcurrentHashMap<String, ScheduledFuture<?>>();/*** 判断是否包含 futureTask* @param taskId* @return*/public static boolean isContains(String taskId) {boolean result = false;if(futureTasks.containsKey(taskId)) {result = true;}return result;}/*** 添加 futureTask* @param taskId* @param futureTask*/public static void addFutureTask(String taskId, ScheduledFuture<?> futureTask) {if(futureTasks.containsKey(taskId)) {log.error("FutureTaskUtil.addFutureTask(), key: {}已存在", taskId);return;}futureTasks.put(taskId, futureTask);}/*** 获取 futureTask* @param taskId* @return*/public static ScheduledFuture<?> getFutureTask(String taskId) {ScheduledFuture<?> futureTask = null;if(futureTasks.containsKey(taskId)) {log.info("FutureTaskUtil.getFutureTask(), taskId: {}", taskId);futureTask = futureTasks.get(taskId);}return futureTask;}/*** 移除 futureTask* @param taskId*/public static void removeFutureTask(String taskId) {if(futureTasks.containsKey(taskId)) {log.info("FutureTaskUtil.removeFutureTask(), taskId: {}", taskId);futureTasks.remove(taskId);}}
}
需要关注的问题
- 并发问题如何处理?
由于并发问题,会造成缓存1和缓存2的数据不一致,延迟任务执行时校验缓存1中存储的缓存2的Key于延迟定时任务的缓存Key是否一致,一致的话才下发通知。
- 服务重启,造成延迟定时任务数据丢失,如何补发通知?
由于延迟定时任务存在于内存中,服务重新启动,会导致其数据丢失,可以考虑从缓存2再拿一次数据,做个数据补偿。
package com.angel.ocean.runner;import com.angel.ocean.service.DataHandlerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;@Slf4j
@Component
public class StartupRunner implements CommandLineRunner {@Resourceprivate DataHandlerService dataHandlerService;@Overridepublic void run(String... args) throws Exception {// 只处理近2个小时的数据int i = 120;while (i > 50) {dataHandlerService.delayTaskHandler(i);i = i - 1;}}
}
模拟验证
package com.angel.ocean.test;import cn.hutool.core.util.RandomUtil;
import com.angel.ocean.domain.DeviceCacheInfo;
import java.util.ArrayList;
import java.util.List;public class DeviceDataUtil {private static int deviceNumber = 500000;private static List<String> dks = new ArrayList<>(deviceNumber + 5);private static boolean initFlag = false;private static void init() {int number = 1;while (number <= deviceNumber) {String formattedNumber = String.format("%06d", number);String dk = "8620241008" + formattedNumber;dks.add(dk);number++;}initFlag = true;}public static void setDeviceNumber(int number) {DeviceDataUtil.deviceNumber = number;}public static DeviceCacheInfo deviceReport() {if(!initFlag) {init();}DeviceCacheInfo deviceCacheInfo = new DeviceCacheInfo();deviceCacheInfo.setProductKey("pk");deviceCacheInfo.setTs(System.currentTimeMillis());deviceCacheInfo.setExpiredNoticeTime(60);String dk = dks.get(RandomUtil.randomInt(1, deviceNumber));deviceCacheInfo.setDeviceKey(dk);return deviceCacheInfo;}
}
package com.angel.ocean;import com.angel.ocean.domain.DeviceCacheInfo;
import com.angel.ocean.service.DataHandlerService;
import com.angel.ocean.test.DeviceDataUtil;
import com.angel.ocean.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;@Slf4j
@SpringBootTest
class ApplicationTests {@Resourceprivate DataHandlerService dataHandlerService;@Testvoid contextLoads() {for(int i = 0; i < 500000; i++) {DeviceCacheInfo deviceCacheInfo = DeviceDataUtil.deviceReport();Runnable task = () -> {dataHandlerService.setCache(deviceCacheInfo.getProductKey(), deviceCacheInfo.getDeviceKey(), deviceCacheInfo.getTs(), deviceCacheInfo.getExpiredNoticeTime());};ThreadPoolUtil.pools.submit(task);try {Thread.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}
package com.angel.ocean.util;import cn.hutool.core.thread.ThreadFactoryBuilder;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolUtil {private ThreadPoolUtil() {}public static final ThreadPoolExecutor pools = new ThreadPoolExecutor(16, 50, 60, TimeUnit.SECONDS,new LinkedBlockingDeque<>(10000),new ThreadFactoryBuilder().setNamePrefix("MyThread-").build(),new ThreadPoolExecutor.CallerRunsPolicy());
}
缓存截图:
缓存1:
缓存2:
运行日志截图:
执行延迟定时任务日志截图: