kafka 发送文件二进制流及使用header发送附属信息

文章目录

  • 背景
  • 案例
    • 发送方
    • 接收方

背景

需要使用kafka发送文件二进制以及附属信息

案例

发送方

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;public class SendFileToKafka {public static void main(String[] args) {String filePath = "com/example/kafka/file/ConsumerFileByteArrayFromKafka.java";Properties kafkaProps = new Properties();kafkaProps.put("bootstrap.servers", "192.168.56.112:9092");kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");KafkaProducer<String, byte[]> producer = new KafkaProducer<>(kafkaProps);InputStream in = SendFileToKafka.class.getResourceAsStream("/com/example/kafka/file/ConsumerFileByteArrayFromKafka.java");try {byte[] buffer = new byte[in.available()];// 读到buffer字节数组中in.read(buffer);ProducerRecord<String, byte[]> record = new ProducerRecord<>("dataTopic", buffer);String header = "aaa";record.headers().add("test_header", header.getBytes(StandardCharsets.UTF_8));producer.send(record);in.close();producer.close();} catch (Exception e) {e.printStackTrace();}}
}

接收方

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;import java.util.Arrays;
import java.util.Properties;public class ConsumerFileByteArrayFromKafka {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.56.112:9092");props.put("group.id", "group1");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("dataTopic"));try {while (true) {ConsumerRecords<String, byte[]> records = consumer.poll(100);for (ConsumerRecord<String, byte[]> record : records) {Headers headers = record.headers();Iterable<Header> testHeader = headers.headers("test_header");for (Header header : testHeader) {String recordHeader = new String(header.value(), "UTF-8");System.out.println("recordHeader => " + recordHeader);}byte[] message = record.value();System.out.println(new String(message));}}} catch (Exception e) {e.printStackTrace();} finally {consumer.close();}}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/338548.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu20.04安装ffmpeg,并捕获视频流

工控机&#xff1a;幻影峡谷 系统&#xff1a;Ubuntu20.04 摄像头&#xff1a;杰瑞微通环星光USB摄像头记录一下使用ffmpeg拉取视频流的原因&#xff1a;刚开始用的是ubuntu系统自带的 茄子 软件&#xff0c;但是视频流很卡&#xff08;非常卡&#xff0c;基本上不能用&#xf…

开箱即用的Spring Boot 企业级开发平台【毕设项目推荐】

项目概述 基于 Spring 实现的通用权限管理平台&#xff08;RBAC模式&#xff09;。整合最新技术高效快速开发&#xff0c;前后端分离模式&#xff0c;开箱即用。 核心模块包括&#xff1a;用户、角色、职位、组织机构、菜单、字典、日志、多应用管理、文件管理、定时任务等功能…

【kubernetes】关于k8s集群如何将pod调度到指定node节点(亲和与反亲和等)

目录 一、调度约束 1.1K8S的 List-Watch 机制 ⭐⭐⭐⭐⭐ 1.1.1Pod 启动典型创建过程 二、调度过程 2.1Predicate&#xff08;预选策略&#xff09; 常见的算法 2.2priorities&#xff08;优选策略&#xff09;常见的算法 三、k8s将pod调度到指定node的方法 3.1指定…

css动态导航栏鼠标悬停特效

charset "utf-8"; /*科e互联特效基本框架CSS*/ body, ul, dl, dd, dt, ol, li, p, h1, h2, h3, h4, h5, h6, textarea, form, select, fieldset, table, td, div, input {margin:0;padding:0;-webkit-text-size-adjust: none} h1, h2, h3, h4, h5, h6{font-size:12px…

为什么GD32F303代码运行在flash比sram更快?

我们知道一般MCU的flash有等待周期&#xff0c;随主频提升需要插入flash读取的等待周期&#xff0c;以stm32f103为例&#xff0c;主频在72M时需要插入2个等待周期&#xff0c;故而代码效率无法达到最大时钟频率。 所以STM32F103将代码加载到sram运行速度更快。 但使用GD32F30…

20.Redis之缓存

1.什么是缓存&#xff1f; Redis 最主要的用途,三个方面:1.存储数据(内存数据库)2.缓存 【redis 最常用的场景】3.消息队列【很少见】 缓存 (cache) 是计算机中的⼀个经典的概念. 在很多场景中都会涉及到. 核⼼思路就是把⼀些常⽤的数据放到触⼿可及(访问速度更快)的地⽅, ⽅…

前端逆向之查看接口调用栈

一、来源 再分析前端请求接口数据的时候&#xff0c;其中有一个sid不知道是前端如何获取的&#xff0c;一般情况下只需要全局搜搜sid这个字符串或者请求接口的名称就可以了&#xff0c;基本都能找到sid的来源&#xff0c;但是今天这个不一样&#xff0c;搜什么都搜不到 接口地…

安装 ArchLinux 和 KDE Plasma 6 | 双系统/虚拟机

注&#xff1a;本文写于 2024/06/02 &#xff0c;ArchLinux 最新版为 2024.06.01 &#xff08;为什么用 Arch 懒得写了&#xff0c;给个别人写的链接&#xff1a;写在主力使用archlinux一年之后&#xff08;一&#xff09;Why Arch Linux? &#xff0c;总之就是pacman真香&…

【UnityShader入门精要学习笔记】第十六章 Unity中的渲染优化技术 (下)

本系列为作者学习UnityShader入门精要而作的笔记&#xff0c;内容将包括&#xff1a; 书本中句子照抄 个人批注项目源码一堆新手会犯的错误潜在的太监断更&#xff0c;有始无终 我的GitHub仓库 总之适用于同样开始学习Shader的同学们进行有取舍的参考。 文章目录 减少需要处…

数据结构复习指导之交换排序(冒泡排序,快速排序)

目录 交换排序 复习提示 1.冒泡排序 1.1基本思想 1.2算法代码 1.3性能分析 2.快速排序 2.1基本思想 2.2算法代码 2.3性能分析 交换排序 复习提示 所谓交换&#xff0c;是指根据序列中两个元素关键字的比较结果来对换这两个记录在序列中的位置。 基于交换的排序算法很…

ROS无人机追踪小车项目开发实战 | 第四届中国智能汽车创新大会圆满结束

2024年5月26日&#xff0c;阿木实验室在深圳第四届中国智能汽车创新大会上&#xff0c;开展的《Prometheus开源平台-ROS无人机追踪小车项目开发实战课》圆满结束。 该实战课从初学者的角度出发&#xff0c;通过实践性讲解和开发&#xff0c;使开发者们系统地学习了硬件系统架构…

vue3使用vue3-print-nb打印

打印效果 1.下载插件 Vue2.0版本安装方法 npm install vue-print-nb --saveVue3.0版本安装方法&#xff1a; npm install vue3-print-nb --save2.main.js引入 vue2引入 import Print from vue-print-nb Vue.use(Print)vue3引入 import print from vue3-print-nb // 打印…

在Windows安装Flutter

一、安装 Android Studio 官网&#xff1a; 下载 Android Studio 和应用工具 - Android 开发者 | Android Developers 教程&#xff1a;Android Studio 安装配置教程 - Windows(详细版)-CSDN博客 Flutter 官网&#xff1a;Windows | Flutter 中文文档 - Flutter 中文开发…

JVM学习-详解类加载器(一)

类加载器 类加载器是JVM执行类加载机制的前提 ClassLoader的作用 ClassLoader是Java的核心组件&#xff0c;所有的Class都是由ClassLoader进行加载的&#xff0c;ClassLoader负责通过各种方式将Class信息的二进制数据流读入JVM内部&#xff0c;转换为一个与目标类型对应的ja…

打开C语言常用的内存函数大门(三) —— memset()函数(内含讲解用法和模拟实现)

文章目录 1. 前言2. memset函数2.1 memset函数原型2.2 memset函数参数的介绍2.3 memset函数的使用演示 3. memset函数的模拟实现4. 总结 1. 前言 哈喽&#xff0c;我们又见面了。通过前面两个内存函数(memcpy、memmove函数)讲解的锤炼后&#xff0c;对如何解析一个自己从来没有…

V90PN伺服驱动器支持的标准报文介绍

1、V90 PN总线伺服通过FB285实现速度控制 V90 PN总线伺服通过FB285速度控制实现正弦位置轨迹运动(解析法和数值法对比测试)-CSDN博客文章浏览阅读448次。上面的位置函数有明确的解析函数&#xff0c;这里我们可以利用解析法求解其导数(微分),当然我们这里借助第三方数学软件求…

【LIN】STM32新能源汽车LIN通信实现过程

【LIN】STM32新能源汽车LIN通信实现过程 文章目录 前言一、软件二、接线图三、硬件原理图四、上位机五、PICO示波器串行解码1.软件中的LIN波特率设置-192002.PIC设置3.PIC串行解码 六.引用总结 前言 【电机控制】直流有刷电机、无刷电机汇总——持续更新 使用工具&#xff1a;…

opencv笔记(13)—— 停车场车位识别

一、所需数据介绍 car1.h5 是训练后保存的模型 class_directionary 是0&#xff0c;1的分类 二、图像数据预处理 对输入图片进行过滤&#xff1a; def select_rgb_white_yellow(self,image): #过滤掉背景lower np.uint8([120, 120, 120])upper np.uint8([255, 255, 255])#…

单片机原理及应用复习

单片机原理及应用 第二章 在AT89S52单片机中&#xff0c;如果采用6MHz晶振&#xff0c;一个机器周期为 2us 。 时钟周期Tocs1focs 机器周期 Tcy12focs 指令周期&#xff1a;一条指令所用的时间&#xff0c;单字和双字节指令一般为单机器周期和双机器周期。 AT89S5…

Unity2D横版摄像机跟随

在Unity2D横版游戏中&#xff0c;摄像机跟随是一个非常重要的功能。一个流畅的摄像机跟随系统可以让玩家更好地沉浸在游戏世界中。本文将介绍如何在Unity中实现2D横版摄像机跟随&#xff0c;并分享一些优化技巧。 一、准备工作 在开始实现摄像机跟随之前&#xff0c;请确保您…