RabbitMQ及各种模式

目录

一、MQ的基本概念

1.1 MQ概述

1.2 MQ的优势和劣势

1.3 MQ的优势

1.应用解耦

2.异步提速 

3.削峰填谷

1.4 MQ的劣势

小结

 1.5 常见的 MQ 产品

1.6 RabbitMQ 简介

1.7 JMS

小结

二、RabbitMQ管控台 

 三、Hello World简单模式

​编辑

1、生产者

​编辑 2、消费者

​编辑 四、Work queues 工作队列模式

1、生产者

2、消费者

启动两个消费者 

 启动生产者

 小结

五、Pub/Sub订阅模式

 1、生产者

2、消费者1

消费者2 

小结

六、Routing 路由模式

1、生产者 

2、消费者1,2

七、Topics 通配符模式 

1、生产者

2、消费者

小结

八、工作模式总结  

1. 简单模式 HelloWorld

2. 工作队列模式 Work Queue

3. 发布订阅模式 Publish/subscribe

4. 路由模式 Routing

5. 通配符模式 Topic

一、MQ的基本概念

1.1 MQ概述

MQ全称 M essage Q ueue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。MQ,消息队列,存储消息的中间件。发送方称为生产者,接收方称为消费者
分布式系统通信两种方式:
  • 直接远程调用
  • 借助第三方完成间接通信

1.2 MQ的优势和劣势

  • 优势:                                                            劣势:
    • 应用解耦                                               系统可用性降低
    • 异步提速                                               系统复杂度提高
    • 削峰填谷                                               ⚫ 一致性问题

1.3 MQ的优势

1.应用解耦

用户点击下单,进入订单系统,订单系统通过远程调用去调用库存系统、支付系统、物流系统。这样这四个系统就会耦合在一起,可能出现第一个问题:当库存系统出现异常,订单系统链路走不通也会出问题,用户可能得到下单失败这个反馈,整个系统的容错率低;第二个问题:在下订单的过程中要增加一个X系统,就要修改订单系统然后再访问X系统,如果又要加Y系统不要X系统了,那么又要修改订单系统,整个系统的可维护性比较低。

系统的耦合性越高,容错性就越低,可维护性就越低。

用户点击下单,进入订单系统。订单系统只需要发送一条消息到MQ就可以了,可以个用户发送下单成功。库存系统、支付系统、物流系统只需要从MQ里取出消息消费就可以了。对于问题一:库存系统出现异常后,订单系统没有异常,因为订单系统和其他三个系统是隔离的,没有任何影响。库存系统异常是暂时的,修复之后再去MQ里取出消息消费,最终是正常的。系统容错性提高;对于问题二:增加X系统,与订单系统无关,不需要修改订单系统,直接增加新系统然后再去MQ里取出消息消费就可以了

使用 MQ 使得应用间解耦,提升容错性和可维护性。

2.异步提速 

远程调用是个同步的方式,订单系统先调用库存返回后再调用支付返回后再调用物流,需要同步的去完成订单的整个链路的调用,没有问题后就会返回给用户下单成功。

一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

用户下订单,订单系统保存到自己的数据库花费20ms,向MQ发送消息花费5ms,这时订单系统可以直接告诉用户下单成功了。后边的操作不管成功与否,取出消息消费即可,这就是异步的方式。

用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

3.削峰填谷

A系统每秒最大处理1000请求,现在10点秒杀活动,请求瞬间增多,每秒5000个请求,A系统承载不了这么大的并发,宕机系统不可用,用户的体验就太差了。

可以使用MQ削峰

5000个请求对接MQ,5000请求MQ完全可以承载,小意思,A系统再慢慢的从MQ每秒拉去1000个请求完成消费,A系统的稳定性就提高了很多

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
使用MQ后,可以提高系统稳定性。
小结:
  • 应用解耦:提高系统容错性和可维护性
  • 异步提速:提升用户体验和系统吞吐量
  • 削峰填谷:提高系统稳定性

1.4 MQ的劣势

系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
系统复杂度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

小结

既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?
  • ① 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
  • ② 容许短暂的不一致性。
  • ③ 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

 1.5 常见的 MQ 产品

目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等, 也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。

1.6 RabbitMQ 简介

AMQP Advanced Message Queuing Protocol (高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。 类比HTTP

生产者发布消息到exchange,exchange通过不同的规则,把消息路由到不同的队列去存储,consumer监听从队列中拿走对应的消息消费

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ 基础架构如下图:

RabbitMQ 中的相关概念:
  • Broker接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connectionpublisher/consumer 和 broker 之间的 TCP 连接
  • Channel如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchangemessage 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue消息最终被送到这里等待 consumer 取走
  • Bindingexchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ 提供了 6 种工作模式 简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ)。

RabbitMQ Tutorials — RabbitMQ

1.7 JMS

  • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
  • JMS 是 JavaEE 规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有

小结

  • 1. RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
  • 2. RabbitMQ提供了6种工作模式,这边讲解5种。这是重点。
  • 3. AMQP 是协议,类比HTTP。
  • 4. JMS 是 API 规范接口,类比 JDBC。

二、RabbitMQ管控台 

 结果:


 三、Hello World简单模式

在上图的模型中,有以下概念:
  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接收者,会一直等待消息到来
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

需求:使用简单模式完成消息传递

步骤:     ① 创建工程(生成者、消费者)
                ② 分别添加依赖
                ③ 编写生产者发送消息
                ④ 编写消费者接收消息

添加依赖:rabbitmq客户端,编译版本插件

 <dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>

1、生产者

1.创建连接工厂---2. 设置参数---3. 创建连接 Connection---4. 创建Channel---5. 创建队列Queue---6. 发送消息---7.释放资源

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*** 发送消息* */
public class Producer_HelloWorld {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("43.143.246.208");//ip  默认值 localhostfactory.setPort(5672); //端口  默认值 5672factory.setVirtualHost("/itcast");//虚拟机 默认值 /factory.setUsername("root");//用户名 默认 guestfactory.setPassword("root");//密码 默认值 guestfactory.setConnectionTimeout(5000);//针对连接超时,延长我们的连接时间//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();//5. 创建队列Queue/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)参数:1. queue:队列名称2. durable:是否持久化,当mq重启之后,还在3. exclusive:* 是否独占。只能有一个消费者监听这队列* 当Connection关闭时,是否删除队列* 一般设为false4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉5. arguments:参数。配置一些怎么删的参数*///如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建channel.queueDeclare("hello_world",true,false,false,null);/*basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)参数:1. exchange:交换机名称。简单模式下交换机会使用默认的空串 ""2. routingKey:路由名称,使用默认的交换机路由名称要和队列名称一致3. props:配置信息4. body:字节数组,真实发送的消息数据*/String body = "hello rabbitmq~~~";//6. 发送消息channel.basicPublish("","hello_world",null,body.getBytes());//7.释放资源// channel.close();// connection.close();}}

连接不关闭,不释放资源

 2、消费者

1.创建连接工厂---2. 设置参数---3. 创建连接 Connection---4. 创建Channel---5. 创建队列Queue---6. 接收消息

import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_HelloWorld {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("43.143.246.208");//ip  默认值 localhostfactory.setPort(5672); //端口  默认值 5672factory.setVirtualHost("/itcast");//虚拟机 默认值/factory.setUsername("root");//用户名 默认 guestfactory.setPassword("root");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();//5. 创建队列Queue/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)参数:1. queue:队列名称2. durable:是否持久化,当mq重启之后,还在3. exclusive:* 是否独占。只能有一个消费者监听这队列* 当Connection关闭时,是否删除队列*4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉5. arguments:参数。*///如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建channel.queueDeclare("hello_world",true,false,false,null);/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck:是否自动确认3. callback:回调对象*/// 接收消息Consumer consumer = new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1. consumerTag:标识2. envelope:获取一些信息,交换机,路由key...3. properties:配置信息4. body:数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);System.out.println("body:"+new String(body));}};channel.basicConsume("hello_world",true,consumer);//关闭资源?不要}
}

 四、Work queues 工作队列模式

Work Queues: 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景 :对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。比如队列里有1000条消息,C1只能处理500条消息,增加队友C2一起处理,理论上可以处理1000条消息了
Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多
个消费者同时对消费消息的测试。

1、生产者

队列修改为work_queues
//修改为循环输出10条语句
for (int i = 1; i <= 10; i++) {String body = i+"hello rabbitmq~~~";//6. 发送消息channel.basicPublish("","work_queues",null,body.getBytes());
}

2、消费者

增加为两个消费者
Consumer_WorkQueues1
Consumer_WorkQueues2
队列修改为work_queues

启动两个消费者 

 启动生产者

消费者1 消费13579  ,消费者2 消费246810  ,两个是循环交替消费的

 小结

1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
2. Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,
只需要有一个节点成功发送即可。

五、Pub/Sub订阅模式

 

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接收者,会一直等待消息到来
  • Queue:消息队列,接收消息、缓存消息
  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
                ➢ Fanout:广播,将消息交给所有绑定到交换机的队列
                ➢ Direct:定向,把消息交给符合指定routing key 的队列
                ➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合
路由规则的队列,那么消息会丢失!

 1、生产者

1.创建连接工厂---2. 设置参数---3. 创建连接 Connection---4. 创建Channel---5. 创建交换机---

6. 创建队列Queue---7. 绑定队列和交换机---8. 发送消息---9. 释放资源

public class Producer_PubSub {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("43.143.246.208");//ip  默认值 localhostfactory.setPort(5672); //端口  默认值 5672factory.setVirtualHost("/itcast");//虚拟机 默认值 /factory.setUsername("root");//用户名 默认 guestfactory.setPassword("root");//密码 默认值 guestfactory.setConnectionTimeout(5000);//针对连接超时,延长我们的连接时间//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();/*exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)参数:1. exchange:交换机名称2. type:交换机类型DIRECT("direct"),:定向FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。TOPIC("topic"),通配符的方式HEADERS("headers");参数匹配,用的比较少3. durable:是否持久化4. autoDelete:自动删除5. internal:内部使用。 一般false6. arguments:参数*/String exchangeName = "test_fanout";//5. 创建交换机channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);//6. 创建队列String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//7. 绑定队列和交换机/*queueBind(String queue, String exchange, String routingKey)参数:1. queue:队列名称2. exchange:交换机名称3. routingKey:路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为""*/channel.queueBind(queue1Name,exchangeName,"");channel.queueBind(queue2Name,exchangeName,"");String body = "日志信息:张三调用了findAll方法...日志级别:info...";//8. 发送消息channel.basicPublish(exchangeName,"",null,body.getBytes());//9. 释放资源channel.close();connection.close();}}

2、消费者1

public class Consumer_PubSub1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("43.143.246.208");//ip  默认值 localhostfactory.setPort(5672); //端口  默认值 5672factory.setVirtualHost("/itcast");//虚拟机 默认值/factory.setUsername("root");//用户名 默认 guestfactory.setPassword("root");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck:是否自动确认3. callback:回调对象*/// 接收消息Consumer consumer = new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1. consumerTag:标识2. envelope:获取一些信息,交换机,路由key...3. properties:配置信息4. body:数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));//字节数组转成字符串System.out.println("将日志信息打印到控制台.....");}};channel.basicConsume(queue1Name,true,consumer);//消费者关闭资源?不要!}
}

消费者2 

public class Consumer_PubSub2 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("43.143.246.208");//ip  默认值 localhostfactory.setPort(5672); //端口  默认值 5672factory.setVirtualHost("/itcast");//虚拟机 默认值/factory.setUsername("root");//用户名 默认 guestfactory.setPassword("root");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue:队列名称2. autoAck:是否自动确认3. callback:回调对象*/// 接收消息Consumer consumer = new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1. consumerTag:标识2. envelope:获取一些信息,交换机,路由key...3. properties:配置信息4. body:数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));//字节数组转成字符串System.out.println("将日志信息保存到数据库.....");}};channel.basicConsume(queue2Name,true,consumer);//消费者关闭资源?不要!}
}

小结

1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到,可以进行不同的处理
2. 发布订阅模式与工作队列模式的区别:
  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机

六、Routing 路由模式

 

模式说明:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
图解:
  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

1、生产者 

修改内容:exchangeName = "test_direct"
BuiltinExchangeType.DIRECT
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
//队列1绑定 error
channel.queueBind(queue1Name,exchangeName,"error");
//队列2绑定 info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
//8. 发送消息
channel.basicPublish(exchangeName,"info",null,body.getBytes());//2收到
//channel.basicPublish(exchangeName,"error",null,body.getBytes());//1,2都收到
//channel.basicPublish(exchangeName,"warning",null,body.getBytes());//2收到

2、消费者1,2

修改内容
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";

 

 Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

七、Topics 通配符模式 

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用 通配符
  •  Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
  • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert 而 item.* 只能匹配 item.insert
图解:
  • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
  • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

1、生产者

修改内容:
exchangeName = "test_topic" 
BuiltinExchangeType.TOPIC 
String queue1Name = "test_topic_queue1"; 
String queue2Name = "test_topic_queue2";
//队列1绑定 error
channel.queueBind(queue1Name,exchangeName,"*.orange.*");
//队列2绑定 info error warning
channel.queueBind(queue2Name,exchangeName,"*.*.rabbite");
channel.queueBind(queue2Name,exchangeName,"lazy.#");
//8. 发送消息
channel.basicPublish(exchangeName,"lazy.orange.ra",null,body.getBytes());//1,2都有
//channel.basicPublish(exchangeName,"lazy.orange",null,body.getBytes());//1,2都没有

2、消费者

 修改内容:

String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";

 

 

小结

Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。

八、工作模式总结  

1. 简单模式 HelloWorld

一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。

2. 工作队列模式 Work Queue

一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。

3. 发布订阅模式 Publish/subscribe

需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

4. 路由模式 Routing

需要设置类型为 direct 的交换机交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

5. 通配符模式 Topic

需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/133553.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mac 安装软件各种报错解决方案

Mac 安装软件各种报错解决方案 文章目录 Mac 安装软件各种报错解决方案一. 打开允许“允许任何来源”二. 无法打开"xxx"&#xff0c;因为它不是从App Store下载三. 无法打开"xxx"&#xff0c;因为 Apple无法检查其是否包含恶意软件。四. "xxx"已…

详解初阶数据结构之顺序表(SeqList)——单文件实现SeqList的增删查改

目录 一、线性表 二、顺序表 2.1概念及结构 2.2接口实现 2.3动态顺序表的创建 2.3动态顺序表的初始化 2.3.1传值初始化 2.3.2传址初始化 2.4动态顺序表的清空 2.5动态顺序表的扩容 2.6动态顺序表内容的打印 三、动态顺序表的使用 3.1尾插尾删 3.1.1尾插 3.1.2尾删…

Echarts 雷达图的详细配置过程

文章目录 雷达图 简介配置步骤简易示例 雷达图 简介 Echarts雷达图是一种常用的数据可视化图表类型&#xff0c;用于展示多个维度的数据在同一坐标系下的分布情况。雷达图通过不同的坐标轴表示不同的维度&#xff0c;数据点的位置表示了各个维度的数值大小。 Echarts雷达图的…

微信小程序中 vant weapp 使用外部的icon作为图标的步骤

微信小程序中 vant weapp 使用外部的icon作为图标的步骤 1. 在项目中创建静态资源文件夹2. 前往iconfont图标官网&#xff0c;添加图标并拷贝在线链接3. 下载iconfont代码&#xff0c;解压之后拷贝到小程序的目录中4. 修改iconfont.wxss 将本地链接替换为在线链接5. 在项目的ap…

【Transformer系列】深入浅出理解Tokenization分词技术

一、参考资料 NLP技术中的Tokenization是什么&#xff1f;核心任务是什么&#xff1f; 二、Tokenization相关介绍 1. Tokenization的概念 NLP技术中Tokenization被称作是“word segmentation”&#xff0c;直译为分词。具体来说&#xff0c;分词是NLP的基础任务&#xff0c…

机器学习(15)---代价函数、损失函数和目标函数详解

文章目录 一、各自定义二、各自详解三、代价函数和损失函数区别四、例题理解 一、各自定义 1. 代价函数&#xff1a;代价函数&#xff08;Cost Function&#xff09;是定义在整个训练集上的&#xff0c;是所有样本误差的平均&#xff0c;也就是损失函数的平均。它用于衡量模型在…

如何应对数字时代的网络安全新挑战?

随着数字时代的来临&#xff0c;我们迎来了无限的机遇&#xff0c;同时也伴随着网络安全领域新的挑战。网络攻击变得更加智能化和复杂化&#xff0c;威胁也在不断演化。为了应对这些新挑战&#xff0c;我们必须采取创新的网络安全策略和技术。本文将探讨数字时代网络安全的新挑…

JVM 篇

一、知识点汇总 其中内存模型&#xff0c;类加载机制&#xff0c;GC是重点方面。性能调优部分更偏向应用&#xff0c;重点突出实践能力。编译器优化和执行模式部分偏向于理论基础&#xff0c;重点掌握知识点。 内存模型&#xff1a;各部分作用&#xff0c;保存哪些数据。类加载…

go-GMP和Scheduler

GPM模型 G 待执行的goroutine&#xff0c;结构定义在runtime.g M 操作系统中的线程&#xff0c;它由操作系统的调度器 进行 调度和管理, 结构定义在runtime.m P 处理器&#xff0c;是GM的中间件&#xff0c;它通过一个队列绑定了GM&#xff0c;每个P都有一个局部queue&#x…

Vue.js新手指南:从零开始建立你的第一个应用

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

编程获取图像中的圆半径

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 即将推出EmguCV的教程&#xff0c;请大家还稍作等待。 之前网友咨询如何获得图像中圆形的半径&#xff0c;其中有两个十字作为标定…

Kotlin文件遍历FileTreeWalk filter

Kotlin文件遍历FileTreeWalk filter import java.io.Filefun main(args: Array<String>) {val filePath "."val file File(filePath)val fileTree: FileTreeWalk file.walk()fileTree//.maxDepth(1) //遍历层级1&#xff0c;不检查子目录.filter {it.isFile…

中小企业建设数字化工厂,选择集成还是重构

随着科技的飞速发展和市场竞争的日益激烈&#xff0c;数字化工厂管理系统已成为中小企业未来发展的必经之路。然而&#xff0c;对于许多中小企业来说&#xff0c;建设数字化工厂并非易事。在建设数字化工厂的过程中&#xff0c;企业需要面对许多问题&#xff0c;其中最关键的问…

如何使用 RunwayML 进行创意 AI 创作

标题&#xff1a;如何使用 RunwayML 进行创意 AI 创作 介绍 RunwayML 是一个基于浏览器的人工智能创作工具&#xff0c;可让用户使用各种 AI 功能来生成图像、视频、音乐、文字和其他创意内容。RunwayML 的功能包括&#xff1a; * 图像生成&#xff1a;使用生成式对抗网络 (…

laravel框架 - 开发实战(目录结构,路由,控制器,模型,视图)

一、laravel框架的目录结构 app:应用目录&#xff0c;保存项目中的控制器、模型等 bootstrap:保存框架启动的相关文件 config:配置文件目录 database:数据库迁移文件和数据填充文件 public:应用入口文件index.php和前端资源文件&#xff08;如CSS、JavaScript等&#xff09…

VEX —— Attribute type metadata

Houdini几何体属性有一些元数据metadata&#xff0c;用于指定属性中的数据是否表示某种变换transformation&#xff08;如位置或旋转&#xff09;&#xff0c;及几何体本身被变换时是否或如何被修改&#xff1b; Houdini理解以下信息类型值&#xff1a; “none”&#xff0c;无…

SQL 2008 R2 和vCenter 5.1安装步骤与AQ

准备情况&#xff1a; Windows 2008 r2 sp1 64bit操作系统 Sql 2008 完整版安装包&#xff08;名称&#xff1a;SQLFULL_CHS.iso 安装完成会安装managment&#xff09; vCenter完整版安装包&#xff08;名称&#xff1a;VMware-VIMSetupall-5.1.0-799735.iso&#xff09; …

Matlab图像处理-HSV

HSV HSV(色调、饱和度、数值)是人们从颜色轮或调色板中挑选颜色(即颜料或油墨)时所用的几种彩色系统之一。这种彩色系统与RGB系统相比&#xff0c;更加接近于人们的经验和描述彩色感觉时所用的方式。在艺术领域&#xff0c;色调、饱和度和数值分别称为色泽、明暗和调色。 HSV…

无涯教程-JavaScript - IFS函数

描述 IFS函数检查是否满足一个或多个条件,并返回与第一个TRUE条件相对应的值。此功能已在Excel 2016中添加。 语法 IFS (logical_test1, value_if_true1, [logical_test2, value_if_true2], [logical_test3, value_if_true3]…) 争论 Argument描述Required/Optionallogical…

短视频seo矩阵系统源码开发搭建--代用户发布视频能力

短视频SEO矩阵系统源码开发搭建的代用户发布视频能力&#xff0c;主要是指在系统平台上&#xff0c;允许用户将其创作的内容发布到指定的账号或平台&#xff0c;并设置好相关的标题、话题、锚点等信息。 一、搭建步骤及注意事项 确定使用场景。根据业务需求&#xff0c;确定该…