Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency>

2、配置application.yml

加入Kafka的配置

springkafka:#Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095producer:# 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认acks: 1# 发送失败时的重试次数,0表示不重试retries: 0# 批量发送时的批次大小(字节)batch-size: 30720000 # 30MB# 生产者的内存缓冲区大小(字节)buffer-memory: 33554432 # 32MB# Key的序列化器类key-serializer: org.apache.kafka.common.serialization.StringSerializer# Value的序列化器类value-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 消费者所属的组IDgroup-id: test-kafka# 禁用自动提交offset,改为手动提交enable-auto-commit: false# 偏移量重置策略:# earliest:从最早的记录开始消费# latest:从最新的记录开始消费auto-offset-reset: earliest# Key的反序列化器类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# Value的反序列化器类value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 每次poll()调用返回的最大消息条数max-poll-records: 2session:# 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)timeout:ms: 300000 # 5分钟listener:# 如果指定的主题不存在,是否让应用启动失败,false表示不会报错missing-topics-fatal: false# 消费模式:single=单条消息,batch=批量消费type: single# 消费确认模式:# manual_immediate:手动确认消息,立即提交offsetack-mode: manual_immediate

这里的生产者value的序列化器用org.apache.kafka.common.serialization.StringSerializer
 ,消费者value的序列化器用org.apache.kafka.common.serialization.StringDeserializer即可。

(这里不需要自定义序列化器,但在代码需要将JAVA对象转化为JSON字符串发送)

3、config、producer、consumer代码

3.1、User.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {private int id;private String name;
}

3.2、Task.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public
class Task {private int id;private String description;private User assignedUser;
}

模拟嵌套类 

3.3、KafkaConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;@EnableKafka
@Configuration
public class KafkaConfig {// 单条消费监听器工厂,手动提交offset@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

3.4、KafkaProducer.java

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootApplication
public class KafkaProducer {public static void main(String[] args) {SpringApplication.run(KafkaProducer.class, args);}@BeanCommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {return args -> {String topic = "task-topic";ObjectMapper objectMapper = new ObjectMapper();for (int i = 1; i <= 5; i++) {// 定义一个对象实例User user = User.builder().id(1).name("Alice").build();Task task = Task.builder().id(101).description("Complete report").assignedUser(user).build();//JAVA对象转化为JSON字符串String message =  objectMapper.writeValueAsString(task);kafkaTemplate.send(topic, message);System.out.println("Sent: " + message);Thread.sleep(500); // 模拟消息发送间隔}};}
}

序列化:使用 Jackson 的 ObjectMapperTask 对象转化为 JSON 字符串,方法 writeValueAsString() 将 Java 对象转为 JSON 字符串。

3.5、SingleConsumer.java

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class SingleConsumer {@KafkaListener(topics = "task-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws JsonProcessingException {String message = record.value();ObjectMapper objectMapper = new ObjectMapper();Task task = objectMapper.readValue(message,Task.class);// 取出System.out.println("User - Received: " + task.getAssignedUser());// 手动提交offsetacknowledgment.acknowledge();}
}

反序列化: 使用 ObjectMapper 将 JSON 字符串 message 转换回 Task 对象,方法 readValue() 可以将 JSON 字符串解析为指定的 Java 对象类型。

4、测试

启动KafkaProducer.java

可以解析出JAVA对象中User

 

成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/487305.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

物品识别 树莓派 5 YOLO v5 v8 v10 11 计算机视觉

0. 要实现的效果 让树莓派可以识别身边的一些物品&#xff0c;比如电脑&#xff0c;鼠标&#xff0c;键盘&#xff0c;杯子&#xff0c;行李箱&#xff0c;双肩包&#xff0c;床&#xff0c;椅子等 1. 硬件设备 树莓派 5 raspberrypi.com/products/raspberry-pi-5/树莓派官方摄…

大数据-245 离线数仓 - 电商分析 缓慢变化维 与 拉链表 SCD Slowly Changing Dimensions

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇开始了&#xff01; 目前开始更新 MyBatis&#xff0c;一起深入浅出&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff0…

【LeetCode: 160. 相交链表 + 链表】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

从爱尔兰歌曲到莎士比亚:LSTM文本生成模型的优化之旅

上一篇&#xff1a;《再用RNN神经网络架构设计生成式语言模型》 序言&#xff1a;本文探讨了如何通过多种方法改进模型的输出&#xff0c;包括扩展数据集、调整模型架构、优化训练数据的窗口设置&#xff0c;以及采用字符级编码。这些方法旨在提高生成文本的准确性和合理性&am…

51c大模型~合集86

我自己的原文哦~ https://blog.51cto.com/whaosoft/12772867 #MILP-StuDio 拆解高复杂运筹问题的砖石&#xff0c;打破数据稀缺的瓶颈&#xff0c;中科大提出高质量运筹数据生成方法 论文作者刘昊洋是中国科学技术大学 2023 级硕士生&#xff0c;师从王杰教授&#xff0c;…

从零用java实现 小红书 springboot vue uniapp (1)

前言 偶尔会用小红书发一些笔记 闲来无事 想自己实现一个小红书 正好可以学习一下 帖子 留言 im 好友 推送 等功能 下面我们就从零 开发一个小红书 后台依旧用我们的会员系统的脚手架 演示 http://120.26.95.195:8889/ 客户端我们使用uniapp 我们首先对主页进行一个分解 顶部我…

pyside6学习专栏(一)常用控件的使用(非QML方式)

前段业余时间在用pythonpyqt5边学边作一些小程序&#xff0c;总算作到了一个相对复杂的基本VTK三维显示地形图并计算挖填方工程量&#xff0c;作完后&#xff0c;又发现pyqt又是要收费的&#xff0c;就又看了下对应的替代库pyside6,对用此库的一些基本技能分享到此专栏中&#…

活动|华院计算董事长宣晓华应邀出席2024科创大会并作圆桌嘉宾

2024科创大会在上海举行&#xff0c;由中央广播电视总台和上海市人民政府共同主办。本次大会以“创新驱动 新质未来”为主题&#xff0c;来自知名院校、科研机构的专家学者以及科技企业、金融机构的相关负责人共聚一堂&#xff0c;探讨人工智能、生物医药等产业应用前景&#x…

计算机网络-IPSec VPN工作原理

一、IPSec VPN工作原理 昨天我们大致了解了IPSec是什么&#xff0c;今天来学习下它的工作原理。 IPsec的基本工作流程如下&#xff1a; 通过IKE协商第一阶段协商出IKE SA。 使用IKE SA加密IKE协商第二阶段的报文&#xff0c;即IPsec SA。 使用IPsec SA加密数据。 IPsec基本工作…

leetcode 3001. 捕获黑皇后需要的最少移动次数 中等

现有一个下标从 1 开始的 8 x 8 棋盘&#xff0c;上面有 3 枚棋子。 给你 6 个整数 a 、b 、c 、d 、e 和 f &#xff0c;其中&#xff1a; (a, b) 表示白色车的位置。(c, d) 表示白色象的位置。(e, f) 表示黑皇后的位置。 假定你只能移动白色棋子&#xff0c;返回捕获黑皇后…

linux 系统常用指令

1、查看内核版本 uname -r 2、列出占用空间最大的 10 个文件或目录 du -ah / | sort -rh | head -n 10 终于找到我虚拟机硬盘空间越来越少的原因了&#xff0c;类目......

【OpenDRIVE_Python】使用python脚本更新OpenDRIVE数据中路口Junction名称

示例代码说明&#xff1a; 遍历OpenDRIVE数据中每个路口JunctionID,读取需要变更的路口ID和路口名称的TXT文件,若JunctionID与TXT文件中的ID一致&#xff0c;则将TXT对应的点位名称更新到OpenDRIVE数据中Junction name字段。补充&#xff1a;需要保持TXT和OpenDRIVE数据文件编…

PySpark3.4.4_基于StreamingContext实现网络字节流统计分析

网络字节流与嵌套字节流的区别 概念解释 网络嵌套字节流&#xff1a; 在网络编程的情境下&#xff0c;网络嵌套字节流通常是指将字节流&#xff08;字节序列&#xff09;以一种分层或者包含的方式进行组织&#xff0c;用于在网络传输过程中更好地处理数据。例如&#xff0c;在一…

【JS】简单CSS简单JS写的上传进度条

纯JS写的&#xff0c;简单的上传进度条&#xff0c;当上传的文件较大&#xff0c;加一个动态画面&#xff0c;就不会让人觉得出错了或网络卡了 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"v…

47 基于单片机的书库环境监测

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于51单片机&#xff0c;采用DHT11湿度传感器检测湿度&#xff0c;DS18B20温度传感器检测温度&#xff0c; 采用滑动变阻器连接数模转换器模拟二氧化碳和氧气浓度检测&#xff0c;各项数值通过lc…

解决:IDEA中@Autowired自动注入MyBatis Mapper报红警告的几种解决方法

文章目录 解决&#xff1a;IDEA中Autowired自动注入MyBatis Mapper报红警告的几种解决方法问题描述&#xff1a;解决办法&#xff1a;1.将Autowired注解改成Resource2.给Autowired(required false)设置属性3.给Mapper层加注解Mapper/Repository4.改变写法,用RequiredArgsConst…

Spring Boot中实现JPA多数据源配置指南

本文还有配套的精品资源&#xff0c;点击获取 简介&#xff1a;本文详细介绍了在Spring Boot项目中配置和使用JPA进行多数据源管理的步骤。从引入依赖开始&#xff0c;到配置数据源、创建DataSource bean、定义实体和Repository&#xff0c;最后到配置事务管理器和使用多数据…

Ubuntu 安装 web 服务器

安装 apach sudo apt install apache2 -y 查看 apach2 版本号 apache2 -v 检查是否启动服务器 sudo service apache2 status 检查可用的 ufw 防火墙应用程序配置 sudo ufw app list 关闭防火墙 sudo ufw disable 更改允许通过端口流量 sudo ufw allow Apache Full 开启…

go语言的成神之路-标准库篇-fmt标准库

目录 一、三种类型的输出 print&#xff1a; println&#xff1a; printf&#xff1a; 总结&#xff1a; 代码展示&#xff1a; 二、格式化占位符 %s&#xff1a;用于格式化字符串。 %d&#xff1a;用于格式化整数。 %f&#xff1a;用于格式化浮点数。 %v&#xff1…

【Linux操作系统】Linux常用一键脚本

Linux网络加速脚本 Linux网络加速脚本可以替换Linux内核和更改TCP拥塞算法的一键脚本&#xff0c;包括安装BBR内核、XANMOD官方内核&#xff0c;开启BBR加速等功能&#xff0c;总之非常强大。 不卸载内核脚本&#xff08;一般用这个&#xff09; wget -O tcpx.sh "http…