SpringBoot集成Kafka和avro和Schema注册表

Schema注册表

为了提升kafka的性能,减少网络传输和存储的数据大小,可以把数据的schema部分单独存储到外部的schema注册表中,整体架构如下图所示:
在这里插入图片描述
1)把所有数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema ID。
2)消费者使用 ID 从注册表里拉取 schema 来反序列化记录。
3)序列化器和反序列化器分别负责处理 schema 的注册和拉取。
schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现,比如Confluent Schema Registry。

安装confluent

参考:安装手册

# confluent的安装包中已经包含了zookeeper和kafka的安装包,无需单独再下载
# 下载
curl -O https://packages.confluent.io/archive/7.7/confluent-7.7.1.tar.gz
# 解压
tar -xzf confluent-7.7.1.tar.gz

解压以后目录结构如下:

文件夹描述
bin可执行文件
etc配置文件
lib服务
libexec多平台的客户端库
sharejar包和license
src源码
# 设置环境变量
vim /etc/profile
export CONFLUENT_HOME=/usr/local/confluent-7.7.1
export PATH=$CONFLUENT_HOME/bin:$PATH
# 加载环境变量
source /etc/profile
# 验证
confluent --help

启动confluent服务

启动zookeeper

cd /usr/local/confluent-7.7.1/etc/kafka
vim zookeeper.properties
# 可以调整zookeeper的端口和数据的存储目录
# 启动zookeeper
./bin/zookeeper-server-start -daemon ./etc/kafka/zookeeper.properties
# 验证
ps -ef | grep zookeeper

启动kafka

cd /usr/local/confluent-7.7.1/etc/kafka
vim server.properties
broker.id=0
# 监听地址
listeners=0.0.0.0://:9092
# 对外暴漏的地址
advertised.listeners=PLAINTEXT://192.168.200.128:9092
# zookeeper的地址
zookeeper.connect=localhost:2181
# 启动./bin/kafka-server-start  -daemon ./etc/kafka/server.properties
# 验证
netstat -nap | grep 9092

启动confluent

cd /usr/local/confluent-7.7.1/etc/schema-registry
# 修改schema-registry.properties
vim schema-registry.properties
# schema-registry的监听地址
listeners=http://0.0.0.0:8081
# kafka的访问地址
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
# 启动
./bin/schema-registry-start -daemon ./etc/schema-registry/schema-registry.properties
# 验证
netstat -nap | grep 8081

新建springboot项目

新建avro的schema文件User.avsc

{"namespace": "com.github.xjs.protocol","type": "record","name": "UserRecord","fields": [{"name": "id","type": "int"},{"name": "name","type": "string"}]
}

pom中添加avro-maven-plugin插件

<!--https://avro.apache.org/docs/1.11.1/getting-started-java/-->
<!-- 命令行执行:mvn generate-sources 把avsc转化成java文件 
-->
<plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.1</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory><outputDirectory>${project.basedir}/src/main/java/</outputDirectory></configuration></execution></executions>
</plugin>

添加avro和kafka的依赖

<dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.7.1</version>
</dependency>
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

添加对应的配置

server:port: 8080
spring:kafka:bootstrap-servers: 192.168.200.128:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer# 重点关注这里的KafkaAvroSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializerconsumer:group-id: testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 重点关注这里的.KafkaAvroDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:# confluent的地址schema.registry.url: http://192.168.200.128:8081

消息生产者

public void send(UserRecord record) {if (Objects.isNull(record)) {return;}log.info("send message, value:{}", record.toString());// 跟发送普通消息一样,可以直接发送UserRecordkafkaTemplate.send("demo-topic", record);
}

消息消费者

@KafkaListener(topics = "demo-topic")
public void consume(ConsumerRecord<String, UserRecord> user){// 跟接收普通消息一样,可以直接接收UserRecordlog.info("receive message, topic:{}, key:{}, value:{}", user.topic(), user.key(), user.value());
}

完整的源码下载:https://github.com/xjs1919/learning-demo/tree/master/springboot-kafka-avro

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/483661.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

http(请求方法,状态码,Cookie与)

目录 1.http中常见的Header(KV结构) 2.http请求方法 2.1 请求方法 2.2 telnet 2.3 网页根目录 2.3.1 概念 2.3.2 构建一个首页 2.4 GET与POST方法 2.4.1 提交参数 2.4.2 GET与POST提交参数对比 2.4.3 GET和POST对比 3.状态码 3.1 状态码分类 3.2 3XXX状态码 3.2 …

十,[极客大挑战 2019]Secret File1

点击进入靶场 查看源代码 有个显眼的紫色文件夹&#xff0c;点击 点击secret看看 既然这样&#xff0c;那就回去查看源代码吧 好像没什么用 抓个包 得到一个文件名 404 如果包含"../"、"tp"、"input"或"data"&#xff0c;则输出"…

pytest自定义命令行参数

实际使用场景&#xff1a;pytest运行用例的时候&#xff0c;启动mitmdump进程试试抓包&#xff0c;pytest命令行启动的时候&#xff0c;传入mitmdump需要的参数&#xff08;1&#xff09;抓包生成的文件地址 &#xff08;2&#xff09;mitm的proxy设置 # 在pytest的固定文件中…

Unity AssetBundles(AB包)

目录 前言 AB包是什么 AB包有什么作用 1.相对Resources下的资源AB包更好管理资源 2.减小包体大小 3.热更新 官方提供的打包工具:Asset Bundle Browser AB包资源加载 AB包资源管理模块代码 前言 在现代游戏开发中&#xff0c;资源管理是一项至关重要的任务。随着游戏内容…

(一)Linux下安装NVIDIA驱动(操作记录)

目录 一、查看CUDA版本 1.输入nvidia-smi&#xff0c;查看驱动支持的最大CUDA版本&#xff0c;这里是11.6 2.输入nvcc --version&#xff0c;查看当前安装的CUDA版本&#xff0c;这里是11.3 二、卸载旧的NVIDIA驱动 1.卸载原有驱动 2.禁用nouveau&#xff08;必须&#x…

用Python做数据分析环境搭建及工具使用(Jupyter)

目录 一、Anaconda下载、安装 二、Jupyter 打开 三、Jupyter 常用快捷键 3.1 创建控制台 3.2 命令行模式下的快捷键 3.3 运行模式下快捷键 3.4 代码模式和笔记模式 3.5 编写Python代码 一、Anaconda下载、安装 【最新最全】Anaconda安装python环境_anaconda配置python…

Ai编程cursor + sealos + devBox实现登录以及用户管理增删改查(十三)

一、什么是 Sealos&#xff1f; Sealos 是一款以 Kubernetes 为内核的云操作系统发行版。它以云原生的方式&#xff0c;抛弃了传统的云计算架构&#xff0c;转向以 Kubernetes 为云内核的新架构&#xff0c;使企业能够像使用个人电脑一样简单地使用云。 二、适用场景 业务运…

重学设计模式-工厂模式(简单工厂模式,工厂方法模式,抽象工厂模式)

在平常的学习和工作中&#xff0c;我们创建对象一般会直接用new&#xff0c;但是很多时候直接new会存在一些问题&#xff0c;而且直接new会让我们的代码变得非常繁杂&#xff0c;这时候就会巧妙的用到设计模式&#xff0c;平常我们通过力扣学习的算法可能并不会在我们工作中用到…

数据结构4——栈和队列

目录 1.栈 1.1.栈的概念及结构 1.2栈的实现 2.队列 2.1队列的概念及结构 2.2队列的实现 1.栈 1.1.栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一段称为栈顶&#xff0c;另一端称为…

家政小程序开发,打造便捷家政生活小程序

目前&#xff0c;随着社会人就老龄化和生活压力的加重&#xff0c;家政服务市场的需求正在不断上升&#xff0c;家政市场的规模也正在逐渐扩大&#xff0c;发展前景可观。 在市场快速发展的影响下&#xff0c;越来越多的企业开始进入到市场中&#xff0c;同时家政市场布局也发…

Spring Cloud Alibaba(六)

目录&#xff1a; 分布式链路追踪-SkyWalking为什么需要链路追踪什么是SkyWalkingSkyWalking核心概念什么是探针Java AgentJava探针日志监控实现之环境搭建Java探针日志监控实现之探针实现编写探针类TestAgent搭建 ElasticsearchSkyWalking服务环境搭建搭建微服务微服务接入Sky…

HTTP 探秘之旅:从入门到未来

文章目录 导言&#xff1a;目录&#xff1a;第一篇&#xff1a;HTTP&#xff0c;互联网的“快递员”第二篇&#xff1a;从点开网页到看到内容&#xff0c;HTTP 究竟做了什么&#xff1f;第三篇&#xff1a;HTTP 的烦恼与进化史第四篇&#xff1a;HTTP 的铠甲——HTTPS 的故事第…

万字长文解读深度学习——多模态模型BLIP2

&#x1f33a;历史文章列表&#x1f33a; 深度学习——优化算法、激活函数、归一化、正则化 深度学习——权重初始化、评估指标、梯度消失和梯度爆炸 深度学习——前向传播与反向传播、神经网络&#xff08;前馈神经网络与反馈神经网络&#xff09;、常见算法概要汇总 万字长…

qt QToolBox详解

1、概述 QToolBox是Qt框架中的一个控件&#xff0c;它提供了一个带标签页的容器&#xff0c;用户可以通过点击标签页标题来切换不同的页面。QToolBox类似于一个带有多页选项卡的控件&#xff0c;但每个“选项卡”都是一个完整的页面&#xff0c;而不仅仅是标签。这使得QToolBo…

如何把阿里云ECS里的文件下载到本地(免登录免配置)

如何把阿里云ECS里的文件下载到本地&#xff08;免登录免配置&#xff09; 作为一个阿里云ECS的用户&#xff0c;Up时长会遇到希望把ECS里的文件下载到自己的个人电脑&#xff0c;然后在自己的电脑里面查看&#xff0c;保存或者发送给别人。最近发现阿里云新上了一个功能&…

Centos7安装MySQL8.0详细教程(压缩包安装方式)

本章教程&#xff0c;主要介绍如何在Centos7上安装MySQL8.0版本数据库&#xff08;压缩包安装方式&#xff09; 一、卸载系统自带的 Mariadb 1、查询 rpm -qa|grep mariadb2.、卸载 如果有查询结果&#xff0c;就进行卸载&#xff0c;没有就跳过该步骤。 rpm -e --nodeps mar…

c++预编译头文件

文章目录 c预编译头文件1.使用g编译预编译头文件2.使用visual studio进行预编译头文件2.1visual studio如何设置输出预处理文件&#xff08;.i文件&#xff09;2.2visual studio 如何设置预编译&#xff08;初始创建空项目的情况下&#xff09;2.3 visual studio打开输出编译时…

MySql:理解数据库

目录 一、什么是数据库 第一层理解 第二层理解 第三层理解 二、Linux下的数据库 三、基本认识 登录数据库时&#xff0c; mysql -u root -h 127.0.0.1 -P 3306 -p -h指定MySql服务器所在主机&#xff0c;若在本地则为回环地址。-P表示目标主机上MySql服务端口号 一般简单…

Spire.PDF for .NET【页面设置】演示:旋转 PDF 中的页面

在某些情况下&#xff0c;您可能需要旋转 PDF 页面。例如&#xff0c;当您收到包含混乱页面的 PDF 文档时&#xff0c;您可能希望旋转页面以便更轻松地阅读文档。在本文中&#xff0c;您将学习如何使用Spire.PDF for .NET在 C# 和 VB.NET 中旋转 PDF 中的页面。 Spire.PDF for…

【JavaEE初阶 — 网络编程】实现基于TCP协议的Echo服务

TCP流套接字编程 1. TCP &#xff06; UDP 的区别 TCP 的核心特点是面向字节流&#xff0c;读写数据的基本单位是字节 byte 2 API介绍 2.1 ServerSocket 定义 ServerSocket 是创建 TCP 服务端 Socket 的API。 构造方法 方法签名 方法说明 ServerS…