目录
1、hello world体验
2、Work queues 工作序列
3、Publish/Subscribe订阅与发布
4、Routing 基于内容的路由
5、Topics 基于话题的路由
6、Headers 头部路由机制
7、Publisher Confirms 发送者消息确认
1、发布单条消息
2、发送批量消息
3、异步确认消息
1、hello world体验
最直接的方式,P端发送一个消息到一个指定的queue,中间不需要任何exchange规则。C端按queue方式进行消费。
关键代码:(其实关键的区别也就是几个声明上的不同。)
producer:
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
consumer:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
这个模式就是之前演示的案例。
2、Work queues 工作序列
这是RabbitMQ最基础也是最常用的一种工作机制。
Producer消息发送给queue,多个Consumer同时往队列上消费消息。一般来讲一条消息对应一个消费者消费(消费者消费消息的数量可以设置)。
producer: 将消息直接到Queue上
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); //任务一般是不能因为消息中间件的服务而被耽误的,所以durable设置成了true,这样,即使rabbitMQ服务断了,这个消息也不会消失
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
Consumer: 每次拉取一条消息(通过调整QOS可以指定一次拉取多少消息)
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
首先,Consumer端的autoAck字段设置的是false,这表示consumer在接收到消息后不会自动反馈服务器已消费了message,而要改在对message处理完成了之后(也就是consumer中的处理逻辑中),再调用channel.basicAck来通知服务器已经消费了该message.这样即使Consumer在执行message过程中出问题了,也不会造成message被忽略,因为没有ack的message会被服务器重新进行投递。
但是,这其中也要注意一个很常见的BUG,就是如果所有的consumer都忘记调用basicAck()了,就会造成message被不停的分发,也就造成不断的消耗系统资源。这也就是 Poison Message(毒消息)
RabbitMQ默认是采用的fair dispatch,也叫round-robin模式,就是把消息轮询,在所有consumer中轮流发送。这种方式,没有考虑消息处理的复杂度以及consumer的处理能力。而改进后的方案,是consumer可以向服务器声明一个prefetchCount,我把他叫做预处理能力值。channel.basicQos(prefetchCount);表示当前这个consumer可以同时处理几个message。这样服务器在进行消息发送前,会检查这个consumer当前正在处理中的message(message已经发送,但是未收到consumer的basicAck)有几个,如果超过了这个consumer节点的能力值,就不再往这个consumer发布。
这种模式,消息有可能全部阻塞,所有consumer节点都超过了能力值,那消息就阻塞在服务器上,这时需要自己及时发现这个问题,采取措施,比如增加consumer节点或者其他策略。
3、Publish/Subscribe订阅与发布
type为fanout(广播) 的exchange
这种机制是对上面的一种补充。也就是把preducer与Consumer进行进一步的解耦。producer只负责发送消息,至于消息进入哪个queue,由exchange来分配。如上图,就是把producer发送的消息,交由exchange同时发送到两个queue里,然后由不同的Consumer去进行消费。
producer: 只负责往exchange里发消息,后面的事情不管
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
receiver: 将消费的目标队列绑定到exchange上
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
发布与订阅关键处就是type为”fanout” 的exchange,这种类型的exchange只负责往所有已绑定的队列上发送消息。
4、Routing 基于内容的路由
type为direct(直连)的exchange
在上一种exchange往所有队列发送消息的基础上,增加一个路由配置,指定exchange如何将不同类别的消息分发到不同的queue上。
Producer
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
Receiver
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
channel.basicConsume(queueName, true, consumer);
交换机根据routinkey(路由键)将消息路由到指定的队列中,这样消费者只关心队列中是否有消息即可。
5、Topics 基于话题的路由
type为topic(主题)的exchange
这个模式也就在上一个模式的基础上,对routingKey进行了模糊匹配单词之间用,隔开,* 代表一个具体的单词。# 代表0个或多个单词。
Producer
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
Receiver
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
channel.basicConsume(queueName, true, consumer);
核心就是通过routingkey来进行模糊匹配。
6、Headers 头部路由机制
在官网中还有一种路由策略并没有提及,那就是Headers路由。其实官网之所以没有过多介绍,就是因为这种策略在实际中用得比较少(官网提倡用Routingkey方式)。
Producer:发送消息时,带上消息的headers相关属性
public class EmitLogHeader {private static final String EXCHANGE_NAME = "logs";/*** exchange有四种类型, fanout topic headers direct* headers用得比较少,他是根据头信息来判断转发路由规则。头信息可以理解为一个Map* @param args* @throws Exception*/public static void main(String[] args) throws Exception{// header模式不需要routingKey来转发,他是根据header里的信息来转发的。比如消费者可以只订阅logLevel=info的消息。// 然而,消息发送的API还是需要一个routingKey。 // 如果使用header模式来转发消息,routingKey可以用来存放其他的业务消息,客户端接收时依然能接收到这个routingKey消息。String routingKey = "ourTestRoutingKey";// The map for the headers.Map<String, Object> headers = new HashMap<>();headers.put("loglevel", "error");headers.put("buslevel", "product");headers.put("syslevel", "admin");String message = "LOG INFO asdfasdf";Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());builder.headers(headers);channel.basicPublish(EXCHANGE_NAME, routingKey, builder.build(), message.getBytes("UTF-8"));channel.close();connection.close();}
}
Consumer:声明Queue与Exchange绑定关系时,可以增加声明headers,表明自己对哪些信息感兴趣
Map<String, Object> headers = new HashMap<String, Object>();headers.put("loglevel", "info");headers.put("buslevel", "product");headers.put("syslevel", "admin");Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);String queueName = channel.queueDeclare("ReceiverHeader",true,false,false,null).getQueue();channel.queueBind(queueName, EXCHANGE_NAME,routingKey,headers);
Headers交换机的性能相对比较低,因此官方并不建议大规模使用这种交换机,也没有把他列入基础的示例当中。
7、Publisher Confirms 发送者消息确认
RabbitMQ的消息可靠性是非常高的,但是他以往的机制都是保证消息发送到了MQ之后,可以推送到消费者消费,不会丢失消息。但是发送者发送消息是否成功是没有保证的。发送者发送消息的基础API:Producer.basicPublish方法是没有返回值的,也就是说,一次发送消息是否成功,应用是不知道的,这在业务上就容易造成消息丢失。而这个模块就是通过给发送者提供一些确认机制,来保证这个消息发送的过程是成功的。
发送者确认模式默认是不开启的,所以如果需要开启发送者确认模式,需要手动在channel中进行声明。
channel.confirmSelect();
1、发布单条消息
发布一条消息就确认一条消息
for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);channel.basicPublish("", queue, null, body.getBytes());channel.waitForConfirmsOrDie(5_000);
}
channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应,用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意,这个方法会同步阻塞channel,在等待确认期间,channel将不能再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。然后如果到了超时时间,还没有收到服务端的确认机制,那就会抛出异常。然后通常处理这个异常的方式是记录错误日志或者尝试重发消息,但是尝试重发时一定要注意不要使程序陷入死循环。
2、发送批量消息
之前单条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后,再一起确认。
int batchSize = 100;int outstandingMessageCount = 0;long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);ch.basicPublish("", queue, null, body.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}}if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);}
这种方式可以稍微缓解下发送者确认模式对吞吐量的影响。但是也有个固有的问题就是,当确认出现异常时,发送者只能知道是这一批消息出问题了, 而无法确认具体是哪一条消息出了问题。所以接下来就需要增加一个机制能够具体对每一条发送出错的消息进行处理。
3、异步确认消息
实现方式,Producer在channel中添加一个注册监听器来对消息进行确认。
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);@FunctionalInterface
public interface ConfirmCallback {void handle(long sequenceNumber, boolean multiple) throws IOException;
}
参数中的两个ConfirmCallback一个表示成功逻辑、一个表示失败逻辑。
ConfirmCallback,这个监听器接口,里面只有一个方法: void handle(long sequenceNumber, boolean multiple) throws IOException; 这方法中的两个参数如下
sequenceNumer(long)
这个是一个唯一的序列号,代表一个唯一的消息。在RabbitMQ中,他的消息体只是一个二进制数组,默认消息是没有序列号的。那么在回调的时候,RabbitMQ提供了一个方法int sequenceNumber = channel.getNextPublishSeqNo();来生成一个全局递增的序列号,这个序列号将会分配给新发送的那一条消息。然后应用程序需要自己来将这个序列号与消息对应起来。需要客户端自己去做对应!!!
multiple(boolean)
这个是一个Boolean型的参数。如果是false,就表示这一次只确认了当前一条消息。如果是true,就表示RabbitMQ这一次确认了一批消息,在sequenceNumber之前的所有消息都已经确认完成了。
发送者消息确认模式不光可以确认消息是否到了Exchange,还可以确认消息是否从Exchange成功路由到了Queue。在Channel中可以添加一个ReturnListener。这个ReturnListener就会监控到这一部分发送成功了,但是无法被Consumer消费到的消息。