前言
本节内容是关于使用redis的过期key,通过开启其监听失效策略,模拟订单延迟任务的执行流程。其核心原理是通过使用redis订阅与发布的方式,将过期失效的key通过广播的方式,发布给客户端,客户端可以监听此消息进而消费消息。需要注意的是官方并不推荐此方式,因为其容易造成数据丢失,例如没有客户端消费消息,消息也会丢失。对于一些安全性要求比较低的场景,可以使用此方式实现延迟队列。
正文
- 引入redis的pom依赖
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
- application.yml中配置redis连接
spring:data:redis:host: 127.0.0.1port: 6379database: 0connect-timeout: 30000timeout: 30000lettuce:pool:enabled: truemax-active: 200max-idle: 50max-wait: -1min-idle: 10shutdown-timeout: 100
- 配置redis的缓冲池,并注入redis的消息监听容器
package com.yundi.tps.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.cache.support.CompositeCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.util.concurrent.TimeUnit;@Configuration
public class RedisConfig {@Beanpublic CacheManager cacheManager(RedisConnectionFactory connectionFactory) {// redis缓存管理器RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));RedisCacheManager redisCacheManager = RedisCacheManager.builder(connectionFactory).cacheDefaults(defaultCacheConfig).transactionAware().build();return new CompositeCacheManager(redisCacheManager);}@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);serializer.serialize(objectMapper);template.setValueSerializer(serializer);template.setKeySerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;}@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}
}
- 实现一个KeyExpirationEventMessageListener过期的监听器RedisKeyExpirationListener
package com.yundi.tps.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/*** Redis失效事件 key** @param message* @param pattern*/@Overridepublic void onMessage(Message message, byte[] pattern) {//notify-keyspace-events Ex// 匹配规则String patternRule = new String(pattern);log.info("patternRule:{}", patternRule);// 监听的通道byte[] channel = message.getChannel();log.info("channel:{}", new String(channel));// 过期的keyString expireKey = message.toString();log.info("expireKey:{}", expireKey);//TODO 处理订单的后续业务逻辑}}
- 实现一个创建订单的延时任务接口,模拟订单超时
package com.yundi.tps.controller;import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.yundi.xyxc.tps.common.ApiResponse;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.TimeUnit;@Tag(name = "订单管理")
@RestController
@RequestMapping("/api/tps/order")
public class OrderController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Operation(summary = "创建订单")@PostMapping("save")public ApiResponse save() {String orderId = String.valueOf(IdWorker.getId());stringRedisTemplate.opsForValue().setIfAbsent(orderId, orderId, 60, TimeUnit.SECONDS);return ApiResponse.ok();}
}
- 开启redis key的失效监听,在redis配置中添加以下配置
notify-keyspace-events Ex
- 启动redis服务和客户端项目,发送延时订单任务,看客户端是否能够消费到此延迟任务
结语
需要注意的是,该方式实现的延迟任务安全性较低,对于安全性高的场景,并不推荐此种方式。关于使用redis的key监听,实现延迟任务实战内容到这里就结束了,下期见。。。。。。