RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 页面分析

在这里插入图片描述

前言

RabbitMQ作为一款常用的消息中间件,在微服务项目中得到大量应用,其本身是微服务中的重点和难点,有不少概念我自己的也是一知半解,本系列博客尝试结合实际应用场景阐述RabbitMQ的应用,分析其为什么使用,并给出怎么用的案例。

本篇博客结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码。(文末有惊喜~)

在这里插入图片描述
其他相关的rabbitmq博客文章列表如下:

RabbitMQ基础(1)——生产者消费者模型 & RabbitMQ简介 & Docker版本的安装配置 & RabbitMQ的helloworld + 分模块构建 & 解决大量注册案例

RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

RabbitMQ的Docker版本安装 + 延迟插件安装 & QQ邮箱和阿里云短信验证码的主题模式发送

目录

  • 前言
  • 引出
  • MQ场景:
    • 1异步处理
    • 2解耦
    • 3削峰填谷
    • 常见的MQ
  • 拆解控制台页面
    • 总览页面
    • 连接connection
    • 队列页面
  • 简单模式
    • 工具类:建立连接
    • 生产者:生产消息
    • 消费者:消费消息
    • 进行测试
      • Ack:项目中必须false
  • 工作模式
    • 一个生产者,两个消费者
    • QOS: 限流
      • Tips:队列参数怎么变?
    • 队列相关参数
      • x-max-length
      • x-overflow
      • x-max-length-bytes
  • 发布者订阅模式
    • 生产者:给fanout交换机发消息
    • 消费者1:队列q32
    • 消费者2:队列q321
  • 路由模式
    • 生产者:发给交换机,告诉路由键
    • 消费者1:根据路由键,接收3种消息
    • 消费者2:根据路由键,接收1种消息
  • 主题模式(Topic)
    • 生产者:主题交换机,带路由键
    • 消费者1:通配符的路由键
    • 消费者2:统配符
  • 总结

引出


1.MQ,消息队列的应用场景,几种MQ简单对比;
2.分析RabbitMQ的浏览器控制台页面;
3.结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码;

在这里插入图片描述

MQ场景:

Message Queue 消息队列。

有比较大的负载,而且这些负载不用立刻马上给程序返回结构,可以有一些等待时间。可以用消息队列。

1异步处理

过去的项目,大部分都是同步解决问题。

在这里插入图片描述

UserinfoService{register(){//典型的同步//插入数据库//发短信//发邮件}
}
//如果你的操作基本没有任何延时操作,或者瓶颈,没有压力。那么你没有必要用MQ
UserinfoService{register(){//发起的是一个异步请求都不等待对方给我返回的结果//异步插入数据库//异步发短信//异步发邮件}
}

2解耦

在这里插入图片描述

库存有没有可能挂了。或者访问量巨大。因为库存慢,导致订单也慢。

在这里插入图片描述

解耦。

在这里插入图片描述

3削峰填谷

双12的时候,叫好早上的8点。秒杀的时候。

有些瞬间服务器压力是超大的,过了这个瞬间,几乎没有消耗量。

等服务器能够正常的时候,我慢慢执行就行了。

在这里插入图片描述

常见的MQ

MQ的前身,就是一个发布者订阅模式。

Kafka RabbitMQ(1W/s) RocketMQ ActiveMQ

Kafka : 10w/S 主要用于日志

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

在这里插入图片描述

拆解控制台页面

总览页面

包括刷新时间间隔,rabbitmq节点,连接端口的信息等

在这里插入图片描述

配置可以导出和导入

在这里插入图片描述

连接connection

在这里插入图片描述

队列页面

包括队列的名字,状态,准备好的消息数量,未确认的消息数量,总计消息数量

在这里插入图片描述

简单模式

在这里插入图片描述

使用的依赖

	<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency>

工具类:建立连接

在这里插入图片描述

package com.tianju.config;import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 建立连接的工具类*/
public class ConnectionFactory {public static Connection createConnection() throws IOException, TimeoutException {com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();connectionFactory.setHost("192.168.111.130"); // http://192.168.111.130/connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("123");connectionFactory.setVirtualHost("/demo");// amqp://admin@192.168.111.130:5672/return connectionFactory.newConnection();}
}

在这里插入图片描述

生产者:生产消息

在这里插入图片描述

package com.tianju.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.tianju.config.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String QUEUE_ORDER = "queue_order";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列channel.queueDeclare(QUEUE_ORDER,false,false,false,null);// 发送消息,指定给哪个队列上发消息for (int i = 0; i < 100; i++) {String msg = "hello rabbitmq--"+i;channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());System.out.println("消息发布成功");}connection.close();}
}

消费者:消费消息

在这里插入图片描述

package com.tianju.simple;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static String QUEUE_ORDER = "queue_order";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列channel.queueDeclare(QUEUE_ORDER,false,false,false,null);// 队列必须声明,如果不存在,则自动创建// 声明一个消费者DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 回调函数,用来接收消息* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param envelope packaging data for the message* @param properties content header data for the message* @param body the message body (opaque, client-specific byte array),字节数组* @throws IOException*/public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body); // 收到的信息System.out.println("消费者接收到:"+msg);try {Thread.sleep(3000);// 模拟一个耗时操作} catch (InterruptedException e) {throw new RuntimeException(e);}}};// 表明自己是消费者,接收消息/*** autoAck:自动确认设置成 true*/channel.basicConsume(QUEUE_ORDER, true, defaultConsumer);}
}

进行测试

生产者发送100条消息
在这里插入图片描述

消费者进行消费,假设突然之间服务宕机了,此时消费了4条消息,理论上还应该有96条消息

在这里插入图片描述

打开控制台页面查看,消息全部消失了,出现了数据丢失的情况

在这里插入图片描述

全部用默认值的情况下,如果发生异常,则消息全部丢失。

消费者一次性拿到了所有的消息。

Ack:项目中必须false

生产者 ==投递消息=》队列

消费者=接受==》队列

ack: false 必须要手工确认。

消费者接到这个消息的时候, 这个消息进入 unack状态。

1:手工确认。消息删除。完成

2: 没手工确认,断开连接或者超时。

3:这个消息重新进入ready状态。等待其他消费者进行消费。

在这里插入图片描述

先设置自动ack为false,表示需要手工确认,然后在消费消息的方法中,进行消息的确认。

在这里插入图片描述

package com.tianju.simple;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static String QUEUE_ORDER = "queue_order";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列channel.queueDeclare(QUEUE_ORDER,false,false,false,null);// 队列必须声明,如果不存在,则自动创建// 声明一个消费者DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 回调函数,用来接收消息* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param envelope packaging data for the message* @param properties content header data for the message* @param body the message body (opaque, client-specific byte array),字节数组* @throws IOException*/public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {// no work to doString msg = new String(body); // 收到的信息System.out.println("消费者接收到:"+msg);try {Thread.sleep(3000);// 模拟一个耗时操作} catch (InterruptedException e) {throw new RuntimeException(e);}// 消费者代码可能有失败,消息拿到之后,可能还没有处理,就宕机了// 消息确认的代码一定在最后一行// long deliveryTag 消息的下标;// boolean multiple 是否批量确认;channel.basicAck(envelope.getDeliveryTag(),true); // 确认消息,批量确认}};// 表明自己是消费者,接收消息/*** autoAck:自动确认设置成 true* 是否自动确认,* false:不进行自动确认;true:自动确认* 消费过程中可能产生异常* 如果产生异常,则必须进行消费补偿*/channel.basicConsume(QUEUE_ORDER, false, defaultConsumer); // 接收消息}
}

再次模拟宕机的情况,一开始消息全部被放到Unack中,当宕机时,又把消息吐了出来,至少消息没有出现丢失的情况。

在这里插入图片描述

工作模式

在这里插入图片描述

一个生产者,两个消费者

在这里插入图片描述

两个消费者消费的是同一个队列中的消息。

两个消费者上来后,默认是按照平均分配。结果: 有人瞬间干完。有人很久都没干完。

QOS: 限流

可以限制你一次拉取几个。

这样两个消费者,也能够节省服务器CPU了。

        // 创建频道Channel channel = connection.createChannel();channel.basicQos(1); /** 1次只能拿1个 **/

在这里插入图片描述

当消费的速度太慢,觉得不够,加入其他的消费者。

核心,只有一个消息队列。

消息是轮询的方式,发送给不同的消费者。

在这里插入图片描述

Tips:队列参数怎么变?

队列的参数发生变化后,要删除再添加。

在这里插入图片描述

        // 创建频道Channel channel = connection.createChannel();channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在// 创建队列channel.queueDeclare(QUEUE_ORDER,durable,false,false,null);

在这里插入图片描述

队列相关参数

x-max-length

在这里插入图片描述

package com.tianju.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String QUEUE_ORDER = "queue_order";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在Map map = new HashMap();map.put("x-max-length", 10); // 设置最大的长度,只接受10个// 创建队列channel.queueDeclare(QUEUE_ORDER,durable,false,false,map);// 发送消息,指定给哪个队列上发消息for (int i = 0; i < 1000; i++) {String msg = "hello rabbitmq--"+i;channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());System.out.println("消息发布成功");}connection.close();}
}

How many (ready) messages a queue can contain before it starts to drop them from its head.

在这里插入图片描述

x-overflow

Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values are drop-head, reject-publish or reject-publish-dlx. The quorum queue type only supports drop-head and reject-publish.

在这里插入图片描述
此时就是前10个了

在这里插入图片描述

x-max-length-bytes

在这里插入图片描述

package com.tianju.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String QUEUE_ORDER = "queue_order";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在Map map = new HashMap();map.put("x-max-length", 10); // 设置最大的长度,只接受10个map.put("x-overflow", "reject-publish"); // 拒绝发布,变成前10gemap.put("x-max-length-bytes", 4); // 消息的最大字节长度// 创建队列channel.queueDeclare(QUEUE_ORDER,durable,false,false,map);// 发送消息,指定给哪个队列上发消息for (int i = 0; i < 20; i++) {String msg = "53"+i;channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());msg = "hello rabbitmq--"+i;channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());System.out.println("消息发布成功");}connection.close();}
}

在这里插入图片描述

参数

在这里插入图片描述

 		//创建队列boolean durable = true;//当前队列中的消息进行持久化操作  重启之后,消息还在Map map = new HashMap();map.put("x-max-length",10);//设置队列的最大长度map.put("x-overflow","reject-publish");//设置队列的最大长度  drop headmap.put("x-max-length-bytes",4);//设置消息的最大字节长度
//        map.put("x-expires",10000);//超时后,直接删除队列channel.queueDeclare(QUQUENAME,durable,false,false,map);

发布者订阅模式

在这里插入图片描述

工作模式中,只有一个队列。两个消费者轮询从同一个队列中取数据。

发布者订阅模式: 交换机后面会绑定多个消息队列。每个消息队列都有完整的信息。每个队列后的消费者都会有完整的消息。

交换机:就是一个无意识的广播。行为就是扇出.

在这里插入图片描述

场景:

下订单:

1:订单的数据入数据库
2:发送订单的短信
3: 物流

要每个队列都有完整的信息。

在这里插入图片描述

生产者:给fanout交换机发消息

package com.tianju.publish;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String QUEUE_ORDER = "queue_order";private static  String EXCHANGE = "pet_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建交换机channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个for (int i = 0; i < 100; i++) {channel.basicPublish(EXCHANGE, "", MessageProperties.TEXT_PLAIN, ("hello fanout"+i).getBytes());}}
}

消费者1:队列q32

package com.tianju.publish;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static  String EXCHANGE = "pet_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建交换机channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个channel.queueDeclare("q32", false, false, false, null);channel.queueBind("q32", EXCHANGE, ""); // 路由键fanout模式,必须为空,即使写了是无效的channel.basicConsume("q32", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("消费者1:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}

消费者2:队列q321

package com.tianju.publish;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {private static  String EXCHANGE = "pet_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建交换机channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个channel.queueDeclare("q321", false, false, false, null);channel.queueBind("q321", EXCHANGE, ""); // 路由键fanout模式,必须为空,即使写了是无效的channel.basicConsume("q321", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("消费者2:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}

路由模式

允许用不同的路由键来接受不同的信息。

在这里插入图片描述

队列会接到完整的全部的信息。

日志系统:

日志级别:

OFF INFO DEBUG ERROR FATAL ALL

如果是致命错误: 立刻给运维发短信。

全部信息: 放到专门的日志系统
Error: 进行发邮件

在这里插入图片描述

生产者:发给交换机,告诉路由键

package com.tianju.routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建交换机channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机String msg  = "this is fatal message";channel.basicPublish(EXCHANGE,"fatal", MessageProperties.TEXT_PLAIN,msg.getBytes());msg  = "this is error message";channel.basicPublish(EXCHANGE,"error", MessageProperties.TEXT_PLAIN,msg.getBytes());msg  = "this is debug message";channel.basicPublish(EXCHANGE,"debug", MessageProperties.TEXT_PLAIN,msg.getBytes());}
}

消费者1:根据路由键,接收3种消息

package com.tianju.routing;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.Calendar;
import java.util.concurrent.TimeoutException;public class Consumer {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列// 创建交换机channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机// 队列必须声明,如果不存在,则自动创建channel.queueDeclare("q58", false, false, false, null);channel.queueBind("q58", EXCHANGE, "fatal");channel.queueBind("q58", EXCHANGE, "error");channel.queueBind("q58", EXCHANGE, "debug");channel.basicConsume("q58", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("消费者1:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}

消费者2:根据路由键,接收1种消息

package com.tianju.routing;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列// 创建交换机channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机// 队列必须声明,如果不存在,则自动创建channel.queueDeclare("q581", false, false, false, null);channel.queueBind("q581", EXCHANGE, "fatal");channel.basicConsume("q581", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("消费者1:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}

主题模式(Topic)

在这里插入图片描述

可以采取通配符模式:

* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.

#`:匹配0个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert

场景:

京东: 无锡,河南下订单。退货

==========总部 收到这个订单 .

无锡分销商也有这个消息 *.wuxi

在这里插入图片描述

生产者:主题交换机,带路由键

package com.tianju.topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建交换机channel.exchangeDeclare(EXCHANGE, "topic"); // 主题模式交换机String msg  = "this is wuxi order";channel.basicPublish(EXCHANGE,"order.wuxi", MessageProperties.TEXT_PLAIN,msg.getBytes());msg  = "this is henan order";channel.basicPublish(EXCHANGE,"order.henan", MessageProperties.TEXT_PLAIN,msg.getBytes());msg  = "this is wuxi back";channel.basicPublish(EXCHANGE,"back.wuxi", MessageProperties.TEXT_PLAIN,msg.getBytes());msg  = "this is henan back";channel.basicPublish(EXCHANGE,"back.henan", MessageProperties.TEXT_PLAIN,msg.getBytes());}
}

消费者1:通配符的路由键

package com.tianju.topic;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列// 创建交换机channel.exchangeDeclare(EXCHANGE, "topic"); // 主题交换机// 队列必须声明,如果不存在,则自动创建channel.queueDeclare("q58", false, false, false, null);channel.queueBind("q58", EXCHANGE, "*.wuxi"); // 意味着无锡的订单和退货都能收到channel.basicConsume("q58", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("无锡仓库:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}

消费者2:统配符

package com.tianju.topic;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列// 创建交换机channel.exchangeDeclare(EXCHANGE, "topic"); // 主题交换机// 队列必须声明,如果不存在,则自动创建channel.queueDeclare("q581", false, false, false, null);channel.queueBind("q581", EXCHANGE, "*.*"); // 意味着所有的订单和退货都能收到channel.basicConsume("q581", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("总部仓库:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}

总结

1.MQ,消息队列的应用场景,几种MQ简单对比;
2.分析RabbitMQ的浏览器控制台页面;
3.结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码;

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/159809.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

手机市场或迎复苏,芯片测试与封测供应链积极应对 | 百能云芯

低迷不振的手机供应链&#xff0c;终于迎来曙光&#xff1f;半导体封测供应链传出从10月开始&#xff0c;手机系统大厂终于开始有明显的库存回补动作&#xff0c;锁定如联发科等一线手机SoC从业者的「旧款芯片」备货。 测试厂如京元电、测试界面的雍智等接获备战指示&#xff0…

深圳寄包裹到德国

深圳&#xff0c;作为全球最发达的城市之一&#xff0c;以其高效的物流服务在全球范围内享有盛名。如果你正在寻找一种方式将包裹从深圳寄送到德国&#xff0c;那么本文将为你提供详细的步骤和建议。 第一步&#xff1a;了解国际邮寄的基本信息 首先&#xff0c;你需要了解包裹…

C++基本语法【恩培学习笔记(一)】

文章目录 1、C程序结构1.1 C程序的基本组成部分1.2 预处理指令1.3 注释1.4 main() 主函数1.5 命名空间 namespace 2、 C的变量和常量2.1 变量2.2 变量的声明2.3 变量的类型 3、C 数组和容器3.1 数组&#xff08;array&#xff09;3.2 容器&#xff08;vector&#xff09; 4、C …

多模态大模型升级:LLaVA→LLaVA-1.5,MiniGPT4→MiniGPT5

Overview LLaVA-1.5总览摘要1.引言2.背景3.LLaVA的改进4.讨论附录 LLaVA-1.5 总览 题目: Improved Baselines with Visual Instruction Tuning 机构&#xff1a;威斯康星大学麦迪逊分校&#xff0c;微软 论文: https://arxiv.org/pdf/2310.03744.pdf 代码: https://llava-vl.…

10.2手动推导linux中file, cdev, inode之间的关系

是时候可以手动推导一下linux里面基类父类和子类的关系了 代码放最后把 简单说明版 详细流程 第一步注册驱动 cdev结构体能看做是一个基类,那么链表里面都是字符设备驱动的cdev连载一起,啥串口,lcd的,通过cdev->list_head连接 那cdev结构体里有主次设备号 第一步 使用r…

探索未来:硬件架构之路

文章目录 &#x1f31f; 硬件架构&#x1f34a; 基本概念&#x1f34a; 设计原则&#x1f34a; 应用场景&#x1f34a; 结论 &#x1f4d5;我是廖志伟&#xff0c;一名Java开发工程师、Java领域优质创作者、CSDN博客专家、51CTO专家博主、阿里云专家博主、清华大学出版社签约作…

矿区井下智慧用电安全监测解决方案

一、背景 矿区井下作业具有复杂的环境和较高的危险性&#xff0c;对于用电安全的要求尤为严格。传统的管理模式和监测方法往往无法实时、准确地掌握井下用电情况&#xff0c;对安全隐患的排查与预防存在一定局限性。因此&#xff0c;引入智慧用电安全监测解决方案&#xff…

【LeetCode刷题(数组and排序)】:存在重复元素

给你一个整数数组 nums 。如果任一值在数组中出现 至少两次 &#xff0c;返回 true &#xff1b;如果数组中每个元素互不相同&#xff0c;返回 false 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3,1] 输出&#xff1a;true 示例 2&#xff1a; 输入&#xff1a;nums [1,2…

【ARM Coresight 系列文章 9.1 -- ITM 仪器化跟踪宏单元详细介绍】

文章目录 1.1 ITM 介绍1.1.1 ITM 功能介绍1.1.2 Cortex-M ITM 的地址范围1.2 ITM 使用1.2.1 ITM 寄存器介绍1.2.2 Cortex-M7 ITM 代码示例1.2.3 Cortex-M33 ITM 代码示例1.1 ITM 介绍 在debug 调试阶段通常都是使用 printf(printk) 来进行进行 log 输出,然后定位问题。那么如…

TikTok国际版 使用特网科技Bluestacks模拟器安装方法

特网科技Bluestacks模拟器主机 桌面自带Bluestacks模拟器 TikTok国际版Bluestacks模拟器搜索tiktot 登录google应用商店-安装TikTok 安装过程可能需要3-5分钟不等-配置过低可能会导致安装失败&#xff0c;建议升级更高内存。 安装完成-打开 安装成功APP-我的游戏查看 打开国际版…

普通螺纹基本牙型尺寸及拧紧力矩.exe

一、概要 本软件功能主要是通过输入螺纹原始三角形高度P,螺栓规格(公称直径)d,材料的屈服应力σs,计算出公称应力截面积As、外螺纹小径d1、外螺纹小径d2、拧紧力矩T等参数。 开发本软件的原因主要有以下几点: 提高设计效率:通过这款软件,工程师可以快速计算螺纹的基本牙…

【Java学习之道】网络编程的基本概念

引言 这一章我们将一同进入网络编程的世界。在开始学习网络编程之前&#xff0c;我们需要先了解一些基本概念。那么&#xff0c;我们就从“什么是网络编程”这个问题开始吧。 一、网络编程的基本概念 1.1 什么是网络编程 网络编程&#xff0c;顾名思义&#xff0c;就是利用…

05_51单片机led流水线的实现

1:step创建一个新的项目并将程序烧录进入51单片机 以下是51单片机流水线代码的具体实现 #include <REGX52.H>void Delay500ms() //11.0592MHz {unsigned char i, j, k;i 4;j 129;k 119;do{do{while (--k);} while (--j);} while (--i); }void main(){while(1){P1 0…

智慧水利:山海鲸数字孪生的革新之路

一、概念 什么是港口&#xff1f; "港口"通常指的是一个水域或岸边的设施&#xff0c;用于装载、卸载、储存和处理货物、以及提供与海上、河流或湖泊交通相关的服务。港口可以包括各种类型的码头、码头设备、仓库、货物运输设施、以及各种管理和物流设施。 什么是数…

LinkedHashMap与LRU缓存

序、慢慢来才是最快的方法。 背景 LinkedHashMap 是继承于 HashMap 实现的哈希链表&#xff0c;它同时具备双向链表和散列表的特点。事实上&#xff0c;LinkedHashMap 继承了 HashMap 的主要功能&#xff0c;并通过 HashMap 预留的 Hook 点维护双向链表的逻辑。 1.缓存淘汰算法…

车辆车型识别系统python+TensorFlow+Django网页界面+算法模型

一、介绍 车辆车型识别系统。本系统使用Python作为主要开发编程语言&#xff0c;通过TensorFlow搭建算法模型网络对收集到的多种车辆车型图片数据集进行训练&#xff0c;最后得到一个识别精度较高的模型文件。并基于该模型搭建Django框架的WEB网页端可视化操作界面。实现用户上…

【Unity基础】6.动画状态机

【Unity基础】6.动画状态机 大家好&#xff0c;我是Lampard~~ 欢迎来到Unity基础系列博客&#xff0c;所学知识来自B站阿发老师~感谢 &#xff08;一&#xff09;Animator Controller组件 &#xff08;1&#xff09;创建组件 Animator Controller组件是unity用于控制管…

机器学习-无监督算法之降维

降维&#xff1a;将训练数据中的样本从高维空间转换到低维空间&#xff0c;降维是对原始数据线性变换实现的。为什么要降维&#xff1f;高维计算难&#xff0c;泛化能力差&#xff0c;防止维数灾难优点&#xff1a;减少冗余特征&#xff0c;方便数据可视化&#xff0c;减少内存…

主动配电网故障恢复的重构与孤岛划分matlab程序

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 参考文档&#xff1a; A New Model for Resilient Distribution Systems by Microgrids Formation&#xff1b; 主动配电网故障恢复的重构与孤岛划分统一模型&#xff1b; 同时考虑孤岛与重构的配电网故障…

嵌入式开发学习之STM32F407串口(USART)收发数据(三)

嵌入式开发学习之STM32F407串口&#xff08;USART&#xff09;收发数据&#xff08;三&#xff09; 开发涉及工具一、选定所使用的串口二、配置串口1.配置串口的I/O2.配置串口参数属性3.配置串口中断4.串口中断在哪里处理5.串口如何发送字符串 三、封装串口配置库文件1.创建头文…