消息队列篇--原理篇--Pulsar(Namespace,BookKeeper,类似Kafka甚至更好的消息队列)

Apache Pulusar是一个分布式、多租户、高性能的发布/订阅(Pub/Sub)消息系统,最初由Yahoo开发并开源。它结合了Kafka和传统消息队列的优点,提供高吞吐量、低延迟、强一致性和可扩展的消息传递能力,适用于大规模分布式系统的实时数据处理和异步通信。
Pulsar的架构设计结合了消息队列和流处理的特点,既可以作为传统消息队列使用,也可以作为流处理平台支持实时数据处理。

主要特点:

  • 分布式架构:Pulsar采用分层架构,将消息存储与代理服务分离,提供了更好的水平扩展能力和故障隔离。
  • 多租户支持:Pulsar支持多租户部署,不同租户可以共享同一集群,同时保证资源隔离和安全性。
  • 持久化和一致性:Pulsar支持消息的持久化存储,并通过BookKeeper提供强一致性保障。
  • 灵活的消息模型:Pulsar支持多种消息传递模式,包括Pub/Sub、P2P和Key_Shared订阅模式。
  • 多语言支持:Pulsar提供了多种编程语言的客户端库,如Java、Python、Go、C++等。
  • 丰富的生态:Pulsar拥有活跃的社区和丰富的生态系统,支持与其他工具和服务集成,如Kafka Connect、Flink、Spark等。

1、核心概念

(1)、命名空间(Namespace)

命名空间是Pulsar中的一个逻辑单元,用于组织和管理主题(Topic)。每个命名空间可以包含多个主题,并且可以为不同的命名空间设置不同的配置,例如保留策略、订阅类型等。命名空间通常用于实现多租户隔离。

(2)、主题(Topic)

主题是Pulsar中的消息通道,生产者(Producer)将消息发送到主题,消费者(Consumer)从主题中消费消息。

Pulsar支持两种类型的主题:

  • 持久化主题(Persistent Topic):消息会被持久化存储,确保即使在broker故障的情况下也不会丢失。
  • 非持久化主题(Non-Persistent Topic):消息不会被持久化存储,适用于对延迟敏感但对可靠性要求较低的场景。

(3)、订阅(Subscription)

订阅是消费者与主题之间的绑定关系。Pulsar支持多种订阅类型,每种订阅类型决定了消息的分发方式:

  • 独占订阅(Exclusive Subscription):只有一个消费者可以订阅该主题,其他消费者无法订阅。
  • 共享订阅(Shared Subscription):多个消费者可以订阅同一个主题,消息会被轮询分发给不同的消费者。
  • 故障转移订阅(Failover Subscription):多个消费者可以订阅同一个主题,但只有一个是活跃的消费者,其他消费者作为备用。当活跃消费者失败时,备用消费者会接管消息消费。
  • Key_Shared 订阅:基于消息的key进行分区,确保相同key的消息总是被分发给同一个消费者。

(4)、消息(Message)

消息是Pulsar中的基本数据单位,由生产者发送到主题。

每个消息可以包含以下属性:

  • 消息体(Payload):消息的实际内容,可以是任意二进制数据。
  • 消息ID(Message ID):唯一标识每条消息的ID,用于确认消息的消费状态。
  • 属性(Properties):用户可以为消息添加自定义的键值对属性,方便后续处理。
  • 时间戳(Timestamp):消息的创建时间或发送时间。

(5)、分区(Partition)

Pulsar支持主题分区,即将一个主题划分为多个分区,每个分区可以独立地处理消息。分区可以提高主题的吞吐量和并发性,特别是在高负载场景下。Pulsar会自动将消息均匀分布到不同的分区中。

(6)、Broker

Broker是Pulsar的核心组件之一,负责接收生产者的消息并将其分发给消费者。注意,Broker不直接存储消息,而是将消息委托给BookKeeper进行持久化存储。Broker负责管理主题、订阅和消费者的连接,并处理消息的路由和分发。

(7)、BookKeeper

BookKeeper是Pulsar的持久化存储层,负责将消息持久化到磁盘。BookKeeper采用分布式日志存储机制,提供了高可用性和强一致性保障。每个消息会被写入多个BookKeeper节点,确保即使部分节点故障也不会丢失数据。

(8)、ZooKeeper

ZooKeeper是Pulsar的元数据管理组件,用于存储集群的配置信息、主题和命名空间的元数据、以及Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务,确保Pulsar集群的一致性和可靠性。

2、架构设计

Pulsar的架构设计采用了分层结构,将消息存储与代理服务分离,使得系统更加模块化和可扩展。

结构示例图:
在这里插入图片描述

Pulsar的主要组件及其作用:

  • Broker:负责接收生产者的消息并将其分发给消费者。Broker不直接存储消息,而是将消息委托给BookKeeper进行持久化存储。Broker还负责管理主题、订阅和消费者的连接。

  • BookKeeper:即上图BK Client。负责将消息持久化到磁盘,提供高可用性和强一致性保障。BookKeeper采用分布式日志存储机制,确保消息的安全性和可靠性。

  • Bookie:Bookie是BookKeeper的存储节点组成,持久化地存储消息。BookKeeper采用分布式日志存储的方式,将消息以日志的形式存储在多个Bookie节点上。这种设计确保了消息的可靠性和持久性,即使在节点故障的情况下也能保证消息不丢失。

  • ZooKeeper:负责存储集群的元数据,包括主题、命名空间、Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务,确保集群的一致性和可靠性。

  • Proxy(可选):Pulsar提供了一个可选的代理层(Proxy),允许客户端通过HTTP或WebSocket协议与Pulsar集群进行通信。Proxy可以简化客户端的连接管理,并提供跨区域访问的能力。

  • Function:Pulsar提供了一个轻量级的流处理框架(Pulsar Functions),允许用户编写简单的流处理逻辑并将其部署到Pulsar集群中。Pulsar Functions可以用于实时数据处理、事件驱动计算等场景。

  • SQL:Pulsar提供了一个SQL查询引擎(Pulsar SQL),允许用户通过SQL语句查询Pulsar中的消息数据。Pulsar SQL可以用于数据分析、监控和告警等场景。

3、特性与优势

(1)、高吞吐量和低延迟

Pulsar采用了分层架构,将消息存储与代理服务分离,使得系统能够同时具备高吞吐量和低延迟。Broker负责处理消息的路由和分发,而BookKeeper负责持久化存储,两者相互协作,确保消息的高效传递。

(2)、多租户支持

Pulsar支持多租户部署,不同租户可以共享同一集群,同时保证资源隔离和安全性。每个租户可以拥有自己的命名空间,并可以根据需要设置不同的配置,例如保留策略、订阅类型等。
即:类似Nacos的命名空间,实现配置,服务等隔离。

(3)、持久化和一致性

Pulsar支持消息的持久化存储,并通过BookKeeper提供强一致性保障。每个消息会被写入多个Bookie节点,确保即使部分节点故障也不会丢失数据。Pulsar还支持事务和幂等性,确保消息的可靠传递。

(4)、灵活的消息模型

Pulsar支持多种消息传递模式,包括Pub/Sub、P2P和Key_Shared订阅模式。用户可以根据实际需求选择合适的订阅类型,满足不同的业务场景。Pulsar还支持消息的重播、回溯和跳过等功能,方便用户进行调试和故障排查。

(5)、多语言支持

Pulsar提供了多种编程语言的客户端库,包括Java、Python、Go、C++等。用户可以根据自己的技术栈选择合适的客户端库,快速集成Pulsar到应用程序中。

(6)、丰富的生态

Pulsar拥有活跃的社区和丰富的生态系统,支持与其他工具和服务集成。例如,Pulsar可以与Kafka Connect、Flink、Spark等工具集成,实现数据的实时处理和分析。Pulsar还提供了Pulsar Functions和Pulsar SQL等功能,进一步扩展了其应用场景。

4、应用场景

(1)、实时数据处理

Pulsar的高吞吐量和低延迟特性使其非常适合用于实时数据处理场景。例如,电商网站可以使用Pulsar来处理订单、支付、库存等实时数据,确保数据的及时性和准确性。

(2)、物联网(IoT)

Pulsar的分布式架构和多租户支持使其非常适合用于物联网场景。物联网设备可以将传感器数据发送到Pulsar,Pulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能,方便用户进行历史数据分析。

(3)、微服务架构

Pulsar可以作为微服务之间的消息总线,实现服务间的异步通信。微服务可以通过Pulsar发送和接收消息,避免阻塞主线程,提高系统的响应速度和稳定性。

(4)、日志收集和监控

Pulsar可以用于日志收集和监控场景,将应用的日志数据发送到Pulsar,Pulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的持久化存储,确保日志数据不会丢失。

(5)、事件驱动架构

Pulsar支持事件驱动架构,用户可以将事件发送到Pulsar,Pulsar可以将这些事件分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能,方便用户进行事件的回放和调试。

5、代码示例

(1)、生产者示例

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.MessageId;public class PulsarProducerExample {public static void main(String[] args) throws Exception {// 1、创建Pulsar客户端try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {// 2、创建生产者try (Producer<byte[]> producer = client.newProducer().topic("persistent://public/default/example-topic")    // 指定主题.create()) {// 3、发送消息for (int i = 0; i < 10; i++) {String message = "Hello, Pulsar! " + i;MessageId msgId = producer.send(message.getBytes());    // 发送消息System.out.println(" [x] Sent message: " + message + ", msgId: " + msgId);}}}}
}

(2)、消费者示例

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;public class PulsarConsumerExample {public static void main(String[] args) throws Exception {// 1、创建Pulsar客户端try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {// 2、创建消费者try (Consumer<byte[]> consumer = client.newConsumer().topic("persistent://public/default/example-topic")   // 监听的主题.subscriptionName("example-subscription").subscriptionType(SubscriptionType.Shared).subscribe()) {// 3、接收和消费消息while (true) {       // 利用循环接收消息Message<byte[]> msg = consumer.receive();      // 具体接收消息try {System.out.println(" [x] Received message: " + new String(msg.getData()));consumer.acknowledge(msg);  // 4、确认消息已消费} catch (Exception e) {consumer.negativeAcknowledge(msg);  // 5、处理失败,重新投递}}}}}
}

6、Pulsar总结

Apache Pulsar是一个功能强大、架构灵活的消息系统,特别适合大规模分布式系统的实时数据处理和异步通信。它的分层架构、多租户支持、持久化和一致性保障、灵活的消息模型等特点,使其在性能、可靠性和可扩展性方面表现出色。Pulsar还拥有丰富的生态系统,支持与其他工具和服务集成,适用于多种应用场景。

乘风破浪会有时,直挂云帆济沧海!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/6450.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

当 Facebook 窥探隐私:用户的数字权利如何捍卫?

随着社交平台的普及&#xff0c;Facebook 已经成为全球用户日常生活的一部分。然而&#xff0c;伴随而来的隐私问题也愈发严峻。近年来&#xff0c;Facebook 频频被曝出泄露用户数据、滥用个人信息等事件&#xff0c;令公众对其隐私保护措施产生质疑。在这个信息化时代&#xf…

OSCP - Proving Grounds - Quackerjack

主要知识点 端口转发 具体步骤 执行nmap扫描,开了好多端口&#xff0c;我先试验80和8081&#xff0c;看起来8081比较有趣 Nmap scan report for 192.168.51.57 Host is up (0.0011s latency). Not shown: 65527 filtered tcp ports (no-response) PORT STATE SERVICE …

Hive之加载csv格式数据到hive

场景&#xff1a; 今天接了一个需求&#xff0c;将测试环境的hive数据导入到正式环境中。但是不需要整个流程的迁移&#xff0c;只需要迁移ads表 解决方案&#xff1a; 拿到这个需求首先想到两个方案&#xff1a; 1、将数据通过insert into语句导出&#xff0c;然后运行脚本 …

PHP如何封装项目框架达到高可用、高性能、高并发

很多初创公司为了快速上线业务&#xff0c;开发时间由本来的6个月压缩到3个月甚至2个月。开发人员只能根据时间及业务需求去git上找现有的项目二次开发或者是一个空框架根据业务一点一点的去做&#xff0c;上述两种方案虽然也可以上线但是对于业务本身存在的问题也是很大的&…

探究 Facebook 隐私安全发展方向,未来走向何方?

随着社交媒体的普及&#xff0c;隐私和数据安全问题成为了全球关注的焦点。Facebook&#xff0c;作为全球最大的社交平台之一&#xff0c;其隐私安全问题尤其引人注目。近年来&#xff0c;随着用户数据泄露事件的不断发生&#xff0c;Facebook 不断调整其隐私政策&#xff0c;探…

【Linux】其他备选高级IO模型

其他高级 I/O 模型 以上基本介绍的都是同步IO相关知识点&#xff0c;即在同步I/O模型中&#xff0c;程序发起I/O操作后会等待I/O操作完成&#xff0c;即程序会被阻塞&#xff0c;直到I/O完成。整个I/O过程在同一个线程中进行&#xff0c;程序在等待期间不能执行其他任务。下面…

R语言学习笔记之开发环境配置

一、概要 整个安装过程及遇到的问题记录 操作步骤备注&#xff08;包含遇到的问题&#xff09;1下载安装R语言2下载安装RStudio3离线安装pacman提示需要安装Rtools4安装Rtoolspacman、tidyfst均离线安装完成5加载tidyfst报错 提示需要安装依赖&#xff0c;试错逐步下载并安装…

数据分析 six库

目录 起因 什么是six库 智能识别py2或3 ​编辑 起因 ModuleNotFoundError: No module named sklearn.externals.six sklearn.externals.six 模块在较新版本的 scikit-learn 中已经被移除。如果你在尝试使用这个模块时遇到了 ModuleNotFoundError: No module named sklear…

HTML5使用favicon.ico图标

目录 1. 使用favicon.ico图标 1. 使用favicon.ico图标 favicon.ico一般用于作为网站标志&#xff0c;它显示在浏览器的地址栏或者标签上 制作favicon图标 选择一个png转ico的在线网站&#xff0c;这里以https://www.bitbug.net/为例。上传图片&#xff0c;目标尺寸选择48x48&a…

Langchain+文心一言调用

import osfrom langchain_community.llms import QianfanLLMEndpointos.environ["QIANFAN_AK"] "" os.environ["QIANFAN_SK"] ""llm_wenxin QianfanLLMEndpoint()res llm_wenxin.invoke("中国国庆日是哪一天?") print(…

Linux系统下速通stm32的clion开发环境配置

陆陆续续搞这个已经很久了。 因为自己新电脑是linux系统无法使用keil&#xff0c;一开始想使用vscode里的eide但感觉不太好用&#xff1b;后面想直接使用cudeide但又不想妥协&#xff0c;想趁着这个机会把linux上的其他单片机开发配置也搞明白&#xff1b;而且非常想搞懂cmake…

Unity自学之旅05

Unity自学之旅05 Unity学习之旅⑤&#x1f4dd; AI基础与敌人行为&#x1f94a; AI导航理论知识&#xff08;基础&#xff09;开始实践 &#x1f383; 敌人游戏机制追踪玩家攻击玩家子弹碰撞完善游戏失败条件 &#x1f917; 总结归纳 Unity学习之旅⑤ &#x1f4dd; AI基础与敌…

java常量池

目录 1 Class常量池 2 运行时常量池 3 字符串常量池 3.1 为什么要设计字符串常量池 3.2 字符串对象三种创建姿势 3.3 字符串的操作 3.4 字符串的不可变性 4 包装类型常量池 1 Class常量池 class 文件的资源仓库javap命令可以查看class常量池 主要包含字面量和符号引用字面量 由…

ES6语法

一、Let、const、var变量定义 1.let 声明的变量有严格局部作用域 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"&g…

windows平台通过命令行安装前端开发环境

访问node.js官网 访问node.js官网https://nodejs.org/en/download/&#xff0c;可以看到类似画面&#xff1a; 可以获取以下命令 # Download and install fnm: winget install Schniz.fnm # Download and install Node.js: fnm install 22 # Verify the Node.js version: no…

【2025小年源码免费送】

&#x1f496;学习知识需费心&#xff0c; &#x1f4d5;整理归纳更费神。 &#x1f389;源码免费人人喜&#xff0c; &#x1f525;码农福利等你领&#xff01; &#x1f496;山高路远坑又深&#xff0c; &#x1f4d5;大军纵横任驰奔&#xff0c; &#x1f389;谁敢横刀立马行…

深入 Flutter 和 Compose 的 PlatformView 实现对比,它们是如何接入平台控件

在上一篇《深入 Flutter 和 Compose 在 UI 渲染刷新时 Diff 实现对比》发布之后&#xff0c;收到了大佬的“催稿”&#xff0c;想了解下 Flutter 和 Compose 在 PlatformView 实现上的对比&#xff0c;恰好过去写过不少 Flutter 上对于 PlatformView 的实现&#xff0c;这次恰好…

小游戏源码开发搭建技术栈和服务器配置流程

近些年各种场景小游戏开发搭建版本层出不穷,山东布谷科技拥有多年海内外小游戏源码开发经验&#xff0c;现为从事小游戏源码开发或游戏运营的朋友们详细介绍小游戏开发及服务器配置流程。 一、可以对接到app的小游戏是如何开发的 1、小游戏源码开发的需求分析&#xff1a; 明…

【Linux网络编程】传输层协议

目录 一&#xff0c;传输层的介绍 二&#xff0c;UDP协议 2-1&#xff0c;UDP的特点 2-2&#xff0c;UDP协议端格式 三&#xff0c;TCP协议 3-1&#xff0c;TCP报文格式 3-2&#xff0c;TCP三次握手 3-3&#xff0c;TCP四次挥手 3-4&#xff0c;滑动窗口 3-5&#xf…

五、华为 RSTP

RSTP&#xff08;Rapid Spanning Tree Protocol&#xff0c;快速生成树协议&#xff09;是 STP 的优化版本&#xff0c;能实现网络拓扑的快速收敛。 一、RSTP 原理 快速收敛机制&#xff1a;RSTP 通过引入边缘端口、P/A&#xff08;Proposal/Agreement&#xff09;机制等&…