RabbitMQ---订阅模型-Direct

1、 订阅模型-Direct

• 有选择性的接收消息
• 在订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。
• 在路由模式中,我们将添加一个功能 - 我们将只能订阅一部分消息。
例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
• 但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
• 在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
• 消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。
在这里插入图片描述

• P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
• X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
• C1:消费者,其所在队列指定了需要routing key 为 error 的消息
• C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

1.1、生产者

此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete

public class Send {private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "商品新增了, id = 1001";// 发送消息,并且指定routing key 为:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());System.out.println(" [商品服务:] Sent '" + message + "'");channel.close();connection.close();}
}

1.2、消费者1

我们此处假设消费者1只接收两种类型的消息:更新商品和删除商品。

public class Recv {private final static String QUEUE_NAME = "direct_exchange_queue_1";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

1.3、 消费者2

我们此处假设消费者2接收所有类型的消息:新增商品,更新商品和删除商品。

public class Recv2 {private final static String QUEUE_NAME = "direct_exchange_queue_2";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

1.4、测试

我们分别发送增、删、改的RoutingKey,发现结果:

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/107127.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构(Java实现)-优先级队列(堆)

队列是一种先进先出(FIFO)的数据结构,但有些情况下,操作的数据可能带有优先级,一般出队 列在这种情况下,数据结构应该提供两个最基本的操作,一个是返回最高优先级对象,一个是添加新的对象。 这种数据结构就…

微信小程序发布迭代版本后如何提示用户强制更新新版本

在点击小程序发布的时候选择,升级选项 之前用户使用过的再打开小程序页面就会弹出升级弹窗modal

常见的移动端布局

流式布局&#xff08;百分比布局&#xff09; 使用百分比、相对单位&#xff08;如 em、rem&#xff09;等来设置元素的宽度&#xff0c;使页面元素根据视口大小的变化进行调整。这种方法可以实现基本的自适应效果&#xff0c;但可能在不同设备上显示不一致。 <!DOCTYPE ht…

日志系统——性能测试

日志系统项目已经编写完成&#xff0c;在本节完成性能测试之后就正式结束了 测试代码如下 #include "../logs/mjwlog.h" #include <vector> #include <thread>//参数&#xff1a;日志器名称&#xff0c;线程数量&#xff0c;输出日志条数&#xff0c;单…

SpringBoot+mybatis+pgsql多个数据源配置

一、配置文件 jdk环境&#xff1a;1.8 配置了双数据源springbootdruidpgsql&#xff0c;application.properties配置修改如下&#xff1a; #当前入库主数据库 spring.primary.datasource.typecom.alibaba.druid.pool.DruidDataSource spring.primary.datasource.driver-class…

1.Flink源码编译

目录 1.环境版本 1.1 jdk 1.2.maven 1.3.node 1.4.scala 2.下载flink源码 3.编译源码 4.idea打开flink源码 5.运行wordcount 1.环境版本 软件地址 链接&#xff1a;https://pan.baidu.com/s/1ZxYydR8rBfpLCcIdaOzxVg 提取码&#xff1a;12xq 1.1 jdk 1.2 maven 1.…

基于Red Hat Enterprise Linux 7操作系统的PostgresSql15的备份恢复(实践笔记)

零、前言 本文是基于阿里云ECS服务器进行的实践操作&#xff0c;操作系统版本&#xff1a;Red Hat Enterprise Linux 7 PG数据库版本&#xff1a;PostgresSql 15 PG安装方式&#xff1a;yum 由于本人新接触pg数据&#xff0c;本次也是出于好奇&#xff0c;就对pg数据库的pg_du…

MTK6833_MT6833核心板_天玑700安卓5G核心板规格性能介绍

MTK6833安卓核心板采用台积电 7nm 制程的5G SoC&#xff0c;2*Cortex-A766*Cortex-A55架构&#xff0c;搭载Android12.0操作系统&#xff0c;主频最高达2.2GHz 。内置 5G 双载波聚合技术&#xff08;2CC&#xff09;及双 5G SIM 卡功能&#xff0c;实现优异的功耗表现及实时连网…

Elasticsearch(十四)搜索---搜索匹配功能⑤--全文搜索

一、前言 不同于之前的term。terms等结构化查询&#xff0c;全文搜索首先对查询词进行分析&#xff0c;然后根据查询词的分词结果构建查询。这里所说的全文指的是文本类型数据&#xff08;text类型&#xff09;,默认的数据形式是人类的自然语言&#xff0c;如对话内容、图书名…

VictoriaLogs:一款超低占用的 ElasticSearch 替代方案

image.png 背景 前段时间我们想实现 Pulsar 消息的追踪流程&#xff0c;追踪实现的效果图如下&#xff1a; 实现其实比较简单&#xff0c;其中最重要的就是如何存储消息。 消息的读取我们是通过 Pulsar 自带的 BrokerInterceptor 实现的&#xff0c;对这个感兴趣的朋友后面会单…

用大白话来讲讲多线程的知识架构

感觉多线程的知识又多又杂&#xff0c;自从接触java&#xff0c;就在一遍一遍捋脉络和深入学习。现在将这次的学习成果展示如下。 什么是多线程&#xff1f; 操作系统运行一个程序&#xff0c;就是一个线程。同时运行多个程序&#xff0c;就是多线程。即在同一时间&#xff0…

python爬虫的js逆向入门到进阶教程文章分享汇总~持续更新

目录 一、内容介绍二 、专栏内容-持续更新1、JS逆向入门2、Js逆向进阶3、爬虫基础知识4、工具与安装5、漫星内容分享 三、星球使用四、b站up主视频推荐 一、内容介绍 二 、专栏内容-持续更新 1、JS逆向入门 2023-08-25》11.常见加密>xx音乐RSA加密 https://articles.zsxq.c…

项目进度管理(4-1)关键链法

1 关键链法产生的背景 关键链法&#xff08;Critical Chain Method&#xff0c;CCM&#xff09;起源于20世纪80年代&#xff0c;是由Eliyahu M. Goldratt在他的著作《关键链》&#xff08;"Critical Chain"&#xff09;中首次提出和阐述的。Eliyahu M. Goldratt是以…

玩转git第7章节,本地git的用户名和密码的修改

一 本地git的用户名和密码 1.1 本地用户名和密码修改 1.本地用户名修改 2.凭据管理 3.进行修改密码 1.2 代码提交操作

ChatGPT在医疗系统的应用探索动态

注意&#xff1a;本信息仅供参考&#xff0c;发布该内容旨在传递更多信息的目的&#xff0c;并不意味着赞同其观点或证实其说法。 生成式人工智能&#xff0c;如OpenAI开发的ChatGPT&#xff0c;被认为是可以颠覆医疗行业的工具。尽管该技术刚刚起步&#xff0c;但已有许多医…

接口测试总结分享(http与rpc)

接口测试是测试系统组件间接口的一种测试。接口测试主要用于检测外部系统与系统之间以及内部各个子系统之间的交互点。测试的重点是要检查数据的交换&#xff0c;传递和控制管理过程&#xff0c;以及系统间的相互逻辑依赖关系等。 一、了解一下HTTP与RPC 1. HTTP&#xff08;H…

SpringCloud超详细教程

1.认识微服务 随着互联网行业的发展&#xff0c;对服务的要求也越来越高&#xff0c;服务架构也从单体架构逐渐演变为现在流行的微服务架构。这些架构之间有怎样的差别呢&#xff1f; 1.0.学习目标 了解微服务架构的优缺点 1.1.单体架构 单体架构&#xff1a;将业务的所有…

C语言(第三十二天)

1. 递归是什么&#xff1f; 递归是学习C语言函数绕不开的一个话题&#xff0c;那什么是递归呢&#xff1f; 递归其实是一种解决问题的方法&#xff0c;在C语言中&#xff0c;递归就是函数自己调用自己。 写一个史上最简单的C语言递归代码&#xff1a; #include <stdio.h>…

二、前端监控之方案调研

前端监控体系 一个完整的前端监控体系包括了日志采集、日志上报、日志存储、日志切分&计算、数据分析、告警等流程。 对于一名前端开发工程师来说&#xff0c;也就意味着工作不再局限于前端业务的开发工作&#xff0c;需要有Nginx服务运维能力、实时/离线分析能力、Node应…

常见的时序数据库

1.概念 时序数据库全称为时间序列数据库。时间序列数据库指主要用于处理带时间标签&#xff08;按照时间的顺序变化&#xff0c;即时间序列化&#xff09;的数据&#xff0c;带时间标签的数据也称为时间序列数据。 时间序列数据主要由电力行业、化工行业、气象行业、地理信息…