【RabbitMQ上手——单实例安装5种简单模式实现通讯过程】

【RabbitMQ入门-单实例安装&5种简单模式实现通讯过程】

    • 一、环境说明
    • 二、安装RabbitMQ
    • 三、用户权限及Virtual Host设置
    • 四、5种简单模式实现通讯过程的实现
    • 五、小结

一、环境说明

  1. 安装环境:虚拟机VMWare + Centos7.6 + Maven3.6.3 + JDK1.8
  2. RabbitMQ版本:rabbitmq-server-3.8.8-1.el7.noarch.rpm

二、安装RabbitMQ

具体安装过程,可参考:CentOS7安装RabbitMQ(rpm包方式)

三、用户权限及Virtual Host设置

  1. 用户角色创建

    RabbitMQ在安装好后,可以访问http://localhost:15672 ;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:
    在这里插入图片描述
    默认情况下,访问RabbitMQ服务的用户名和密码都是"guest",这个账户有限制,默认只能通过本地网络(如localhost)访问,远程网络访问受限,使用默认的用户 guest / guest (此也为管理员用户)登陆,会发现无法登陆,报错:User can only log in via localhost。那是因为默认是限制了guest用户只能在本机登陆,也就是只能登陆localhost:15672。所以在实现生产和消费消息之前,需要另外添加一个用户,并设置相应的访问权限。

    添加新用户,用户名为"sujiangming",密码为"openGauss@1234",该步骤需要在rabbitmq所在Linux服务器上执行,执行命令如下:

    rabbitmqctl add_user sujiangming openGauss@1234
    

    为root用户设置所有权限,且设置用户为管理员角色,执行如下命令:

    rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
    rabbitmqctl set_user_tags sujiangming administrator
    

    重新登陆,正常可以登录,如图:
    在这里插入图片描述
    补充说明:有关角色

    1. 超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
    2. 监控者(monitoring):可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
    3. 策略制定者(policymaker):可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
    4. 普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
    5. 其他角色:无法登陆管理控制台,通常就是普通的生产者和消费者。
  2. Virtual Host设置

    像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理。在RabbitMQ中有虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchangequeuemessage不能互通。 相当于mysql的db。Virtual Name一般以/开头。

    1. 创建Virtual Hosts
      在这里插入图片描述
      2. 设置Virtual Hosts权限,可以给其他用户授权,如root,如下图所示:
      在这里插入图片描述
      权限参数说明:
      1. user:用户名
      2. configure :一个正则表达式,用户对符合该正则表达式的所有资源拥有 configure 操作的权限
      3. write:一个正则表达式,用户对符合该正则表达式的所有资源拥有 write 操作的权限
      4. read:一个正则表达式,用户对符合该正则表达式的所有资源拥有 read 操作的权限

四、5种简单模式实现通讯过程的实现

  1. 在IDEA中创建maven工程,添加依赖到pom.xml中,项目结构如下图所示;
    在这里插入图片描述
    pom.xml中添加如下内容:

    <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><amqp-version>5.6.0</amqp-version><slf4j.version>1.7.25</slf4j.version>
    </properties>
    <dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>${amqp-version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency>
    </dependencies>
    

    创建com.rabbitmq.utils.CommonUtils工具类,代码如下:

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;/*** 连接rabbitmq工具类*/
    public class ConnectionUtils {public static Connection getConnection() throws IOException, TimeoutException {//1、创建链接工厂对象ConnectionFactory factory = new ConnectionFactory();//2、设置RabbitMQ服务主机地址factory.setHost("192.168.36.132");//3、设置RabbitMQ服务端口factory.setPort(5672);//4、设置虚拟主机名字factory.setVirtualHost("/vhtest");//5、设置用户连接名factory.setUsername("sujiangming");//6、设置链接密码factory.setPassword("openGauss@1234");return factory.newConnection();}
    }
    
  2. 第一种:简单模式

    在这里插入图片描述
    P:生产者,也就是要发送消息的程序
    C:消费者:消息的接受者,会一直等待消息到来。
    Queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

    创建com.rabbitmq.simplest.SimpleProducer类,代码如下:

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.utils.ConnectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;/*** //1、创建链接工厂对象-factory=newConnectionFactory()* //2、设置RabbitMQ服务主机地址,默认localhost-factory.setHost("localhost")* //3、设置RabbitMQ服务端口,默认-1-factory.setPort(5672)* //4、设置虚拟主机名字,默认/-factory.setVirtualHost("szitheima")* //5、设置用户连接名,默认guest-factory.setUsername("admin")* //6、设置链接密码,默认guest-factory.setPassword("admin")* //7、创建链接-connection=factory.newConnection()* //8、创建频道-channel=connection.createChannel()* //9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)* //10、创建消息-Stringm=xxx* //11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)* //12、关闭资源-channel.close();connection.close()*/
    public class SimpleProducer {private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare("simple_queue",true,false,false,null);System.out.println("=====>>>开始消息发送<<<=====");for (int i = 0; i < 10; i++) {//10、创建消息-Stringm=xxxString message = "我是第"+ i + "消息,我喜欢的数字是:" + i;System.out.println(">>>"+message+"<<<");//11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)channel.basicPublish("","simple_queue",null,message.getBytes(StandardCharsets.UTF_8));}//12、关闭资源-channel.close();channel.close();connection.close();//13、打印提升信息System.out.println("=====>>>消息发送结束<<=====");}
    }
    

    创建com.rabbitmq.simplest.SimpleConsumer类,代码如下:

    import com.rabbitmq.client.*;
    import com.rabbitmq.utils.ConnectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;/*** 消费者*/
    public class SimpleConsumer {private static final Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare("simple_queue",true,false,false,null);//10、创建消费者Consumer callback = new DefaultConsumer(channel){/*** @param consumerTag 消费者标签,在channel.basicConsume时候可以指定* @param envelope 信封,消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties  属性信息(生产者的发送时指定)* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("=====>>>读取到的消息<<<=====");System.out.println("routingKey:" + routingKey +",exchange:" + exchange +",deliveryTag:" + deliveryTag +",message:" + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息/*** 消息消费* 参数1:队列名称* 参数2:是否自动应答,true为自动应答[mq接收到回复会删除消息],设置为false则需要手动应答* 参数3:消息接收到后回调*/channel.basicConsume("simple_queue",true,callback);}
    }
    

    运行测试:

    • 先启动消费者SimpleConsumer类,让消费者等待接收消费
    • 再启动SimpleProducer类,让生产者发送10条消息,运行结果如下
      在这里插入图片描述
  3. 第二种:Work queues工作队列模式

    在这里插入图片描述
    Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
    应用场景:对于任务过重或任务较多情况,使用工作队列模式使用多个消费者可以提高任务处理的速度。
    Work Queues特点:在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

    创建生产者com.rabbitmq.workqueues.WorkQueuesProducer,代码如下:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class WorkQueuesProducer {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();System.out.println("=====>>>开始消息发送<<<=====");for (int i = 0; i < 100; i++) {//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare("work_queue",true,false,false,null);//10、创建消息-Stringm=xxxString message = "我是第"+ i + "WorkQueues消息,我喜欢的数字是:" + i;System.out.println(">>>"+message+"<<<");//11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)channel.basicPublish("","work_queue",null,message.getBytes(StandardCharsets.UTF_8));}//12、关闭资源-channel.close();channel.close();connection.close();//13、打印提升信息System.out.println("=====>>>消息发送结束<<=====");}
    }
    

    创建第一个生产者com.rabbitmq.workqueues.WorkQueuesConsumer,代码如下:

    import com.rabbitmq.client.*;
    import com.rabbitmq.utils.ConnectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class WorkQueuesConsumer {private static final Logger logger = LoggerFactory.getLogger(WorkQueuesConsumer.class);public static final String WORK_QUEUES_NAME="work_queue";public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(WORK_QUEUES_NAME,true,false,false,null);//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("=====>>>读取到的消息<<<=====");System.out.println("routingKey:" + routingKey +",exchange:" + exchange +",deliveryTag:" + deliveryTag +",message:" + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(WORK_QUEUES_NAME,true,callback);}
    }
    

    创建第二个生产者com.rabbitmq.workqueues.WorkQueuesConsumer2,代码如下:

    import com.rabbitmq.client.*;
    import com.rabbitmq.utils.ConnectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class WorkQueuesConsumer2 {private static final Logger logger = LoggerFactory.getLogger(WorkQueuesConsumer2.class);public static final String WORK_QUEUES_NAME="work_queue";public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(WORK_QUEUES_NAME,true,false,false,null);//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("=====>>>读取到的消息<<<=====");System.out.println("routingKey:" + routingKey +",exchange:" + exchange +",deliveryTag:" + deliveryTag +",message:" + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(WORK_QUEUES_NAME,true,callback);}
    }
    

    运行测试:启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。
    在这里插入图片描述

  4. 第三种:Publish&Subscribe发布订阅模式

    在这里插入图片描述
    在发布订阅模型中,多了一个x(exchange)角色,而且过程略有变化。
    P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    C:消费者,消息的接受者,会一直等待消息到来。
    Queue:消息队列,接收消息、缓存消息。
    Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    Fanout:广播,将消息交给所有绑定到交换机的队列
    Direct:定向,把消息交给符合指定routing key 的队列
    Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

    创建生产者com.rabbitmq.publishsubscribe.FanoutProducer,代码如下:

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class FanoutProducer {public static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//9、声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);System.out.println("=====>>>开始消息发送<<<=====");for (int i = 10; i < 100; i++) {//10、创建消息-Stringm=xxxString message = "我是第 【"+ i + "】 fanout_exchange 消息,我喜欢的数字是: " + i;System.out.println(">>>"+message+"<<<");//11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));}//12、关闭资源-channel.close();channel.close();connection.close();//13、打印提升信息System.out.println("=====>>>消息发送结束<<=====");}
    }
    

    创建第一个消费者com.rabbitmq.publishsubscribe.FanoutConsumer01,代码如下:

    import com.rabbitmq.client.*;
    import com.rabbitmq.utils.ConnectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class FanoutConsumer01 {private static final Logger logger = LoggerFactory.getLogger(FanoutConsumer01.class);public static final String QUEUES_NAME="fanout_queue1";public static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,"");//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey:" + routingKey +",exchange:" + exchange +",deliveryTag:" + deliveryTag +",message:" + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);}
    }
    

    创建第二个消费者com.rabbitmq.publishsubscribe.FanoutConsumer02,代码如下:

    import com.rabbitmq.client.*;
    import com.rabbitmq.utils.ConnectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class FanoutConsumer02 {private static final Logger logger = LoggerFactory.getLogger(FanoutConsumer02.class);public static final String QUEUES_NAME="fanout_queue2";public static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,"");//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey:" + routingKey +",exchange:" + exchange +",deliveryTag:" + deliveryTag +",message:" + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);}
    }
    

    注意
    绑定交换机的前提是得先有这个交换机,所以得先执行一次生产者,如果没有这个交换机就执行消费者绑定交换机的话会报错.执行完两个消费者再执行生产者后,就会看到两个消费者都消费这一条消息了。

    运行测试:启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
    在这里插入图片描述

    测试结论

    交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
    发布订阅模式与work队列模式的区别
    1、work队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
    2、发布/订阅模式的生产方是面向交换机发送消息,work队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
    3、发布/订阅模式的消费者需要设置队列和交换机的绑定,work队列模式不需要设置,实际上work队列模式会将队列绑 定到默认的交换机 。

  5. 第四种:Routing路由模式

    在这里插入图片描述
    P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
    X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
    C1:消费者,其所在队列指定了需要routing key 为 error 的消息
    C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
    路由模式特点
    1.队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
    2.消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
    3.Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

    创建生产者com.rabbitmq.routing.RoutingProducer,代码如下:

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class RoutingProducer {public static final String EXCHANGE_NAME = "routing_exchange";public static final String ROUTING_LOG_ERROR = "log.error";public static final String ROUTING_LOG_INFO = "log.info";public static final String ROUTING_LOG_WARNING= "log.warning";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//9、声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);System.out.println("=====>>>开始消息发送<<<=====");for (int i = 100; i < 1000; i++) {String routingKey = "";//发送消息的时候根据相关逻辑指定相应的routing key。switch (i%3){case 0:  //假设i=0,为error消息routingKey = ROUTING_LOG_ERROR;break;case 1: //假设i=1,为info消息routingKey = ROUTING_LOG_INFO;break;case 2: //假设i=2,为warning消息routingKey = ROUTING_LOG_WARNING;break;}//10、创建消息-Stringm=xxxString message = "我是第 【"+ i + "】 "+ EXCHANGE_NAME +" 消息,我喜欢的数字是: " + i;System.out.println(">>>"+message+"<<<");//11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));}//12、关闭资源-channel.close();channel.close();connection.close();//13、打印提升信息System.out.println("=====>>>消息发送结束<<=====");}
    }
    

    创建第一个消费者com.rabbitmq.routing.RoutingConsumer01,代码如下:

    import com.rabbitmq.client.*;
    import com.rabbitmq.utils.ConnectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class RoutingConsumer01 {private static final Logger logger = LoggerFactory.getLogger(RoutingConsumer01.class);public static final String QUEUES_NAME="routing_queue1";public static final String EXCHANGE_NAME = RoutingProducer.EXCHANGE_NAME;public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,RoutingProducer.ROUTING_LOG_ERROR);//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey: " + routingKey +" ,exchange: " + exchange +" ,deliveryTag: " + deliveryTag +" ,message: " + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);}
    }
    

    创建第二个消费者com.rabbitmq.routing.RoutingConsumer02,代码如下:

    import com.rabbitmq.client.*;
    import com.rabbitmq.utils.ConnectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class RoutingConsumer02 {private static final Logger logger = LoggerFactory.getLogger(RoutingConsumer02.class);public static final String QUEUES_NAME="routing_queue2";public static final String EXCHANGE_NAME = RoutingProducer.EXCHANGE_NAME;public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,RoutingProducer.ROUTING_LOG_ERROR);channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,RoutingProducer.ROUTING_LOG_INFO);channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,RoutingProducer.ROUTING_LOG_WARNING);//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey: " + routingKey +" ,exchange: " + exchange +" ,deliveryTag: " + deliveryTag +" ,message: " + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);}
    }
    

    运行测试:

    启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key,对应队列的消息;到达按照需要接收的效果。
    在这里插入图片描述
    在这里插入图片描述

    测试结论

    Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。

  6. 第五种:Topics主题模式

    在这里插入图片描述
    Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!
    Routingkey 一般都是有一个或多个单词组成,多个单词之间以“ . ”分割,例如: item.insert
    通配符规则
    #:匹配一个或多个词
    *:匹配不多不少恰好1个词
    在这里插入图片描述

    创建生产者com.rabbitmq.topics.TopicProducer,代码如下:

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class TopicProducer {public static final String EXCHANGE_NAME = "topic_exchange";public static final String ROUTING_LOG_ERROR = "log.error";public static final String ROUTING_LOG_INFO = "log.info";public static final String ROUTING_LOG_INFO_ADD = "log.info.add";public static final String ROUTING_LOG_INFO_UPDATE = "log.info.update";public static final String ROUTING_LOG_WARNING= "log.warning";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//9、声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);System.out.println("=====>>>开始消息发送<<<=====");for (int i = 0; i < 100; i++) {String routingKey = "";//发送消息的时候根据相关逻辑指定相应的routing key。switch (i%5){case 0:  //假设i=0,为error消息routingKey = ROUTING_LOG_ERROR;break;case 1: //假设i=1,为info消息routingKey = ROUTING_LOG_INFO;break;case 2: //假设i=2,为warning消息routingKey = ROUTING_LOG_WARNING;break;case 3: //假设i=3,为log.info.add消息routingKey = ROUTING_LOG_INFO_ADD;break;case 4: //假设i=4,为log.info.update消息routingKey = ROUTING_LOG_INFO_UPDATE;break;}//10、创建消息-Stringm=xxxString message = "我是第 【"+ i + "】 "+ EXCHANGE_NAME +" 消息,我喜欢的数字是: " + i;System.out.println(">>>"+message+"<<<");//11、消息发送-channel.basicPublish(交换机[默认DefaultExchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));}//12、关闭资源-channel.close();channel.close();connection.close();//13、打印提升信息System.out.println("=====>>>消息发送结束<<=====");}
    }
    

    创建第一个消费者com.rabbitmq.topics.TopicConsumer01,代码如下:

    import com.rabbitmq.client.*;
    import com.rabbitmq.utils.ConnectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class TopicConsumer01 {private static final Logger logger = LoggerFactory.getLogger(TopicConsumer01.class);public static final String QUEUES_NAME="topic_queue1";public static final String EXCHANGE_NAME = TopicProducer.EXCHANGE_NAME;public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串]) topic支持通配符方式// log.* 表示匹配log.后面一个,如log.error,log.info等channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,"log.*");//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey: " + routingKey +" ,exchange: " + exchange +" ,deliveryTag: " + deliveryTag +" ,message: " + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);}
    }

    创建第二个消费者com.rabbitmq.topics.TopicConsumer01,代码如下:

    import com.rabbitmq.client.*;
    import com.rabbitmq.utils.ConnectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;public class TopicConsumer02 {private static final Logger logger = LoggerFactory.getLogger(TopicConsumer02.class);public static final String QUEUES_NAME="topic_queue2";public static final String EXCHANGE_NAME = TopicProducer.EXCHANGE_NAME;public static void main(String[] args) throws IOException, TimeoutException {logger.debug("info");// 获取链接Connection connection = ConnectionUtils.getConnection();//8、创建频道-channel=connection.createChannel()Channel channel = connection.createChannel();//9、声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)channel.queueDeclare(QUEUES_NAME,true,false,false,null);//10、绑定队列到交换机: channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串]) topic支持通配符方式// log.# 表示匹配log.后面一个或者多个词,如log.info,log.info.add等channel.queueBind(QUEUES_NAME,EXCHANGE_NAME,"log.#");//10、创建消费者Consumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由的keyString routingKey = envelope.getRoutingKey();//获取交换机信息String exchange = envelope.getExchange();//获取消息IDlong deliveryTag = envelope.getDeliveryTag();//获取消息信息String message = new String(body, StandardCharsets.UTF_8);System.out.println("routingKey: " + routingKey +" ,exchange: " + exchange +" ,deliveryTag: " + deliveryTag +" ,message: " + message);}};//11、消息消费,注意,此处不建议关闭资源,让程序一直处于读取消息channel.basicConsume(QUEUES_NAME,true,callback);}
    }

    运行测试:

    启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。
    在这里插入图片描述
    在这里插入图片描述

    测试小结:

    Topic主题模式可以实现 Publish/Subscribe发布订阅模式 和 Routing路由模式 的双重功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。

五、小结

  1. RabbitMQ五种工作模式小结:

    1、简单模式 HelloWorld
    一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
    2、工作队列模式 Work Queue
    一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
    3、发布订阅模式 Publish/subscribe
    需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
    4、路由模式 Routing
    需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
    5、通配符模式 Topic
    需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

  2. 学习资料总结

    本文参考了来自网络上的资料,如有侵权,请及时联系博主进行删除。本文仅是博主本人在学习过程中作为学习笔记使用,常言道:好记性不如烂笔头。如本文对您有所帮助,请您动动发财的手指给博主点个赞,谢谢您的阅读~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/89073.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows CMD 关闭,启动程序

Windows CMD 关闭&#xff0c;启动程序 1. Windows 通过 CMD 命令行关闭程序 示例&#xff1a;通过 taskkill 命令关闭 QQ 管家&#xff0c;但是这里有个问题&#xff0c;使用命令行关闭 QQ 管家时&#xff0c;会提示“错误: 无法终止 PID 1400 (属于 PID 22116 子进程)的进程…

ABeam×Startup丨德硕管理咨询(深圳)创新研究团队前往灵境至维·既明科技进行拜访交流

近日&#xff0c;德硕管理咨询&#xff08;深圳&#xff09;&#xff08;以下简称“ABeam-SZ”&#xff09;创新研究团队一行前往灵境至维既明科技有限公司&#xff08;以下简称“灵境至维”&#xff09;进行拜访交流&#xff0c;探讨线上虚拟空间的商业模式。 现场合影 &…

基于灰色神经网络的订单需求预测代码

目录 1 概述 2 代码 3 结果 ​ 1 概述 BP(Back Propagation)神经网络模型是一种信息前向传播,误差反向传播的神经网络模型0,能够通过训练样本反向传播调节网络的阈值和权值,使误差平方最小。 BP神经网络是目前应用最广泛的神经网络模型之一。 灰色人工神经网络模型建模过…

了解华为(H3C)网络设备和OSI模型基本概念

目录 一&#xff0c;认识华为 1.华为发展史 2.华为网络设备介绍 3.VRP概述 二&#xff0c;OSI七层模型 1.七层模型详细表格 2.各层的作用 3.数据在各层之间的传递过程 4.OSI四层网络模型 一&#xff0c;认识华为 官网&#xff1a;https://www.huawei.com/cn/ 1.华为发…

MyBatis快速入门

一、Mybatis介绍 MyBatis是一个支持普通SQL查询&#xff0c;存储过程和高级映射的优秀持久层框架。MyBatis消除了几乎所有的JDBC代码和参数的手工设置以及对结果集的检索封装。MyBatis可以使用简单的XML或注解用于配置和原始映射&#xff0c;将接口和Java的POJO&#xff08;Pla…

<Vite>HMR实现原理

什么是HMR&#xff1f; HMR&#xff08;Hot Module Replacement&#xff09;是一种开发工具&#xff0c;也就是热更新。用于在应用程序运行时替换、添加或删除模块&#xff0c;而无需完全重新加载整个页面或重新启动应用程序。这可以极大地提高开发效率和调试体验。 HMR的优势 …

vue table动态合并, 自定义合并,参照合并,组合合并

<template><div><el-table:data"tableData":span-method"objectSpanMethod"border:header-cell-style"{ textAlign: center }"><el-table-column prop"area" label"区域" align"center">…

ios 知识

IOS 类文件.h和.m中interface的区别 大家都知道我们在创建类文件时会发现&#xff1a; #import <UIKit/UIKit.h>interface ViewController : UIViewControllerend和 #import "ViewController.h"interface ViewController ()end那么他们之间有何区别呢&#x…

MySql存储过程详解

文章目录 存储过程1 介绍 基本语法创建:调用查看删除演示: 变量相关系统变量演示: 用户自定义变量局部变量 if语法参数介绍casewhilerepeatloop游标条件处理程序存储函数 存储过程 1 介绍 存储过程是事先经过编译并存储在数据库中的一段 SQL 语句的集合&#xff0c;调用存储过…

kafka 02——三个重要的kafka客户端

kafka 02——三个重要的kafka客户端 1. 前言1.1 关于 Kafka 的安装1.2 常用客户端简介1.3 依赖 2. AdminClient2.1 Admin Configs2.2 AdminClient API2.2.1 设置 AdminClient 对象2.2.2 创建 topic 获取 topic 列表2.2.3 删除topic2.2.4 查看 topic 的描述信息2.2.5 查看 topi…

Python系统学习1-7-字典

一、字典 1、概念及内存图 列表&#xff1a;由一系列变量组成的可变序列容器字典&#xff1a;由一系列键值对组成的可变散列容器字典优势&#xff1a;利用&#xff08;内存&#xff09;空间&#xff0c;换取&#xff08;CPU查找&#xff09;时间 键key 必须唯一且为不…

学无止境·运维高阶⑤(LVS-DR 群集 配置Nginx负载均衡)

LVS-DR 群集 && 配置Nginx负载均衡 一、LVS-DR 群集1、相关配置环境2、在RS上配置并启动脚本2.1相关脚本2.2 启动脚本&#xff0c;另一台RS同样步骤 3、LVS-DR模式配置脚本4、测试 二、Nginx负载均衡1、安装Nginx并关闭相应设置2、向主机 node2&#xff0c;node3 写入内…

JDK 17 营销初体验 —— 亚毫秒停顿 ZGC 落地实践 | 京东云技术团队

前言 自 2014 年发布以来&#xff0c; JDK 8 一直都是相当热门的 JDK 版本。其原因就是对底层数据结构、JVM 性能以及开发体验做了重大升级&#xff0c;得到了开发人员的认可。但距离 JDK 8 发布已经过去了 9 年&#xff0c;那么这 9 年的时间&#xff0c;JDK 做了哪些升级&am…

《golang设计模式》第二部分·结构型模式-01-适配器模式(Adapter)

文章目录 1. 概念1.1 角色1.2 应用场景1.2 类图 2. 代码示例2.1 设计2.2 代码2.3 示例类图 1. 概念 定义一个适配器&#xff0c;帮助原本不能实现接口的类“实现”该接口 1.1 角色 目标&#xff08;Target&#xff09;&#xff1a;客户端调用的目标接口 被适配者&#xff08…

OSI七层模型及TCP/IP四层模型

目录 OSI七层模型 TCP/IP四层模型 OIS七层模型和TCP/IP模型图 七层详解 两种模型比较 为什么OSI七层体系结构不常用 四层详解 网络为什么要分层&#xff1f; 说说 OSI 七层模型和 TCP/IP 四层模型的关系和区别 OSI七层模型 OSI&#xff08;Open System Interconnect&a…

ModaHub魔搭社区:Milvus Cloud素材集合帖,等你查收

Hi~Milvus Cloud 的各位朋友,这是一期 Milvus Cloud 素材弹药库的集中汇总帖。随着向量数据库的火爆,越来越多的伙伴开始关注到向量数据库并开始使用 Milvus Cloud 。 考虑到目前信息获取的渠道多且分散,我们专门为大家整理了一期 Milvus Cloud 信息集合帖,让大家可以在快…

ppt怎么压缩?试试这样压缩文件

当PPT文件体积过大时&#xff0c;打开的速度就会很慢&#xff0c;演示的时候刘程度也会受到影响&#xff0c;其次&#xff0c;现在很多平台对于上传的文件是有大小限制的&#xff0c;比如超过100M的文件就无法上传、发送等等&#xff0c;那么&#xff0c;怎么才能压缩PPT文件呢…

CentOS安装Postgresql

PG基本安装步骤 安装postgresql&#xff1a; sudo yum install postgresql-server初始化数据库&#xff1a;安装完毕后&#xff0c;需要初始化数据库并创建初始用户&#xff1a; sudo postgresql-setup initdb启动和停止服务&#xff1a; sudo systemctl start postgresql sudo…

Android之版本号、版本别名、API等级对应关系(全)(一百六十二)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

微服务07-分布式缓存

前提: 单机的Redis存在四大问题: 解决办法:基于Redis集群解决单机Redis存在的问题 1、Redis持久化 Redis 具有持久化功能,其会按照设置以 快照 或 操作日志 的形式将数据持久化到磁盘。 Redis有两种持久化方案: RDB持久化AOF持久化注意: RDB 是默认持久化方式,但 Red…