Spring中基于redis stream 的消息队列实现方法

     本文主要介绍了消息队列的概念性质和应用场景,介绍了kafka、rabbitMq常用消息队列中间件的应用模型及消息队列的实现方式,并实战了在Spring中基于redis stream 的消息队列实现方法。

一、消息队列

      消息队列是一种进程间通信或者同一个进程中不同线程间的通信方式,主要解决异步处理、应用耦合、流量消峰、负载均衡等问题,实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。

1、异步处理

收到订单消息后,各子系统(库存、支付、消息)可以同步进行。

2、应用解耦

收到订单消息后,各子系统(库存、支付、消息)可以不用被调用或按顺序进行,解决调用失败造成的数据错误

3、流量削峰

在应用和数据库操作之间设置消息队列,消息队列配置请求最大数(低于数据库最大并发数),避免数据库超负荷运行。

4、负载均衡

Kafka、rabbitMq等支持主从架构,在多台服务器进行同步和自动选主。

二、消息队列实现方法

1、四大类方法

内存队列:消息队列通常在内存中实现

文件系统队列:消息可以被写入到文件系统中,持久化存储消息,但需要额外的磁盘空间和I/O操作。

数据库队列:消息可以被添加到数据库的特定表中,然后由另一个进程或线程从表中读取并处理。例如redis、tdengine都可以实现

消息队列中间件:如RabbitMQ、 Kafka等

2、kafka概念

Kafka 的核心架构由以下几个主要组件组成:

  1. Producer(生产者):发送消息的一方,负责发布消息到 Kafka 主题(Topic)。
  2. Consumer(消费者):接受消息的一方,订阅主题并处理消息。
  3. Broker(代理):服务代理节点,Kafka 集群中的一台服务器就是一个 broker,可以水平无限扩展,同一个 Topic 的消息可以分布在多个 broker 中。
  4. Topic(主题):Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。
  5. Partition(分区):主题的物理分片,提高了并行处理能力。
  6. Replica(副本):副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络一场,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。
  7. ZooKeeper:管理 Kafka 集群的元数据和分布式协调。

3、rabbitMq概念

1.Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2.Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

3.Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

4.Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 5.Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

6.Connection 网络连接,比如一个TCP连接。

7.Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

8.Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

4、Redis

发布订阅、list 队列、zset 队列、Stream 队列

三、基于redis Stream的实现案例

如业务需要发送邮件和短信时,可引入消息队列,不影响业务进行。

1、依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

2、redisConfig.java连接配置

主要配置连接地址数据库

sync-stream-redis:host: 10.110.1.1password: aaaadatabase: 0port: 6379timeout: 10s# 连接超时时间lettuce:shutdown-timeout: 60spool:# 连接池中的最小空闲连接min-idle: 0# 连接池中的最大空闲连接max-idle: 8# 连接池的最大数据库连接数max-active: 8# #连接池最大阻塞等待时间(使用负值表示没有限制)max-wait: -1mssyncKey: stream_vir_name

@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport
{/*** 数据库连接配置* @return*/
@Bean(name = "syncDataRedisProperties")
@ConfigurationProperties(prefix = "spring.sync-stream-redis")
public RedisProperties syncDataRedisProperties() {return new RedisProperties();
}@Bean(name = "syncDataRedisConnectionFactory")
public RedisConnectionFactory syncDataRedisConnectionFactory(@Qualifier("syncDataRedisProperties") RedisProperties redisProperties) {RedisProperties.Sentinel sentinel = redisProperties.getSentinel();RedisConfiguration redisConfig = null;if (sentinel == null) {// redis单体模式连接配置RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration();standaloneConfig.setHostName(redisProperties.getHost());standaloneConfig.setPort(redisProperties.getPort());standaloneConfig.setDatabase(redisProperties.getDatabase());standaloneConfig.setPassword(RedisPassword.of(redisProperties.getPassword()));standaloneConfig.setDatabase(redisProperties.getDatabase());redisConfig = standaloneConfig;}// lettuce连接池配置GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();RedisProperties.Lettuce lettuce = redisProperties.getLettuce();if(lettuce.getPool() != null) {RedisProperties.Pool pool = redisProperties.getLettuce().getPool();// 连接池最大连接数poolConfig.setMaxTotal(pool.getMaxActive());// 连接池中的最大空闲连接poolConfig.setMaxIdle(pool.getMaxIdle());// 连接池中的最小空闲连接poolConfig.setMinIdle(pool.getMinIdle());// 连接池最大阻塞等待时间(使用负值表示没有限制)poolConfig.setMaxWaitMillis(pool.getMaxWait().toMillis());}LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder builder = LettucePoolingClientConfiguration.builder();// timeoutif(redisProperties.getTimeout() != null) {builder.commandTimeout(redisProperties.getTimeout());}// shutdownTimeoutif(lettuce.getShutdownTimeout() != null) {builder.shutdownTimeout(lettuce.getShutdownTimeout());}// 创建Factory对象LettuceClientConfiguration clientConfig = builder.poolConfig(poolConfig).build();return new LettuceConnectionFactory(redisConfig, clientConfig);
}}

3、RedisStreamConfig.java 监听配置及消费者注册 启动监听

主要配置监听容器设置(最大消息数-流量削峰可重点关注)和消费者组的注册,程序运行时启动监听,配置监听的topic(streamName示例中为syncKey)

主要用到以下类和方法

StreamMessageListenerContainer 、createGroup、register、createautoAcknowledge

@SuppressWarnings({"rawtypes", "unchecked"})
public class RedisStreamConfig {//  监听性质配置@Bean( name = "syncListenerContainer", initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer syncListenerContainer(@Qualifier("syncDataRedisConnectionFactory") RedisConnectionFactory factory) {StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小, 设置为0会导致CPU飙升.pollTimeout(Duration.ofSeconds(2))// 一次最多获取多少条消息.batchSize(10)// 运行 Stream 的 poll task.executor(emsThreadPoolTaskExecutor)// 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理.errorHandler(e -> {logger.error("streamMessageListenerContainer异常", e);}).build();return StreamMessageListenerContainer.create(factory, options);
}

@Bean
public Subscription syncDeviceDataSubscription(@Qualifier("syncListenerContainer") StreamMessageListenerContainer listenerContainer) {String groupName = syncKey + "ems";StreamOperations streamOperations = syncDataRedisTemplate.opsForStream();RecordId recordId = null;// 如果队列不存在,则创建队列if (Boolean.FALSE.equals(syncDataRedisTemplate.hasKey(syncKey))) {recordId = streamOperations.add(syncKey, Collections.singletonMap("_up", "up"));// 删除创建队列时的测试消息streamOperations.delete(syncKey, recordId);}// 如果分组不存在,则创建分组StreamInfo.XInfoGroups groups = streamOperations.groups(syncKey);long groupCount = groups.stream().filter(xInfoGroup -> xInfoGroup.groupName().equals(groupName)).count();if (groupCount <= 0) {streamOperations.createGroup(syncKey, groupName);}StreamMessageListenerContainer.StreamReadRequest<String> readRequest =StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(syncKey, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, "consumer_" + System.currentTimeMillis())).cancelOnError(t -> false)// 自动确认消息.autoAcknowledge(true).build();return  listenerContainer.register(readRequest, syncDataStreamListener);}}

4、消费者接受消息

主要实现接口StreamListener,并重写onMessage, 在onMessage可调用其他业务方法进行处理(如短信邮箱发送等),接受到的消息格式MapRecord,<id, map<string,data>>.

@Component
public class SyncDataStreamListener implements StreamListener<String, MapRecord<String,String,String>> {private final Logger logger = LoggerFactory.getLogger(SyncDataStreamListener.class);@Overridepublic void onMessage(MapRecord<String, String, String> message) {try {String stream = message.getStream();RecordId messageId = message.getId();Map<String, String> value = message.getValue();//   业务处理(如短信、邮箱发送)} catch (Exception e) {logger.error("处理异常", e);}}
}

5、生产者

@GetMapping("/redis/ps")
public String redisPublish(String content,Integer count){StreamOperations streamOperations = redisTemplate.opsForStream();for (int i = 0; i < count; i++) {AtomicInteger num = new AtomicInteger(i);Map msgMap = new HashMap();msgMap.put("count", i);msgMap.put("sID", num);//新增消息streamOperations.add(syncKey,msgMap);return "success";
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/415086.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uni-app 获取当前位置的经纬度以及地址信息

文章目录 uni.getLocation(objc)获取经纬度和地址调试结果问题 uni-app 获取当前位置的经纬度以及地址信息 uni.getLocation(objc) uni-app官方文档定位API: uni.getLocation(OBJECT) uni.getLocation({type: wgs84,success: function (res) {console.log(当前位置的经度&…

【系统架构设计】嵌入式系统设计(1)

【系统架构设计】嵌入式系统设计&#xff08;1&#xff09; 嵌入式系统概论嵌入式系统的组成硬件嵌入式处理器总线存储器I/O 设备与接口 软件 嵌入式开发平台与调试环境交叉平台开发环境交叉编译环境调试 嵌入式系统概论 嵌入性、专用性、计算机系统是嵌入式系统的三个基本的核…

【话题讨论】VS Code:倍增编程动力,实现效率飞跃

目录 引言 一、详情介绍 功能特点 使用场景 提高工作效率 二、效率对比 2.1 高度可定制性与丰富的插件生态 2.2 智能的代码补全与导航 2.3 内置的调试器与版本控制集成 2.4 轻量级与跨平台 2.5 选择合适工具的重要性 2.6 实际案例或数据展示 三、未来趋势 3.1 编…

能见度监测站—实时监测道路能见度情况

型号&#xff1a;TH-NJD10】能见度监测站是一种专门用于自动观测和存储气象观测数据的设备&#xff0c;它通过高科技手段实时监测大气能见度的变化&#xff0c;为多个领域提供重要的数据支持。主要基于光在大气中的衰减规律。传感器系统中的发射器发出光线&#xff0c;照射到空…

shell编程--正则表达式

正则表达式 正则表达式都被置于两个正斜杠之间&#xff1b;如/l[oO]ve/ 示例 匹配数字的脚本&#xff0c;用户输入创建账号的数量 语法&#xff1a; [[ ^[0-9]$ ]] 表示必须输入数字 #!/bin/bashwhile : do read -p "输入数字&#xff1a;" numif [[ $num ~ ^[…

产品需求过程管理重要性

产品需求过程管理重要性 背景 以下都是真实事项经历回顾&#xff0c;在产品开发过程中&#xff0c;产品经理与研发团队之间的沟通至关重要。然而&#xff0c;沟通不畅或信息缺失常常导致需求无法准确传达&#xff0c;最终影响产品的成功。以下是一些常见的问题&#xff1a; 1.需…

Jmeter执行多机联合负载

1、注意事项&#xff0c;负载机必须要安装jre&#xff0c;控制机则必须安装jdk。要配置同网段ip&#xff0c;双向关闭防火墙。 每个负载机要平均承担线程数。 具体执行事项查看上面截图所示&#xff0c;控制机和负载机配置。 2、先给负载机设置ip地址&#xff0c;保持与控制…

C++项目详细分析_WebServer

前言 项目地址 项目介绍 源码详细分析 项目路径如下&#xff1a; 1.webserver.cpp 头文件和构造函数 #include "webserver.h"WebServer::WebServer() {// http_conn类对象users new http_conn[MAX_FD];// root文件夹路径char server_path[200];getcwd(server…

prometheus基于文件的服务发现

之间讲到&#xff0c;prometheus监控的对象就来自于他的配置文件里面的targets&#xff0c;如果要新增被监控对象&#xff0c;就继续往targets里面加。 但这个缺点是&#xff0c;每次修改完后都得重启prometheus。有没有什么办法&#xff0c;能在不重启的情况下增加target呢&a…

【Qt】 QComboBox | QSpinBox

文章目录 QComboBox —— 下拉框QComboBox 属性核心方法核心信号QComboBox 使用 QSpinBox —— 微调框QSpinBox 属性核心信号QSpinBox 使用 QComboBox —— 下拉框 QComboBox 属性 QComboBox —— 表示下拉框 currentText ——当前选中的文本 currentindex ——当前选中的条…

【硬件知识】从零开始认识GPU

【硬件知识】从零开始认识GPU 一、GPU的发展史简介二、GPU主要构成三、GPU与AI的关系 一、GPU的发展史简介 GPU&#xff08;图形处理器&#xff09;的发展史是一段充满创新与变革的历程&#xff0c;它不仅改变了计算机图形显示的方式&#xff0c;还推动了高性能计算、人工智能…

盘点大模型中转 API 平台,并比较费用

1. 大模型中转 API 平台集合 1.1 DevAGI DevAGI开放平台 Open AI 价格 1.2 Deepbricks 官网价格 1.3 AiHubMix AiHubMix 官网 使用教程 价格&#xff1a; 1.4 WildCard 开卡订阅 WildCard官网 价格 有3.5% 的充值手续费&#xff0c;API 价格与 Open AI 一样 2. 价…

机器学习:opencv--图像边缘检测

目录 前言 一、图像边缘检测 1.边缘检测 2.边缘检测的方法 二、Sobel算子 1.Sobel算子 2.计算 3.代码实现 4.代码步骤解析 1.导入图片 2.处理x轴和y轴的边缘并相加 三、Scharr算子 1.Scharr算子 2.计算 3.代码实现 四、Laplacian算子 1.Lapla…

PHP 项目流水线部署与错误问题解决

在现代软件开发中&#xff0c;持续集成&#xff08;CI&#xff09;和持续部署&#xff08;CD&#xff09;已成为确保代码质量和加快发布速度的关键实践。本文将介绍如何构建一个 PHP 项目的流水线部署&#xff0c;涵盖从代码提交到生产环境的自动化流程。 #### 1. 什么是流水线…

高效能低延迟:EasyCVR平台WebRTC支持H.265在远程监控中的优势

TSINGSEE青犀视频EasyCVR视频汇聚平台在WebRTC方面确实支持H.265编码&#xff0c;尽管标准的WebRTC API在大多数浏览器中默认并不支持H.265&#xff08;也称为HEVC&#xff0c;高效视频编码&#xff09;编码。EasyCVR平台通过一系列创新的技术手段&#xff0c;实现了在WebRTC协…

深入Redis:细谈持久化

Redis的数据是保存在内存中的&#xff0c;内存里面的数据是不持久的&#xff0c;要想做到持久化&#xff0c;必须要把在内存中的数据储存到硬盘上。 Redis速度非常快&#xff0c;数据只有在内存中才有这样的速度&#xff0c;但是为了持久&#xff0c;数据还是要想办法保存到硬…

WordPress 资源展示型下载类主题 CeoMax-Pro_v7.6 开心版

WordPress 资源展示型下载类主题 CeoMax-Pro_v7.6 开心版&#xff1b; CeoMax-Pro是一款极致美观强大的WordPress付费资源下载主题&#xff0c;它能满足您所有付费资源下载的业务需求&#xff01; 你的想法与业务不能被主题所限制&#xff01;CeoMax-Pro强大的功能&#xff0…

Unity(2022.3.41LTS) - UI详细介绍- Button(按钮)TMP

目录 零.简介 一、基本功能与重要性 二、属性和设置详解 三、使用方法深入探讨 四、优化和注意事项 零.简介 在 Unity 中&#xff0c;按钮&#xff08;Button&#xff09;是用户界面中非常重要的交互元素之一。以下是对 Unity 中按钮的更详细介绍&#xff1a; 一、基本功…

使用session实现单用户多端登录限制

基本流程&#xff1a; 首先获得当前浏览器访问服务器的session&#xff0c;然后根据用户的信息&#xff08;如id等&#xff09;在redis中查找&#xff0c;如果找到&#xff0c;并且和查找对应的session不同&#xff0c;则可以判断已经有其他设备登录过了&#xff0c;这个时候就…

基于生成对抗模型GAN蒸馏的方法FAKD及其在EdgesSRGAN中的应用

文章目录 FAKD系列论文paper1: FAKD&#xff1a;用于高效图像超分辨率的特征亲和知识蒸馏&#xff08;2020&#xff09;ABSTRACT1. INTRODUCTION2. PROPOSED METHOD2.1. Feature Affinity-based Distillation (FAKD) 2.2. Overall Loss Function3. EXPERIMENTAL RESULTS3.1. Ex…