1、 基本消息模型
官方介绍:
RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。
在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。
• P(producer/ publisher):生产者,一个发送消息的用户应用程序。
• C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
• 队列(红色区域):rabbitmq内部类似于邮箱的一个概念。
• 虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。
• 队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。
• 许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
• 总之:
o 生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。
o 我们将用Java编写两个程序;发送单个消息的生产者,以及接收消息并将其打印出来的消费者。
o 我们将详细介绍Java API中的一些细节,这是一个消息传递的“Hello World”。
o 我们将调用我们的消息发布者(发送者)Send和我们的消息消费者(接收者)Recv。
o 发布者将连接到RabbitMQ,发送一条消息,然后退出。
1.1、 生产者发送消息
import com.atuigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道,使用通道才能完成消息相关的操作Channel channel = connection.createChannel();// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 向指定的队列中发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//关闭通道和连接channel.close();connection.close();}
}API:
/ *** 声明队列* @param queue队列名称* @param durable 如果我们声明一个持久队列,则为true(该队列将在服务器重启后继续存在)* @param Exclusive 如果我们声明一个排他队列,则为true(仅限此连接)* @param autoDelete 如果我们声明一个自动删除队列,则为true(服务器将在不再使用它时将其删除)* @param arguments 参数队列的其他属性(构造参数)* @return一个声明确认方法来指示队列已成功声明* /
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
控制台:
1.2、管理工具中查看消息
进入队列页面,可以看到新建了一个队列:simple_queue
点击队列名称,进入详情页,可以查看消息:
在控制台查看消息并不会将消息消费,所以消息还在。
1.3、消费者获取消息
import com.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/*** 消费者*/
public class Recv {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [x] received : " + msg + "!");}};// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);}
}
控制台:
这个时候,队列中的消息就没了:
我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印。
1.4、 消息确认机制(ACK)
• 通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
• 那么问题来了:RabbitMQ怎么知道消息被接收了呢?
• 如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
• 因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。
• 不过这种回执ACK分两种情况:
o 自动ACK:消息一旦被接收,消费者自动发送ACK
o 手动ACK:消息接收后,不会发送ACK,需要手动调用
• 大家觉得哪种更好呢?
• 这需要看消息的重要性:
o 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
o 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
• 我们之前的测试都是自动ACK的,如果要手动ACK,需要改动我们的代码:
public class Recv2 {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 创建通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);//int i = 1/0;System.out.println(" [x] received : " + msg + "!");// 手动进行ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列,第二个参数false,手动进行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);}
}
注意到最后一行代码:
channel.basicConsume(QUEUE_NAME, false, consumer);
如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。方法的声明:
1.5、自动ACK存在的问题
修改消费者,添加异常,如下:
生产者不做任何修改,直接运行,消息发送成功:
运行消费者,程序抛出异常。但是消息依然被消费:
管理界面:
1.6、演示手动ACK
修改消费者,把自动改成手动(去掉之前制造的异常)
生产者不变,再次运行:
运行消费者
但是,查看管理界面,发现:
停掉消费者的程序,发现:
这是因为虽然我们设置了手动ACK,但是代码中并没有进行消息确认!所以消息并未被真正消费掉。
当我们关掉这个消费者,消息的状态再次称为Ready
修改代码手动ACK:
执行:
消息消费成功!