Kafka消息自定义序列化

文章目录

  • 1. 默认序列化
  • 2.自定义序列化
  • 3.示例
  • 4.自定义解序列化器


1. 默认序列化

在网络中发送数据都是以字节的方式,Kafka也不例外。Apache Kafka支持用户给broker发送各种类型的消息。它可以是一个字符串、一个整数、一个数组或是其他任意的对象类型。序列化器(serializer)负责在producer发送前将消息转换成字节数组;而与之相反,解序列化器(deserializer)则用于将consumer接收到的字节数组转换成相应的对象。
常见的serializer有:

  • ByteArraySerializer:本质上什么都不用做,因为已经是字节数组了。
  • ByteBufferSerializer:列化ByteBuffer。
  • BytesSerializer:序列化Kafka自定义的 Bytes 类。
  • DoubleSerializer:列化 Double 类型
  • IntegerSerializer:列化Integer 类型
  • LongSerializer:序列化Long类型。
  • StringSerializer:序列化 String 类型。

producer的序列化机制使用起来非常简单,只需要在构造producer时同时指定参数key.serializer 和 value.serializer的值即可,用户可以为消息的key和value 指定不同类型的 serializer,只要与解序列类型分别保持一致就可以。

2.自定义序列化

Kafka支持用户自定义消息序列化。若要编写一个自定义的serializer,需要完成以下3件事情。
1)定义数据对象格式。
2)创建自定义序列化类,实现 org.apache.kafka.common.serialization.Serializer 接口,在serializer方法中实现序列化逻辑。
3)在用于构造KafkaProducer 的Properties 对象中设置 key.serializer 或 value.serializer取决于是为消息key还是 value 做自定义序列化。

3.示例

下面结合一个实例来说明如何创建自定义的serializer。首先定义待序列化的数据对象。本例中使用一个简单的Java POJO对象,如下面的代码所示:

public class User {private String firstName;private String lastName;private int age;private String address;public User(String firstName, String lastName, int age, String address) {this.firstName = firstName;this.lastName = lastName;this.age = age;this.address = address;}@Overridepublic String toString() {return "User{" +"firstName='" + firstName + '\'' +", lastName='" + lastName + '\'' +", age=" + age +", address='" + address + '\'' +'}';}
}

接下来创建 serializer。本例中使用了jackson-mapper-asl包的 ObjectMapper 帮助我们直接把对象转成字节数组。为了使用该类,你需要在producer工程中增加依赖:

<dependency><groupId>org.codehaus.jackson</groupId><artifactId>jackson-mapper-asl</artifactId><version>1.9.13</version>
</dependency>

UserSerializer代码如下:

import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
public class UserSerializer implements Serializer<User> {private ObjectMapper objectMapper;@Overridepublic void configure(Map configs, boolean isKey) {objectMapper=new ObjectMapper();}@Overridepublic byte[] serialize(String topic, User data) {byte[] ret =null;try {if (data == null){System.out.println("Null received at serializing");return null;}ret=objectMapper.writeValueAsString(data).getBytes();} catch (IOException e) {e.printStackTrace();}return ret;}@Overridepublic void close() {}
}

指定Serializer,然后构建消息发送:

import com.exm.collectcodenew.kafka.producer.customSerializer.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");//必须指定props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("value.serializer", "com.exm.collectcodenew.kafka.producer.customSerializer.UserSerializer");//必须指定props.put("acks", "-1");props.put("retries", 3);props.put("batch.size", 323840);props.put("linger.ms", 10);props.put("buffer.memory", 33554432);props.put("max.block.ms", 3000);props.put("partitioner.class","com.exm.collectcodenew.kafka.producer.customPartitioner.AuditPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);//构建User对象User user = new User("Z","tt",18,"Beijing,China");ProducerRecord record = new ProducerRecord("topic-test",user);producer.send(record);producer.close();}
}

4.自定义解序列化器

Kafka支持用户自定义消息的deserializer。成功编写一个自定义的deserializer需要完成以下3件事情。
1)定义或复用 serializer 的数据对象格式,
2) 创建自定义 deserializer 类,令其实现 org.apache.kafka.common.serialization.Deserializer接口。在deserializer方法中实现 deserialize 逻辑。
3)在构造KafkaConsumer的Properties对象中设置key.deserializer和(或)value.deserializer为上一步的实现类。
依然使用序列化中的User 例子来实现自定义的 deserializer。代码如下。

import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
public class UserDeserializer implements Deserializer {private ObjectMapper objectMapper;@Overridepublic void configure(Map configs, boolean isKey) {objectMapper = new ObjectMapper();}@Overridepublic Object deserialize(String topic, byte[] data) {User user =null;try {user=objectMapper.readValue(data,User.class);} catch (IOException e) {throw new RuntimeException(e);}finally {return user;}}@Overridepublic void close() {}
}

消费者代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");//必须指定props.put("group.id","test-group");//必须指定props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//必须指定props.put("value.deserializer", "com.exm.collectcodenew.kafka.producer.customSerializer.UserDeserializer");//必须指定props.put("enable.auto.commit","true");props.put("auto.commit.interval.ms","1000");props.put("auto.offset.reset","earliest");//从最早的消息开始读取KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//创建consumer实例consumer.subscribe(Arrays.asList("topic-test"));while(true){ConsumerRecords<String,String> records=consumer.poll(1000);for (ConsumerRecord<String, String> record: records){System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/37410.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

避坑指南 | 阿里云服务器centos7上MySQL部署优化指南

目录 1 检查阿里云是否安装mysql 1.1使用 rpm 命令 1.2检查 MySQL 服务状态 2 卸载mysql 2.1停止 MySQL 服务 2.2 检查已安装的 MySQL 包 2.3 卸载 MySQL 包 2.4 删除 MySQL 数据和配置文件 2.5 清理残留的依赖包 2.6 验证卸载 2.7 &#xff08;可选&#xff09;删除…

位运算--求二进制中1的个数

位运算–求二进制中1的个数 给定一个长度为 n 的数列&#xff0c;请你求出数列中每个数的二进制表示中 1 的个数。 输入格式 第一行包含整数 n。 第二行包含 n 个整数&#xff0c;表示整个数列。 输出格式 共一行&#xff0c;包含 n 个整数&#xff0c;其中的第 i 个数表…

Go语言的基础类型

一基础数据类型 一、布尔型&#xff08;Bool&#xff09; 定义&#xff1a;表示逻辑真 / 假&#xff0c;仅有两个值&#xff1a;true 和 false内存占用&#xff1a;1 字节使用场景&#xff1a;条件判断、逻辑运算 二、数值型&#xff08;Numeric&#xff09; 1. 整数类型&…

SpringBoot整合MQTT最详细版(亲测有效)

一、导入pom.xml依赖 <!--mqtt依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.in…

记一次发短信接口分析

忘记密码接口 数据包 GET /api/weatherforcast/user/send/17777777777 HTTP/2 Host: Cookie: SECKEY_ABVKd1GnERPtEFYSs7fL9W7VzoxAG0rjit7K8hAiMGIySpo522Wig70mdKRZQlvXNuqUTh9sBTWXG6XJ7miFZtA%3D%3D; Hm_lvt_018467e59f9d76a72cdbed870456819b1742445251,1742456927,1742…

dfs刷题排列问题 + 子集问题 + 组和问题总结

文章目录 一、排列问题全排列II题解代码 优美的排列题解代码 二、子集问题字母大小写全排列题解代码 找出所有子集的异或总和再求和题解代码 三、组合问题电话号码的字母组合题解代码 括号生成题解代码 组合题解代码 目标和题解代码 组合总和题解代码 总结 一、排列问题 全排列…

【AVRCP】蓝牙链路控制器(LC)与AVRCP互操作性要求深度解析

目录 一 、Link Controller&#xff08;LC&#xff09;概述 1.1 LC的定义与功能 1.2 LC在蓝牙技术中的重要性 二、Link Controller&#xff08;LC&#xff09;互操作性要求 2.1 互操作性要求概述 2.2 物理层互操作性要求 2.3 链路管理互操作性要求 2.4 其他互操作性要求…

go + vscode + cline +qwen 快速构建 MCP Server

go 编译自定义 mcp tool current time tool 代码 package mainimport ("context""fmt""time""github.com/mark3labs/mcp-go/mcp""github.com/mark3labs/mcp-go/server" )func main() {// Create MCP servers : server.New…

C语言-动态内存管理

1.为什么要有动态内存分配 我们现如今已经掌握的内存开辟方式有 int main() {int a 0;int arr[30] { 0 };return 0; } 这两种方式&#xff0c;但是这种开辟空间的方式有两个特点&#xff1a; 1.空间开辟大小是固定的 2.数组在申明的时候&#xff0c;必须指定数组的长度&…

Java复习

在开篇前首先申明一下&#xff0c;本文虽不够系统&#xff0c;但复习够用&#xff0c;尤其是快速回忆( •̀ ω •́ )✧与提问。 主打一个速度。 本文将会从Java的基础语法、面向对象、API、字符串、集合、进阶...等六方面讲起。 一、Java的基础语法&#xff1a; 1、Java入门…

Vue+ElementUI 字符串数组标签化展示组件

一. 效果 数据&#xff1a;‘[“苹果”,“香蕉”]’ 可添加&#xff0c;编辑&#xff0c;删除。 二. 组件源码 <template><div><div v-for"(item, index) in items":key"index"><el-inputv-if"inputVisible && ed…

识别并脱敏上传到deepseek/chatgpt的文本文件中的身份证/手机号

本文将介绍一种简单高效的方法解决用户在上传文件到DeepSeek、ChatGPT,文心一言,AI等大语言模型平台过程中的身份证号以及手机号等敏感数据识别和脱敏问题。 DeepSeek、ChatGPT,Qwen,Claude等AI平台工具快速的被接受和使用,用户每天上传的文本数据中潜藏着大量敏感信息,…

UR5e机器人位姿

UR5e 作为一款 6 自由度协作机器人&#xff0c;其末端执行器的位姿&#xff08;位置与姿态的组合&#xff09;控制是实现精准操作的核心。在笛卡尔坐标系中&#xff0c;位姿通常用齐次变换矩阵表示&#xff0c;包含末端的三维位置&#xff08;x, y, z&#xff09;和三维姿态&am…

小白闯AI:Llama模型Lora中文微调实战

文章目录 0、缘起一、如何对大模型进行微调二、模型微调实战0、准备环境1、准备数据2、模型微调第一步、获取基础的预训练模型第二步:预处理数据集第三步:进行模型微调第四步:将微调后的模型保存到本地4、模型验证5、Ollama集成部署6、结果测试三、使用总结AI是什么?他应该…

Linux基础开发工具——gdb/cgdb(7)

文章目录 前言一、生成可调试文件二、调试打开与关闭启动调试l 查看代码退出调试运行与断点单行与单步 三、查看变量bt 查看调用堆栈p 临时查看变量display 常显示变量 四、快速跳转until 指定行finish 函数c 断点 五、其他指令disable 断点使能set var 设置条件ptype 查看变量…

Python 3.13.2安装教程(安装包)Python 3.13.2 快速安装指南

文章目录 前言一 、Python 3.13.2下载二、Python 3.13.2安装教程1.运行安装程序2.选择安装方式3.自定义安装选项4.开始安装5.安装完成6.打开程序7.验证安装 前言 Python 作为一门通用编程语言&#xff0c;在全球拥有庞大的用户群体。其简洁易读的语法和丰富的库&#xff0c;使…

游戏MOD伴随盗号风险,仿冒网站借“风灵月影”窃密【火绒企业版V2.0】

游戏MOD&#xff08;即游戏修改器&#xff09;是一种能够对游戏进行修改或增强的程序&#xff0c;因其能够提升游戏体验&#xff0c;在玩家群体中拥有一定的市场。然而&#xff0c;这类程序大多由第三方开发者制作&#xff0c;容易缺乏完善的安全保障机制&#xff0c;这就为不法…

【读点论文】Chain Replication for Supporting High Throughput and Availability

在分布式系统中&#xff0c;强一致性往往和高可用、高吞吐是矛盾的。比如传统的关系型数据库&#xff0c;其保证了强一致性&#xff0c;但往往牺牲了可用性和吞吐量。而像 NoSQL 数据库&#xff0c;虽然其吞吐量、和扩展性很高&#xff0c;但往往只支持最终一致性&#xff0c;无…

新书速览|云原生Kubernetes自动化运维实践

《云原生Kubernetes自动化运维实践》 本书内容&#xff1a; 《云原生Kubernetes自动化运维实践》以一名大型企业集群运维工程师的实战经验为基础&#xff0c;全面系统地阐述Kubernetes&#xff08;K8s&#xff09;在自动化运维领域的技术应用。《云原生Kubernetes自动化运维实践…

Linux驱动学习笔记(六)

平台总线 1.平台总线模型也叫platform总线模型&#xff0c;平台总线是Linux系统虚拟出来的总线, 引入总线的概念可以对驱动代码和设备信息进行分离。平台总线模型将一个驱动分成了两个部分&#xff1a;platform_device和platform_driver&#xff0c;例如可使用文件device.c和d…