1、pub端
//获取一个流
RStream rStream = redissonClient.getStream("testStream");
//创建一个map,添加数据
Map<String, Object> rr = new HashMap<>();
rr.put("xx", RandomUtil.randomString(5));
//添加到流
rStream.addAll(rr);
rStream支持的add函数如下:
2、sub端
sub端订阅的方法有三种:
1、调用read方法
2、调用range方法
3、调用readGroup方法
基于组的订阅如下:
RStream rStream = redissonClient.getStream("testStream");
//创建分组
rStream.createGroup("default",StreamMessageId.ALL);
new Thread(new Runnable() {@Overridepublic void run() {while (true) {try {
//读取default分组中,消费者名词为consumer_1,每次读取三个,30秒阻塞。Map<StreamMessageId, Map<String,Object>> ss = rStream.readGroup("default", "consumer_1", 3, 30, TimeUnit.SECONDS);Map<StreamMessageId, Map<String,Object>> ss = rStream.readGroup("default","1234",3,StreamMessageId.ALL);for (StreamMessageId streamMessageId : ss.keySet()) {
System.out.println(streamMessageId.toString()+"__"+ss.get(streamMessageId));rStream.remove(streamMessageId);}} catch (Exception e) {e.printStackTrace();}}}
}).start();
分组的好处是,假如有10个订阅者同时在一个分组,那么分组中的消息只会给其中某一个订阅者。