2.7日学习打卡----初学RabbitMQ(二)

2.7日学习打卡

目录:

  • 2.7日学习打卡
    • 一. RabbitMQ 简单模式![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/42009c68e078440797c3183ffda6955d.png)
      • 生产者代码实现
      • 消费者代码实现
    • 二. RabbitMQ 工作队列模式
      • 生产者代码实现
      • 消费者代码实现
    • 三. RabbitMQ 发布订阅模式
      • 生产者代码实现
      • 消费者代码实现
    • 四. RabbitMQ 路由模式
      • 生产者代码实现
      • 消费者代码实现
    • 五. RabbitMQ 通配符模式
      • 生产者代码实现
      • 消费者代码实现

在这里插入图片描述
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则
——JMS,用于操作消息中间件。JMS即Java消息服务
(JavaMessage Service)应用程序接口,是一个Java平台中关于面
向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多
MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没
有实现JMS规范,但是开源社区有JMS的实现包。

创建项目

# 开启管控台插件
rabbitmq-plugins enable
rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

创建普通maven项目,添加RabbitMQ依赖:

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqpclient</artifactId><version>5.14.0</version></dependency>
</dependencies>

一. RabbitMQ 简单模式在这里插入图片描述

P:生产者,也就是要发送消息的程序

C:消费者:消息的接收者,会一直等待消息到来

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

特点:

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机

生产者代码实现

步骤:

  1. 创建连接工厂ConnectionFactory
  2. 设置工厂的参数
  3. 创建连接 Connection
  4. 创建管道 Channel
  5. 简单模式中没有交换机exchange,所以不用创建(RabbitMQ会使用默认的交换机!)
  6. 创建队列 queue
  7. 设置发送内容,使用channal.basicPublish()发送
  8. 释放资源

代码实现

package com.jjy.mq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//使用自己的服务器ip地址connectionFactory.setHost("192.168.66.100");//rabbitmq的默认端口5672connectionFactory.setPort(5672);//用户名connectionFactory.setUsername("jjy");//密码connectionFactory.setPassword("jjy");//虚拟机connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建队列,如果队列已存在,则使用该队列/**//     * 参数1:队列名//     * 参数2:是否持久化,true表示MQ重启后队列还在。//     * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问//     * 参数4:是否自动删除,true表示不再使用队列时自动删除队列//     * 参数5:其他额外参数//     */channel.queueDeclare("simple_queue",false,false,false,null);//5.发送消息String mesg="hello rabbitmq";/*** 参数1:交换机名,""表示默认交换机* 参数2:路由键,简单模式就是队列名* 参数3:其他额外参数* 参数4:要传递的消息字节数组*/channel.basicPublish("","simple_queue",null,mesg.getBytes());//6.关闭资源(信道和连接)channel.close();connection.close();System.out.println("发送成功");}
}

消费者代码实现

在这里插入图片描述

步骤:

1.创建连接工厂ConnectionFactory
2.设置工厂参数
3.创建连接
4.创建信道
前四步代码基本是一致的,需要注意的是生产者与消费者的Channel是不同Connection中的!不是同一个对象.
5. 最简单的模型没有交换机exchange,所以此处RabbitMQ会使用默认的交换机
6. 接收消息,有一个回调方法 channel.basicConsume()

代码实现

package com.jjy.mq.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.监听队列/*** 参数1:监听的队列名* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费*/channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body,"UTF-8");System.out.println("接受消息,消息为:"+message);}});//}
}

二. RabbitMQ 工作队列模式

在这里插入图片描述
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该
模式也使用direct交换机,应用于处理消息较多的情况。特点如
下:

  1. 一个队列对应多个消费者。
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

生产者代码实现

代码实现

package com.jjy.mq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建队列,持久化队列channel.queueDeclare("work_queue",true,false,false,null);// 5.发送大量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中for(int i=0;i<100;i++){channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("你好,这是今天的第"+i+"条消息").getBytes());}// 6.关闭资源channel.close();connection.close();}
}

消费者代码实现

在这里插入图片描述

消费者1:

package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();// 4.监听队列,处理消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1消费消息,消息为:" + message);}});}
}

消费者2

package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer2 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();// 4.监听队列,处理消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者2消费消息,消息为:" + message);}});}}

消费者3

package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer3 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();// 4.监听队列,处理消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者3消费消息,消息为:" + message);}});}}

三. RabbitMQ 发布订阅模式

在这里插入图片描述
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机

C:消费者,消息的接收者,会一直等待消息到来

Queue:消息队列,接收消息、缓存消息

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电
商网站的同一条促销信息需要短信发送、邮件发送、站内信发送
等。此时可以使用发布订阅模式(Publish/Subscribe)
特点:

  1. 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
  2. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

Exchange:交换机(X)一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic(常用):通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失
在这里插入图片描述

生产者代码实现

与之前的步骤相比,多了创建交换机和绑定交换机与队列的操作

代码实现

package com.jjy.mq.publish;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建交换机/*exchangeDeclare(String exchange,                  -- 交换机的名称String type,                      -- 交换机的类型,4种枚举(direct,fanout,topic,headers)boolean durable,                  -- 持久化boolean autoDelete,               -- 自动删除boolean internal,                 -- 内部使用,基本是falseMap<String, Object> arguments)    -- 参数*/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);//5.创建队列//短信队列channel.queueDeclare("SEND_MAIL",true,false,false,null);//消息队列channel.queueDeclare("SEND_MESSAGE",true,false,false,null);//站内信息channel.queueDeclare("SEND_STATION",true,false,false,null);//6.交换机绑定队列/*** 参数1:队列名* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL","exchange_fanout","");channel.queueBind("SEND_MESSAGE","exchange_fanout","");channel.queueBind("SEND_STATION","exchange_fanout","");//7.发送消息for (int i = 1; i <= 10 ; i++) {channel.basicPublish("exchange_fanout","",null,("你好,尊敬的用户,秒杀商品开抢了!"+i).getBytes(StandardCharsets.UTF_8));}//8.关闭资源channel.close();connection.close();}
}

消费者代码实现

接下来编写三个消费者,分别监听各自的队列。
//站内信消费者

package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_STATION", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送站内信:"+message);}});}
}

邮件消费者

 
package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MAIL", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送邮件:"+message);}});}
}

短信消费者

package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信:"+message);}});}
}

也可以使用工作队列+发布订阅模式同时使用,两个消费者同时监听
一个队列:


// 短信消费者2
public class CustomerMessage2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.0.162");connectionFactory.setPort(5672);connectionFactory.setUsername("itbaizhan");connectionFactory.setPassword("itbaizhan");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信2:"+message);}});}
}

两个不一样的系统,对同一条消息做不一样的处理

发布订阅模式与工作队列模式的区别
(1)工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机

(2)发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)

(3)发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机

四. RabbitMQ 路由模式

在这里插入图片描述
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多
时候,不是所有消息都无差别的发布到所有队列中。比如电商网站
的促销活动,双十一大促可能会发布到所有队列;而一些小的促销
活动为了节约成本,只发布到站内信队列。此时需要使用路由模式
(Routing)完成这一需求。
特点:

  1. 每个队列绑定路由关键字RoutingKey
  2. 生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模
    式使用direct交换机。

队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
在这里插入图片描述

生产者代码实现

package com.jjy.mq.routing;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);// 5.创建队列channel.queueDeclare("SEND_MAIL2",true,false,false,null);channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);channel.queueDeclare("SEND_STATION2",true,false,false,null);//6.交换机绑定队列/*** 参数1:队列名* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL2","exchange_routing","import");channel.queueBind("SEND_MESSAGE2","exchange_routing","import");channel.queueBind("SEND_STATION2","exchange_routing","import");channel.queueBind("SEND_STATION2","exchange_routing","normal");//7.发送消息channel.basicPublish("exchange_routing","import",null,"双十一大促活动".getBytes());channel.basicPublish("exchange_routing","normal",null,"小型促销活动".getBytes());//8.关闭资源channel.close();connection.close();}
}

消费者代码实现

站内信消费者

package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送站内信:"+message);}});}
}

短信消费者

package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信:"+message);}});}
}

邮件消费者

package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MAIL2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送邮件:"+message);}});}
}

总的来说就一句话:

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

五. RabbitMQ 通配符模式

在这里插入图片描述
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的
路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消
息转发到该队列。通配符模式比路由模式更灵活,使用topic交换
机.
通配符规则

  1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以 . 分割。
  2. 队列设置RoutingKey时, # 可以匹配任意多个单词, * 可以匹配任意一个单词。

生产者代码实现

在这里插入图片描述
代码实现

package com.jjy.mq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);// 5.创建队列channel.queueDeclare("SEND_MAIL3",true,false,false,null);channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);channel.queueDeclare("SEND_STATION3",true,false,false,null);//6.交换机绑定队列channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");//7.发送消息channel.basicPublish("exchange_topic","mail.message.station",null,"双十一大促活动".getBytes());channel.basicPublish("exchange_topic","station",null,"小型促销活动".getBytes());//8.关闭资源channel.close();connection.close();}
}

消费者代码实现

站内信消费者

package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_STATION3", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送站内信:"+message);}});}
}

短信消费者

package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE3", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信:"+message);}});}
}

邮件消费者

package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MAIL3", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送邮件:"+message);}});}
}

总述:topics模式比routing模式要更加灵活,笼统的说就是routing模式加上通配符

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/258122.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch

文章目录 01 Elasticsearch Sink 基础概念02 Elasticsearch Sink 工作原理03 Elasticsearch Sink 核心组件04 Elasticsearch Sink 配置参数05 Elasticsearch Sink 依赖管理06 Elasticsearch Sink 初阶实战07 Elasticsearch Sink 进阶实战7.1 包结构 & 项目配置项目配置appl…

《杨绛传:生活不易,保持优雅》读书摘录

目录 书简介 作者成就 书中内容摘录 良好的家世背景&#xff0c;书香门第为求学打基础 求学相关 念大学 清华研究生 自费英国留学 法国留学自学文学 战乱时期回国 当校长 当小学老师 创造话剧 支持钱锺书写《围城》 出任震旦女子文理学院的教授 接受清华大学的…

【AIGC】Stable Diffusion的ControlNet参数入门

Stable Diffusion 中的 ControlNet 是一种用于控制图像生成过程的技术&#xff0c;它可以指导模型生成特定风格、内容或属性的图像。下面是关于 ControlNet 的界面参数的详细解释&#xff1a; 低显存模式 是一种在深度学习任务中用于处理显存受限设备的技术。在这种模式下&am…

【AIGC】Stable Diffusion的模型入门

下载好相关模型文件后&#xff0c;直接放入Stable Diffusion相关目录即可使用&#xff0c;Stable Diffusion 模型就是我们日常所说的大模型&#xff0c;下载后放入**\webui\models\Stable-diffusion**目录&#xff0c;界面上就会展示相应的模型选项&#xff0c;如下图所示。作者…

【C++】 为什么多继承子类重写的父类的虚函数地址不同?『 多态调用汇编剖析』

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C》《Linux》《算法》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 前言 本篇文章主要是为了解答有…

Pytest测试技巧之Fixture:模块化管理测试数据

在 Pytest 测试中&#xff0c;有效管理测试数据是提高测试质量和可维护性的关键。本文将深入探讨 Pytest 中的 Fixture&#xff0c;特别是如何利用 Fixture 实现测试数据的模块化管理&#xff0c;以提高测试用例的清晰度和可复用性。 什么是Fixture&#xff1f; 在 Pytest 中&a…

华为问界M9:领跑未来智能交通的自动驾驶黑科技

华为问界M9是一款高端电动汽车&#xff0c;其自动驾驶技术是该车型的重要卖点之一。华为在问界M9上采用了多种传感器和高级算法&#xff0c;实现了在不同场景下的自动驾驶功能&#xff0c;包括自动泊车、自适应巡航、车道保持、自动变道等。 华为问界M9的自动驾驶技术惊艳之处…

Linux之多线程

目录 一、进程与线程 1.1 进程的概念 1.2 线程的概念 1.3 线程的优点 1.4 线程的缺点 1.5 线程异常 1.6 线程用途 二、线程控制 2.1 POSIX线程库 2.2 创建一个新的线程 2.3 线程ID及进程地址空间布局 2.4 线程终止 2.5 线程等待 2.6 线程分离 一、进程与线程 在…

构造题记录

思路&#xff1a;本题要求构造一个a和b数组相加为不递减序列&#xff0c;并且b数组的极差为最小的b数组。 可以通过遍历a数组并且每次更新最大值&#xff0c;并使得b数组为这个最大值和当前a值的差。 #include <bits/stdc.h> using namespace std; #define int long lon…

优化策略模式,提高账薄显示的灵活性和扩展性

接着上一篇文章&#xff0c;账薄显示出来之后&#xff0c;为了提高软件的可扩展性和灵活性&#xff0c;我们应用策略设计模式。这不仅仅是为了提高代码的维护性&#xff0c;而是因为明细分类账账薄显示的后面有金额分析这个功能&#xff0c;从数据库后台分析及结合Java语言特性…

小程序或者浏览器chrome访问的时候出现307 interval redicrect内部http自动跳转到https产生的原理分析及解决方案

#小李子9479# 出现的情况如下&#xff0c;即我们访问http的时候&#xff0c;它会自动307重定向到https,产生的原因是&#xff0c; 当你通过https访问过一个没有配置证书的http的网站之后&#xff0c;你再访问http的时候&#xff0c;它就会自动跳转到https&#xff0c;导致访问…

奔跑吧小恐龙(Java)

前言 Google浏览器内含了一个小彩蛋当没有网络连接时&#xff0c;浏览器会弹出一个小恐龙&#xff0c;当我们点击它时游戏就会开始进行&#xff0c;大家也可以玩一下试试&#xff0c;网址&#xff1a;恐龙快跑 - 霸王龙游戏. (ur1.fun) 今天我们也可以用Java来简单的实现一下这…

黄金交易策略(Nerve Nnife.mql4):移动止盈的设计

完整EA&#xff1a;Nerve Knife.ex4黄金交易策略_黄金趋势ea-CSDN博客 相较mt4的止盈止损&#xff0c;在ea上实现移动止盈&#xff0c;可以尽最大可能去获得更高收益。移动止盈的大体逻辑是&#xff1a;到达止盈点就开始追踪止盈&#xff0c;直到在最高盈利点回撤指定点数即平…

【Python】通过conda安装Python的IDE

背景 系统&#xff1a;win11 软件&#xff1a;anaconda Navigator 问题现象&#xff1a;①使用Navigator安装jupyter notebook以及Spyder IDE 一直转圈。②然后进入anaconda prompt执行conda install jupyter notebook一直卡在Solving environment/-\。 类似问题&#xff1a; …

Stable Diffusion 模型下载:DreamShaper XL(梦想塑造者 XL)

本文收录于《AI绘画从入门到精通》专栏&#xff0c;专栏总目录&#xff1a;点这里。 文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八案例九案例十 下载地址 模型介绍 DreamShaper 是一个分格多样的大模型&#xff0c;可以生成写实、原画、2.5D 等…

第80讲订单管理功能实现

后端 <?xml version"1.0" encoding"UTF-8" ?> <!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace"com.java1234.mapper.OrderM…

第9章 网络编程

9.1 网络通信协议 通过计算机网络可以实现多台计算机连接&#xff0c;但是不同计算机的操作系统和硬件体系结构不同&#xff0c;为了提供通信支持&#xff0c;位于同一个网络中的计算机在进行连接和通信时必须要遵守一定的规则&#xff0c;这就好比在道路中行驶的汽车一定要遵…

Redisson分布式锁 原理 + 运用 记录

Redisson 分布式锁 简单入门 pom <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>配置类 package com.hmdp.config;import org.redisson.Redisson;…

C语言第二十五弹---字符函数和字符串函数(上)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 目录 1、字符分类函数 2、字符转换函数 3、strlen的使用和模拟实现 4、strcpy 的模拟实现 5、strcat 的模拟实现 6、strcmp 的模拟实现 7、strncpy 函数的使用 总结…

2024-02-08 Unity 编辑器开发之编辑器拓展1 —— 自定义菜单栏

文章目录 1 特殊文件夹 Editor2 在 Unity 菜单栏中添加自定义页签3 在 Hierarchy 窗口中添加自定义页签4 在 Project 窗口中添加自定义页签5 在菜单栏的 Component 菜单添加脚本6 在 Inspector 为脚本右键添加菜单7 加入快捷键8 小结 1 特殊文件夹 Editor ​ Editor 文件夹是 …