RabbitMQ简单使用

 

RabbitMq是一个消息中间件:它接收消息、转发消息。你可以把它理解为一个邮局:当你向邮箱里寄出一封信后,邮递员们就能最终将信送到收信人手中。 

RabbitMq、消息相关术语如下:

  生产者:生产者只发送消息,发送消息的程序即为生产者:

  

  消息队列:消息队列就相当于RabbitMq中的邮箱,消息存储在消息队列中。队列本质上是一个大的消息缓存,它能存多少消息,取决于主机的内存和磁盘限制。多个生产者可以往同一个消息队列中发送消息;多个消费者可以从同一个队列中获取数据。

  

  消费者:消费者是一个等待接收消息的程序:

  

  注意:生产者、消费者和RabbitMq可以在不同的机器上;在很多的应用中,一个生产者同时也可能是消费者。

在下面图形中,“P”是消息的生产者,“C”是消息的消费者,中间的红框是消息队列,保存了从生产者那里接收到的准备转发到消费方的消息。

  

 

Hello World

发送消息

生产者连接RabbitMq,发送一条简单的消息”Hello World!“后就退出。

在Send.java类中,需要引入以下依赖包:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

给队列起个名字:

public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {...}
}

创建连接到服务器的连接Collection:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {...
}

这个连接是套接字连接,为我们处理协议版本协商和身份验证等。这里我们连接一个本地的RabbitMq因此是localhost,如果想连接到一个远程的RabbitMq,只需要把localhst改成那台机器的IP地址即可。

创建完连接之后,要继续创建一个信道:Channel。使用try-with-resource表达式,因为Connection和Channel都实现了JAVA接口Closeable,属于资源,需要关闭。使用try-with-resource就不需要显示地在我们的代码中进行关闭了。(关于信道,请参考文章最顶部的RabbitMq原理图,它是TCP里面的虚拟链接,例如:电缆相当于一个TCP,信道就是里面的一个独立光纤,一条TCP上面创建多条信道是没有问题的;TCP一旦打开就会创建AMQP信道;无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的)。

为了发送消息,我们还必须定义一个消息发送到的消息队列:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

队列声明函数queueDeclare有多个参数,其分别是:

1. queue: 队列的名称 

2. durable: 是否持久化 

当durable = false时,队列非持久化。因为队列是存放在内存中的,所以当RabbitMQ重启或者服务器重启时该队列就会丢失 ;
当durable = true时,队列持久化。当RabbitMQ重启后队列不会丢失。RabbitMQ退出时它会将队列信息保存到 Erlang自带的Mnesia数据库 中,当RabbitMQ重启之后会读取该数据库
3. exclusive: 是否排外的 ;

当exclusive = true则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时自动删除 ;
当exclusive = false则设置队列为非排他的,此时不同连接(Connection)的管道Channel可以使用该队列 ;
注意2点:

排他队列是 基于连接(Connection) 可见的,同个连接(Connection)的不同管道 (Channel) 是可以同时访问同一连接创建的排他队列 。其他连接是访问不了的 ,强制访问将报错:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'hello-testExclusice' in vhost '/'.;

以下声明是没问题的:

	Channel channel = connection.createChannel();Channel channel2 = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, true, false, null);channel2.queueDeclare(QUEUE_NAME, false, true, false, null);=>如果是不同的 connection 创建的 channel 和 channel2,那么以上的=>channel2.queueDeclare()是会报错的!!!!!!

"首次" 是指如果某个连接(Connection)已经声明了排他队列,其他连接是不允许建立同名的排他队列的。这个与普通队列不同:即使该队列是持久化的(durable = true),一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
4. autoDelete: 是否自动删除 ;如果autoDelete = true,当所有消费者都与这个队列断开连接时,这个队列会自动删除。注意: 不是说该队列没有消费者连接时该队列就会自动删除,因为当生产者声明了该队列且没有消费者连接消费时,该队列是不会自动删除的。

5. arguments: 设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等。


basicPublish() 方法是基础的发布消息方法,它有四个参数

  1. String exchange : 交换机名, 当不使用交换机时,传入“”空串。
  2. String routingKey :(路由地址) 发布消息的队列, 无论channel绑定那个队列,最终发布消息的队列都由该字串指定。
  3. AMQP.BasicProperties props :消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化。
  4. byte[] body :消息数据本体, 必须是byte数组

定义一个消息队列时,只有该队列不存在的时候才能被创建。

消息是二进制数组,因此你可以根据需要指定编码。

完整的Send.java如下: 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

接收消息

消费者监听RabbitMq中的消息,因此与生产者发送一条消息就退出不同,消费者要保持运行状态来接收消息并打印出来。

与生产者相同,我们需要创建Connetcion和Channel、定义队列(需要监听并接收消息的队列):

public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}

注意我们也在这里声明队列,因为我们可能在生产者之前启动消费者。我们想要确保在我们尝试消费消息的时候队列就已经存在了。

这里我们为什么不使用try-with-resource表达式自动关闭channel和connection?这样我们就可以使我们的程序一直保持运行状态,如果把channel、connection这些关了,程序也就停止了。这就尴尬了,因为我们需要保持消费者一直处于异步监听消息过来的状态。

RabbitMq会将队列中的消息异步地推送过来,我们需要提供一个回调函数来缓存消息直到我们需要用到这些消息:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

basicConsume方法会启动一个消费者,并返回服务端生成的消费者标识,它的几个参数是
1. queue:队列名
2. autoAck:true 接收到传递过来的消息后自动acknowledged(应答服务器),false 接收到消息后不自动应答服务器
3. deliverCallback: 当一个消息发送过来后的回调接口
4. cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用
方法的返回值是服务端生成的消费者标识

完整的接收端代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

接下来创建一个工作队列,用于在多个工作者之间分配耗时的任务。

工作队列(即任务队列)的主要思想是避免立即执行那些需要等他们执行完成的资源密集型任务。相反,我们将任务安排在稍后完成。我们将任务封装为消息并将其发送到队列,后台运行的工作进程将取出任务并执行完成。如果你启动了多个工作者,这些任务将在多个工作者之间分享。

这个概念也即我们说的异步,在项目中,有时候一个简单的Web请求,后台要做一系列的操作。这时候,如果后台执行完成之后再给前台返回消息将会导致浏览器页面等待从而出现假死状态。因此,通常的做法是,在这个Http请求到后台,后台获取到正确的参数等信息后立即给前台返回一个成功标志,然后后台异步地进行后续的操作

准备

本章中,我们将发送字符串消息来模拟复杂的任务。这里因为没有一个真实的复杂任务,因此用Thread.sleep()方法来模拟复杂耗时的任务。我们用字符串中的含点(“.")的数量来表示任务的复杂程度,一个点表示一秒钟的耗时,例如:一个发送”Hello ...“字符串的任务将会耗时3秒钟。

修改前面的Send.java为NewTask.java,允许从命令行发送消息。修改后的程序会把任务发送到工作队列:

完整的NewTask.java代码为:

public class NewTask {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");try(Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message = String.join(" ", argv);channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

Recv.java也要做一些修改:模拟字符串消息中的每个点耗时1秒钟,它将处理传送过来的消息并执行任务,修改后的程序名字是Work.java:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

doWork是模拟执行过程中耗时的伪任务:

private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}
}

完整的Work.java为:

public class Worker {private final static String TASK_QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println(" [x] Done");}};boolean autoAck = true; // acknowledgment is covered belowchannel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });}private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}}
}

使用工作队列的优点之一是能够轻松地进行并行化操作。假设我们在做一个后台日志收集系统,我们可以很容易地增加更多的Worker从而提高系统性能。

首先,我们同时启动两个Worker,接下来,我们先后启动5个Task,并分别通过main()参数传入五个字符串消息:

First message.
Second message..
Third message...
Fourth message....
Fifth message.....

看一下两个Worker都接收到了什么样的消息

 默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均每个消费者将得到相同数量的消息。这种消息的调度方式称之为循环调度,你可以开启更多的Worker来进行测试。

因为消费者执行一个任务会有时间耗时,假设一个消费者在执行一个任务执行一半的时候挂掉了将会怎样?消息会不会因此丢失?在我们目前的代码里,一旦RabbitMq将一条消息转发给了一个消费者后,将会立即将消息删除(注意Worker.java里的autoAck目前设置为true),因此,在我们上面例子里,如kill掉一个正在处理数据的Worker,那么该数据将会丢失。不仅如此,所有那些指派给该Worker的还未处理的消息也会丢失。

但在实际工作中,我们并不希望一个Worker挂掉之后就会丢失数据,我们希望的是:如果该Worker挂掉了,所有转发给该Worker的消息将会重新转发给其他Worker进行处理(包括处理了一半的消息)。为了确保一条消息永不丢失,RabbitMq支持消息回执。消费者在接收到一条消息,并且成功处理完成之后会给RabbitMq回发一条确认ack确认消息,RabbitMq此时才会删除该条消息。如果一个Worker正在处理一条消息时挂掉了(信道关闭、连接关闭、TCP连接丢失),它将没有机会发送ack回执,RabbitMq就认为该消息没有消费成功于是便会将该消息重新放到队列中,如果此时有其他消费者还是在线状态,RabbitMq会立即将该条消息再转发给其他在线的消费者。这种机制可以保证任何消息都不会丢失。

默认情况下,需要手动进行消息确认,在前面的例子里,我们通过autoAck=true显示地关闭了手动消息确认,因此,RabbitMq将采用自动消息确认的机制。现在,我们修改我们的程序,采用手动发送回执的方式,当我们完成对消息的处理后,再手动发送回执确认:

channel.basicQos(1); // accept only one unack-ed message at a time (see below)DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

可以看到将autoAck设置为了false。

ack发送信道必须和接收消息的信道(channel)是同一个,如果尝试通过一个不同的信道发送ack回执,将会抛出channel等级协议异常(官网说会抛出异常,但在实际测试中并没有抛异常,只是该条消息得不到回执,从而也无法删除)。另一个常见的错误是关闭了自动ack后忘了手动回执,虽然只是一个简单的错误,但是带来的后果却是严重的,它将导致已经消费掉的消费不会被删除,并且当消费该消息的消费者在退出之后,RabbitMq会将该条消息重新进行转发,内存将被慢慢耗尽。我们可以通过下面的命令来检查这种错误:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

 该命令有三列内容,第一列是在监听的队列名称,第二列是Ready状态的消息数量,第三列是Unacked的消息数量。

消息的持久化

前面讲了如何保证当消费者挂掉之后消息不被丢失,但是,如果RabbitMq服务或者部署RabbitMq的服务器挂掉了之后,消息仍然会丢失。当RabbitMq崩溃之后,它将会忘记所有的队列和消息,除非,有什么机制让RabbitMq将队列信息和消息保存下来。

要确保消息和队列不会丢失,我们必须要确保两件事情。

首先,我们要确保RabbitMq永远不丢失队列,要做到这点,我们在定义的时候就需要告诉RabbitMq它是需要持久化的,通过指定durable参数实现:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

虽然这个命令本身是正确的,但需要注意的是我们前面hello队列是一个非持久化队列,RabbitMq不允许重新定义一个已经存在的队列(用不同的参数),否则会抛出异常。

com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, 
reply-text=PRECONDITION_FAILED - parameters for queue 'hello' in vhost '/' not equivalent, class-id=50, method-id=10)

要么重启RabbitMq让该临时队列消失,要么在控制台将该队列删除,或重新创建一个新的队列:

1 boolean durable = true;
2 channel.queueDeclare("task_queue", durable, false, false, null);

生产者和消费者要做同步修改。

做完上面这一步就保证了队列(task_quee)的持久化,此时,即便RabbitMq崩溃了也不会丢失该队列,当RabbitMq重启后将自动重新加载该队列。

其次,要确保消息也被持久化。要做到这一点,在生产者发布消息的时候需要指定消息的属性为:PERSISTENT_TEXT_PLAIN。

import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

注意,即便设置了消息的持久化属性也不能保证消息会被100%地写入到磁盘中。因为RabbitMq在接收到消息和写入到磁盘不是同步的,消息可能只是被写入到缓存中而还没来得及写入磁盘,RabbitMq崩溃了,此时也会丢失消息。但无论如何,比前面简单的消息队列已经强大了很多。

公平调度

你可能已经注意到,此时任务调度仍然不能完全按照我们希望的方式工作。举个例子,在只有两个Worker的环境中,奇数的消息比较重,偶数的消息比较轻时,一个Worker将会一直处于忙碌状态,而另一个Worker将会一直处于空闲状态,但RabbitMq并不知道这种情况,它会依然均衡地向两个Worker传递消息。发生这种情况是因为,当一个消息进入队列之后,RabbitMq只是盲目地将该第n个消息转发给第n个消费者,它并不关注每个消费者发了多少个回执。

为了解决这个问题,我们可以通过调用basicQos方法,给它传入1。这将告诉RabbitMq不要同时给一个队列转发多于1条的消息,换句话说,在一个消费者没有完成并回执前一条消息时,不要再给它转发其他消息。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

 完整的代码

NewTask.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = String.join(" ", argv);channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}

Worker.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}}
}

 工作队列模式的设想是每一条消息只会被转发给一个消费者。接下来会讲解另一种完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式

为了阐述这个模式,我们将会搭建一个简单的日志系统,它包含两种程序:一种发送日志消息,另一种接收并打印日志消息。在这个日志系统里,生产者发布的消息将会以广播的形式让每一个运行的消费者都可以获取的到,我们让其中一个消费者接收消息并写入磁盘,另一个消费者接收消息并打印在电脑屏幕上。

交换器(Exchange)

回忆一下前面的内容:

  • 一个生产者用以发送消息;
  • 一个队列缓存消息;
  • 一个消费者用以消费队列中的消息。

RabbitMq消息模式的核心思想是:一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。

实际上,生产者只能把消息发送给一个exchange,exchange只做一件简单的事情:一方面它们接收从生产者发送过来的消息,另一方面,把接收到的消息推送给队列。

一个exchage必须清楚地知道如何处理一条消息。

  

有四种类型的交换器,分别是:direct、topic、headers、fanout。主要讲解最后一种:fanous(广播模式)。下面创建一个fanout类型的交换器,将创建的交换机命名为logs,类型是fanout:

channel.exchangeDeclare("logs", "fanout");

广播模式交换器很简单,从字面意思也能理解,就是把接收到的消息推送给所有它知道的队列。在我们的日志系统中正好需要这种模式。

如果想查看当前系统中有多少个exchange,可以使用以下命令:

sudo rabbitmqctl list_exchanges

  

或者通过控制台查看(Exchanges标签下):

可以看到有很多以amq.*开头的交换器,以及(AMQP default)默认交换器,这些是默认创建的交换器。

在前面工作队列的指南中,我们并未显式的使用交换器(指定交换器为字符串""),但是依然可以将消息发送到队列中,其实并不是我们没有使用交换器,实际上是我们使用的是默认交换器,回顾一下我们之前是如何发送消息的:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数是交换器的名字,空字符串表示它是一个默认或无命名的交换器,消息将会由指定的路由键(第二个参数,routingKey,后面会讲)转发到队列。

既然exchange可以指定为空字符串(""),那么可否指定为null?答案是:不能!

在AMQImpl类中的Publish()方法中,不光是exchange不能为null,routingKey路由键也不能为null,否则会抛出异常:

现在,可以显式的使用刚创建的交换器:

channel.basicPublish( "logs", "", null, message.getBytes());

临时队列

前面我们使用的队列都是有具体的队列名(hello),创建命名队列是很必要的,因为我们需要将消费者指向同一名字的队列。因此,要想在生产者和消费者中间共享队列就必须要使用命名队列。

但是,现在讲解的日志系统也可以使用非命名队列(可以不手动命名),我们希望收到所有日志消息,并且我们希望总是接收到新的日志消息而不是旧的日志消息。为了解决这个问题,需要分两步走。

首先,无论何时当消费者连接到RabbitMq,都需要一个新的、空的队列来接收日志消息,因此,消费者在连接上RabbitMq之后需要创建一个任意名字的队列或者让RabbitMq生成一个任意的队列名字。

其次,一旦该消费者断开了与RabbitMq的连接,队列也被自动删除。

使用无参的queueDeclare(),就可以创建一个非持久化、专有的、自动删除且名字随机生成的队列。

String queueName = channel.queueDeclare().getQueue();

绑定(Binding)

  

当广播模式的交换器和队列已经创建好了,接下来就是要告诉交换器向队列里发送消息。交换器与队列之间的关系称之为绑定关系

channel.queueBind(queueName, "logs", "");

queueBind的第三个参数是routingkey。

至此,交换器已经可以往队列中发送消息了。

可以通过下列命令来查看队列的绑定关系:

rabbitmqctl list_bindings

完整的代码  

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel();) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String message = "RabbitMq fanout......";channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("utf-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

Connection创建完成之后,定义了exchange,这一步是必要的,因为如果没有交换器将无法发送消息。如果没有队列绑定到该交换器上,那么,交换器收到的消息将会丢失掉,但是对本章的日志系统来说没问题的,当没有消费者时,就完全的放弃掉数据,消费者连接上时,只接收最新的日志消息就好。

public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);final String queue = channel.queueDeclare().getQueue();channel.queueBind(queue,EXCHANGE_NAME,"");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTa,delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queue,true,deliverCallback,consumerTag -> {});}
}

basicConsume的autoAck设置为true,因为现在是广播模式,每个消费者都会收到一样的消息,并且这里给消费者生产的随机名称的队列相当于是独有的,所以在接收到消息之后立即发送确认回执是OK的。

现在已经可以把消息广播给很多的消费者。接下来我们将增加一个特性:订阅这些信息中的一些信息。例如,只将error级别的错误存储到硬盘中,同时可以将所有级别(error、info、warning等)的日志都打印在控制台上。

绑定(Bindings)

回顾一下创建绑定关系的代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");

一个绑定是一个交换器与队列之间的关系。意思是指:这个队列对这个交换器的消息感兴趣。

该方法同时还有另一个routing Key参数(第三个参数),为了避免与basicPublish参数中的路由键(routing key)混淆,我们称之为绑定键(bingind key),下面展示了如何通过一个绑定key创建一个绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键(这里是"black")的含义依赖于交换器的类型。在我们的日志系统中,交换器类型是fanout,绑定键没有任何意义,会被忽略掉。

直连交换机(Direct Exchange)

我们正在用的广播模式的交换器并不够灵活,它只是不加思索地进行广播。现在使用direct exchange来代替。直连交换器的路由算法非常简单:将消息推送到binding key与该消息的routing key相同的队列。

请看下图:

  

在该图中,直连交换器X上绑定了两个队列。第一个队列的绑定键orange,第二个队列有两个绑定键:black和green。在这种场景下,一个消息在发布时(basicPublish)指定的路由键若为orange,则该消息将只被路由到队列Q1,若路由键为black或green的消息,将只被路由到队列Q2。其他的消息都将被丢失。

多重绑定

  

同一个绑定键可以绑定到不同的队列上去,在上图中,交换器X与队列Q2的绑定键也是black,在这种情况下,直连交换器将会和广播交换器有着相同的行为,将消息推送到所有匹配的队列。一个路由键为black的消息将会同时被推送到队列Q1和Q2。

发送日志的代码片段

//一如既往的先创建一个交换器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//发送消息:"severity"参数是日志系统中“info”、“warning”和“error”中的一个。
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

接收消息的代码片段与之前的基本相同,只是需要在创建绑定关系时,指定severity的值(也就是绑定值):

String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

完整的代码

  EmitLogDirect.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");String severity = getSeverity(argv);String message = getMessage(argv);channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + severity + "':'" + message + "'");}}//..
}

发送者发送消息的routingKey和消息都来自于命令行传递过来的argv参数中。

ReceiveLogsDirect.java

import com.rabbitmq.client.*;public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");String queueName = channel.queueDeclare().getQueue();if (argv.length < 1) {System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");System.exit(1);}for (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

消费者接收消息时,queueBind的bindingKey也要来自于命令行的argv参数。

用direct交换器替换了fanout交换器,使得我们可以有选择性地接收消息。尽管如此,仍然还有限制:不能基于多个标准进行路由。

在日志系统中,我们可能不仅希望根据日志等级订阅日志,还希望根据日志来源订阅日志。这个概念来自于unix工具syslog,它不仅可以根据日志等级(info/warn/crit...)来路由日志,同时还可以根据设备(auth/cron/kern...)来路由日志,这将更加灵活。我们可能希望只监听来自'cron'的error级别日志,同时又要接收来自'kern'的所有级别的日志。我们的日志系统如果要实现这个功能,就需要使用到另外一种交换器:主题交换器(Topic Exchange)。

主题交换器(Topic Exchange)

发送到主题交换器的消息的routing key必须是由点号分开的一串单词,这些单词可以是任意的,但通常是与消息相关的一些特征。比如以下是几个有效的routing key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",routing key的单词可以有很多,最大限制是255 bytes。

binding key必须与routing key模式一样。Topic交换器的逻辑与direct交换器有点相似:使用特定路由键发送的消息将被发送到所有使用匹配的绑定键绑定的队列。然而,绑定键有两个特殊的情况,如下:

  • 表示匹配任意一个单词
  • # 表示匹配任意一个或多个单词

下图表示了这这两个通配符的用法:

  

在这个例子中,我们将发送所有跟动物有关的消息,这些消息将会发送到由三个单词,两个点号组成的routing key,第一个单词了表示的是速度,第二个单词表示颜色,第三个单词表示种类:

  "<speed>.<colour>.<species>"。

  我们创建三个绑定关系:队列Q1绑定到绑定键*.orange.* ,队列Q2绑定到*.*.rabbit和lazy.#。

  总结下来就是:

  • 队列Q1对橘黄色(orange)颜色的所有动物感兴趣;
  • 队列Q2对所有的兔子(rabbit)和所有慢吞吞(lazy)的动物感兴趣。

一个路由为 "quick.orange.rabbit"的消息,将会被转发到这两个队列,路由为"lazy.orange.elephant"的消息也被转发给这两个队列,路由为 "quick.orange.fox"的消息将只被转发到Q1队列,路由为 "lazy.brown.fox"的消息将只被转发到Q2队列。"lazy.pink.rabbit" 只被转发到Q2队列一次(虽然它匹配绑定键*.*.rabbit和lazy.#),路由为 "quick.brown.fox"的消息与任何一个绑定键都不匹配,因此将会被丢弃。

如果我们发送的消息的的路由是由一个单词“orangle"或4个单词”quick.orangle.male.rabbit“将会怎样?会因为与任何一个绑定键不匹配而被丢弃。若路由为 "lazy.orange.male.rabbit"的消息,因为匹配"lazy.#"绑定键,因而会被转发到Q2队列。

Topic交换器非常强大,可以像其他类型的交换器一样工作:当一个队列的绑定键是"#"是,它将会接收所有的消息,而不再考虑所接收消息的路由键,就像是fanout交换器一样;当一个队列的绑定键没有用到”#“和”*“时,它又像direct交换一样工作。

2、完整的代码

  下面是在我们日志系统中采用Topic交换器的完整代码,日志消息的路由由两个单词组成:"<facility>.<severity>"。

EmitLogTopic.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class EmitLogTopic {private final static String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost);try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String message = "A critical kernel error";String routingKey = "kern.critical";channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("utf-8"));System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");}}
}

ReceiveLogsTopic.java

import com.rabbitmq.client.*;public class ReceiveLogsTopic {private final static String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String queueName = channel.queueDeclare().getQueue();if (args.length < 1) {System.err.println("Usage: ReceiveLogsTopic [binding_key]...");System.exit(1);}for (String bindingKey : args) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/91188.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

React(5)

1.受控组件案例 1.1之前的影院案例改写 import React, { Component } from react import axios from axios import BetterScroll from better-scroll import ./css/02_tab.cssexport default class Cinema extends Component {constructor() {super();this.state {cinemaLis…

Jmeter进阶使用:BeanShell实现接口前置和后置操作

一、背景 我们使用Jmeter做压力测试或者接口测试时&#xff0c;除了最简单的直接对接口发起请求&#xff0c;很多时候需要对接口进行一些前置操作&#xff1a;比如提前生成测试数据&#xff0c;以及一些后置操作&#xff1a;比如提取接口响应内容中的某个字段的值。举个最常用…

只需要自动售货机,商业模式立马大变样!

随着互联网、大数据和人工智能的蓬勃发展&#xff0c;商业模式正以前所未有的方式融合&#xff0c;其中自动售货机作为新零售模式的一颗璀璨明珠&#xff0c;正引领着购物体验的革命。这个巧妙的结合将消费者的便利、数据的智能分析以及科技的创新融为一体&#xff0c;重新定义…

Mysql:Access denied for user ‘root‘@‘localhost‘ (using password:YES)解决方案

最近在配置Maven以及Mybatis时&#xff0c;连接localhost数据库时出现无法连接&#xff0c;用cmd测试时报错&#xff1a;Access denied for user ‘ODBC’‘localhost’ (using password: NO)&#xff0c;这个意思就是不允许远程访问&#xff0c;一开始笔者进入mysql试了一下是…

Untiy Json和Xml的序列化和反序列化

Json的序列化和反序列化 1.定义数据类 [Serializable] public class ZoomPoint {// 点名称, 将作为Key被字典存储public string name;// 轴心X坐标public Vector2 pivot Vector2.one / 2;// 放大倍率&#xff0c;小于1是为缩小倍率&#xff0c;小于0是取绝对值&#xff0c;不…

机器学习中基本的数据结构说明

数据维度或数据结构 当我们在机器学习或深度学习的领域内处理数据&#xff0c;我们通常会遇到四种主要的数据结构&#xff1a;标量&#xff0c;向量&#xff0c;矩阵和张量。理解这些基本数据结构是非常重要的&#xff0c;因为它们是机器学习算法和神经网络的核心。下面是对这…

NLPR、SenseTime 和 NTU 加速自动视频纵向编辑

视频人像编辑技术已经在电视、视频和电影制作中得到了应用&#xff0c;并有望在不断发展的网真场景中发挥关键作用。最先进的方法已经可以逼真地将同源音频合成为视频。现在&#xff0c;来自北京模式识别国家实验室&#xff08;NLPR&#xff09;、商汤科技研究和南洋理工大学的…

go.sum are different when using go mod vendor/download

本地Golang配置 今天本地编译一个项目&#xff0c;遇到以下错误 PS D:\Code\Golang\jiankunking\k8s-ext> go mod tidy go: downloading github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.4incompatible verifying github.com/gin-gonic/ginv1.7.3: checksum mismat…

JVM 内存结构快速入门

文章目录 一、简介二、JVM内存区域2.1 方法区2.3.2 永久代和元空间 2.2 堆2.1.2 对象的创建和销毁 2.2 栈内存2.2.1 栈帧的组成和作用2.2.2 栈的特点 2.4 程序计数器2.4.1 程序计数器的作用和使用场景 一、简介 Java 内存模型&#xff08;Java Memory Model&#xff0c;JMM&…

微信小程序云开发快速入门(2/4)

前言 我们对《微信小程序云开发快速入门&#xff08;1/4&#xff09;》的知识进行回顾一下。在上章节我们知道了云开发的优势以及能力&#xff0c;并且我们还完成了码仔备忘录的本地版到网络版的改造&#xff0c;主要学习了云数据库同时还通过在小程序使用云API直接操作了云数…

做海外游戏推广有哪些条件?

做海外游戏推广需要充分准备和一系列条件的支持。以下是一些关键条件&#xff1a; 市场调研和策略制定&#xff1a;了解目标市场的文化、玩家偏好、竞争格局等是必要的。根据调研结果制定适合的推广策略。 本地化&#xff1a;将游戏内容、界面、语言、货币等进行本地化&#…

智慧工地源码,PC+APP源码,项目管理端+企业监管端+数据大屏端

智慧工地概念 智慧工地是一种崭新的工程全生命周期管理理念&#xff0c;是指运用信息化手段&#xff0c;通过对工程项目进行精确设计和施工模拟&#xff0c;围绕施工过程管理&#xff0c;建立互联协同、智能生产、科学管理的施工项目信息化生态圈&#xff0c;并将此数据在虚拟…

DIP:《Deep Image Prior》经典文献阅读总结与实现

文章目录 Deep Image Prior1. 方法原理1.1 研究动机1.2 方法 2. 实验验证2.1 去噪2.2 超分辨率2.3 图像修复2.4 消融实验 3. 总结 Deep Image Prior 1. 方法原理 1.1 研究动机 动机 深度神经网络在图像复原和生成领域有非常好的表现一般归功于神经网络学习到了图像的先验信息…

解压版 MySQL 数据库的安装与配置

目录 1 下载2 安装3 配置3.1 添加环境变量3.2 新建配置文件3.3 初始化MySQL3.4 注册MySQL服务3.5 启动MySQL服务3.6 修改默认账户密码 4 登录5 卸载 安装环境:Win10 64位 软件版本:MySQL 5.7.24 解压版 1 下载 点击链接 进入如下界面 ❗️注意&#xff1a; 我们一般不会选择最新…

物联网智慧安防实训综合实训基地建设方案

一、系统概述 物联网智慧安防实训综合实训基地是一个为学生提供综合实践、培养技能的场所&#xff0c;专注于物联网技术与智慧安防应用的培训和实训。通过物联网智慧安防实训综合实训基地的建设和运营&#xff0c;学生可以在真实的环境中进行实践训练&#xff0c;提高其物联网技…

MongoDB增删改查操作

数据库操作&#xff1a; 在MongoDB中&#xff0c;文档集合存在数据库中。 要选择使用的数据库&#xff0c;请在mongo shell程序中发出 use <db> 语句 // 查看有哪些数据库 show dbs;// 如果数据库不存在&#xff0c;则创建并切换到该数据库&#xff0c;存在则直接切换到…

使用ntp服务器调整linux系统时间(附带代码示例)

前言 这是我在这个网站整理的笔记&#xff0c;关注我&#xff0c;接下来还会持续更新。 作者&#xff1a;RodmaChen 为了防止应用在系统上运行时候&#xff0c;系统时间与真实时间出现误差。可以调用ntp服务器获取正确的时间进行调整 NTP简介 网络时间协议&#xff08;NTP&…

Redis数据结构——链表list

链表是一种常用的数据结构&#xff0c;提供了顺序访问的方式&#xff0c;而且高效地增删操作。 Redis中广泛使用了链表&#xff0c;例如&#xff1a;列表的底层实现之一就是链表。 在Redis中&#xff0c;链表分为两部分&#xff1a;链表信息 链表节点。 链表节点用来表示链表…

CentOS系统环境搭建(五)——Centos7安装maven

centos系统环境搭建专栏&#x1f517;点击跳转 Centos7安装maven 下载压缩包 maven下载官网 解压 压缩包放置到/usr/local tar -xvf apache-maven-3.9.2-bin.tar.gz配置环境变量 vim /etc/profile在最下面追加 MAVEN_HOME/usr/local/apache-maven-3.9.2 export PATH${MAV…