使用Redis中的list实现消息队列
list是Redis的一种数据结构,可以把它理解成双向链表
可以从头部插入数据然后从尾部取出数据,从而实现消息队列的效果
利用命令 LPUSH和RPOP (从左边插入数据从右边取出数据)
lpush l1 e1 e2
rpop l1
或者 RPUSH和LPOP (从左边插入数据从右边取出数据)从而实现消息队列
rpush l1 e1 e2
lpop l1
但是使用list作为消息队列也有弊端 - 不能实现广播功能,只能单对单的进行消息队列
使用Redis中的pubsub实现消息队列
PubSub是Redis引入的一种消息传递的模型。消费者可以订阅一个或者多个Channel,从Channel中获取数据,当生产者向Channel发送数据的时候,所有的消费者都可以接收数据
(XXX是Channel xxx是消息)
发送消息
publish order.XXX xxx
接收消息
subscribe order.XXX
但是使用PubSub作为消息队列也有弊端 - 当消息堆积以后会造成消息的丢失
使用Redis中的stream实现消息队列
stream是Redis引入的新的消息队列,是功能比较完善的消息队列
使用XADD用于添加信息
具体做法可以参考下面这张图
编写一段命令
XADD l1 20 * name jack age 21
使用XREAD来接收命令
编写一段命令来接收刚刚发送的消息
XREAD Count 1 BLOCK 20 Stream l1 0
我们也可以$来表示获取最新的消息
XREAD Count 1 BLOCK 20 Stream l1 $
当然我们也可以创建一个消息组,利用消息组来处理消息
- 创建消息组
XGROUP CREATE l1 g1 $
- 利用消息组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动ACK,获取到消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始ID
当消息没有被确认的时候就会被放进PendingList里面等待处理
下面编写一段命令来创建两个消费者
XREADGROUP GROUP g1 c1 BLOCK 2000 STREAM l1XREADGROUP GROUP g1 c2 BLOCK 2000 STREAM l1
下面来演示一下怎么使用
public class voucherOrderHandler implements Runnable{@Orrvidepublic void run(){while(true){try{//接收信息List<MapRecord<String,Object,Object>> list =stringRedisTemplate.opsFotStream().read(Consumer.from("g1","c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders",ReadOffset.lastConsumed()));//省略一堆业务代码//确认机制stringRedisTemplate.opsForStream.acknowledge("s1","g1",record.getId(); }catch(Exception e){log.erroe("消息处理异常",e);handlePendingList();}}}//当消息没有被确认就会出现异常,那么我们就从PendingList里面尝试取出数据public void handlePendingList(){//几乎同样的逻辑再来一遍while(true){try{//接收信息 从PendingList中读取消息不需要阻塞List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsFotStream().read(Consumer.from("g1","c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders",ReadOffset.from("0")))//省略一堆业务代码(当处理完消息的时候记得退出循环)//确认机制stringRedisTemplate.opsForStream.acknowledge("s1","g1",record.getId(); //当出现异常的时候由于我们已经设置类while(true)所以会自动循环 }catch(Exception e){log.erroe("消息处理异常",e);try{Thread.sleep(20);}catch(Exception e){e.printStackTrace();}}}}
}