消息队列篇--原理篇--RocketMQ(NameServer,Broker,单机上每秒处理数百万条消息性能)

1、概述

RocketMQ是阿里巴巴开源的一个分布式消息中间件,具有高吞吐量、低延迟和强一致性等特点。它特别适合大规模分布式系统的消息传递,广泛应用于电商、金融、物流等领域的实时数据处理和异步通信。

RocketMQ是用Java语言实现,在设计时参考了Kafka,并做出了自己的一些改进,消息可靠性上比Kafka更好。RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理等。

它最初是为了解决阿里巴巴双11大促期间的海量消息传递问题而设计的,后来被捐赠给Apache基金会,成为了一个广泛使用的开源项目。RocketMQ不仅是一个消息队列系统,还可以作为日志收集、实时数据处理等场景下的分布式消息总线。

2、特点

  • 高吞吐量:RocketMQ能够在单机上每秒处理数百万条消息,适用于大数据量的场景。
  • 低延迟:RocketMQ的设计目标是低延迟,能够在毫秒级别内完成消息传递。
  • 强一致性:RocketMQ支持消息的顺序传递,确保消息的顺序性和一致性。
  • 分布式架构:RocketMQ采用主从复制和分布式架构,提供高可用性和水平扩展能力。
  • 灵活的消息模型:支持P2P和Pub/Sub模式,满足不同的业务需求。
  • 丰富的生态:RocketMQ提供了多种客户端库和工具,支持多种编程语言和平台。
  • 插件机制:RocketMQ提供了插件机制,用户可以根据需要扩展其功能,例如添加自定义的序列化器、压缩算法等。
  • 持久化存储:RocketMQ使用高效的文件系统进行消息持久化,确保消息不会丢失,并且支持磁盘预读、零拷贝等优化技术。
  • 集群管理:RocketMQ支持多种集群模式,包括同步复制、异步复制、混合复制等,确保系统的高可用性和容错性。

3、架构设计

结构示例图:
在这里插入图片描述
RocketMQ的架构设计非常精巧,主要由以下几个组件组成:

(1)、NameServer

NameServer是RocketMQ的路由注册中心,负责管理和维护Broker的元数据信息(如IP地址、端口等)。生产者和消费者通过NameServer获取Broker的地址信息,从而建立连接。

特点:

  • 无状态:NameServer是无状态的,多个NameServer实例可以独立运行,互不影响。
  • 轻量级:NameServer的职责相对简单,只负责路由信息的管理和更新,不参与消息的存储和转发。
  • 高可用:可以通过部署多个NameServer实例来实现高可用性,生产者和消费者会自动选择可用的NameServer。

(2)、Broker

Broker是RocketMQ的核心组件,负责消息的存储、转发和查询。每个Broker实例可以包含多个Topic,每个Topic又可以分为多个Queue(队列),用于实现消息的分区存储和负载均衡。

Broker类型:

  • Master Broker:主Broker负责接收生产者的消息并将其持久化到磁盘,同时将消息分发给从Broker。
  • Slave Broker:从Broker通过异步或同步的方式从主Broker复制消息,确保数据的高可用性和容错性。

特点:

  • 高吞吐量:Broker使用高效的文件系统进行消息持久化,并通过内存映射、零拷贝等技术优化性能。
  • 持久化存储:Broker将消息存储在磁盘上,确保消息不会丢失。支持磁盘预读、批量写入等优化技术。
  • 负载均衡:Broker支持消息的分区存储和负载均衡,能够根据不同的策略将消息分配到不同的Queue中。
  • 主从复制:Broker支持主从复制机制,确保数据的高可用性和容错性。可以通过配置同步复制或异步复制来平衡性能和可靠性。

(3)、Producer(生产者)

Producer负责将消息发送到Broker。生产者可以选择将消息发送到指定的Topic和Queue,或者让Broker自动选择合适的Queue。

特点:

  • 负载均衡:Producer可以根据不同的策略选择合适的Broker和Queue,实现负载均衡。
  • 事务消息:RocketMQ支持事务消息,确保消息的可靠传递。生产者可以在发送消息后执行本地事务,并根据事务结果决定是否提交或回滚消息。
  • 批量发送:Producer支持批量发送消息,减少网络开销,提升性能。
  • 消息压缩:Producer可以对消息进行压缩,减少网络传输的数据量,提升性能。

(4)、Consumer(消费者)

Consumer负责从Broker拉取消息并进行处理。消费者可以选择从指定的Topic和Queue中拉取消息,或者让Broker自动分配消息。

消费模式:

  • 集群消费:多个消费者实例可以同时消费同一个Topic的消息,消息会被均匀分配给不同的消费者实例。适用于需要高吞吐量的场景。
  • 广播消费:每个消费者实例都会收到Topic中的所有消息。适用于需要每个消费者都处理所有消息的场景。

特点:

  • 负载均衡:Consumer可以根据不同的策略选择合适的Broker和Queue,实现负载均衡。
  • 消息重试:如果消费者处理消息失败,RocketMQ会自动将消息重新放入队列中,供消费者再次处理。
  • 消息顺序:RocketMQ支持消息的顺序传递,确保消息按照发送的顺序被消费。可以通过配置顺序消息队列来实现这一功能。
  • 消息过滤:Consumer可以根据消息的标签(Tag)或其他属性进行过滤,只消费符合条件的消息。

(5)、Message(消息)

结构:

  • Topic:消息的主题,用于区分不同类型的消息。一个Topic可以包含多个Queue。
  • Queue:消息的队列,用于实现消息的分区存储和负载均衡。每个Queue对应一个物理文件,消息按顺序写入文件中。
  • Tag:消息的标签,用于对消息进行分类和过滤。消费者可以根据Tag来选择消费哪些消息。
  • Key:消息的唯一标识符,用于快速查找和定位消息。
  • Body:消息的主体内容,通常是一个字节数组,可以包含任意格式的数据。
  • 属性:消息的附加属性,用于存储一些额外的元数据信息,如消息的优先级、延迟时间等。

4、工作流程

(1)、启动NameServer:首先启动NameServer,NameServer作为路由注册中心,负责管理和维护Broker的元数据信息。
(2)、启动Broker:启动Master和Slave Broker,Broker会向NameServer注册自己的地址信息。生产者和消费者通过NameServer获取Broker的地址信息,建立连接。
(3)、生产者发送消息:

  • 生产者根据Topic和Tag构建消息,并选择合适的Broker和Queue。
  • 生产者将消息发送到Broker,Broker将消息持久化到磁盘,并返回确认信息。
    (4)、消费者拉取消息:
  • 消费者根据Topic和Tag订阅消息,并选择合适的Broker和Queue。
  • 消费者定期从Broker拉取消息,并进行处理。处理完成后,消费者向Broker发送确认信息,表示消息已被成功消费。
    (5)、消息重试:如果消费者处理消息失败,RocketMQ会自动将消息重新放入队列中,供消费者再次处理。
    (6)、消息顺序:对于顺序消息,RocketMQ会确保消息按照发送的顺序被消费。消费者可以配置顺序消息队列,确保消息的顺序性。

5、关键概念

  • Topic:消息的主题,用于区分不同类型的消息。一个Topic可以包含多个Queue。
  • Queue:消息的队列,用于实现消息的分区存储和负载均衡。每个Queue对应一个物理文件,消息按顺序写入文件中。
  • Tag:消息的标签,用于对消息进行分类和过滤。消费者可以根据Tag来选择消费哪些消息。
  • Group:消费者组,用于区分不同的消费者实例。同一Group内的消费者会共享消息,不同Group的消费者可以独立消费消息。类似kafka。
  • Offset:消息的偏移量,表示消费者已经消费到的消息位置。每个消费者组都有一个独立的Offset,用于记录消费进度。
  • Message Key:消息的唯一标识符,用于快速查找和定位消息。可以通过Message Key查询特定的消息。
  • Transaction Message:事务消息,确保消息的可靠传递。生产者可以在发送消息后执行本地事务,并根据事务结果决定是否提交或回滚消息。

6、应用场景

  • 电商促销:RocketMQ适合用于电商促销活动中的订单处理、库存更新等场景,能够应对高并发和大流量,其最初设计就是为了淘宝双十一活动准备的。
  • 实时数据处理:适用于日志收集、监控数据传输等实时数据处理场景。
  • 异步通信:可以用于系统之间的解耦和异步通信,避免阻塞主线程。

7、代码示例

生产者示例:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class RocketMQProducer {public static void main(String[] args) throws Exception {// 创建生产者实例DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("localhost:9876");  // 设置NameServer地址producer.start();// 发送消息for (int i = 0; i < 10; i++) {String messageBody = "Hello, RocketMQ! " + i;// 参数(topic,标签tag,消息体)Message msg = new Message("topic-test", "tag-a", messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf(" [x] Sent message: %s, result: %s%n", messageBody, sendResult);}// 关闭生产者producer.shutdown();}
}

消费者示例:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;public class RocketMQConsumer {public static void main(String[] args) throws Exception {// 创建消费者实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876");  // 设置NameServer地址consumer.subscribe("topic-test", "");// 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf(" [x] Received message: %s, content: %s%n",new String(msg.getBody()), msg.getMsgId());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.println(" [x] Consumer started.");}
}

乘风破浪会有时,直挂云帆济沧海!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/4515.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

简述mysql 主从复制原理及其工作过程,配置一主两从并验证。

MySQL 主从同步是一种数据库复制技术&#xff0c;它通过将主服务器上的数据更改复制到一个或多个从服务器&#xff0c;实现数据的自动同步。 主从同步的核心原理是将主服务器上的二进制日志复制到从服务器&#xff0c;并在从服务器上执行这些日志中的操作。 MySQL主从同步是基…

Web前端开发技术之HTMLCSS知识点总结

学习路线 一、新闻网界面1. 代码示例2. 效果展示3. 知识点总结3.1 HTML标签和字符实体3.2 超链接、颜色描述与标题元素3.3 关于图片和视频标签&#xff1a;3.4 CSS引入方式3.5 CSS选择器优先级 二、flex布局1. 代码示例2. 效果展示3. 知识点总结3.1 span标签和flex容器的区别3.…

内存故障原因与诊断(Reasons and Diagnosis of Memory Failure)

内存故障原因与诊断 您是否曾遇到过电脑无法启动、黑屏、死机&#xff0c;或者系统卡顿的情况&#xff1f;这些问题看起来很复杂&#xff0c;实际上大多数都是内存故障引起的。内存是电脑的核心组成部分之一&#xff0c;任何小东西问题都可能导致系统死机&#xff0c;严重时甚…

vulnhub靶机(ReconForce)

一.信息收集: 使用nmap进行端口扫描,发现其开放了ftp,http,ssh服务 nmap -sS -O -sV -p- 192.168.80.142访问其80端口发现是一个网页,点击TroubleShoot后发现其需要登录 在去尝试使用ftp的匿名登录发现无法执行任何命令,发现了他的欢迎语有点特别 在扫描目录后没有发现什么有…

54,【4】BUUCTF WEB GYCTF2020Ezsqli

进入靶场 吓我一跳&#xff0c;但凡放个彭于晏我都不说啥了 提交个1看看 1 and 11 1# 还尝试了很多&#xff0c;不过都被过滤了&#xff0c;头疼 看看别人的WP 竟然要写代码去跑&#xff01;&#xff01;&#xff01;&#xff0c;不会啊&#xff0c;先用别人的代码吧&#xf…

vue2使用flv.js在浏览器打开flv格式视频

组件地址&#xff1a;GitHub - bilibili/flv.js: HTML5 FLV Player flv.js 仅支持 H.264 和 AAC/MP3 编码的 FLV 文件。如果视频文件使用了其他编码格式就打不开。 flv.vue <template><div><el-dialog :visible.sync"innerVisibleFlv" :close-on-pre…

Git原理与应用(三)【远程操作 | 理解分布式 | 推送拉取远程仓库 | 标签管理】

Git 理解分布式版本控制系统远程仓库新建远程仓库克隆远程仓库向远程仓库推送配置Git忽略特殊文件 标签管理理解标签创建标签操作标签删除标签 理解分布式版本控制系统 我们⽬前所说的所有内容&#xff08;工作区&#xff0c;暂存区&#xff0c;版本库等等&#xff09;&#x…

网络安全:信息时代的守护者

随着互联网的快速发展&#xff0c;网络安全问题日益成为全球关注的焦点。无论是个人用户、企业组织还是政府部门&#xff0c;网络安全都已成为保障信息安全、保护隐私、确保社会秩序的基石。在这个数字化时代&#xff0c;如何应对复杂多变的网络安全威胁&#xff0c;成为了我们…

BUUCTF_Web([GYCTF2020]Ezsqli)

1.输入1 &#xff0c;正常回显。 2.输入1 &#xff0c;报错false&#xff0c;为字符型注入&#xff0c;单引号闭合。 原因&#xff1a; https://mp.csdn.net/mp_blog/creation/editor/145170456 3.尝试查询字段&#xff0c;回显位置&#xff0c;数据库&#xff0c;都是这个。…

HTML知识点复习

1.src 和 href 的区别 src&#xff1a;表示对资源的引用&#xff0c; src指向的内容会嵌入到其标签里。 当浏览器解析到该元素时候&#xff0c;会暂停其他资源的下载和处理&#xff0c; 直到将该资源加载、编译、执行完毕&#xff0c;所以js脚本一般会放在页面底部 href&…

Windows11电脑总是一闪一闪的,黑一下亮一些怎么解决

Windows11电脑总是一闪一闪的&#xff0c;黑一下亮一些怎么解决 1. 打开设备管理器2. 点击显示适配器3. 更新下方两个选项的驱动3.1 更新驱动Inter(R) UHD Graphixs3.2 更新驱动NVIDIA GeForce RTX 4060 Laptop GPU 4. 其他文章快来试试吧&#x1f970; 1. 打开设备管理器 在电…

WPS计算机二级•高效操作技巧

听说这里是目录哦 斜线表头 展示项目名称&#x1f34b;‍&#x1f7e9;横排转竖排&#x1f350;批量删除表格空白行&#x1f348;方法一方法二建辅助列找空值 能量站&#x1f61a; 斜线表头 展示项目名称&#x1f34b;‍&#x1f7e9; 选中单元格&#xff0c;单击右键➡️“设…

使用Torchvision框架实现对象检测:从Faster-RCNN模型到自定义数据集,训练模型,完成目标检测任务。

引言 对象检测是一项计算机视觉中的核心任务&#xff0c;其目标是识别图像中的目标并标记它们的位置和类别。在Pytorch生态系统中&#xff0c;Torchvision提供了多种预训练的对象检测模型&#xff08;如Faster-RCNN、Mask-RCNN等&#xff09;&#xff0c;为开发者快速构建应用…

SSM课设-学生管理系统

【课设者】SSM课设-学生管理系统 技术栈: 后端: SpringSpringMVCMybatisMySQLJSP 前端: HtmlCssJavaScriptEasyUIAjax 功能: 学生端: 登陆 学生信息管理 个人信息管理 老师端: 多了教师信息管理 管理员端: 多了班级信息管理 多了年级信息管理 多了系统用户管理

C语言之装甲车库车辆动态监控辅助记录系统

&#x1f31f; 嗨&#xff0c;我是LucianaiB&#xff01; &#x1f30d; 总有人间一两风&#xff0c;填我十万八千梦。 &#x1f680; 路漫漫其修远兮&#xff0c;吾将上下而求索。 C语言之装甲车库车辆动态监控辅助记录系统 目录 一、前言 1.1 &#xff08;一&#xff09;…

【STM32-学习笔记-4-】PWM、输入捕获(PWMI)

文章目录 1、PWMPWM配置 2、输入捕获配置3、编码器 1、PWM PWM配置 配置时基单元配置输出比较单元配置输出PWM波的端口 #include "stm32f10x.h" // Device headervoid PWM_Init(void) { //**配置输出PWM波的端口**********************************…

Kinova仿生机械臂Gen3搭载BOTA 力矩传感器SeneOne:彰显机器人触觉 AI 与六维力传感的融合力量

随着工业4.0时代的到来&#xff0c;自动化和智能化成为制造业的趋势。机器人作为实现这一趋势的重要工具&#xff0c;其性能和智能水平直接影响到生产效率和产品质量。然而&#xff0c;传统的机器人系统在应对复杂任务时往往缺乏足够的灵活性和适应性。为了解决这一问题&#x…

有限元分析学习——Anasys Workbanch第一阶段笔记(13)网格单元分类、物理场与自由度概念

目录 0 序言 1 网格单元分类 2 各类单元的应用 3 massage与帮助和查看 4 物理场和自由度 4.1 各种单元自由度 4.2 结构自由度 0 序言 本章主要讲解网格单元的分类及物理场和自由度的相关概念。 1 网格单元分类 按单元的形状分类&#xff1a;实体单元、壳单元和杆梁单元…

python3GUI--仿崩坏三二次元登录页面(附下载地址) By:PyQt5

文章目录 一&#xff0e;前言二&#xff0e;预览三&#xff0e;实现方案1.实现原理1.PyQt52. 具体实现 2.UI设计1.UI组件化、模块化2.UI设计风格思路 3.项目代码结构4.使用方法3.代码分享1.支持跳转网页的QLabel组件2.三角形ICON按钮 四&#xff0e;总结 大小&#xff1a;33.3 …

Pytorch使用教程(12)-如何进行并行训练?

在使用GPU训练大模型时&#xff0c;往往会面临单卡显存不足的情况。这时&#xff0c;通过多卡并行的形式来扩大显存是一个有效的解决方案。PyTorch主要提供了两个类来实现多卡并行&#xff1a;数据并行torch.nn.DataParallel&#xff08;DP&#xff09;和模型并行torch.nn.Dist…