RockerMQ学习

消息中间件以前常用RabbitMQ和ActiveMQ,由于业务需要,后期业务偏向大数据,现着重学习一下RocketMQ(RocketqMQ原理同ctg-mq),后续更新Kafka

一、RocketMQ特性

在这里插入图片描述

  • Kafka特性 (高性能分布式)
    吞吐量大,支持消息大量推挤,支持topic离线,支持分布式,使用ZooKeeper实现负载均衡,支持Hadoop数据并行加载

  • RocketMQ特性
    (1)Broker 服务器:Broker 服务器是RocketMQ的核心,主要功能:消息的处理(接收生产者Producer发送过来的消息(持久化),推送消息给消费者Consumer),消息的存储。NameServer 服务器:记录Producert信息、Broker信息、Consumer信息、Topic主题信息,NameServer服务器在这里作为控制中心、注册中心、路由,
    服务启动顺序,先启动NameServer再启动Broker,将Broker服务器的ip注册到NameServer服务中
    业务流程:如果生产者Producer需要推送消息至Broker服务器中,需要先去NameServer服务器中查找到对应的Broker服务器,然后生产者端Producer与Broker服务器建立连接
    (2)能够保证严格的消息顺序(顺序消费、顺序拉取)
    丰富的消息拉取模式:push模式(等待Broker推送消息,推荐使用,),pull模式(主动向Broker主动拉取消息),pull与push模式同时可以满足使用需求的情况下,建议优先使用push模式
    (3)可以多节点生产和多节点消费
    (4)消息事务机制,目前只有RocketMQ支持,Kafka和RabbitMQ不支持
    (5)亿级消息堆积
    (6)吞吐量高,但比Kafka低
    (7)消息重推、死信队列

  • RabbitMQ特性
    吞吐量比Kafka、RocketMQ低…
    在这里插入图片描述

二、RocketMQ消费模式

1.Push推模式-DefaultMQPushConsumer原理

Consumer消费者向Broker服务器发送请求,Consumer通过请求与Broker服务器保持一种长连接的形式,Broker服务器每5s检查一次是否存在消息,如果有就推送给Consumer消费者

2.Pull拉模式-DefaultMQPullConsumer原理

Consumer消费者主动去Broker服务器拉取数据,一般使用本地定时任务去拉取,由于需要保证消息的及时性,一般推荐使用Push推模式订阅消息

3. 轮询监控机制

在这里插入图片描述
RocketMQ默认将Producer生产者消息发送至4(不一定4个)个队列中进行存储,Consumer消费方通过轮询的方式去监控这个4个队列(轮询监控机制)

4.ack机制

LocalTransactionState标识消息的状态,通过判断返回的枚举值enum做出相应处理

  1. COMMIT_MESSAGE
    消息可见,目前事务消息分为提交不可见消息和可见消息
  2. ROLLBACK_MESSAGE
    消息需要回滚
  3. UNKNOW
    消息异常或超时时返回该枚举值,重复回查信息
    在这里插入图片描述

三、RocketMQ实战

(1)消息发送实现流程

  1. 引入pom依赖,目前最新版本为5.3.0,推荐使用4.4.0
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>版本</version>
</dependency>

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
  1. 创建DefualtMQProducer实例对象
  2. 设置NaemServer地址
  3. 开启DefaultMQProducer
  4. 创建消息Message
  5. 发送消息
  6. 关闭DefaultMQProducer

(2)消息消费流程实现

目前业务需要实现消费业务,着重学习消费端逻辑

  1. 创建DefaultMQPushConsumer
  2. 设置NameServer地址
  3. 设置subscribe,这里是要读取的主题信息
  4. 创建监听器MessageListener
  5. 获取消息信息
  6. 返回消息读取状态

消费者流程代码,这里使用Push推模式实现,并设置了消息拉取最大上限setConsumeMessageBatchMaxSize(2)为2条消息
监听器这么选择普通监听器MessageListenerConcurrently,如果需要实现顺序消费可以选用MessageListenerOrderly
消息消费时如果有异常出现,注意这里不要抛出异常,打印异常日志即可,直接返回消息失败枚举值 RECONSUME_LATER 触发RocketMQ消息重推机制
如果消息消费成功,只需返回枚举类enum ConsumeConcurrentlyStatus.CONSUME_SUCCESS即可表示消息推送成功

public class Consumer {public static void main(String[] args) throws MQClientException {//1. 创建DefaultMQPushConsumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");//2. 设置NameServer地址consumer.setNamesrvAddr("192.168.211.141:9876");//3. 设置subscribe,这里是要读取的主题信息consumer.subscribe("Topic_Name",//执行要消费的主题"Tags || TagsA || TagsB");//过滤规则  "*"则表示全部订阅//4. 创建监听器MessageListener//4.1 设置消息拉取最大数(上限)-最大拉取两条consumer.setConsumeMessageBatchMaxSize(2);consumer.setMessageListener(new MessageListenerConcurrently() {/*MessageListenerConcurrently 普通消息的接收MessageListenerOrderly 顺序消息的接收*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {/*** List<MessageExt> msgs 可以从Broker获取多条数据,默认是32条,可以设置上限*///5. 获取消息信息//迭代消息信息for (MessageExt msg : msgs) {//获取主题String topic = msg.getTopic();//获取标签String tags = msg.getTags();//获取信息byte[] body = msg.getBody();try {String result = new String(body, RemotingHelper.DEFAULT_CHARSET);//todo 实现业务...System.out.println("Consumer消费信息--topic: " + topic + ", tags: " + tags + ", result: "+ result);} catch (UnsupportedEncodingException e) {//throw new RuntimeException(e);//注意这里不要抛出异常,打印异常日志即可,直接返回消息失败枚举值 RECONSUME_LATER 触发重推机制e.printStackTrace();//消息消费失败,重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}//6. 返回消息读取状态//消息消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return null;}});//开启RockerMQ消费端consumer.start();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/402431.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

day34-nginx常用模块

## 0. 网络面试题 网络面试题: TCP三次握手 TCP四次挥手 DNS解析流程 OSI七层模型 抓包工具 tcpdump RAID级别区别 开机启动流程 如何实现不同的网段之间通信(路由器) ip route add 192.168.1.0 255.255.255.0 下一跳的地址或者接口 探测服务器开启了哪些端口(无法登录服务器…

渗透测试-行业术语

1.肉鸡 肉鸡用来比喻哪些可以随意被我们控制的计算机&#xff0c;可以是普通的个人电脑&#xff0c;也可以是大型服务器或者其他网络设备&#xff0c;我们可以像操作自己的电脑一样操作他们而不被发觉。 2.木马 表面上伪装成了正常程序&#xff0c;但是当程序被运行的时候&a…

配置MySQL主从,配置MySQL主主 +keeplive高可用

在大数据-Hadoop体系中 配置MySQL主主keeplive高可用 注意&#xff1a;这个是我两年前的word文档&#xff0c;可以当作参考文档有个思路参考一下&#xff0c;但是里面可能有些地方有误 另外 :关于一些企业级实战技术可以参考这篇mysql 物理备份 MySQL 全量备份 增量备份 差异…

Linux_vi vim的使用

目录 vi和vim的基本介绍 vi和vim常用的三种模式 案例演示 vim的快捷键 快捷键使用练习 vi和vim的基本介绍 linux系统内会内置vi文本编译器。vim可以简单认为是vi的增强版本。 vi和vim常用的三种模式 有正常模式&#xff0c;移动光标&#xff0c;删除字符等。插入模式可以进…

Flask 线上高并发部署方案实现

目录 1、Flask默认多线程执行 2、使用gevent.pywsgi实现 3、是用uWSGI服务器实现 1、Flask默认多线程执行 前言&#xff1a;在Flask的较早版本中&#xff0c;默认并不支持多线程模式。然而&#xff0c;从Flask 0.9版本开始&#xff0c;引入了多线程模式的支持&#xff0c;并…

【自动驾驶】ROS中的TF坐标变换(一):静态坐标变换

目录 引子ros中的右手坐标系补充&#xff1a;欧拉角及四元数理解旋转平移操作复合操作 运行坐标变换的例子坐标转换 静态坐标变换-发布坐标系信息创建功能包 静态坐标变换-订阅坐标系信息添加cpp订阅者主文件修改cmakelist文件编译报错的解决方案运行程序进行测试 引子 机器人…

【MySQL数据库】单机、集群、分布式的区别

单机、集群和分布式是计算机系统中三种不同的架构模型,它们在资源管理、任务执行和性能优化方面有显著区别。 图片来源 1. 单机(Standalone) 单机指的是单一计算机系统,即所有的计算任务和数据都在一台计算机上处理。单机系统的特点包括: 硬件限制:受限于单台机器的计…

Visual Studio 2022 无法打开源文件atlimage.h

最近在搞tcp socket 通信demo&#xff0c;网上抄了一下源码&#xff08;代码参考&#xff1a;C中的Socket编程使用协议发送图片_快速传输 照片 c-CSDN博客&#xff09;&#xff0c;还没开始编译就提示 无法打开源文件atlimage.h&#xff0c;全局搜了一下没有这个文件&#xff0…

JSON Web Token (JWT): 理解与应用

JWT&#xff08;JSON Web Token&#xff09;是一种开放标准&#xff08;RFC 7519&#xff09;&#xff0c;它定义了一种紧凑且自包含的方式&#xff0c;用于在各方之间以JSON对象的形式安全地传输信息。JWT通常用于身份验证和授权目的&#xff0c;因为它可以使用JSON对象在各方…

Unity开发抖音小游戏广告部分接入

Unity开发抖音小游戏广告部分接入 介绍环境确保开通流量主获取广告位广告部分代码测试如下总结 介绍 最近在使用Unity做抖音小游戏这块的内容&#xff0c;因为要接入广告&#xff0c;所以这里我把我接入广告的部分代码和经验分享一下。 环境确保 根据抖音官方的文档我们是先…

Linux网络编程—socket、bind

一、socket创建套接字 socket是用来创建网络通信或本地通信的套接字&#xff0c;跟文件有关&#xff1a;告诉系统&#xff0c;PCB&#xff08;进程控制块&#xff09;控制的数据应该向哪个套接字写入、或读取&#xff1b;这个套接字是在TCP/IP协议下运行的 #include <sys/t…

选择排序(附动图)

1.思路 基本思想&#xff1a; 每一次从待排序的数据元素中选出最小&#xff08;或最大&#xff09;的一个元素&#xff0c;存放在序列的起始位置&#xff0c;直到全部待排序的数据元素排完 。 1.1双向选择排序&#xff08;升序&#xff09; 头尾指针&#xff08;索引&#xf…

初识C++

一、C的由来 C的起源可以追溯到1979年&#xff0c;当时Bjarne Stroustrup(本贾尼斯特劳斯特卢普&#xff0c;这个翻译的名字不同的地方可能有差异)在贝尔实验室从事计算机科学和软件工程的研究工作。面对项目中复杂的软件开发任务&#xff0c;特别是模拟和操作系统的开发⼯作&…

涉案财物管理系统DW-S405|实现人员随身物品智能化管理

涉案财物管理系统DW-S405系统基于物联网技术规范涉案财物管理流程&#xff0c;确保涉案财物的安全性、完整性和合法性&#xff1b;可以提高办案效率&#xff0c;减少办案成本&#xff0c;实现资源共享。 财物管理 管理员可通过个人账号和指纹验证两种登录方式进入财物管理系统…

1. 数据结构——顺序表的主要操作

1. 内容 顺序表的初始化、插入、删除、按值查找、输出以及其时间复杂度的计算。 2.代码 #include<stdio.h> #include<stdlib.h> //函数结果状态代码 #define OK 1 #define OVERFLOW -2 #define ERROR 0 #define MAXSIZE 100typedef int ElemType; //顺序表每个…

使用Nexus搭建Maven私服仓库

一、私服仓库简介 在Java的世界中&#xff0c;我们通常使用Maven的依赖体系来管理构件&#xff08;artifact&#xff0c;又称为二方库或三方库&#xff09;的依赖&#xff0c;Maven仓库用于存储这些构件。一般的远程仓库&#xff08;比如Maven Central&#xff09;只提供下载功…

OpenCV Python 图像处理入门

OpenCV入门 OpenCV&#xff1a;轻量、高效、开源。最广泛使用的计算机视觉工具。 下面涉及图片的读取&#xff0c;RGB彩色通道&#xff0c;区域裁剪&#xff0c;绘制图形和文字&#xff0c;均值滤波&#xff0c;特征提取&#xff0c;模板匹配&#xff0c;梯度算法&#xff0c…

获奖方案|趋动科技:资源池化释放AI算力价值

“据统计&#xff0c;GPU的平均利用率不超过30%&#xff0c;会产生巨大的算力资源浪费。我们用软件定义的方式通常可以把用户GPU的利用率提升3-8倍&#xff0c;甚至可以到10倍。” 这是算力池化软件公司趋动科技援引行业报告数据并结合自身企业最佳实践经验给出的最新数据。通…

在 SOCKS 和 HTTP 代理之间如何选择?

在 SOCKS 和 HTTP 代理之间进行选择需要彻底了解每种代理的工作原理以及它们传达的配置。只有这样&#xff0c;您才能轻松地在不同类型的代理之间进行选择。 本文概述了 HTTP 和 SOCKS 代理是什么、它们如何运作以及它们各自带来的好处。此外&#xff0c;我们将比较这两种代理类…

Java算法解析一:二分算法及其衍生出来的问题

这个算法的前提是&#xff0c;数组是升序排列的 算法描述&#xff1a; i和j是指针可以表示查找范围 m为中间值 当目标值targat比m大时&#xff0c;设置查找范围在m右边&#xff1a;i m-1 当目标值targat比m小时&#xff0c;设置查找范围在m左边&#xff1a;j m1 当targat的…