Java操作RabbitMQ

文章目录

  • Spring集成RabbitMQ
    • 1. AMQP&SpringAMQP
    • 2. SpringBoot集成RabbitMQ
    • 3. 模型
      • work模型
    • 4.交换机
      • Fanout交换机
      • Direct交换机
      • Topic交换机
    • 5.声明式队列和交换机
      • 基于API声明
      • 基于注解声明
    • 6.消息转换器


Spring集成RabbitMQ

1. AMQP&SpringAMQP

  • AMQP(高级消息队列协议):Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。是一种面向消息通信的协议,就像HTTP协议是一种浏览器向服务器发消息的协议。
  • SpringAMQP:Spring AMOP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象spring-rabbit是底层的默认实现。也就是说SpringAMQP只是一种思想,而spring-rabbit是其具体实现

2. SpringBoot集成RabbitMQ

在Maven依赖中引入amqp的起步依赖即可

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在Spring配置文件中配置

spring:rabbitmq:host: 127.0.0.1port: 5672# 虚拟主机virtual-host: /hhyusername: hhypassword: hhy

RabbitTemplate是Spring封装好的操作RabbitMQ的工具类

生产者

@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessage2Queue() {String queueName = "hhy.q1";String msg = "hello, mq!666";rabbitTemplate.convertAndSend(queueName, msg);}

消费者

@Component
public class MqListener {@RabbitListener(queues = "hhy.q1")public void listenSimpleQueue(String msg){System.out.println("hhy.q1的消息:【" + msg +"】");}}

3. 模型

work模型

假设消息生产者生产消息的速度非常的快,消息消费者消费消息的速度赶不上生产的速度,就会导致MQ队列中的消息越来越多,从而导致消息堆积问题,如何处理消息堆积问题?

  1. 让多个消费者绑定一个队列,加快消息处理速度
  2. 还可以在代码层面使用异步操作,比说线程池

绑定多个消费者,每个消费者的处理能力也可能不一致,而Spring默认将消息以轮询的方式发送给多个消费者,处理能力慢的消费者还是会影响处理速度,此时就可以通过添加配置prefetch让消费者只获取一条消息处理完成后再获取,进一步避免消息堆积问题

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

在这里插入图片描述

work模型就是多个消费者绑定一个队列

@Component
public class MqListener {@RabbitListener(queues = "work.q")public void workListen1(String msg){System.out.println("消费者1:work.q的消息:【" + msg +"】");}@RabbitListener(queues = "work.q")public void workListen2(String msg){System.err.println("消费者2:work.q的消息:【" + msg +"】");}
}

4.交换机

上诉实例代码中并没有使用交换机,生产者是直接将消息发送到队列中,实际这种方式是不合理的,假设多个服务都需要订阅同一条消息这种方式就无法满足需求了,那么就要引入交换机。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

Fanout交换机

Fanout交换机其实就是广播,将生产者发布的消息广播给绑定的自身的所有消息队列。发送消息流程:

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息

在这里插入图片描述

根据上诉图编写代码

// 消费者1消费队列1
@RabbitListener(queues = "fanout.q1")
public void fanoutListen1(String msg){System.out.println("消费者1:fanout.q1的消息:【" + msg +"】");
}
// 消费者2消费队列2
@RabbitListener(queues = "fanout.q2")
public void fanoutListen2(String msg){System.out.println("消费者1:fanout.q2的消息:【" + msg +"】");
}

生产者向Fanout类型交换机发送消息,前提需要创建Fanout类型的交换机

@Test
void testSendFanout() {// 交换机名称String exchangeName = "amq.fanout";String msg = "hello, fanout!";rabbitTemplate.convertAndSend(exchangeName, null, msg);
}

Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在这里插入图片描述

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

在这里插入图片描述

通过key进行绑定,如下图也就是说生产者发送消息时指定key为test两个消费者内的队列都能收到,key为java时只有dirct.q1队列能收到,key为cpp时只有dirct.q2队列能收到

在这里插入图片描述

消费者代码

@RabbitListener(queues = "direct.q1")
public void fanoutDirect1(String msg){System.out.println("消费者1:direct.q1的消息:【" + msg +"】");
}
@RabbitListener(queues = "direct.q2")
public void fanoutDirect2(String msg){System.out.println("消费者2:direct.q2的消息:【" + msg +"】");
}

生产者代码

生产者在指定消息时指定不同的key来发送消息

@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "所有队列都能收到该消息";rabbitTemplate.convertAndSend(exchangeName, "test", msg);
}
@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "只有队列direct.q1能收到消息";rabbitTemplate.convertAndSend(exchangeName, "java", msg);
}
@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "只有队列direct.q2能收到消息";rabbitTemplate.convertAndSend(exchangeName, "cpp", msg);
}

Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!也就是说Topic交换机是非常灵活的,Bindingkey支持模糊匹配。

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: china.hunan

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

假设有多个队列绑定的Bindingkey分别为:

  • china.hunan.chenzhou.weather:湖南郴州的天气
  • china.hunan.chenzhou.news:湖南郴州的新闻
  • china.zhejiang.hangzhou.weather:浙江杭州的天气
  • japan.tokyo.news:日本东京的新闻

那么使用通配符:

  • china.hunan.#:表示接受湖南的所有新闻和天气消息
  • #.news:表示接受所有新闻消息
  • china.hunan.*.news:表示接受湖南省各个市区的新闻

建立绑定关系:

在这里插入图片描述

在这里插入图片描述

代码实例

// 消费者
@RabbitListener(queues = "topic.q1")
public void topicListen1(String msg){System.out.println("消费者1:topic.q1的消息:【" + msg +"】");
}@RabbitListener(queues = "topic.q2")
public void topicListen2(String msg){System.out.println("消费者2:topic.q2的消息:【" + msg +"】");
}

生产者代码

这一条消息topic.q1topic.q2两个队列都能收到消息,因为它们和交换机绑定的关系的时候指定的KEY:

  • #.news:接受所有地方的新闻
  • china.hunan.#:接受湖南的新闻和天气

@Test
void testSendTopic() {// 交换机名称String exchangeName = "hhy.topic";String key = "china.hunan.chenzhou.news";String msg = "这是一条湖南郴州的新闻!";rabbitTemplate.convertAndSend(exchangeName, key, msg);
}

下面这条消息只有topic.q2能收到,因为topic.q2和交换机绑定时指定的KEY为china.hunan.#,接受湖南的所有天气和新闻消息

@Test
void testSendTopic() {// 交换机名称String exchangeName = "hhy.topic";String key = "china.hunan.chenzhou.weather";String msg = "郴州今天多云转晴";rabbitTemplate.convertAndSend(exchangeName, key, msg);
}

小结:

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

5.声明式队列和交换机

通过RabbitMQ提供的管理页面创建队列和交换机比较麻烦,SpringAMQP提供了对应API方便开发者来创建队列和交换机。

基于API声明

通过Spring提供的API创建fanout交换机和队列并建立绑定关系

@Configuration
public class FanoutConfiguration {/*** 声明式创建fanout交换机* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hhy.fanout");}/*** 声明式创建队列* @return*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 声明式创建绑定关系* @param fanoutQueue1* @param fanoutExchange* @return*/@Beanpublic Binding fanoutBinding3(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}}

但如果使用这种方式创建Direct交换机就会非常麻烦,因为如果要绑定时要指定多个Key就会出现很多冗余代码,每绑定一个不同的Key就需要多写一份代码

@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange directExchange(){return new DirectExchange("test.direct");}@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}@Beanpublic Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}@Beanpublic Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}@Beanpublic Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}

基于注解声明

基于@Bean的方式声明队列和交换机的方式比价麻烦,代码有点冗余,Spring还为我们提供基于注解的方式来声明。

使用注解的方式声明Direct模式的交换机和队列,通过注解声明这种创建方式更简单清爽,一个注解直接创建交换机并且绑定队列。并且对应消费者直接就可以监听队列接收消息

@Component
public class MqListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenSimpleQueue1(String msg){System.out.println("消费者1:收到了simple.queue的消息:【" + msg +"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenSimpleQueue2(String msg){System.out.println("消费者2:收到了simple.queue的消息:【" + msg +"】");}
}

6.消息转换器

前面我们生产者发送的消息都是一些字符串,当我们发送的消息是一个对象的时候就会出现问题。

@Test
void testSendObject() {String exchangeName = "test.direct";Map<String, Object> msg = new HashMap<>(2);msg.put("name", "jack");msg.put("age", 21);rabbitTemplate.convertAndSend(exchangeName, "red", msg);
}

如下图RabbitMQ中的消息队列中存储的消息,数据类型是通过JDK自带的序列化后的数据

在这里插入图片描述

而JDK自带的序列化,存在以下问题:

  • 消息体积大
  • 毫无可读性
  • 有安全漏洞,利用Java字节码反序列化能被替换恶意代码

所以使用JDK自带的序列化方式并不合适,那么我可以使用JSON的序列化方式来解决这个问题。

使用jackson就行,引入jackson依赖

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>

将消息转换器交给Spring管理

@Bean
public MessageConverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();
}

在这里插入图片描述


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/42498.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL的多表查询

我们之前在讲解SQL语句的时候&#xff0c;讲解了DQL语句&#xff0c;也就是数据查询语句&#xff0c;但是之前讲解的查询都是单表查询&#xff0c;而本章节我们要学习的则是多表查询操作&#xff0c;主要从以下几个方面进行讲解。 5.1 多表关系 项目开发中&#xff0c;在进行…

微软Copilot与向量数据库:智能化办公的技术架构与实现路径

作为大禹智库的向量数据库高级研究员王帅旭,我在向量数据库和AI应用领域深耕30余年,亲历了向量数据库从学术概念到产业核心基础设施的演进历程。今天,我将从专业视角剖析微软Copilot背后的向量数据库技术支撑,并分享如何利用Mlivus Cloud等现代向量数据库构建类似的智能办公…

AI-人工智能-实现将静态图片和视频合成为类似iPhone的Live Photo(动态照片)效果

实现将静态图片和视频合成为类似iPhone的Live Photo&#xff08;动态照片&#xff09;效果 可以使用Python结合OpenCV和图像处理库来完成 技术说明 Live Photo原理&#xff1a;iPhone的Live Photo实际上是3秒的MOV视频一张高分辨率JPEG格式选择&#xff1a; .mov是最兼容的格…

数据结构之排序

目录 排序的概念及引用 排序的概念 常见的排序算法 常见排序算法的实现 插入排序 1.直接插入排序&#xff1a; 2.希尔排序( 缩小增量排序 ) 选择排序 直接选择排序 堆排序 交换排序 冒泡排序 快速排序 1&#xff09;Hoare版 2&#xff09;挖坑法 3&#xff09;…

从“泛读”到“精读”:合合信息文档解析如何让大模型更懂复杂文档?

从“泛读”到“精读”&#xff1a;合合信息文档解析如何让大模型更懂复杂文档&#xff1f; 一、引言&#xff1a;破解文档“理解力”瓶颈二、核心功能&#xff1a;合合信息的“破局”亮点功能亮点1&#xff1a;复杂图表的高精度解析图表解析&#xff1a;为大模型装上精准“标尺…

NoSQL 数据库的适用场景与局限性分析

NoSQL(Not Only SQL)数据库是一类非关系型数据库,通过灵活的数据模型和分布式架构解决传统关系型数据库在扩展性、性能和数据多样性上的瓶颈。以下从技术特性、适用场景、不适用场景及行业实践展开分析: 一、NoSQL数据库的核心技术特性 四大数据模型 文档型:以JSON/BSON格…

Pycharm(七):几个简单案例

一.剪刀石头布 需求&#xff1a;和电脑玩剪刀石头布游戏 考察点&#xff1a;1.随机数&#xff1b;2.判断语句 import random # numrandom.randint(1,3) # print(num) # print(**30) #1.录入玩家手势 playerint(input(请输入手势&#xff1a;&#xff08;1.剪刀 2.石头 3&…

Reactive编程:什么是Reactive编程?Reactive编程思想

文章目录 **1. Reactive编程概述****1.1 什么是Reactive编程&#xff1f;****1.1.1 Reactive编程的定义****1.1.2 Reactive编程的历史****1.1.3 Reactive编程的应用场景****1.1.4 Reactive编程的优势** **1.2 Reactive编程的核心思想****1.2.1 响应式&#xff08;Reactive&…

【数学建模】动态规划算法(Dynamic Programming,简称DP)详解与应用

动态规划算法详解与应用 文章目录 动态规划算法详解与应用引言动态规划的基本概念动态规划的设计步骤经典动态规划问题1. 斐波那契数列2. 背包问题3. 最长公共子序列(LCS) 动态规划的优化技巧动态规划的应用领域总结 引言 动态规划(Dynamic Programming&#xff0c;简称DP)是一…

Linux基础之软硬链接

参考链接&#xff1a;https://baijiahao.baidu.com/s?id1770724291436944734&wfrspider&forpc 一、定义 1.硬链接&#xff08;Hard Link&#xff09; 硬链接是指多个文件名指向同一个物理文件的链接关系。它们在文件系统中具有相同的inode号&#xff08;索引节点号…

python每日十题(13)

一般把计算机完成一条指令所花费的时间称为一个指令周期。指令周期越短&#xff0c;指令执行就越快。本题答案为D选项。 顺序程序具有顺序性、封闭性和可再现性的特点&#xff0c;使得程序设计者能够控制程序执行的过程(包括执行顺序、执行时间&#xff09;&#xff0c;对程序执…

0328-内存图2

是否正确待定&#xff1a; Perso类 package com.qc.内存图2;public class Perso {public int age;public String name;public static int flag;public void m1() {}public static void m2() {}Overridepublic String toString() {return "Perso [age" age "…

Java 开发中的 AI 黑科技:如何用 AI 工具自动生成 Spring Boot 项目脚手架?

在 Java 开发领域&#xff0c;搭建 Spring Boot 项目脚手架是一项耗时且繁琐的工作。传统方式下&#xff0c;开发者需要手动配置各种依赖、编写基础代码&#xff0c;过程中稍有疏忽就可能导致配置错误&#xff0c;影响开发进度。如今&#xff0c;随着 AI 技术的迅猛发展&#x…

一文详解k8s体系架构知识

0.云原生 1.k8s概念 1. k8s集群的两种管理角色 Master&#xff1a;集群控制节点&#xff0c;负责具体命令的执行过程。master节点通常会占用一股独立的服务器&#xff08;高可用部署建议用3台服务器&#xff09;&#xff0c;是整个集群的首脑。 Master节点一组关键进程&#xf…

ubuntu下docker 安装 graylog 6.1

下载docker compose相关仓库 https://github.com/Graylog2/docker-compose 按readme所述&#xff0c;拷贝.env.example并重命名 .env 按.env中的说明创建密码和密钥 创建GRAYLOG_PASSWORD_SECRET 用: pwgen -N 1 -s 96 创建GRAYLOG_ROOT_PASSWORD_SHA2 用: echo -n yourpa…

创新驱动 智领未来丨中威电子全景展示高速公路数字化创新成果

在数字经济与新型基础设施建设深度融合的背景下&#xff0c;中国智慧交通产业正迎来前所未有的发展机遇。3月27日&#xff0c;第27届中国高速公路信息化大会暨技术产品博览会在青岛市红岛国际会议展览中心盛大开幕。作为高速公路信息化领域的创新先锋&#xff0c;中威电子&…

计算机期刊征稿 | 计算机-网络系统:物联网系统架构、物联网使能技术、物联网通信和网络协议、物联网服务和应用以及物联网的社会影响

IEEE Internet of Things Journal 学科领域&#xff1a; 计算机-网络系统 期刊类型&#xff1a; SCI/SSCI/AHCI 收录数据库&#xff1a; SCI(SCIE),EI ISSN&#xff1a; 2327-4662 中科院&#xff1a; 1区 影响因子&#xff1a; 8.2 JCR&#xff1a; Q1 IEEE Internet…

springBoot统一响应类型3.3版本

前言&#xff1a; 通过实践而发现真理&#xff0c;又通过实践而证实真理和发展真理。从感性认识而能动地发展到理性认识&#xff0c;又从理性认识而能动地指导革命实践&#xff0c;改造主观世界和客观世界。实践、认识、再实践、再认识&#xff0c;这种形式&#xff0c;循环往…

mapbox基础,加载popup弹出窗

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:mapbox 从入门到精通 文章目录 一、🍀前言1.1 ☘️mapboxgl.Map 地图对象1.2 ☘️mapboxgl.Map style属性1.3 ☘️popup 弹出窗 api1.3.1 ☘️构造函数1.…

MySQL基础语法1

目录 #1.创建和删除数据库 ​编辑#2.如果有lyt就删除,没有则创建一个新的lyt #3.切换到lyt数据库下 #4.创建数据表并设置列及其属性,name是关键词要用name包围 ​编辑 #5.删除数据表 #5.查看创建的student表 #6.向student表中添加数据,数据要与列名一一对应 #7.查询st…