整合Spring Boot和Pulsar实现可扩展的消息处理

整合Spring Boot和Pulsar实现可扩展的消息处理

大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!

在现代分布式系统中,消息队列是实现异步通信和解耦的重要组件。Apache Pulsar作为一个分布式消息流平台,具备高吞吐、低延迟、多租户支持等优势,是很多高性能消息处理场景的理想选择。本文将介绍如何在Spring Boot项目中整合Pulsar,实现可扩展的消息处理功能。

什么是Apache Pulsar

Apache Pulsar是一个开源的分布式消息流平台,支持多租户、多主题和持久化。Pulsar的架构包括Brokers、Bookies(Apache BookKeeper的存储节点)和ZooKeeper协调服务,提供了高可用性和高性能的消息传递和存储服务。

在Spring Boot中集成Pulsar

为了在Spring Boot项目中使用Pulsar,我们需要以下几个步骤:

  1. 添加Maven依赖
  2. 配置Pulsar客户端
  3. 创建消息生产者
  4. 创建消息消费者

1. 添加Maven依赖

首先,我们需要在pom.xml中添加Pulsar的依赖:

<dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.9.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>
</dependencies>

2. 配置Pulsar客户端

接下来,我们需要创建一个配置类来初始化Pulsar客户端。创建一个名为PulsarConfig的配置类:

package cn.juwatech.config;import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class PulsarConfig {@Beanpublic PulsarClient pulsarClient() throws PulsarClientException {return PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();}
}

3. 创建消息生产者

我们需要一个消息生产者来发送消息到Pulsar。创建一个名为PulsarProducer的生产者类:

package cn.juwatech.producer;import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class PulsarProducer {private final PulsarClient pulsarClient;private Producer<byte[]> producer;@Autowiredpublic PulsarProducer(PulsarClient pulsarClient) {this.pulsarClient = pulsarClient;initProducer();}private void initProducer() {try {ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer();this.producer = producerBuilder.topic("my-topic").create();} catch (PulsarClientException e) {e.printStackTrace();}}public void sendMessage(String message) {try {producer.send(message.getBytes());} catch (PulsarClientException e) {e.printStackTrace();}}
}

4. 创建消息消费者

我们需要一个消息消费者来接收来自Pulsar的消息。创建一个名为PulsarConsumer的消费者类:

package cn.juwatech.consumer;import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class PulsarConsumer {private final PulsarClient pulsarClient;private Consumer<byte[]> consumer;@Autowiredpublic PulsarConsumer(PulsarClient pulsarClient) {this.pulsarClient = pulsarClient;}@PostConstructprivate void initConsumer() {try {this.consumer = pulsarClient.newConsumer().topic("my-topic").subscriptionName("my-subscription").subscribe();startConsumer();} catch (PulsarClientException e) {e.printStackTrace();}}private void startConsumer() {new Thread(() -> {while (true) {try {Message<byte[]> msg = consumer.receive();String message = new String(msg.getData());System.out.println("Received message: " + message);consumer.acknowledge(msg);} catch (PulsarClientException e) {e.printStackTrace();}}}).start();}
}

5. 测试Pulsar生产者和消费者

最后,我们编写一个简单的测试类来验证生产者和消费者的工作。创建一个名为PulsarTest的测试类:

package cn.juwatech;import cn.juwatech.producer.PulsarProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PulsarApplication implements CommandLineRunner {@Autowiredprivate PulsarProducer pulsarProducer;public static void main(String[] args) {SpringApplication.run(PulsarApplication.class, args);}@Overridepublic void run(String... args) throws Exception {pulsarProducer.sendMessage("Hello, Pulsar!");}
}

运行上述代码后,您应该会在控制台上看到消费者接收到的消息。

总结

通过以上步骤,我们成功地在Spring Boot项目中整合了Pulsar,实现了可扩展的消息处理功能。Pulsar的高性能和可扩展性使其非常适合分布式系统中的消息传递和流处理。在实际项目中,可以根据需求进一步优化和扩展Pulsar的使用,例如配置不同的主题和分区、实现更复杂的消息处理逻辑等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/367018.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

eclipse断点调试(用图说话)

eclipse断点调试&#xff08;用图说话&#xff09; debug方式启动项目&#xff0c;后端调试bug调试 前端代码调试&#xff0c;请参考浏览器断点调试&#xff08;用图说话&#xff09; 1、前端 选中一条数据&#xff0c;点击删除按钮 2、后端接口打断点 断点按钮 介绍 resum…

前端知识点

HTML、CSS 相关 1、 BFC 1、BFC 是什么&#xff1f; BFC&#xff08;Block Formatting Context&#xff09; 格式化上下文&#xff1b; 指一个独立的渲染区域&#xff0c;或者说是一个隔离的独立容器&#xff1b;可以理解为一个独立的封闭空间。无论如何不会影响到它的外面 …

Elasticsearch-Rest-Client

Elasticsearch-Rest-Client&#xff1a;官方RestClient&#xff0c;封装了ES操作&#xff0c;API层次分明&#xff0c;上手简单。 1. 导入依赖 <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high…

BUG TypeError: GPT2Model.forward() got an unexpected keyword argument ‘past’

TypeError: GPT2Model.forward() got an unexpected keyword argument past’ 环境 transformers 4.38.1详情 这是由于新版的transformers 对GPT2Model.forward() 参数进行了改变导致的错误。具体是past名称改为了 past_key_values 。 解决方法 找到错误语…

【Windows】draw.io(免费的开源跨平台绘图软件)软件介绍

软件介绍 draw.io 是一款免费且易于使用的在线流程图绘图软件&#xff0c;后来更名为 diagrams.net。它最初作为一个基于 Web 的应用程序提供&#xff0c;支持用户创建各种类型的图表、流程图、网络图、组织结构图、UML 图等。它是完全免费的、强大的、专业的、易于使用的和高…

分享:Motionity-开源的Web端动画编辑器

Motionity是一个免费且开源的Web端动画编辑器&#xff0c;它结合了After Effects和Canva的优点&#xff0c;为用户提供了强大的动画编辑功能。支持视频剪切、图像搜索过滤、文本动画库、图层蒙版等功能。 一、项目背景与特点 开源项目&#xff1a;Motionity是一个开源项目&…

黄子韬vs徐艺洋卫生间风波

【热搜爆点】黄子韬VS徐艺洋&#xff1a;卫生间风波背后的职场与友情界限探讨在这个充满欢笑与意外的综艺时代&#xff0c;《跟我出游吧》再次以它独有的魅力&#xff0c;引爆了一个既尴尬又引人深思的话题——“黄子韬要上徐艺洋的卫生间&#xff1f;”这不仅仅是一句简单的调…

Yarn的安装与配置

Yarn 是一个快速、可靠且安全的 JavaScript 包管理器&#xff0c;最初由 Facebook 开发&#xff0c;旨在提供比 npm 更快的依赖安装速度和更一致的包管理体验。以下是 Yarn 的安装与配置教程&#xff1a; 安装 Yarn 方法 1: 使用 npm 安装 如果你的系统已经安装了 Node.js …

基于YOLOv10深度学习的CT扫描图像肾结石智能检测系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战、目标检测

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

【ajax实战02】数据管理网站—验证码登录

一&#xff1a;数据提交&#xff08;提交手机验证码&#xff09; 核心思路整理 利用form-serialize插件&#xff0c;收集对象形式的表单数据后&#xff0c;一并提交给服务器。后得到返回值&#xff0c;进一步操作 基地址&#xff1a; axios.defaults.baseURL http://geek.…

Keil5 ST-LINK setting闪退问题解决

1. 官网下载新版驱动文件 MDK uVision crashes when using ST-Link debugger 2. 解压替换 STLinkUSBDriver6.1.2.0Signed 我的库文件目录&#xff1a; D:\Tool\Keil5\ARM\STLink

开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)

一、前言 使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。 FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,F…

【JS】纯web端使用ffmpeg实现的视频编辑器-视频合并

纯前端实现的视频合并 接上篇ffmpeg文章 【JS】纯web端使用ffmpeg实现的视频编辑器 这次主要添加了一个函数&#xff0c;实现了视频合并的操作。 static mergeArgs(timelineList) {const cmd []console.log(时间轴数据,timelineList)console.log("文件1",this.readD…

MatLab三维图形绘制基础

三维图形绘制 三维曲线 plot3 螺旋图绘制 % %三维图像:螺旋图绘制 clear; clc; t [0:0.1:10*pi];% 向量 x sin(t) t.*cos(t);%t是向量&#xff0c;用点乘 y cos(t) - t.*sin(t); z t; plot3(x,y,z); grid on;plot3 绘制同型矩阵 %% % plot3绘制同型矩阵 t [0:0.1:10*…

游戏AI的创造思路-技术基础-tanh函数详解

又来搞事情&#xff0c;总想着把sigmoid函数替换成其他函数作为激活函数&#xff0c;或者找到更合适某一段训练的函数&#xff0c;所以今天来聊聊tanh函数&#xff08;谁让咱当年差点去了数学系&#xff0c;结果还是在数学系转过去计算机的&#xff09; 目录 3.9. tanh函数详解…

【Rust基础入门】Hello Cargo

文章目录 前言Cargo是什么&#xff1f;Cargo的作用查看cargo版本使用cargo创建项目Cargo.toml文件cargo build命令cargo runcargo check为发布构建 总结 前言 在Rust编程中&#xff0c;Cargo扮演着至关重要的角色。它是Rust的包管理器&#xff0c;负责处理许多任务&#xff0c…

uniapp入门

一、新建项目 进入到主界面&#xff0c;左上角点击新建——1.项目 输入项目名称&#xff0c;Vue版本选择3 二、创建页面 选中左侧文件目录里的pages文件夹&#xff0c;右键&#xff0c;选择新建页面 1输入名称 2选中“创建同名目录” 3选择模板&…

昇思25天学习打卡营第7天|网络构建

网络构建 神经网络模型由tensor操作和神经网络层构成。 MIndSporezhong&#xff0c;Cell是构建所有网络的基类&#xff0c;也是网络的基本单元。cell也由子cell构成。 定义模型类 # 继承nn.Cell类 class Network(nn.Cell):def __init__(self):super().__init__()self.flatte…

ElasticSearch 和 MySQL的区别

MySQLElasticSearch 数据库&#xff08;database&#xff09;索引&#xff08;index&#xff09;数据表&#xff08;table&#xff09; 类型&#xff08;type&#xff09; 记录文档&#xff08;document&#xff0c;json格式&#xff09; 一、ES基础命令 1. ES cat查询命令 2.…

分布式技术专题 | TCP在分布式网络中的通信机制与底层实现

深入解析分布式网络中的TCP通信协议实现 跨地域通信与资源共享网络节点与主机的定义网络技术通信机制TCP/IP协议模型TCP/IP分层机制TCP的Socket链接处理控制TCP的优势和特性自动差错控制正确性和有序性 TCP的Socket使用端口在应用程序间通信TCP的Socket使用端口套接字操作 跨地…