RabbitMQ学习笔记(消息发布确认,死信队列,集群,交换机,持久化,生产者、消费者)

 MQ(message queue):本质上是个队列,遵循FIFO原则,队列中存放的是message,是一种跨进程的通信机制,用于上下游传递消息。MQ提供“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ,不需要依赖其它服务。

功能1:流量消峰

功能2:应用解耦

功能3:异步处理

MQ的分类:

1.Kafka

2.RabbitMQ

RabbitMQ概念:

四大核心概念:

交换机:

队列: 

 六大核心模式:

1.简单模式。2.工作模式。3.发布订阅模式。4.路由模式。5.主题模式。6.发布确认模式。

RabbitMQ工作原理:

Channer:信道,发消息的通道。

下载:

1. 官网地址:https://www.rabbitmq.com/download.html。参考的下载地址如下:Linux下安装RabbitMQ_rabbitmq下载_零碎de記憶的博客-CSDN博客

2.安装Erlang环境

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers

3.下载Erlang,方式1:找到下面网址,在网址中下载rpm文件:

el/7/erlang-22.3.4.12-1.el7.x86_64.rpm - rabbitmq/erlang · packagecloud

或者直接输入下面指令下载rpm文件: 

wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm

然后输入下面的命令安装已下载的安装包:

yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm

4.下载RabbitMQ,输入下面的下载

wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm

 输入下面的命令进行本地安装:

yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm

5. 下载socat,检查是否已下载:

yum install socat -y

注意以下的操作都要在 /usr/local/software目录下查看: 

6.添加开机启动RabbitMQ服务:chkconfig rabbitmq-server on。启动rabbitmq /sbin/service rabbitmq-server start。

7.查看服务状态 /sbin/service rabbitmq-server status

8.停止服务 /sbin/service rabbitmq-server stop。重新查看服务状态。

10.开启web管理界面 sudo rabbitmq-plugins enable rabbitmq_management

11.查看防火墙状态:systemctl status firewalld。关闭防火墙:systemctl stop firewalld。关闭rabbit服务器输入:sudo rabbitmqctl stop。开启rabbit服务器输入:sudo rabbitmq-server -detached。

12.在浏览器中输入地址:Linux服务器ip地址:15672,可访问web管理界面。

13.用户名guest,密码默认,但无法登陆,无权限。

14.rabbitmqctl list_users查看用户。创建账号 rabbitmqctl add_user admin 123。设置用户角色为管理员 rabbitmqctl set_user_tags admin administrator。设置用户权限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"。

15.再经尝试可以重新登录:

创建Java开发环境

1.创建1个新项目,命名atguigu-rabbitmq,然后创建模块Module。GroupId可以填写:com.atguigu.rabbitmq,ArtifactId可以填rabbitmq-hello,选择quickstart:

导入依赖如下:

  <dependencies><!--rabbitmq依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency><dependency><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version> <!-- 根据你的需求指定版本号 --></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>

在下图中,P是生产者,C是消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓存区。

生产者代码

public class producer {//队列名称public static final String QUEUE_NAME = "hello";//发消息public static void main( String[] args ) throws IOException, TimeoutException {//第1步:创建一个连接工程ConnectionFactory factory = new ConnectionFactory();//第2步:输入工厂IP,用户名和密码——连接RabbitMQd队列factory.setHost("192.168.182.136");factory.setUsername("admin");factory.setPassword("123");//第3步:创建连接Connection connection = factory.newConnection();//第4步:获取信道Channel channel = connection.createChannel();//第5步:生成一个队列(队列名称,是否持久化,是否排他,自动删除,队列参数)//持久化:是否存储入磁盘,默认是将消息存储在内存中//排他:队列是否只供一个消费者消费,是否进行消息共享,true可以供多个消费者消费//自动删除:最后一个消费者断开连接后,该队列是否自动删除channel.queueDeclare(QUEUE_NAME,false,false,false,null);//第6步:发消息,(交换机,路由key本次是队列名,参数,发送的消息)String message = "hello world";channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送成功!!!");}
}

消费者代码

public class consumer {public static final String QUEUE_NAME = "hello";public static void main(String [] args) throws IOException, TimeoutException {//第1步:创建一个连接工程ConnectionFactory factory = new ConnectionFactory();//第2步:输入工厂IP,用户名和密码——连接RabbitMQd队列factory.setHost("192.168.182.136");factory.setUsername("admin");factory.setPassword("123");//第3步:创建连接Connection connection = factory.newConnection();//第4步:获取信道Channel channel = connection.createChannel();//第5步:声明,接收消息DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody()));};//第6步:取消消息时的回调CancelCallback cancelCallback = consumerTag->{System.out.println("消息消费被中断");};//第7步:接收,(队列名,自动or手动,接收消息,回调)//1.消费哪个队列;2.消费成功后是否要自动应答true代表自动应答,false表示手动应答//3.消费者未成功消费的回调channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

注意几点:1.确保rabbitmq处于开启的状态(开启方式见前面)2.最好让防火墙处于关闭的状态 3.最好通过方法左侧的开关按钮进行启动,确保启动是选择Current File。

 

工作队列:

工作队列:又称任务队列,主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

情况:生产者大量分发消息给队列,工作线程接收队列的消息,工作线程不止一个,三者关系时竞争关系,你一条我一条他一条,但要注意一个消息只能被处理一次,不能被处理多次。

重复性的代码可以被抽取成为工具类。

在java — com — atguigu — rabbitmq下创建utils包,工具类起名RabbitMqUtils,放入如下代码:

public class RabbitMqUtils {public static Channel getChannel() throws Exception{//第1步:创建一个连接工程ConnectionFactory factory = new ConnectionFactory();//第2步:输入工厂IP,用户名和密码——连接RabbitMQd队列factory.setHost("192.168.182.137");factory.setUsername("admin");factory.setPassword("123");//第3步:创建连接Connection connection = factory.newConnection();//第4步:获取信道Channel channel =  connection.createChannel();return channel;}
}

工作线程的更新后,worker01的代码如下:

public static final String QUEUE_NAME = "hello";public static void main(String [] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//声明队列,没有会报错//消息接收DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("接收到的消息:" + new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag)->{System.out.println(consumerTag + "消息被取消消费接口回调逻辑");};System.out.println("c1等待接收消息......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}

重复利用one包下的consumer类,将其更改为c2工作线程:

Task01作为生产者用于生产数据,与前面不同的是,Task01支持从IDEA控制台输入数据:

public class Task01 {public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接收信息Scanner scanner = new Scanner(System.in); //扫描控制台输入内容while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送消息完成..");}}
}

  

消息应答:

概念:

自动应答:

手动应答:

手动应答好处,建议不批量应答,选择false:

 消息自动重新入队:

原本正常传输,C1突然失去连接,检测到C1断开连接,于是会让消息重新入队,原本的消息交由C2进行处理。

实验思路:写1个生产者,2个消费者,当关闭掉其中1个工作线程,消息不丢失,还被另一个工作线程接收。消费在手动应答时不丢失、放回队列中重新消费。

消息手动应答(生产者):

public class Task2 {
public static final String task_queue_name = "ack_queue";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(task_queue_name,false,false,false,null);Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",task_queue_name,null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:"+message);}
}
}

消息手动应答(消费者):

public class Work03 {public static final String task_queue_name = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1等待接收消息处理时间较短");DeliverCallback deliverCallback = (consumerTag,message)->{SleepUtils.sleep(1);System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));//1.消息的标记tag 2.是否批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck = false;channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{System.out.println(consumerTag + "消费者取消消费接口回调逻辑");}));}
}
public class Work04 {public static final String task_queue_name = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2等待接收消息处理时间较短");DeliverCallback deliverCallback = (consumerTag,message)->{SleepUtils.sleep(30);System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));//1.消息的标记tag 2.是否批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck = false;channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{System.out.println(consumerTag + "消费者取消消费接口回调逻辑");}));}
}

实现效果:在生产者输入AA BB CC DD EE等消息,消费者1接收速度快,会立即打印AA CC EE等消息,消费者2接收速度慢,会在一段时间后接收到BB,此时如果关闭消费者2,则消费者1输出DD,表明消费在手动应答时不丢失、放回队列中重新消费。

持久化:

如果报错,说明原本的队列就是不持久化,此时无法设定持久化,只能先将队列删除然后再重新设定。

控制队列持久化,需要修改生产者声明函数的第2个参数:

消息持久化:

队列持久化和消息持久化不同,队列是MQ里的一个组件,消息是生产者发送的消息。

如果要让消息持久化,在发消息的时候就要通知队列。

更改的是生产者的信道的basicPublish的第3个参数,添加MessageProperties.PERSISTENT_TEXT_PLAIN

不公平分发: 

消费者处理任务的速度不一致,为了不让速度快的消费者长时间处于空闲状态,因此采用不公平分发。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

预取值:

前面N条数据分别交给谁处理,如下图就是前7条数据中,2条给C1,5条给C2

发布确认原理:

1.设置要求队列必须持久化:就算服务器宕机,队列也不至于消失。

2.设置要求队列中的消息也必须持久化。

3. 发布确认,消息保存到磁盘上之后,队列要告知生产者。

Channel channel = connection.createChannel();
channel.confirmSelect();

public static void main(String[] args){
}

单个发布确认:

是一种同步确认发布的方式,发布消息-确认消息-发布消息...必须要确认后才能继续发布,类似于一手交钱一手交货,缺点是发布速度很慢。

1. 创建com/atguigu/rabbitmq/four文件夹下的ConfirmMessage

public static void publishMessageIndividually() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();for(int i=0;i<MESSAGE_COUNT;i++){String message = i +"";channel.basicPublish("",queueName,null,message.getBytes());boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功");}}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时:"+(end-begin)+"ms");}

批量发布确认:

public static void publishMessageBatch() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();int batchSize = 100; //批量确认消息的大小//批量发送消息,批量发布确认for(int i=0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//判断达到100条消息的时候,批量确认一次if(i%batchSize==0) channel.waitForConfirms();}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时:"+(end-begin)+"ms");
}

异步发布确认:

map序列,key是消息序号(deliveryTag是消息的标识,multiple是是否为批量),value是消息内容,将消息每一条都编号,broker会对消息进行应答,分为两种一种是确认应答,另一种是未确认应答。消息生产者不需要等待接收方的消息,只需要负责发送消息即可,消息是否应答最终会以异步的形式回传,也就是说确认的时间可以是稍后的。

addConfirmListener单参数的是只能监听成功的,多参数的是可以监听成功也可以监听失败的,都是接口需要自己来写。

public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();//消息确认成功,回调函数ConfirmCallback ackCallback = (deliveryTag, multiple)->{System.out.println("确认的消息:"+deliveryTag);};//消息确认失败回调函数ConfirmCallback nackCallback = (deliveryTag, multiple)->{System.out.println("未确认的消息:"+deliveryTag);};//准备消息的监听器,监听哪些消息成功了,哪些消息失败了channel.addConfirmListener(ackCallback,nackCallback);//批量发送消息for(int i=0;i<MESSAGE_COUNT;i++){String message="消息"+i;channel.basicPublish("",queueName,null,message.getBytes());//发布确认}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");
}

 处理异步未确认消息:

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。

public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认/*线程安全有序的一个哈希表,适用于高并发的情况下1.轻松地将序号与消息进行关联2.轻松地批量删除条目只要给到序号3.支持高并发(多线程)*/ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();//消息确认成功,回调函数ConfirmCallback ackCallback = (deliveryTag, multiple)->{if(multiple){//2.删除掉已经确认的消息,剩下的就是未确认的消息ConcurrentNavigableMap<Long, String> confirmd =outstandingConfirms.headMap(deliveryTag);}else{outstandingConfirms.remove(deliveryTag);}System.out.println("确认的消息:"+deliveryTag);};//消息确认失败回调函数ConfirmCallback nackCallback = (deliveryTag, multiple)->{//3.打印一下未确认的消息都有哪些String message = outstandingConfirms.get(deliveryTag);System.out.println("未确认的消息是:"+message+"未确认的消息:"+deliveryTag);};//准备消息的监听器,监听哪些消息成功了,哪些消息失败了channel.addConfirmListener(ackCallback,nackCallback);long begin = System.currentTimeMillis();//批量发送消息for(int i=0;i<MESSAGE_COUNT;i++){String message="消息"+i;channel.basicPublish("",queueName,null,message.getBytes());//1.此处记录下所有发送的消息,消息的总和outstandingConfirms.put(channel.getNextPublishSeqNo(),message);}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");}
}

三种方式对比:

交换机:

一个消息可以被消费多次,需要通过交换机,仍旧遵循队列中的消息只能被消费一次。

 生产者生产的消息从不会直接发送到队列。生产者将消息发送到交换机。交换机负责接收来自生产者的消息,将消息推入队列。

Exchanges的类型:直接(direct),主题(topic),标题(headers),扇出(fanout)

消息能路由发送到队列中其实是由routingKey(bindingkey)绑定key指定的。

创建临时队列:

String queueName = channel.queueDedare().getQueue();

绑定:

根据Routing key来确定消息要发给哪个队列,如果Routing Key相同消息就可以发送给多个队列。

先添加一个队列queue1,再添加一个交换机exchange1,最后点击exchange1交换机,进入绑定菜单,然后输入绑定的队列是queue1,然后Routing key随便设置为123。

广播Fanout:

Fanout(扇出)是将接收到的所有消息广播到它知道的所有队列中。如果Routing Key相同则发送给队列以相同消息。

生产者:

public class EmitLog {public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息"+message);}}
}

消费者:

public class ReceiveLogs01 {public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明一个交换机//声明一个队列临时队列,队列的名称是随机的,当消费者断开与队列的连接时候,队列就删除了String queueName = channel.queueDeclare().getQueue();//绑定交换机与队列channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("等待接收消息,把接收到消息打印在屏幕上......");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("ReceiveLogs01控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});}
}

效果:实现广播的功能

Direct路由交换机:

消费者1:

public class ReceiveLogsDirect01 {public static final String EXCHANGE_NAME="direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);channel.queueDeclare("console",false,false,false,null);channel.queueBind("console",EXCHANGE_NAME,"info"); //队列名称,交换机名称,RoutingkeyDeliverCallback deliverCallback =(consumerTag,message)->{System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume("console",true,deliverCallback,consumerTag->{});}
}

消费者2:

public class ReceiveLogsDirect02 {public static final String EXCHANGE_NAME="direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);channel.queueDeclare("disk",false,false,false,null);channel.queueBind("disk",EXCHANGE_NAME,"error"); //队列名称,交换机名称,RoutingkeyDeliverCallback deliverCallback =(consumerTag,message)->{System.out.println("ReceiveLogsDirect02控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume("disk",true,deliverCallback,consumerTag->{});}
}

生产者: 

public class DirectLogs {public static final String EXCHANGE_NAME="direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息"+message);}}
}

效果:

如果【channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));】的第2个参数填info就只会发送消息给消费者1,填写error就只会发送消息给消费者2。

Topics主题交换机:

发布(生产者)订阅(消费者)模式:

消费者1:

public class ReceiveLogsTopic01 {public static final String EXCHANGE_NAME="topic_logs";//交换机名称public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"topic");String queueName="Q1";channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());};channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});}
}

消费者2:

public class ReceiveLogsTopic02 {public static final String EXCHANGE_NAME="topic_logs";//交换机名称public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"topic");String queueName="Q2";channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());};channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});}
}

 生产者1:

public class EmitLogTopic {public static final String EXCHANGE_NAME="topic_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Map<String,String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到");bindingKeyMap.put("lazy.orange.elephant","被队列Q1Q2接收到");bindingKeyMap.put("quick.orange.fox","被队列Q1接收到");bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到");bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {String routingKey = bindingKeyEntry.getKey();String message =  bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:"+message);}}
}

结果:

死信:

无法被消费消息被称为死信

死信的来源:

死信实战架构图:

1个生产者2个消费者。生产者原本走正常交换机,消息走正常队列,被C1消费。当满足消息被拒绝,消息TTL过期,队列达到最大长度三者其一时,消息成为死信,会进入dead_exchange交换机,进入dead_queue死信队列,死信队列的信息由C2消费。

消费者1:

public class Consumer01 {//普通交换机的名称public static final String NORMAL_EXCHANGE ="normal_exchange";//死信交换机的名称public static final String DEAD_EXCHANGE = "dead_exchange";//普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";//死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel1 = RabbitMqUtils.getChannel();//声明死信和普通交换机,类型为directchannel1.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel1.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);Map<String,Object> arguments = new HashMap<>(); //设置参数//正常队列设置死信交换机arguments.put("x—dead—letter—exchange",DEAD_EXCHANGE); //****相当于正常的C1不能消费掉就通过这个交换机进行转发//设置死信RoutingKeyarguments.put("x—dead—letter—routing—key","lisi"); //***//声明普通队列channel1.queueDeclare(NORMAL_QUEUE,false,false,false,arguments); //正常交换机不正常,需要将死信转发给死信队列//声明死信队列channel1.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通的交换机与队列channel1.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//绑定死信的交换机与死信的队列channel1.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("Consumer01接收的消息是:" + new String(message.getBody(),"UTF-8"));};channel1.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag->{});}
}

消费者2:

public class Consumer02 {//死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel1 = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("Consumer02接收的消息是:" + new String(message.getBody(),"UTF-8"));};channel1.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{});}
}

生产者:

public class Producer {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//死信消息,设置TTL时间,time to liveAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info"+i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/147763.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件设计模式系列之二十四——模板方法模式

在软件设计领域&#xff0c;设计模式是一组被反复使用、多次实践验证的经典问题解决方案。其中&#xff0c;模板方法模式是一种行为型设计模式&#xff0c;用于定义一个算法的骨架&#xff0c;将算法中的某些步骤延迟到子类中实现&#xff0c;从而使子类可以重新定义算法的某些…

lv7 嵌入式开发-网络编程开发 03 TCP/IP与五层体系结构

目录 1 TCP/IP协议族体系结构 1.1 OSI与TCP/IP 1.2 TCP/IP 的体系结构 1.3 TCP/IP 体系结构的另一种表示方法 1.4 沙漏计时器形状的 TCP/IP 协议族 2 五层协议的体系结构 2.1 各层的主要功能 2.2 互联网中客户-服务器工作方式 2.3 同时为多个客户进程提供服务 3 练…

react 网页/app复制分享链接到剪切板,分享到国外各大社交平台,通过WhatsApp方式分享以及SMS短信方式分享链接内容

1.需求 最近在做一个国际网站app,需要把app中某个页面的图文链接分享到国外各大社交平台上(facebook,whatapp,telegram,twitter等),以及通过WhatApp聊天方式分享&#xff0c;和SMS短信方式分享链接内容&#xff0c;该怎么做呢&#xff1f;图示如下: 分享到国外各大社交平台&am…

Kubernetes 学习总结(38)—— Kubernetes 与云原生的联系

一、什么是云原生&#xff1f; 伴随着云计算的浪潮&#xff0c;云原生概念也应运而生&#xff0c;而且火得一塌糊涂&#xff0c;大家经常说云原生&#xff0c;却很少有人告诉你到底什么是云原生&#xff0c;云原生可以理解为“云”“原生”&#xff0c;Cloud 可以理解为应用程…

选择排序算法:简单但有效的排序方法

在计算机科学中&#xff0c;排序算法是基础且重要的主题之一。选择排序&#xff08;Selection Sort&#xff09;是其中一个简单但非常有用的排序算法。本文将详细介绍选择排序的原理和步骤&#xff0c;并提供Java语言的实现示例。 选择排序的原理 选择排序的核心思想是不断地从…

Springboot+Vue+Mysql实现模拟汽车保养系统(附源码)

前言 本项目基于springbootvue搭建的汽车保养的系统&#xff0c;页面较为粗糙&#xff0c;前端好的小伙伴可自行优化。 项目环境 -环境框架后端JDK1.8SpringBootmybatisPlus前端NodeJS16.0Vue2.0ElementPlus数据库MySQL8.0- 数据库设计 数据表备注banner轮播图表car用户汽…

C++ 程序员入门之路——旅程的起点与挑战

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

【maven】idea中基于maven-webapp骨架创建的web.xml问题

IDEA中基于maven-webapp骨架创建的web工程&#xff0c;默认的web.xml是这样的。 <!DOCTYPE web-app PUBLIC"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN""http://java.sun.com/dtd/web-app_2_3.dtd" ><web-app><display-name…

本地部署 Qwen-Agent

本地部署 Qwen-Agent 1. Qwen-Agent 概述2. Github 地址3. 创建虚拟环境4. 安装 flash-attention5. 部署 Qwen 模型服务6. 部署 Qwen-Agent7. 浏览器访问 Qwen Agent8. 安装浏览器助手 1. Qwen-Agent 概述 Qwen-Agent 是一个代码框架&#xff0c;用于发掘开源通义千问模型&…

2120 -- 预警系统题解

Description OiersOiers 国的预警系统是一棵树&#xff0c;树中有 &#xfffd;n 个结点&#xff0c;编号 1∼&#xfffd;1∼n&#xff0c;树中每条边的长度均为 11。预警系统中只有一个预警信号发射站&#xff0c;就是树的根结点 11 号结点&#xff0c;其它 &#xfffd;−1…

面试题:Kafka 为什么会丢消息?

文章目录 1、如何知道有消息丢失&#xff1f;2、哪些环节可能丢消息&#xff1f;3、如何确保消息不丢失&#xff1f; 引入 MQ 消息中间件最直接的目的&#xff1a;系统解耦以及流量控制&#xff08;削峰填谷&#xff09; 系统解耦&#xff1a; 上下游系统之间的通信相互依赖&a…

华硕X555YI, Win11下无法调节屏幕亮度

翻出一个旧电脑华硕X555YI&#xff0c;装Win11玩&#xff0c;已经估计到会有一些问题。 果然&#xff0c;装完之后&#xff0c;发现屏幕无法调节亮度。试了网上的一些方法&#xff0c;比如修改注册表等&#xff0c;无效。 估计是显卡比较老&#xff0c;哪里没兼容。然后用驱动…

C++项目:仿mudou库one thread one loop式并发服务器实现

目录 1.实现目标 2.HTTP服务器 3.Reactor模型 3.1分类 4.功能模块划分: 4.1SERVER模块: 4.2HTTP协议模块: 5.简单的秒级定时任务实现 5.1Linux提供给我们的定时器 5.2时间轮思想&#xff1a; 6.正则库的简单使用 7.通用类型any类型的实现 8.日志宏的实现 9.缓冲区…

【初识Linux】:常见指令(1)

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux的基础知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数…

PowerPoint如何设置密码?

PowerPoint&#xff0c;也就是PPT&#xff0c;是很多人工作中经常用的办公软件&#xff0c;而PPT和Word、Excel等一样可以设置密码保护。 PPT可以设置两种密码&#xff0c;一种是“打开密码”&#xff0c;也就是需要密码才能打开PPT&#xff1b;还有一种是设置成有密码的“只读…

22.app.js的全局数据共享

app.js中定义的全局变量适合 不修改且仅在js中使用的变量 目录 1 全局变量 2 修改全局变量 3 app.js中的变量不能直接在wxml中渲染 4 全局方法 1 全局变量 比如我现在想定义一个全局的变量something&#xff0c;直接在APP中写就行了 之后你可以在任何一个页面中&…

互联网Java工程师面试题·MyBatis 篇·第一弹

目录 1、什么是 Mybatis&#xff1f; 2、Mybaits 的优点 3、MyBatis 框架的缺点 4、MyBatis 框架适用场合 5、MyBatis 与 Hibernate 有哪些不同&#xff1f; 6、#{}和${}的区别是什么&#xff1f; 7、当实体类中的属性名和表中的字段名不一样 &#xff0c;怎么办 &#x…

HTML5 跨屏前端框架 Amaze UI

Amaze UI采用国际最前沿的“组件式开发”以及“移动优先”的设计理念&#xff0c;基于其丰富的组件&#xff0c;开发者可通过简单拼装即可快速构建出HTML5网页应用&#xff0c;上线仅半年&#xff0c;Amaze UI就成为了国内最流行的前端框架&#xff0c;目前在Github上收获Star数…

【C语言】【动态内存管理】malloc,free,calloc,realloc

1.malloc函数 void* malloc(size_t size)功能&#xff1a;向内存申请字节为 size大小的空间 使用时要包含头文件&#xff1a;<stdlib.h> 开辟成功&#xff1a;返回开辟好的空间初始地址的指针 开辟失败&#xff1a;返回空指针 NULL 使用举例&#xff1a; (malloc和free…

Java编程技巧:swagger2、knif4j集成SpringBoot或者SpringCloud项目

目录 1、springbootswagger2knif4j2、springbootswagger3knif4j3、springcloudswagger2knif4j 1、springbootswagger2knif4j 2、springbootswagger3knif4j 3、springcloudswagger2knif4j 注意点&#xff1a; Api注解&#xff1a;Controller类上的Api注解需要添加tags属性&a…