Spring Boot 整合RabbitMQ

系列文章目录

第一章 Java线程池技术应用
第二章 CountDownLatch和Semaphone的应用
第三章 Spring Cloud 简介
第四章 Spring Cloud Netflix 之 Eureka
第五章 Spring Cloud Netflix 之 Ribbon
第六章 Spring Cloud 之 OpenFeign
第七章 Spring Cloud 之 GateWay
第八章 Spring Cloud Netflix 之 Hystrix
第九章 代码管理gitlab 使用
第十章 SpringCloud Alibaba 之 Nacos discovery
第十一章 SpringCloud Alibaba 之 Nacos Config
第十二章 Spring Cloud Alibaba 之 Sentinel
第十三章 JWT
第十四章 RabbitMQ应用

在这里插入图片描述


文章目录

  • 系列文章目录
    • @[TOC](文章目录)
  • 前言
  • 1、RabbitMQ概念概念
    • 1.1、生产者和消费者
    • 1.2、队列
    • 1.3、交换机、路由键、绑定
      • 1.3.1、交换机类型
  • 2、RabbitMQ运转流程
    • 2.1、生产者发送消息流程
    • 2.2、消费者接收消息的过程
    • 2.3、AMQP协议
  • 3、RabbitMQ windows安装
    • 3.1、下载
    • 3.2、安装
  • 4、Spring Boot 整合RabbitMQ
    • 4.1、在user-service添加依赖
    • 4.2、配置文件添加
    • 4.3、增加RabbitMQ配置类
    • 4.4、新增消费监听类
    • 4.5、消息生产端

前言

一般MQ用于系统解耦、削峰使用,常见于微服务、业务活动等场景。 MQ(消息队列)在微服务、业务活动等场景中的应用主要表现为系统解耦和削峰。

系统解耦

场景描述:在微服务架构中,服务与服务之间需要通信。如果采用直接调用方式,服务间会存在强依赖关系,一个服务的改动可能引发连锁反应。
MQ作用:服务间可以通过消息队列进行通信,一个服务将消息放入队列,另一个服务从队列中取出消息进行处理。这种方式下,服务间实现了解耦,降低了相互的依赖。

削峰

场景描述:在业务活动期间,由于用户请求量短时间内剧增,可能导致系统压力过大甚至崩溃。
MQ作用:通过消息队列实现请求的缓冲。在高并发场景下,系统可以将请求放入消息队列,然后异步处理这些请求,从而平滑系统的处理负载,确保系统的稳定性。

综上所述,MQ因其独特的队列属性和消息传递模式,在分布式、微服务架构中发挥着重要的作用,提高了系统的可用性和稳定性。

1、RabbitMQ概念概念

RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
在这里插入图片描述

1.1、生产者和消费者

  • Producer:生产者,就是投递消息的一方。消息一般可以包含2个部分:消息体和标签(Label)。消息的标签用来描述这条消息,比如一个交换器的名称和一个路由键。
  • Consumer:消费者,就是接受消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)
  • Broker:消息中间件的服务节点。一个RabbitMQ Broker看做一台RabbitMQ服务器
    在这里插入图片描述

1.2、队列

Queue:队列,是RabbitMQ的内部对象,用于存储消息
在这里插入图片描述
在这里插入图片描述

1.3、交换机、路由键、绑定

Exchange:交换器。生产者将消息发送到Exchange(交换器,通常也可以用大写的"X"来表示),有交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。
在这里插入图片描述
RoutingKey:路由键。生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
Binding:绑定。RabbitMQ中通过绑定将交换器与队列联合起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。
在这里插入图片描述

1.3.1、交换机类型

  • Direct Exchange:直连交换机,根据Routing Key(路由键)进行投递到不同队列。
    在这里插入图片描述
    在这里插入图片描述
  • Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。
    在这里插入图片描述
  • Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,*表示一个词。
    在这里插入图片描述
  • Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。
    在这里插入图片描述
    自学参考:https://blog.csdn.net/qq_38550836/article/details/95358353

2、RabbitMQ运转流程

2.1、生产者发送消息流程

  • 生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
  • 生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等
  • 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
  • 生产者通过路由键将交换器和队列绑定起来
  • 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
  • 相应的交换器根据接收到的路由键查找相匹配的队列。
  • 如果找到,则将从生产者发送过来的消息存入相应的队列。
  • 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  • 关闭信道
  • 关闭连接

2.2、消费者接收消息的过程

  • 消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
  • 消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
  • 等待RabbitMQ Broker回应并投递相应队列中队列的消息,消费者接收消息。
  • 消费者确认(ack)接收到的消息。
  • RabbitMQ从队列中删除相应已经被确认的消息。
  • 关闭信道
  • 关闭连接
    在这里插入图片描述无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。

2.3、AMQP协议

Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等
在这里插入图片描述

  • Broker:接收和分发消息的应用,RabbitMQ 就是 Message Broker
  • Virtual Host:虚拟 Broker,将多个单元隔离开
  • Connection:publisher / consumer 和 broker 之间的 tcp 连接
  • Channel:connection 内部建立的逻辑连接,通常每个线程创建单独的 channel
  • Routing key:路由键,用来指示消息的路由转发,相当于快递的地址
  • Exchange:交换机,相当于快递的分拨中心
  • Queue:消息队列,消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,用于 message 的分发依据

3、RabbitMQ windows安装

3.1、下载

https://github.com/erlang/otp/releases/download/OTP-25.2/otp_win64_25.2.exe
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.5/rabbitmq-server-3.11.5.exe

3.2、安装

配置环境变量
在这里插入图片描述
在这里插入图片描述

cd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.5\sbin

开启rabbitmq-plugins插件

rabbitmq-plugins enable rabbitmq_management
在这里插入图片描述

打开地址
http://127.0.0.1:15672/
在这里插入图片描述
输入用户名/密码:guest/guest

4、Spring Boot 整合RabbitMQ

4.1、在user-service添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.2、配置文件添加

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest

4.3、增加RabbitMQ配置类

package com.xxxx.user.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {/******************direct**********************//*** 创建direct队列* @return*/@Beanpublic Queue directQueue(){return new Queue("directQueue");}/*** 创建direct交换机* @return*/@Beanpublic DirectExchange directExchange(){return new DirectExchange("directExchange");}/*** 把队列和交换机绑定在一起* @param queue* @param directExchange* @return*/@Beanpublic Binding bindingDirect(@Qualifier("directQueue") Queue queue, DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("routingKey");}/******************topic**********************/@Beanpublic Queue topicQuerue1(){return new Queue("topicQuerue1");}@Beanpublic Queue topicQuerue2(){return new Queue("topicQuerue2");}@Beanpublic TopicExchange topicExchange(){return new TopicExchange("topicExchange");}@Beanpublic Binding bindingTopic1(@Qualifier("topicQuerue1") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("topic.key1");}/*** 通配符:* 表示一个词,# 表示零个或多个词* @param queue* @param topicExchange* @return*/@Beanpublic Binding bindingTopic2(@Qualifier("topicQuerue2") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");}/******************fanout**********************/@Beanpublic Queue fanoutQueue1(){return new Queue("fanoutQueue1");}@Beanpublic Queue fanoutQueue2(){return new Queue("fanoutQueue2");}@Beanpublic Queue fanoutQueue3(){return new Queue("fanoutQueue3");}@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}@Beanpublic Binding bindingFanout1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}@Beanpublic Binding bindingFanout2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}@Beanpublic Binding bindingFanout3(@Qualifier("fanoutQueue3") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}
}

4.4、新增消费监听类

package com.xxxx.user.consumer;import com.xxxx.drp.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
@RabbitListener(queues = "directQueue")
public class DataDirectReceiver {@RabbitHandlerpublic void process(String data){log.info("收到directQueue队列信息:" + data);}@RabbitHandlerpublic void process(UserInfo data){log.info("收到directQueue队列信息:" + data);}
}
package com.xxxx.user.consumer;import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
@RabbitListener(queues = {"topicQuerue1","topicQuerue2"})
public class DataFanoutReceiver {@RabbitHandlerpublic void process(String data){log.info("收到topicQuerue队列信息:" + data);}@RabbitHandlerpublic void process(UserInfo data){log.info("收到topicQuerue队列信息:" + data);}
}
package com.xxxx.user.consumer;import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
@RabbitListener(queues = {"fanoutQueue1","fanoutQueue2","fanoutQueue3"})
public class DataTopicReceiver {@RabbitHandlerpublic void process(String data){log.info("收到topicQuerue队列信息:" + data);}@RabbitHandlerpublic void process(UserInfo data){log.info("收到topicQuerue队列信息:" + data);}
}

4.5、消息生产端

package com.xxxx.user;import com.xxxx.common.entity.UserInfo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class DataSender {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendDirect(){UserInfo userInfo = new UserInfo();userInfo.setUserAccount("tiger");userInfo.setPassword("12345");this.rabbitTemplate.convertAndSend("directExchange","routingKey",userInfo);}@Testpublic void sendTopic(){this.rabbitTemplate.convertAndSend("topicExchange","topic.key2","Hello world topic");}@Testpublic void sendFanout(){this.rabbitTemplate.convertAndSend("fanoutExchange","","Hello world topic");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/181459.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Technology Strategy Pattern 学习笔记2-Creating the Strategy-World Context

Creating the Strategy-World Context 1 PESTEL 1.1 从6个方案看外部 PoliticalEconomicSocialTechnologicalEnvironmentalLegal 1.2 参考URL https://zhuanlan.zhihu.com/p/192522082https://www.docin.com/p-449396129.htmlhttps://blog.csdn.net/xiaoyw71/article/deta…

全球首款双模型AI手机METAVERTU2,为用户开发“第二大脑”

在2023年11月1日&#xff0c;英国奢侈手机品牌VERTU在香港举办了一场新品发布会&#xff0c;它推出了一款全新的AI手机称为METAVERTU2&#xff0c;这是全球首款双模型AI手机。此款手机将Web3技术与人工智能相结合&#xff0c;通过AI模型标记数据和AI Agent的方式&#xff0c;将…

sqlsugar查询数据库下的所有表,批量修改表名字

查询数据库中的所有表 using SqlSugar;namespace 批量修改数据库表名 {internal class Program{static void Main(string[] args){SqlSugarClient sqlSugarClient new SqlSugarClient(new ConnectionConfig(){ConnectionString "Data Source(localdb)\\MSSQLLocalDB;In…

VulnHub jarbas

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【python】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏…

Zabbix监控联想服务器的配置方法

简介 图片 随着科技的发展&#xff0c;对于数据的敏感和安全大部分取决于对硬件性能、故障预判的监测&#xff0c;由此可见实时监测保障硬件的安全很重要&#xff0c;从而衍生了很多对硬件的监测软件&#xff0c;Zabbix就一个不错的选择。开源 开源 开源&#xff01; zabbix是…

[极客大挑战 2019]Knife 1(两种解法)

题目环境&#xff1a; 这道题主要考察中国菜刀和中国蚁剑的使用方法 以及对PHP一句话木马的理解 咱们先了解一下PHP一句话木马&#xff0c;好吗&#xff1f; **eval($_POST["Syc"]);** **eval是PHP代码执行函数&#xff0c;**把字符串按照 PHP 代码来执行。 $_POST P…

基于单片机的超声波测距仪

收藏和点赞&#xff0c;您的关注是我创作的动力 文章目录 概要 一、本课题研究的主要内容二、超声波测距仪的整体方案2.2 超声波测距仪设计原理 三、超声波测距仪系统硬件电路的设计3.1 超声波测距仪的基本结构 四、 超声波测距仪系统的软件设计4.1 主程序软件设计仿真 五、结…

大数据之陌陌聊天数据分析案例

目录 目标需求 数据内容 基于Hive数仓实现需求开发 1.建库建表、加载数据 2.ETL数据清洗 3需求指标统计 目标需求 基于Hadoop和hive实现聊天数据统计分析&#xff0c;构建聊天数据分析报表 1.统计今日总消息量 2.统计今日每小时消息量&#xff0c;发送和接收用户数 3.…

用户态内存映射

内存映射不仅仅是物理内存和虚拟内存之间的映射&#xff0c;还包括将文件中的内容映射到虚拟内存空间。这个时候&#xff0c;访问内存空间就能够访问到文件里面的数据。而仅有物理内存和虚拟内存的映射&#xff0c;是一种特殊情况。 对于堆的申请来讲&#xff0c;mmap 是映射内…

Linux Framebuffer驱动框架、接口实现和使用

Linux 驱动-Frame Buffer代码分析 Framebufferfbmem.c部分代码分析初始化 Framebuffer 对于驱动开发人员来说&#xff0c;其实只需要针对具体的硬件平台SOC和具体的LCD&#xff08;通过焊接连接到该SOC引脚上的LCD&#xff09;来进行第一部分的寄存器编程&#xff08;红色部分&…

Git同时配置Gitee和GitHub

Git同时配置Gitee和GitHub 一、删除原先ssh密钥二、生成密钥 这里的同时配置是针对于之前配置过单个gitee或者github而言的&#xff0c;如果需要看git从安装开始的配置&#xff0c;则可以看这一篇文章 git安装配置教程 一、删除原先ssh密钥 在C盘下用户/用户名/.ssh文件下找到…

Python基础入门例程46-NP46 菜品的价格(条件语句)

最近的博文&#xff1a; Python基础入门例程45-NP45 禁止重复注册&#xff08;条件语句&#xff09;-CSDN博客 Python基础入门例程44-NP44 判断列表是否为空&#xff08;条件语句&#xff09;-CSDN博客 Python基础入门例程43-NP43 判断布尔值&#xff08;条件语句&#xff0…

045_第三代软件开发-U盘监测

第三代软件开发-U盘监测 文章目录 第三代软件开发-U盘监测项目介绍U盘监测原理解释源代码 关键字&#xff1a; Qt、 Qml、 USB、 Disk、 文件 项目介绍 欢迎来到我们的 QML & C 项目&#xff01;这个项目结合了 QML&#xff08;Qt Meta-Object Language&#xff09;和…

玩转AIGC:如何选择最佳的Prompt提示词?

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

Redis02-持久化策略

目录 RDB&#xff08;Redis DataBase Backup file&#xff09; RDB执行原理 AOF&#xff08;Append-Only File&#xff09; RDB和AOF对比 Redis支持多种持久化方式&#xff0c;以确保数据在内存中持久存储&#xff0c;以便在Redis服务器重启时数据不会丢失。Redis中持久化的…

Linux第一个小程序进度条

缓冲区 ​ 在写进度条程序之前我们需要介绍一下缓冲区&#xff0c;缓冲区有两种&#xff0c;输入和输出缓冲区&#xff0c;这里主要介绍输出缓冲区。在我们用C语言写代码时&#xff0c;输出一些信息&#xff0c;实际上是先输出到输出缓冲区里&#xff0c;然后才输出到我们的显…

Python笔记——linux/ubuntu下安装mamba,安装bob.learn库

Python笔记——linux/ubuntu下安装mamba&#xff0c;安装bob.learn库 一、安装/卸载anaconda二、安装mamba1. 命令行安装&#xff08;大坑&#xff0c;不推荐&#xff09;2. 命令行下载guihub上的安装包并安装&#xff08;推荐&#xff09;3. 网站下载安装包并安装&#xff08;…

二维码智慧门牌管理系统升级:一键报警让你的生活更安全!

文章目录 前言一、升级解决方案的特点二、实施步骤 前言 随着科技的不断进步&#xff0c;我们的生活正在逐渐变得更加智能化。可以想象一下&#xff0c;如果你家的门牌也能拥有这种智能升级&#xff0c;将会带来怎样的改变&#xff1f;今天&#xff0c;让我们一起探讨这令人兴…

数据处理中的中心化

数据处理中的中心化&#xff0c;就是将原数据减去平均值&#xff0c;得到新的数据&#xff0c;新的数据的平均值为0。 假设原数据是x&#xff08;x可以是多维的&#xff09;&#xff0c;其平均值是&#xff0c;新的数据&#xff0c;那么新数据的平均值是为0的。下面证明下&…

【云原生基础】了解云原生,什么是云原生?

&#x1f4d1;前言 本文主要讲了云原生的基本概念和原则的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x1f304;每日一句&#x…