SpringBoot整合RabbitMQ的快速使用教程

     

目录

一、引入依赖

二、配置rabbitmq的连接信息等

1、生产者配置

2、消费者配置 

三、设置消息转换器

四、生产者代码示例

 1、配置交换机和队列信息

2、生产消息代码

五、消费者代码示例

1、消费层代码

2、业务层代码 


        在分布式系统中,消息队列是一种重要的通信方式,它能够有效地将消息从一个应用程序传递到另一个应用程序。RabbitMQ是一款流行的开源消息队列系统,简单易用且功能强大。本文将介绍如何使用SpringBoot快速整合RabbitMQ,实现消息的发送和接收。

 

交换机: 主要负责接收生产者发送的消息,并根据特定的规则将这些消息路由到一个或多个队列中。交换机的类型有:

  •  Fanout Exchange(扇出交换机)

        Fanout交换机会将接收到的所有消息广播到它知道的所有队列中。这种类型的交换机不考虑路由键,只是简单地将消息复制到所有绑定的队列中。适用于不需要选择性地发送消息给特定队列的情况,例如,广播系统通知或有多个服务需要消费同一份数据的场景。

  • Direct Exchange(直连交换机)

       Direct交换机根据消息的路由键将消息发送到与之匹配的队列中。只有当路由键与绑定关键字完全匹配时,消息才会被路由到相应的队列。适合于精确控制消息投递的场景,如特定的服务或功能模块只关心特定类型的消息。

  • Topic Exchange(主题交换机)

       Topic交换机允许更复杂的匹配规则,通过模式匹配的方式将消息路由到一个或多个队列。路由键和绑定键都使用点分隔的字符串,可以包含特殊字符如“#”和“*”来实现模糊匹配。"*"用于匹配一个单词,而“#”则用于匹配零个或多个单词。适合于需要按内容分类消息的系统,如日志处理系统,可以根据日志等级或来源将日志消息分发到不同的队列。

  • Headers Exchange(头交换机)

        Headers交换机使用消息头的一组键值对来决定消息应该被路由到哪个队列。这种交换机允许更细粒度的路由控制,但配置和使用较为复杂。适合需要基于消息多个属性来动态决定路由的场景,例如某些高级的路由策略或复杂的事件处理系统。

队列:主要用于存储消息,实现先进先出(FIFO)的特性。

一、引入依赖

这里引入了两个依赖。一个是rabbitmq的依赖,另一个是配置json转换器所需要的依赖。生产者和消费者服务都需要引入这两个依赖。

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-xml</artifactId>
 </dependency>

二、配置rabbitmq的连接信息等

1、生产者配置

  rabbitmq:
    host: 170.40.20.16
    port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /

2、消费者配置 

   rabbitmq:
    host: 170.40.20.16
  port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #每次只能处理一个,处理完成才能获取下一个消息

三、设置消息转换器

        默认情况下Spring采用的序列化方式是JDK序列化,而JDK的序列化存在可读性性差、占用内存大、存在安全漏洞等问题。所以,这里我们一般使用Jackson的序列化代替JDk的序列化。

在生产者和消费者的启动类上加上如下代码:  

@SpringBootApplication
@EnableRabbit //开启rabbitmq的使用
public class ConsumerApp {public static void main( String[] args ) {SpringApplication.run(ConsumerApp.class, args);}//使用的是Jackson库中的Jackson2JsonMessageConverter类,代替使用jdk自带的序列化@Beanpublic MessageConverter jacksonMessageConvertor(){Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);//开启消息id的自动生成功能return jackson2JsonMessageConverter;}
}

四、生产者代码示例

 1、配置交换机和队列信息
@Configuration
public class RabbitMqConfig {private static String EXCHANGE_NAME="amq.topic";private static String QUEUE_NAME="alarm.data.topic.queue";private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";/*** 声明交换机*/@Beanpublic TopicExchange exchange(){// durable:是否持久化,默认是false// autoDelete:是否自动删除,当没有生产者或者消费者使用此交换机,该交换机会自动删除。return new TopicExchange(EXCHANGE_NAME,true,false);}/*** 声明告警队列* @return*/@Bean("alarmQueue")public Queue alarmQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。return new Queue(QUEUE_NAME,true,false,false);}/*** 声明确认告警队列* @return*/@Bean("confirmAlarmQueue")public Queue confirmAlarmQueue(){return new Queue(CONFIRM_ALARM_QUEUE_NAME,true,false,false);}/*** 声明告警队列绑定关系* @param queue* @param topicExchange* @return*/@Beanpublic Binding alarmBinding(@Qualifier("alarmQueue") Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("server.event.#");}/*** 声明确认告警队列绑定关系* @param queue* @param topicExchange* @return*/@Beanpublic Binding confirmAlarmBinding(@Qualifier("confirmAlarmQueue") Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("server.event_confirm.#");}
2、生产消息代码
    @Autowiredprivate RabbitTemplate rabbitTemplate;private static String EXCHANGE_NAME="amq.topic";private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";@Testvoid producerAlarmMsg() {String msg = "发送一条告警消息";rabbitTemplate.convertAndSend(EXCHANGE_NAME, "server.event.#",msg);System.out.println("msg = " + msg);}@Testvoid producerConfirmAlarmMsg() {String msg = "发送一条确认告警消息";rabbitTemplate.convertAndSend(CONFIRM_ALARM_QUEUE_NAME, "server.event_confirm.#",msg);System.out.println("msg = " + msg);}

五、消费者代码示例

1、消费层代码
@Component
public class AlarmConsumer {@Autowiredprivate IAlarmService alarmService;@RabbitListener(queues ="alarm.data.topic.queue",concurrency = "5")public void getAlarmInfo(String data){alarmService.dealAlarmData(data);}@RabbitListener(queues ="alarm.confirm.data.topic.queue",concurrency = "5")public void getConfirmAlarmInfo(String data){alarmService.dealConfirmAlarmData(data);}
}
2、业务层代码 
@Service
public class IAlarmServiceImpl implements IAlarmService {@Overridepublic void dealAlarmData(String data) {EquipAlarmResp equipAlarmResp= JSON.parseObject(result,EquipAlarmResp.class);List<String> alarmIdsOld = dceEquipAlarmMapper.queryAllAlarmIds();DceEquipAlarmDto dceEquipAlarmDto = CopyBeanUtils.copyProperties(equipAlarmResp, DceEquipAlarmDto.class);dceEquipAlarmDto.setCreateTime(new Date());dceEquipAlarmDto.setAlarmTime(dceEquipAlarmDto.getAlarmTime()/1000);//查询出需要新增或者更新的数据Boolean flag=alarmIdsOld.stream().filter(a->a.equals(dceEquipAlarmDto.getAlarmId())).findFirst().isPresent();//开启事务,保证新增、更新、删除的原子性TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);List<DceEquipAlarmDto> list=new ArrayList<>();list.add(dceEquipAlarmDto);try {//新增if (!flag) {dceEquipAlarmMapper.insertBatch(list);}//更新if (flag) {dceEquipAlarmMapper.updateBatch(list);}//提交事务transactionManager.commit(transaction);} catch (Exception e) {//回滚transactionManager.rollback(transaction);log.error("DynamicEnvironmentServiceImpl.getAlarmInfoByRabbitMq 新华报业动环设备告警信息更新失败!", e);}}@Overridepublic void dealConfirmAlarmData(String data) {EquipConfirmAlarmResp alarmResp = JSON.parseObject(data,EquipConfirmAlarmResp.class);Integer confirmTime = Integer.parseInt(String.valueOf(System.currentTimeMillis() / 1000));alarmResp.setConfirmTime(confirmTime);dceEquipAlarmMapper.updateConfirmAlarmBatch(alarmResp,alarmResp.getAlarmIds());}}

注:以上代码为对接告警信息和对接告警确认消息的示例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/334548.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[SWPUCTF 2022 新生赛]奇妙的MD5... ...

目录 [SWPUCTF 2022 新生赛]奇妙的MD5 [GDOUCTF 2023]受不了一点 [LitCTF 2023]作业管理系统 注入点一&#xff1a;文件上传 注入点二&#xff1a;创建文件直接写一句话木马 注入点三&#xff1a;获取数据库备份文件 [LitCTF 2023]1zjs [SWPUCTF 2022 新生赛]奇妙的MD5 …

修改了vue3 <script setup>留言板

Лунная ночь <template><button class"edit_view_checkbox"><input type"checkbox" v-model"editshowInput" value"编辑" /></button><div class"editshowInput" v-if"editshowI…

Oracle实践|内置函数之日期与时间函数

&#x1f4eb; 作者简介&#xff1a;「六月暴雪飞梨花」&#xff0c;专注于研究Java&#xff0c;就职于科技型公司后端工程师 &#x1f3c6; 近期荣誉&#xff1a;华为云云享专家、阿里云专家博主、腾讯云优秀创作者、ACDU成员 &#x1f525; 三连支持&#xff1a;欢迎 ❤️关注…

《中国科技投资》是什么级别的期刊?是正规期刊吗?能评职称吗?

问题解答&#xff1a; 问&#xff1a;《中国科技投资》期刊什么级别&#xff1f; 答&#xff1a;国家级 问&#xff1a;《中国科技投资》期刊是核心期刊吗? 答&#xff1a;不是&#xff0c;是万方维普收录的正规期刊。 主管单位&#xff1a;中国信息协会 主办单位&#…

3.1 掌握RDD的创建

在Apache Spark中&#xff0c;RDD&#xff08;Resilient Distributed Dataset&#xff09;是一个基本的、不可变的、分布式的和可分区的数据集。它能够自动进行容错处理&#xff0c;并支持在大规模集群上的并行操作。RDD之间存在依赖关系&#xff0c;可以实现管道化&#xff0c…

【全开源】民宿酒店预订管理系统(ThinkPHP+uniapp+uView)

民宿酒店预订管理系统 特色功能&#xff1a; 客户管理&#xff1a;该功能可以帮助民宿管理者更加有效地管理客户信息&#xff0c;包括客户的姓名、电话、地址、身份证号码等&#xff0c;并可以在客户的订单中了解客户的消费情况&#xff0c;从而更好地满足客户的需求&#xff…

②单细胞学习-组间及样本细胞比例分析

目录 数据读入 每个样本各细胞比例 两个组间细胞比例 亚组间细胞比例差异分析&#xff08;循环&#xff09; 单个细胞类型亚新间比例差异 ①单细胞学习-数据读取、降维和分群-CSDN博客 比较各个样本间的各类细胞比例或者亚组之间的细胞比例差异 ①数据读入 #各样本细胞…

【介绍下如何在SQL中添加数据】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

rust语言初识

程序设计实践课上水一篇ing 来源&#xff1a;rust基础入门-1.初识rust-酷程网 (kucoding.com) rust作为一名新兴语言&#xff0c;与go又有些许不同&#xff0c;因为它的目标是对标系统级开发&#xff0c;也就是C、C这两位在编程界的位置。比如我们最常用的windows系统&#x…

SpringBoot实现邮箱验证码

自行创建一个SpringBoot项目 导入SpringBoot所需要的邮箱验证码的包 <!--邮件发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId><version>2.6.1</version>…

【JAVA |图书管理系统】JAVA实现图书管理系(附完整代码)

✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天开心哦&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; &#x1f388;丠丠64-CSDN博客&#x1f388; ✨✨ 帅哥美女们&#xff0c;我们共同加油&#xff01;一起…

Java入门基础学习笔记47——ArrayList

什么是集合呢&#xff1f; 集合是一种容器&#xff0c;用来装数据的&#xff0c;类似数组。 有数组&#xff0c;为什么还要学习集合呢&#xff1f; 数组定义完成并启动后&#xff0c;长度就固定了。 而集合是大小可变&#xff0c;开发中用的最多的。 集合的特点&#xff1a;大…

改进rust代码的35种具体方法-类型(十九)-避免使用反射

上一篇文章 从其他语言来到Rust的程序员通常习惯于将反思作为工具箱中的工具。他们可能会浪费很多时间试图在Rust中实现基于反射的设计&#xff0c;却发现他们所尝试的事情只能做得不好&#xff0c;如果有的话。这个项目希望通过描述Rust在反思方面做什么和不做什么&#xff0c…

【chagpt】广泛使用API之前:考虑成本和数据隐私

文章目录 一. 定价和标记限制二. 安全和隐私 在广泛使用API之前&#xff0c;应该考虑两个重要因素&#xff1a;成本和数据隐私。 一. 定价和标记限制 OpenAI在Pricing页面上列出了模型的定价。请注意&#xff0c;OpenAI不一定及时更新该页面上的定价信息&#xff0c;因此实际…

华为OD机试【计算最接近的数】(java)(100分)

1、题目描述 给定一个数组X和正整数K&#xff0c;请找出使表达式X[i] - X[i1] … - X[i K 1]&#xff0c;结果最接近于数组中位数的下标i&#xff0c;如果有多个i满足条件&#xff0c;请返回最大的i。 其中&#xff0c;数组中位数&#xff1a;长度为N的数组&#xff0c;按照元…

Pyinstaller打包exe文件解决指南

打包命令 打包 Python 文件 输入如下格式的命令即可 默认命令 Pyinstaller 文件名.py Pyinstaller -option1 -option2 -... 要打包的文件 Pyinstaller 文件名.pyPyinstaller -option1 -option2 -... 要打包的文件 参数选项比较多&#xff0c;这里我列一个表&#xff1a;…

element ui 的el-input输入一个字后失去焦点,需重新点击输入框才能再次输入!

解决方案&#xff1a; 我是form表单嵌套表格&#xff0c;里面的el-input输入框&#xff0c;输入第一个值的时候会突然失去焦点&#xff0c;需要再次点击输入框才能正常输入&#xff0c;原因是table的key值&#xff0c;需要改成正常的index即可&#xff0c;如果你是循环的&…

讯方·智汇云校助力多所高校斩获华为ICT大赛2023-2024全球总决赛佳绩

2024年5月26日&#xff0c;华为ICT大赛2023-2024全球总决赛闭幕式暨颁奖典礼在深圳举行。讯方技术14所合作院校的18支队伍在此次大赛中荣获系列大奖。 本届大赛为华为历届最大规模的线下比赛&#xff0c;共吸引了全球80多个国家和地区、2000多所院校、17万余名学生报名参赛&am…

【前端学习笔记】HTML基础

HTML 一、HTML介绍1.HTML概念2.文档声明3.字符编码4. HTML标签5. HTML属性 二、标签1.meta标签2.语义标签3.布局标签4.列表5.超链接6.图片7.字符实体8.内联格式9.HTML 表格10.HTML 表单 三、HTML5新特性1. 本地存储2. Cookie3. 语义化标签4.多媒体元素5.表单增强6.Canvas7.SVG …

PHP开发入门

PHP官网&#xff1a;PHP: Hypertext Preprocessor apache官网&#xff1a;https://httpd.apache.org/ 一、搭建PHP环境 下载apache 进入官网点击download 选择下载windows版本文件 点击进入下载界面 点击下载64位版本文件 下载后解压文件 解压文件后进入 D:\httpd-2.4.59-24…