RabbitMQ工作模式-发布订阅模式

Publish/Subscribe(发布订阅模式)

官方文档: https://www.rabbitmq.com/tutorials/tutorial-three-python.html

使用fanout类型类型的交换器,routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange,每个消费者都可以消费完整的消息。

消息广播给所有订阅该消息的消费者。

在RabbitMQ中,生产者不是将消息直接发送给消息消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。

生产者将消息发送给交换器。交换器非常简单,从生成者接收消息,将消息推送给消息队列。交换器必须清楚的知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器的类型。

在这里插入图片描述

发布订阅使用fanout的交换器,创建交换器,名称为test

channel.exchangeDeclare("test","fanout");

fanout交换器很简单,从名称就可以看出来(用风扇吹出去),将所有的收到的消息发给它的知道的所有队列。

存在一个默认的交换器。

此样例使用的是临时队列,即消费都实现将自动创建此队列,当消费都退出后,此队列也将自动删除。

队列名称如

amq.gen-gjKBgQ9PSmoj2YQGMOdPfA

样例代码

消费者1的代码:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class OneConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明的临时队列,名称由rabbitMQ自动生成String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("one 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}

消费者2

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class TwoConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 生成的临时队列String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("two 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}

消费者3

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class ThirdConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 生成的临时队列String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("third 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class Product {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();try {// 声明fanout类型交换机channel.exchangeDeclare("ex.testfan", "fanout", true, false, false, null);for (int i = 0; i < 20; i++) {channel.basicPublish("ex.testfan",// 路由key"",// 属性null,// 信息("hello world fan " + i).getBytes(StandardCharsets.UTF_8));}} catch (IOException e) {throw new RuntimeException(e);} finally {channel.close();connection.close();}}
}

观察下队列的绑定的情况:

在未启动消费都队列之前:

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]# 

在未启动消费者之前,只有看到几个默认的生产者。绑定的队列为空。

启动三个消费者:

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ ex.testfan         │ fanout  │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬────────────────────────────────┬──────────────────┬────────────────────────────────┬───────────┐
│ source_name │ source_kind │ destination_name               │ destination_kind │ routing_key                    │ arguments │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│             │ exchange    │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue            │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│             │ exchange    │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue            │ amq.gen-UG67rAw03FGbBupHX6o18g │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│             │ exchange    │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue            │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan  │ exchange    │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue            │                                │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan  │ exchange    │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue            │                                │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan  │ exchange    │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue            │                                │           │
└─────────────┴─────────────┴────────────────────────────────┴──────────────────┴────────────────────────────────┴───────────┘
[root@nullnull-os ~]# 

当启动生产者后,可以发现已经产生了3个默认的交换机及队列的绑定关系。以及手动绑定的3个队列的关系。

启动生产者,查看消费情况:

消费者1

临时队列的名称:amq.gen-VbV63vwAn0IBzC7n6I--vQ
one 获取到的消息:hello world fan 0
one 获取到的消息:hello world fan 1
one 获取到的消息:hello world fan 2
one 获取到的消息:hello world fan 3
one 获取到的消息:hello world fan 4
one 获取到的消息:hello world fan 5
one 获取到的消息:hello world fan 6
one 获取到的消息:hello world fan 7
one 获取到的消息:hello world fan 8
one 获取到的消息:hello world fan 9
one 获取到的消息:hello world fan 10
one 获取到的消息:hello world fan 11
one 获取到的消息:hello world fan 12
one 获取到的消息:hello world fan 13
one 获取到的消息:hello world fan 14
one 获取到的消息:hello world fan 15
one 获取到的消息:hello world fan 16
one 获取到的消息:hello world fan 17
one 获取到的消息:hello world fan 18
one 获取到的消息:hello world fan 19

消费者2:

临时队列的名称:amq.gen-KadV2OsCRLb84p2k_ijuww
two 获取到的消息:hello world fan 0
two 获取到的消息:hello world fan 1
two 获取到的消息:hello world fan 2
two 获取到的消息:hello world fan 3
two 获取到的消息:hello world fan 4
two 获取到的消息:hello world fan 5
two 获取到的消息:hello world fan 6
two 获取到的消息:hello world fan 7
two 获取到的消息:hello world fan 8
two 获取到的消息:hello world fan 9
two 获取到的消息:hello world fan 10
two 获取到的消息:hello world fan 11
two 获取到的消息:hello world fan 12
two 获取到的消息:hello world fan 13
two 获取到的消息:hello world fan 14
two 获取到的消息:hello world fan 15
two 获取到的消息:hello world fan 16
two 获取到的消息:hello world fan 17
two 获取到的消息:hello world fan 18
two 获取到的消息:hello world fan 19

消息者3:

临时队列的名称:amq.gen-TcqXVnoS2mjOpfCw1o1CZw
third 获取到的消息:hello world fan 0
third 获取到的消息:hello world fan 1
third 获取到的消息:hello world fan 2
third 获取到的消息:hello world fan 3
third 获取到的消息:hello world fan 4
third 获取到的消息:hello world fan 5
third 获取到的消息:hello world fan 6
third 获取到的消息:hello world fan 7
third 获取到的消息:hello world fan 8
third 获取到的消息:hello world fan 9
third 获取到的消息:hello world fan 10
third 获取到的消息:hello world fan 11
third 获取到的消息:hello world fan 12
third 获取到的消息:hello world fan 13
third 获取到的消息:hello world fan 14
third 获取到的消息:hello world fan 15
third 获取到的消息:hello world fan 16
third 获取到的消息:hello world fan 17
third 获取到的消息:hello world fan 18
third 获取到的消息:hello world fan 19

再停止几个消费者查看队列信息

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ ex.testfan         │ fanout  │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]# 

可以看到,当客户端退出之后,临时队列也就消失了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/123345.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux安装wget命令_linux下载文件到本地命令

1、检查是否有安装wget rpm -qa|grep "wget" 复制 在这里插入图片描述 若存在则移除&#xff0c;以下为移除命令 # 移除wget yum remove wget 复制 2、登录wget官网下载地址&#xff0c;下载最新的wget的rpm安装包到本地 下载地址&#xff1a;http://mirrors.…

数据结构与算法(四):栈与队列

栈与队列 我们一般把栈与队列合在一块讨论&#xff0c;因为他们具有相似的性质。 栈&#xff1a;栈是限定仅在表尾进行插入和删除操作的线性表&#xff0c;所以栈又称为后进先出&#xff08;LastIn First Out&#xff09;的线性表&#xff0c;简称LIFO结构。 队列&#xff1…

Postgresql JSON对象和数组查询

文章目录 一. Postgresql 9.5以下版本1.1 简单查询(缺陷&#xff1a;数组必须指定下标&#xff0c;不推荐)1.1.1 模糊查询1.1.2 等值匹配1.1.3 时间搜索1.1.4 在列表1.1.5 包含 1.2 多层级JSONArray&#xff08;推荐&#xff09;1.2.1 模糊查询1.2.2 模糊查询 NOT1.2.3 等值匹配…

微软8月系统更新引发问题:虚拟内存分页文件出现错误

微软的八月系统更新引发了一系列问题&#xff0c;其中包括“UNSUPPORTED_PROCESSOR”蓝屏错误和文件管理器故障。尽管微软已经修复了前者&#xff0c;但据国外科技媒体Windows Latest报道&#xff0c;仍有用户反馈在非微星设备上出现“fault in nonpaged area”蓝屏错误。 如果…

如何用SSH克隆GitHub项目

诸神缄默不语-个人CSDN博文目录 使用场景&#xff1a;由于不可知的网络问题&#xff0c;无法用HTTPS克隆GitHub项目。 报错fatal: unable to access https://github.com/PolarisRisingWar/llm-throught-ages.git/: GnuTLS recv error (-110): The TLS connection was non-pro…

基于阻塞队列的生产消费模型

目录 一、线程同步 1.生产消费模型&#xff08;或生产者消费者模型&#xff09; 2.认识同步 &#xff08;1&#xff09;生产消费模型中的同步 &#xff08;2&#xff09;生产者消费者模型的特点 二、条件变量 1.认识条件变量 2.条件变量的使用 3.代码改造 三、基于阻…

同旺科技USB to I2C 适配器烧写 Arduino 模块

所需设备&#xff1a; 内附链接 1、同旺科技USB to I2C 适配器 2、Arduino 模块 硬件连接&#xff1a; 用同旺科技USB to I2C 适配器连接芯片的TX、RX、GND; 打开Arduino IDE编辑工具&#xff0c; 点击“上传”按钮&#xff0c;完成程序的编译和烧录&#xff1b;

【Day-31慢就是快】代码随想录-二叉树-中序和后序遍历构造二叉树

根据一棵树的中序遍历与后序遍历构造二叉树。 注意: 你可以假设树中没有重复的元素。 思路 首先知道怎么画&#xff0c;然后写代码流程。 以 后序数组的最后一个元素为切割点&#xff0c;先切中序数组&#xff0c;根据中序数组&#xff0c;反过来再切后序数组。一层一层切下去…

探索隧道ip如何助力爬虫应用

在数据驱动的世界中&#xff0c;网络爬虫已成为获取大量信息的重要工具。然而&#xff0c;爬虫在抓取数据时可能会遇到一些挑战&#xff0c;如IP封禁、访问限制等。隧道ip&#xff08;TunnelingProxy&#xff09;作为一种强大的解决方案&#xff0c;可以帮助爬虫应用更高效地获…

信息安全基础-技术体系-加密技术

系统安全 考点分析信息安全的基础知识&#xff08;重点&#xff09;信息安全系统的组成框架信息安全技术对称加密技术非对称加密对称密钥和非对称密钥对比 考点分析 一般不超纲 信息安全的基础知识&#xff08;重点&#xff09; 五个基本要素经常考察 机密性&#xff1a;加密报…

MAC修改python3命令为py

1, 找到python3安装路径 2, vi ~/.bash_profile 3, 增加内容: alias py“/usr/bin/python3” 4, 重载source ~/.bash_profile 5,执行py

数据挖掘的学习路径

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ &#x1f434;作者&#xff1a;秋无之地 &#x1f434;简介&#xff1a;CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作&#xff0c;主要擅长领域有&#xff1a;爬虫、后端、大数据…

Redis简介

简单来说 redis 就是一个数据库&#xff0c;不过与传统数据库不同的是 redis 的数据是存在内存中的&#xff0c;所以读写速度非常快&#xff0c;因此 redis 被广泛应用于缓存方向。另外&#xff0c;redis 也经常用来做分布式锁。redis 提供了多种数据类型来支持不同的业务场景。…

LLVM 与代码混淆技术

项目源码 什么是 LLVM LLVM 计划启动于2000年&#xff0c;开始由美国 UIUC 大学的 Chris Lattner 博士主持开展&#xff0c;后来 Apple 也加入其中。最初的目的是开发一套提供中间代码和编译基础设施的虚拟系统。 LLVM 命名最早源自于底层虚拟机&#xff08;Low Level Virtu…

java 企业工程管理系统软件源码+Spring Cloud + Spring Boot +二次开发+ MybatisPlus + Redis

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部工程管…

【算法】选择排序

选择排序 选择排序代码实现代码优化 排序&#xff1a; 排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起来的操作。 稳定性&#xff1a; 假定在待排序的记录序列中&#xff0c;存在多个具有相同的关键字的记录&…

Linux修复损坏的文件系统

如何判断文件系统是否损坏 当文件系统受损时&#xff0c;将会出现一些明显的迹象。例如&#xff0c;文件或文件夹无法访问、文件大小异常、系统启动慢或无法启动等。此外&#xff0c;系统也可能发出一些错误信息&#xff0c;如"Input/output error"、"Filesyst…

正中优配:国内怎么买美股?

近年来&#xff0c;随着我国经济的发展和对全球金融市场的越来越深入的了解&#xff0c;越来越多的投资者开始重视美国股市。而想要在国内购买美国股票并不是一件简单的事情&#xff0c;本文将从多个视点进行剖析。 一、注册海外买卖账户 在国内购买美股的条件是需求注册海外买…

滚动菜单 flutter

想实现这个功能&#xff1a; 下面的代码可以实现&#xff1a; import package:flutter/material.dart;void main() > runApp(MyApp());class MyApp extends StatelessWidget {static const String _title Flutter Code Sample;overrideWidget build(BuildContext context)…

基于Python开发的飞机大战小游戏彩色版(源码+可执行程序exe文件+程序配置说明书+程序使用说明书)

一、项目简介 本项目是一套基于Python开发的飞机大战小游戏&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Python学习者。 包含&#xff1a;项目源码、项目文档等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xff0c;…