消息队列实战应用

适用场景

耗时长,非核心业务,生产者不会用到消息处理结果的情况下,可以将消息交给异步服务去缓存与消费

部署MQ服务

version: "3.0"
services:rabbitmq:container_name: rabbitmq-15672-1image: rabbitmq:3-managementports:- "15672:15672"- "5672:5672"environment:RABBITMQ_DEFAULT_USER: rootRABBITMQ_DEFAULT_PASS: root123

15672是rabbitmq server的图形化界面端口
5672是向rabbitmq server发送消息的端口

架构

exchange交换机维护生产者列表和队列列表,queue消息队列维护消费者列表
生产者只需要把消息交给交换机,由交换机决定将消息转发给哪一个队列,最后再由队列根据消费者列表,转发消息

在这里插入图片描述
类似于数据库或者容器,不同的项目或者服务独占一组交换机和队列,为了避免各组交换机和队列相互影响,采用虚拟主机进行隔离
一个管理员用户对应一个虚拟主机,每个管理员只能操作自己对应的那个虚拟主机中的交换机和队列
消息只能缓存在队列中,如果消息无法到达队列,就会出现消息丢失的问题

客户端

rabbitMQ提供了多种语言的代码客户端,这些客户端通过网络与rabbitMQ服务端进行交互

SpringAMQP

Spring封装了官方提供的API
AMQP:advanced MQ protocol
是一种有关消息队列的高级网络协议,规定了消息如何从生产者出发,经过交换机和队列,最终到达消费者的过程
换句话说,只要遵循了AMQP规范,就能用任何语言,实现消息队列的功能
在这里插入图片描述

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>3.2.4</version>
</dependency>

配置文件

RabbitMQ暴露的用于发消息的端口是5672

spring:rabbitmq:host: localhostport: 5672virtual-host: /v1username: u1password: u1

RabbitTemplate

Spring将功能封装在模板类RabbitTemplate中

生产者

convertAndSend方法实现消息发送到RabbitMQ服务器,并且缓存起来
这个方法需要指定服务端的一个消息队列,以及消息内容

String queue = "test.queue1";String message = "支付成功3";rabbitTemplate.convertAndSend(queue, message);

底层是单独开一个异步线程用来发送消息

	/*** 将指定消息发送到交换机** @param channel 单个网络连接中的一个缓存区* @param exchangeArg 目标交换机名称* @param routingKeyArg 路由键值* @param message 待发送的消息* @param mandatory 标记字段* @param correlationData 数据*/public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,boolean mandatory, @Nullable CorrelationData correlationData) {String exch = nullSafeExchange(exchangeArg);String rKey = nullSafeRoutingKey(routingKeyArg);Message messageToUse = message;MessageProperties messageProperties = messageToUse.getMessageProperties();if (mandatory) {messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);}if (this.beforePublishPostProcessors != null) {for (MessagePostProcessor processor : this.beforePublishPostProcessors) {messageToUse = processor.postProcessMessage(messageToUse, correlationData, exch, rKey);}}setupConfirm(channel, messageToUse, correlationData);if (this.userIdExpression != null && messageProperties.getUserId() == null) {String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);if (userId != null) {messageProperties.setUserId(userId);}}if (logger.isDebugEnabled()) {logger.debug("Publishing message [" + messageToUse+ "] on exchange [" + exch + "], routingKey = [" + rKey + "]");}observeTheSend(channel, messageToUse, mandatory, exch, rKey);// Check if commit neededif (isChannelLocallyTransacted(channel)) {// Transacted channel created by this template -> commit.RabbitUtils.commitIfNecessary(channel);}}

消费者

在IoC容器中注册一个监听器bean,绑定消息队列,当消息队列接收到消息时,会通知Spring,由Spring将对应的消息交给监听器处理

/**
* 消息队列监听器bean
* */
@Component
@Slf4j
public class SpringRabbitListener {@RabbitListener(queues = {"test.queue1"})public void listenOnTestQueue1(String message){log.info("接收到test.queue1队列的消息:{}",message);}}

消息分配策略

RabbitMq默认采用轮询的机制向同一队列的多个消费者分配消息,属于平均分配消息,但是,实际生产环境中,各个消费者的处理速度并不一样,那么,最优性能的做法,应该是让处理速度快的消费者处理更多的消息,而不是平均分配
需要设置预分配消息数量,消息队列默认一次性将消息全部平均分配给消费者,这样会导致消费者一次接受过多的消息而处理不过来,而且也不是一种最优的消费方式
可以限制发送给消费者的消息数量,这本质上是一种限流策略
底层是交给channel的basicQos实现限流


void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
  • 请求特定的“服务质量”设置。这些设置对服务器在要求确认之前将交付给使用者的数据量施加了限制。因此,它们提供了一种由消费者发起的流量控制手段。请注意,预取计数必须介于
    0 和 65535 之间(AMQP 0-9-1 中的无符号短行)。Params: prefetchSize –
    服务器将交付的最大内容量(以八位字节为单位),0 if unlimited prefetchCount – 服务器将传递的最大消息数,0
    if unlimited global – true 如果设置应应用于整个通道而不是每个使用者 Throws: IOException

队列

一个队列负责绑定同一类服务的消费者,执行同一个功能
一个交换机可以给多个队列发送消息,也就是可以通知多个不同类型的服务

代码创建队列

SpringAMQP提供了一套用于创建队列的Api


/**
* 创建交换机以及绑定交换机关系
* */
@Configuration
public class FanoutConfiguration {@Beanpublic FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange("v1.fanout2").build();}@Beanpublic Queue fanoutQueue3(){return QueueBuilder.durable("fanout.queue1").build();}@Beanpublic Binding fanoutBinding1(Queue queue,FanoutExchange exchange){return BindingBuilder.bind(queue).to(exchange);}}

也可以适用注解指定交换机和队列的定义信息和绑定信息

@RabbitListener(bindings = {@QueueBinding(value = @Queue(name = "direct.queue1",durable = "true"),exchange = @Exchange(name = "v1.direct",type = ExchangeTypes.DIRECT),key = {"china.#","japan.#"}),@QueueBinding(value = @Queue(name = "direct.queue2",durable = "true"),exchange = @Exchange(name = "v1.direct2",type = ExchangeTypes.DIRECT),key = {"china.#","japan.#"}),@QueueBinding(value = @Queue(name = "direct.queue3",durable = "true"),exchange = @Exchange(name = "v1.direct3",type = ExchangeTypes.DIRECT),key = {"china.#","japan.#"})})

交换机

交换机只负责将同样的消息发给多个队列,并且不会缓存消息
根据交换机转发消息的策略的不同,可以将交换机分类
RabbitMQ也提供了convertAndSend的重载方法,用于指定交换机

Fanout广播交换机

广播:交换机会将同样的消息发送给所有与它绑定的消费者

public void publish() throws Exception{String exchange = "v1.fanout";for (int i = 0; i < 50; i++) {String message = "消费者消息:"+i;rabbitTemplate.convertAndSend(exchange,"", message);Thread.sleep(20);}}

Direct定向交换机

通过进一步划分消息队列,实现消息的精准投递,划分的关键是RoutingKey和BindingKey的配对,只有RoutingKey和BindingKey匹配时,交换机才会向该队列发送消息
生产者发送的消息都带上一个RoutingKey,那么,这条消息只能发送给拥有配对的BindingKey的队列
类似于,客户端向某个端口发送消息,服务端监听某个端口

Topic交换机

与Direct交换机的投递机制相同,只不过,Topic交换机的key可以是以点分隔的多个单词,并且,支持通配符匹配
*匹配一个单词
#匹配0个或多个单词

序列化和反序列化

消息通过网络传输,以二进制数据形式传输,所以,需要先将消息序列化成字节数组
RabbitMQ客户端默认先将消息转成message对象,再进行序列化,适用默认的SimpleMessageConverter

	/*** 根据提供的消息对象创建message对象*/@Overrideprotected Message createMessage(Object object, MessageProperties messageProperties)throws MessageConversionException {byte[] bytes = null;if (object instanceof byte[]) {bytes = (byte[]) object;messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);}else if (object instanceof String) {try {bytes = ((String) object).getBytes(this.defaultCharset);}catch (UnsupportedEncodingException e) {throw new MessageConversionException("failed to convert to Message content", e);}messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);messageProperties.setContentEncoding(this.defaultCharset);}else if (object instanceof Serializable) {try {bytes = SerializationUtils.serialize(object);}catch (IllegalArgumentException e) {throw new MessageConversionException("failed to convert to serialized Message content", e);}messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);}if (bytes != null) {messageProperties.setContentLength(bytes.length);return new Message(bytes, messageProperties);}throw new IllegalArgumentException(getClass().getSimpleName()+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());}

核心序列化代码:

new ObjectOutputStream(stream).writeObject(object);

可以看出rabbitMQ默认使用jdk的序列化流将对象转化成二进制数据,底层实际是通过字节码进行计算得到一串二进制数
jdk序列化缺点太多,在很多业务中都不适用,缺点有以下:
篡改序列化后的数据,将导致无法通过反序列化还原数据
序列化后的数据体积会增大很多倍
序列化后的数据可读性差

最优的解决方案是选择将对象序列化成Json字符串
通过配置类,来替换rabbitMQ默认的序列化器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/330057.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于新配置的adb,设备管理器找不到此设备问题

上面页面中一开始没有找到此android设备&#xff0c; 可能是因为我重新配置的adb和设备驱动&#xff0c; 只把adb配置了环境变量&#xff0c;驱动没有更新到电脑中&#xff0c; 点击添加驱动&#xff0c; 选择路径&#xff0c;我安装时都放在了SDK下面&#xff0c;可以尝试…

Python操作MySQL

文章导读 阅读本文需要一定的Python基础和MySQL基础&#xff0c;如果阅读过程中感到吃力&#xff0c;可以阅读我的Python入门篇学习记录和MySQL学习记录填补知识漏洞&#xff0c;本文使用VS Code操作pymysql驱动&#xff0c;使用navicat查看数据库&#xff0c;实操偏多&#xf…

Parasoft C++Test软件静态分析操作指南_软件质量度量

系列文章目录 Parasoft CTest软件安装指南 Parasoft CTest软件静态分析操作指南_编码规范/标准检查 Parasoft CTest软件静态分析操作指南_软件质量度量 Parasoft CTest软件静态分析_自动提取静态分析数据生成文档 Parasoft CTest软件单元测试_操作指南 Parasoft CTest软件单元…

8个实用网站和软件,收藏起来一定不后悔~

整理了8个日常生活中经常能用得到的网站和软件&#xff0c;收藏起来一定不会后悔~ 1.ZLibrary zh.zlibrary-be.se/这个网站收录了超千万的书籍和文章资源&#xff0c;国内外的各种电子书资源都可以在这里搜索&#xff0c;98%以上都可以在网站内找到&#xff0c;并且支持免费下…

错误0xc0000022的3种解决方法

程序无法正常启动&#xff0c;报错代码为0xc0000022。当你的电脑运行程序出现这种情形&#xff0c;多半是由于系统的权限问题引起的。 原因一&#xff1a;应用程序的访问权限不足 有时候&#xff0c;直接打开文件时会遇到“0xc0000022” 错误&#xff0c;但是右键“以管理员身份…

力扣刷题---409. 最长回文串【简单】

题目描述 给定一个包含大写字母和小写字母的字符串 s &#xff0c;返回 通过这些字母构造成的 最长的回文串 。 在构造过程中&#xff0c;请注意 区分大小写 。比如 “Aa” 不能当做一个回文字符串。 示例 1: 输入:s “abccccdd” 输出:7 解释: 我们可以构造的最长的回文串…

抖店一件代发,从0到1操作全流程

我是王路飞。 先说明一点&#xff0c;新手不需要纠结抖店一件代发&#xff08;即无货源模式&#xff09;还能不能做的问题。 无货源只是前期帮助新手阶段的你进入到这个市场里来的一种方式&#xff0c;不是你长期做店的思路。 入门之后&#xff0c;基本就转型为有货源去玩了…

光照模型技术在AI去衣中的重要作用

引言&#xff1a; 在数字图像处理和计算机视觉领域&#xff0c;AI去衣技术正逐渐成为研究和应用的热点。这项技术依赖于人工智能算法&#xff0c;尤其是深度学习模型&#xff0c;来识别和处理图像或视频中的衣物。在这个过程中&#xff0c;光照模型技术扮演着至关重要的角色。本…

【话题】你眼中的IT行业现状与未来趋势

大家好&#xff0c;我是全栈小5&#xff0c;欢迎阅读小5的系列文章&#xff0c;这是《话题》系列文章 目录 引言一、IT行业的现状1.1 云计算与大数据1.2 人工智能与机器学习1.3 物联网与5G通信1.4 区块链技术 二、IT行业未来发展趋势2.1 边缘计算与智能设备2.2 深度学习与自然语…

K8S中YAML案例

目录 案例&#xff1a;自主式创建service并关联上面的pod 案例&#xff1a;部署redis 案例&#xff1a;部署myapp 案例&#xff1a;部署MySQL数据库 总结 1.K8S集群中访问流向 K8S集群外部&#xff1a;客户端——nodeIP&#xff1a;nodeport——通过target port——podIP…

数据可视化第十天(爬虫爬取某瓣星际穿越电影评论,并且用词云图找出关键词)

开头提醒 本次爬取的是用户评论&#xff0c;只供学习使用&#xff0c;不会进行数据的传播。希望大家合法利用爬虫。 获得数据 #总程序 import requests from fake_useragent import UserAgent import timefuUserAgent()headers{User-Agent:fu.random }page_listrange(0,10) …

概率论统计——大数定律

大数定律 弱大数定律&#xff08;辛钦大数定律&#xff09; 利用切比雪夫不等式&#xff0c;证明弱大数定律 应用 伯努利大数定理&#xff0c;&#xff08;辛钦大数定理的推论&#xff09; 证明伯努利大数定理 注意&#xff1a;这里将二项分布转化成0,1分布来表示&#xff0c;…

【C++】牛客——美团 奇数位丢弃

✨题目链接&#xff1a; MT8 奇数位丢弃 ✨题目描述 对于一个由 0..n 的所有数按升序组成的序列&#xff0c;我们要进行一些筛选&#xff0c;每次我们丢弃去当前所有数字中第奇数位个的数。重复这一过程直到最后剩下一个数。请求出最后剩下的数字。 数据范围&#xff1a; 1≤…

Softing工业推出新品edgeGate:一款用于工业边缘和云应用的硬件网关

2024年4月17日&#xff08;哈尔&#xff09;&#xff0c;Softing工业自动化在2024年汉诺威工业博览会上首次展示了新品edgeGate。该产品是一个无需维护的硬件物联网网关解决方案&#xff0c;可将生产数据从PLC和数控机床控制器传输至工业边缘及物联网云平台。 &#xff08;edge…

Docker简单使用

1.简单认识 软件的打包技术&#xff0c;就是将打乱的多个文件打包为一个整体&#xff0c;比如想使用nginx&#xff0c;需要先有一台linux的虚拟机&#xff0c;然后在虚拟机上安装nginx.比如虚拟机大小1G&#xff0c;nginx100M。当有了docker后我们可以下载nginx 的镜像文件&am…

四信云-设备维保管理系统上线,实现设备全生命周期管理

在当今的制造业中&#xff0c;设备是企业生产的核心要素&#xff0c;是企业竞争力的基石。 随着企业发展规模不断扩大&#xff0c;设备数量急速增长&#xff0c;传统的手工管理方式已经无法满足企业需求&#xff0c;设备管理系统的出现则填补了市场需求空白&#xff0c;其目标…

做OZON怎么选择物流,OZON物流Xingyuan

随着跨境电商的蓬勃发展&#xff0c;OZON作为俄罗斯领先的电商平台&#xff0c;吸引了大量中国卖家入驻。然而&#xff0c;物流作为跨境电商的关键环节&#xff0c;其选择对于卖家来说至关重要。本文将围绕“做OZON怎么选择物流”这一问题&#xff0c;深度解析OZON物流Xingyuan…

Mysql教程(0):学习框架

1、Mysql简介 MySQL 是一个开放源代码的、免费的关系型数据库管理系统。在 Web 开发领域&#xff0c;MySQL 是最流行、使用最广泛的关系数据库。MySql 分为社区版和商业版&#xff0c;社区版完全免费&#xff0c;并且几乎能满足全部的使用场景。由于 MySQL 是开源的&#xff0…

WordPress插件Disable WP REST API,可根据是否登录来禁用REST API

前面跟大家分享了代码版禁用WordPress REST API的方法&#xff08;详见『WordPress4.7以上版本如何禁用JSON REST API&#xff1f;』&#xff09;&#xff0c;不过有些站长不太敢折腾自己的网站代码&#xff0c;那么建议试试这款Disable WP REST API&#xff0c;它可以&#xf…