说明:本文介绍RocketMQ的消费模式&消息类型,RocketMQ的安装参考及简单使用,参考:http://t.csdn.cn/BKFPj
消费模式
RocketMQ与RabbitMQ最大的区别在于,RocketMQ是根据消息的Topic锁定消费者的,Topic属性设置为相同的消费者,可以看做是一个消费者集群。消息模式分为以下三种:
(1)一对一
最简单的一种方式,消息的Topic只被一个消费者消费,如下:
(生产者)
@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void simpleTest(){rocketMQTemplate.syncSend("simple","hello rocketmq!");}
(消费者)
@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("s = " + s);}
}
执行结果
(2)一对多
当存在多个Topic相同的消费者时,这些消费者共同消费消息,如下:
(开启两个消费者,Topic相同)
(生产者)
@Testpublic void oneToMany(){for (int i = 0; i < 10; i++) {rocketMQTemplate.syncSend("simple","one to many" + i);}}
执行结果,可以看到负载均衡策略是随机;
(3)多对多
参考一对多方式,发送多个Topic的消息,让多种Topic的消费者接收消息;
消息类型
根据消息的类型和对消息的处理,可以分为以下几种:
(1)同步消息
同步消息,消息发送到MQ,MQ保存成功后才会返回结果,在API中是以"sync"(synchronous,同步)开头的一些方法,可以看到这些方法都有返回值,可以通过返回结果判断是否发送成功;
(消费者)
@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("接收到同步消息 = " + s);}
}
(生产者,可以通过返回结果判断发送是否成功)
@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void simpleTest1(){SendResult sendResult = rocketMQTemplate.syncSend("simple", "这是一个异步消息");System.out.println("sendResult.getSendStatus() = " + sendResult.getSendStatus());}
(2)异步消息
异步消息,消息发送给MQ后代码就会立即向下执行,在API中是以“asyn”(asynchronous,异步),可以手动设置发送消息成功与否执行的方法;
(生产者)
@Testpublic void simpleTest2() throws InterruptedException {rocketMQTemplate.asyncSend("simple", "这是一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功信息" + sendResult.toString());}@Overridepublic void onException(Throwable throwable) {System.out.println("异常信息" + throwable.getMessage());}});TimeUnit.SECONDS.sleep(2);}
(发送消息成功,执行成功的方法)
需要注意,这里是指发送消息成功与否,与消费者是否成功消费无关;
(3)单向消息
单向消息,是指只管发送消息,不关系MQ是否成功接收,没有返回值;
@Testpublic void simpleTest3() {rocketMQTemplate.sendOneWay("simple", "这是一个单向消息");}
(4)延迟消息
延迟消息,指给消息设置一个延迟级别,达到指定时间后,消费者才能收到这个消息,延迟级别如下:
# 延迟级别,从1开始
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
(生产者)
@Testpublic void simpleTest4() {// 设置超时为1秒,延迟等级为3,即10秒rocketMQTemplate.syncSend("simple", MessageBuilder.withPayload("这是一个延迟消息").build(),1000,3);}
(消费者,10秒后才收到消息)
延迟消息相较于RabbitMQ,使用起来更方便,但是只能设置时间等级,不能设置准确时间,非常难受;
(5)批量消息
RocketMQ可以发送一个集合,如下:
(消费者)
@Testpublic void simpleTest5(){ArrayList<Message> list = new ArrayList<>();list.add(MessageBuilder.withPayload("aaa").build());list.add(MessageBuilder.withPayload("bbb").build());list.add(MessageBuilder.withPayload("ccc").build());rocketMQTemplate.syncSend("simple", list, 3000);}
(执行结果)
(6)消息过滤
消息过滤,是RocketMQ较与RabbitMQ独有的功能,指对发送的消息进行过滤,指接收限定条件的消息,对消息进行限制接收。有两种方式,如下:
a. 标签过滤
在发送消息时,指定topic的同时,加上一个标签,表示只发给有这个标签的消费者;
(生产者)
@Testpublic void simpleTest6(){rocketMQTemplate.syncSend("simple:tag", "Tag Message");}
(消费者)
@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple", selectorExpression = "tag1")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("接收到标签过滤消息 = " + s);}
}
(执行结果)
b. SQL过滤
另一种是SQL过滤的方式,在消费者这边,写SQL语句对消息进行过滤消息;
(生产者,设置name = SQL)
@Testpublic void simpleTest6(){// 标签方式rocketMQTemplate.syncSend("simple:tag", "Tag Message");// SQL语句方式rocketMQTemplate.syncSend("simple",MessageBuilder.withPayload("SQL Message").setHeader("name","SQL").build());}
(消费者,只接受name = SQL的消息)
@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",selectorType = SelectorType.SQL92, selectorExpression = "name = 'SQL'")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("接收到SQL语句过滤消息 = " + s);}
}
(执行结果)
(7)对象消息
RocketMQ当然也可以发送对象作为消息,该对象应该要实现Serializable接口,如下:
import java.io.Serializable;public class User implements Serializable {private String username;private String password;public User() {}public User(String username, String password) {this.username = username;this.password = password;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}@Overridepublic String toString() {return "User{" +"username='" + username + '\'' +", password='" + password + '\'' +'}';}
}
(生产者)
@Testpublic void simpleTest7(){User user = new User();user.setUsername("zhangsan");user.setPassword("123456");rocketMQTemplate.syncSend("simple", user);}
(消费者)
@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println("user = " + user);}
}
(执行结果)
(8)顺序消息
顺序消息,是指消息从发送到被消费,需要始终保持前后顺序。如下,发送15次消息,可以看到消费者那边的消费顺序并不是一直的;
@Testpublic void simpleTest1() {for (int i = 0; i < 15; i++) {rocketMQTemplate.syncSend("simple", "这是一个同步消息===>" + i);}}
顺序消息,需要保证以下两方面:
-
所有的消息存入到MQ中的同一个队列中,因为RocketMQ默认有四个队列,消息会被负载均衡存储在这些队列里;
-
该队列只能被一个线程消费,因为一个队列的消息在消费时会有多个线程同时进行消费;
前者可以通过,XxxOrderly()方法实现消息在队列中的顺序存储,如下:
(生产者:给对象设置一个ID,让它们按照ID顺序存储在MQ中)
@Testpublic void simpleTest8(){ArrayList<User> users = new ArrayList<>();User user1 = new User("1","zhangsan","zs");User user2 = new User("2","lisi","ls");User user3 = new User("3","wangwu","ww");users.add(user1);users.add(user2);users.add(user3);for (User user : users) {rocketMQTemplate.syncSendOrderly("simple",user,user.getId());}}
后者,可以通过在消费者这边添加这个配置,保证消息被顺序消费,如下:
(消费者,设置消费模式 consumeMode = ConsumeMode.ORDERLY
)
@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",consumeMode = ConsumeMode.ORDERLY)
public class ConsumerListener implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println(user);}
}
执行结果,可以看到消息时顺序进行的
总结
RocketMQ的内容还有很多,可参考 http://t.csdn.cn/QXQNZ