rabbitmq详解

有需要的直接看狂神的视频,讲得很好

简介

RabbitMQ 是一个开源的 消息队列中间件,实现了 AMQP(Advanced Message Queuing Protocol,先进消息队列协议)。它允许 应用程序、服务、系统之间异步地传递消息,并通过消息队列来实现消息的 存储、转发、处理。

它的核心功能是解耦,异步,削峰

常用的消息队列
activemq(基于JMS协议,基本不用了)
rabbitmq(基于AMQP协议,主要是Exchange组件)
kafka(kafka协议,主要是Topic和partition组件)
rocketmq(自定义的 RPC 协议)

rabbitmq 和kafka的异同点

1.kafka消费者是拉取kafka中的数据,rabbitmq 是推送数据到消费者

rabbitMQ核心组成

组件

produce/chnanle/broker/exchange/key/bondings/queue/consumer
在这里插入图片描述
核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力),这里我理解和kafka的topic类似,都是处理和隔离不同的数据
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

Exchange(核心)

因为rabbitmq是基于AMQP协议的,所有最重要的第一点就是Excahnge(交换机)
这里有不同种类的交换机来处理不同的业务场景

fanout模式

在这里插入图片描述
Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。
这种模式的特点是不需要执行key,生产者会将生产的数据转发到该Exchange的所有queue中

direct模式

在这里插入图片描述
Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。
什么意思呢,意思是之前fanout模式不需要指定RoutingKey,而direct模式是需要指定Routingkey,对应每一个队列
例如3个queue,配置Routingkey为key1,key2,key3,然后生成的时候讲routingkey带上key1就会只向queue1发送数据

topic模式

在这里插入图片描述
Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。

意思是routingkey可以进行模糊匹配

消费者端消费模式-轮询模式

上面的的模式可以说是生成者端生成数据到queue的策略
这里模式是指多个消费者消费queue是怎么消费的

轮询模式是指多个消费者,消费一个queue,会公平的分发相同数量的数据到不同的消费者,让他们消费
简单来说就是一个消费者消费一条数据

公平分发模式

这里就是按照各自的处理速度来分发数据到不同的消费者
根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

实现(fanout举例)

配置yml文件

# 服务端口
server:port: 8080
# 配置rabbitmq服务
spring:rabbitmq:username: adminpassword: adminvirtual-host: /host: 47.104.141.27port: 5672

配置生产者

@Component
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate; //创建了rbbitmq对象// 1: 定义交换机private String exchangeName = "direct_order_exchange";// 2: 路由keyprivate String routeKey = "";public void makeOrder(Long userId, Long productId, int num) {// 1: 模拟用户下单String orderNumer = UUID.randomUUID().toString();// 2: 根据商品id productId 去查询商品的库存// int numstore = productSerivce.getProductNum(productId);// 3:判断库存是否充足// if(num >  numstore ){ return  "商品库存不足..."; }// 4: 下单逻辑// orderService.saveOrder(order);// 5: 下单成功要扣减库存// 6: 下单完成以后System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);// 发送订单信息给RabbitMQ fanoutrabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}
}

创建配置类绑定交换机队列关系

@Configuration
public class DirectRabbitConfig {//队列 起名:TestDirectQueue@Beanpublic Queue emailQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("email.fanout.queue", true);}@Beanpublic Queue smsQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("sms.fanout.queue", true);}@Beanpublic Queue weixinQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("weixin.fanout.queue", true);}//Direct交换机 起名:TestDirectExchange@Beanpublic DirectExchange fanoutOrderExchange() {//  return new DirectExchange("TestDirectExchange",true,true);return new DirectExchange("fanout_order_exchange", true, false);}//绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting@Beanpublic Binding bindingDirect1() {return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");}@Beanpublic Binding bindingDirect2() {return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");}@Beanpublic Binding bindingDirect3() {return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");}
}

消费者消费

// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。value = @Queue(value = "email.fanout.queue",autoDelete = "false"),// order.fanout 交换机的名字 必须和生产者保持一致exchange = @Exchange(value = "fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式type = ExchangeTypes.FANOUT)
))
@Component
public class EmailService {// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublic void messagerevice(String message){// 此处省略发邮件的逻辑System.out.println("email-------------->" + message);}
}

过期时间TTL、超长处理

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息队列设置TTL。目前有两种方法可以设置。

过期的时候怎么处理,可以直接抛弃或者加入死信队列

rabbitmq还可以设置queue长度,超长的也是抛出到死信队列

死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
消息被拒绝、消息过期、超出队列长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。

归纳:
简单来说就是咱们正常queue的数据由于过期,例如一个订单,长时间未支付一直存在queue中就会占用额外的内存空间,且无法得到处理,这时候就定义一个死信队列来存放过期的数据,然后这个死信队列也是正常的Exchange+queue,我们可以定义一个消费者来接受这些异常的消息,例如刚才订单超时就可以返回超时的处理

MQ解决分布式事务

分布式系统中,使用到了不同数据库,不同数据库之间事务不互通,因此需要使用分布式事务,
我们业务中应该尽量避免核心业务使用分布式事务,这样会导致响应速度慢,进行在非核心业务上使用

解决分布式事务的方法有很多例如
1.使用seata管理分布式事务
2.使用TCC方式
3.最大努力通知
一般建议使用最大努力通知

生产者可靠

当生产者发送消息到Exchange-》queue之后可能出现网络波动等因素,导致消息没有应答,获取消息根本没有发送到rabbitmq中去

方案 try-catch+冗余表
声明rabbitmq的回调函数,是否正确响应,如果没有则放入冗余表(存放为正确进入queue中的数据),然后用定时任务重新执行冗余表

消费者可靠

一般我们会基于try-catch+手动ack+死信队列来保证消息的可靠
例如当我们的消费者为正常消费到数据,报错了,这时候rabbitmq会干啥,会一直重新发送数据,导致内存崩溃
这时候就需要关闭重试机制,手动ack,然后让未能正确处理的数据进入死信队列,然后又后续的程序处理死信队列的数据

如果死信队列处理消息也失败,就像进行人工处理了

消费重复问题

幂等性解决重复消费问题

问题

消费者消费应答机制ACK改为false
使用手动应答机制否则会造成死循环

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/19276.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

moveable 一个可实现前端海报编辑器的 js 库

目录 缘由-胡扯本文实验环境通用流程1.基础移动1.1 基础代码1.1.1 data-* 解释 1.2 操作元素创建1.3 css 修饰1.4 cdn 引入1.5 js 实现元素可移动1.6 图片拖拽2.缩放3.旋转4.裁剪 懒得改文案了,海报编辑器换方案了,如果后面用别的再更。 缘由-胡扯 导火…

计算机视觉中图像的基础认知

第一章:计算机视觉中图像的基础认知 第二章:计算机视觉:卷积神经网络(CNN)基本概念(一) 第三章:计算机视觉:卷积神经网络(CNN)基本概念(二) 第四章:搭建一个经典的LeNet5神经网络 一、图像/视频的基本属性…

java八股文-mysql

1. 索引 1.1 什么是索引 索引(index)是帮助Mysql高效获取数据的数据结构(有序).提高数据的检索效率,降低数据库的IO成本(不需要全表扫描).通过索引列对数据进行排序,降低数据排序成本,降低了CPU的消耗. 1.2 mysql索引使用的B树? 1. 没有使用二叉树,最坏情况o&…

Next.js【详解】CSS 样式方案

全局样式 Global CSS 默认已创建,即 src\app\globals.css,可根据需要修改 默认在全局布局中导入 src\app\layout.tsx import "./globals.css";组件样式 CSS Modules 新建文件 src\app\test\styles.module.css .red {color: red;}导入目标页面…

彻底解决Idea控制台中文乱码问题

中文乱码我相信每一个程序员都会遇到这种问题。 但有时候我们按照网上教程去设置,确实编码好了,但是有时候按照教程来却没能达到我们的预期。 在此之前我将所有编码都设置成了UTF-8,文件编码,项目编码,尝试(最终不需要…

[实现Rpc] 客户端划分 | 框架设计 | common类的实现

目录 3. 客户端模块划分 3.1 Network模块 3.2 Protocol模块 3.3 Dispatcher模块 3.4 Requestor模块 3.5 RpcCaller模块 3.6 Publish-Subscribe模块 3.7 Registry-Discovery模块 3.8 Client模块 4. 框架设计 4.1 抽象层 4.2 具象层 4.3 业务层 ⭕4.4 整体设计框架…

Java里ArrayList和LinkedList有什么区别?

大家好,我是锋哥。今天分享关于【Java里ArrayList和LinkedList有什么区别?】面试题。希望对大家有帮助; Java里ArrayList和LinkedList有什么区别? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 ArrayList 和 LinkedL…

【Java】分布式锁Redis和Redisson

https://blog.csdn.net/weixin_44606481/article/details/134373900 https://www.bilibili.com/video/BV1nW421R7qJ Redis锁机制一般是由 setnx 命令实现,set if not exists,语法setnx key value,将key设置值为value,如果key不存在…

c++TinML转html

cTinML转html 前言解析解释转译html类定义开头html 结果这是最终效果(部分): ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/6cf6c3e3c821446a84ae542bcc2652d4.png) 前言 在python.tkinter设计标记语言(转译2-html)中提到了将Ti…

2.2 反向传播:神经网络如何“学习“?

一、神经网络就像小学生 想象一个刚学算术的小学生,老师每天布置练习题,学生根据例题尝试解题,老师批改后指出错误。神经网络的学习过程与此相似: 输入层:相当于练习题(如数字图片)输出层&…

QEMU 通过网络实现共享文件

系列文章目录 Linux内核学习 Linux 知识(1) Linux 知识(2) WSL Ubuntu QEMU 虚拟机 Linux 调试视频 PCIe 与 USB 的补充知识 vscode 使用说明 树莓派 4B 指南 设备驱动畅想 Linux内核子系统 Linux 文件系统挂载 QEMU 通过网络实现…

当时只道是寻常

晴,2025年2月16日 卸载了油管、脸书和 X 手机 app ,太浪费我时间,以后再去经营吧。 教学技能大赛材料需要在明天之内搞定——《教学实施方案》。感觉玄,同部门有经验的老师说至少花一周时间。 只能明天早点继续接着弄&#xff…

Hive之分区表

Hive之分区表 文章目录 Hive之分区表写在前面分区表分区表基本操作引入分区表创建分区表语法加载数据到分区表中查询分区表中数据增加分区删除分区查看分区表有多少分区查看分区表结构 二级分区正常的加载数据分区表和数据产生关联 动态分区开启动态分区参数设置案例实操 写在前…

【线段树模板】

介绍 这段代码看起来是一个基于树结构的数据结构,可能是线段树或者其他类似的数据结构。主要包含了构建数据结构、查询和修改等基本操作的实现函数。以下是对每个函数的简要介绍: pushup(int u): 用于计算结点u的属性。build(int u, int l, int r): 用于…

DeepSeek辅助测试测试一 -- DeepSeek加MaxKB知识库本地部署

文章目录 前言任务拆解最终目标两种技术路径对比知识库检索增强(RAG) 大语言模型 构建知识库加本地部署DeepSeek目前的问题 前言 开工已经两周啦,开始慢慢的进入工作状态了,新的一年大家一起加油吧~ 任务拆解 最终目标 训练一…

Yuque-DL:一款强大的语雀资源下载工具

语雀是一款常用的文档管理工具,但官方未提供直接下载文档的功能。为此,可以使用第三方工具下载语雀文档。以下是使用步骤: 1. 安装工具 GitHub - gxr404/yuque-dl: yuque 语雀知识库下载 安装步骤: 确保已安装 Node.js&#xff…

【Java 面试 八股文】Spring Cloud 篇

Spring Cloud 篇 1. Spring Cloud 5大组件有哪些?2. 服务注册和发现是什么意思?Spring Cloud 如何实现服务注册发现?3. 我看你之前也用过nacos,你能说下nacos与eureka的区别?4. 你们项目负载均衡如何实现的&#xff1f…

国内外网络安全政策动态(2025年1月)

▶︎ 1.国家互联网信息办公室发布《个人信息出境个人信息保护认证办法(征求意见稿)》 1月3日,国家互联网信息办公室发布《个人信息出境个人信息保护认证办法(征求意见稿)》。根据《意见稿》,个人信息出境个…

图论入门算法:拓扑排序(C++)

上文中我们了解了图的遍历(DFS/BFS), 本节我们来学习拓扑排序. 在图论中, 拓扑排序(Topological Sorting)是对一个有向无环图(Directed Acyclic Graph, DAG)的所有顶点进行排序的一种算法, 使得如果存在一条从顶点 u 到顶点 v 的有向边 (u, v) , 那么在排序后的序列中, u 一定…

Anaconda +Jupyter Notebook安装(2025最新版)

Anaconda安装(2025最新版) Anaconda简介安装1:下载anaconda安装包2: 安装anaconda3:配置环境变量4:检查是否安装成功5:更改镜像源6:更新包7:检查 Jupyter Notebook一.Jup…