【大数据学习 | kafka】producer的参数与结构

1. producer的结构

producer:生产者

它由三个部分组成

interceptor:拦截器能拦截到数据,处理完毕以后发送给下游,它和过滤器不同并不是丢弃数据,而是将数据处理完毕再次发送出去,这个默认是不存在的

serialiazer:序列化器kafka中存储的数据是二进制的,所以数据必须经过序列化器进行处理,这个是必须要有的,将用户的数据转换为byte[]的工具类,其中k和v要分别指定

partitioner: 分区器主要是控制发送的数据到topic的哪个分区中,这个默认也是存在的

record accumulator

本地缓冲累加器 默认32M

producer的数据不能直接发送到kafka集群中,因为producer和kafka集群并不在一起,远程发送的数据不是一次发送一条这样太影响发送的速度和性能,所以我们发送都是攒一批数据发一次,record accumulator就是一个本地缓冲区,producer将发送的数据放入到缓冲区中,另外一个线程会去拉取其中的数据,远程发送给kafka集群,这个异步线程会根据linger.msbatch-size进行拉取数据。如果本地累加器中的数据达到batch-size或者是linger.ms的大小阈值就会拉取数据到kafka集群中,这个本地缓冲区不仅仅可以适配两端的效率,还可以批次形式执行任务,增加效率

batch-size 默认16KB

linger.ms 默认为0

生产者部分的整体流程

首先producer将发送的数据准备好

经过interceptor的拦截器进行处理,如果有的话

然后经过序列化器进行转换为相应的byte[]

经过partitioner分区器分类在本地的record accumulator中缓冲

sender线程会自动根据linger.ms和batch-size双指标进行管控,复制数据到kafka

2. producer的简单代码

2.1 准备:

引入maven依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>
</dependencies>

在resources文件中创建log4j.properties

log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n

2.2 生产者中的设定参数

参数含义
bootstrap.serverskafka集群的地址
key.serializerkey的序列化器,这个序列化器必须和key的类型匹配
value.serializervalue的序列化器,这个序列化器必须和value的类型匹配
batch.size批次拉取大小默认是16KB
linger.ms拉取的间隔时间默认为0,没有延迟
partitioner分区器存在默认值
interceptor拦截器选的

2.3 全部代码

public class producer_test {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");//设定集群地址pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设定两个序列化器,其中StringSerializer是系统自带的序列化器,要和数据的类型完全一致pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);//batch-size默认是16KB,参数的单位是bytepro.put(ProducerConfig.LINGER_MS_CONFIG, 0);//默认等待批次时长是0KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");//发送数据的时候有kv两个部分,但是一般k我们什么都不放,只放value的值producer.send(record);producer.close();}
}

在x-shell中观察消费的数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/458853.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【c++篇】:探索c++中的std::string类--掌握字符串处理的精髓

✨感谢您阅读本篇文章&#xff0c;文章内容是个人学习笔记的整理&#xff0c;如果哪里有误的话还请您指正噢✨ ✨ 个人主页&#xff1a;余辉zmh–CSDN博客 ✨文章所属专栏&#xff1a;c篇–CSDN博客 文章目录 前言一.std::string对象的创建二.std::string对象的访问三.std::str…

读取有空格的string对象(getline)

文章目录 读取有空格的string对象1.使用标准库中的iostream来写2.**使用getline读取一整行** 读取有空格的string对象 1.使用标准库中的iostream来写 #include<iostream> using namespace std; int main() {string s;cin >> s;cout << s << endl;ret…

探索Python安全字符串处理的奥秘:MarkupSafe库揭秘

文章目录 探索Python安全字符串处理的奥秘&#xff1a;MarkupSafe库揭秘第一部分&#xff1a;背景介绍第二部分&#xff1a;MarkupSafe是什么&#xff1f;第三部分&#xff1a;如何安装MarkupSafe&#xff1f;第四部分&#xff1a;MarkupSafe的简单使用方法1. 使用escape函数2.…

Tomcat安装与使用

Tomcat优点 1、开源免费&#xff1a;是一个免费、开源的Web服务器&#xff0c;可以在任何环境下自由使用&#xff0c;无需支付任何费用。 2、轻量级&#xff1a;是一个轻量级的Web服务器&#xff0c;其核心仅有几百K&#xff0c;启动速度非常快。 3、易于安装和配置&#xff1a…

【笔记】LLM位置编码之标准位置编码

标准位置编码 起源原理证明&#xff1a;对于任何固定的偏移量 k k k&#xff0c; P E p o s k PE_{posk} PEposk​可以表示为 P E p o s PE_{pos} PEpos​的线性函数。计算 P E p o s k 与 P E p o s PE_{posk} 与PE_{pos} PEposk​与PEpos​的内积结论 通俗理解缺点 起源 由…

深度学习之降维和聚类

1 降维和聚类 1.1 图解为什么会产生维数灾难 ​ 假如数据集包含10张照片&#xff0c;照片中包含三角形和圆两种形状。现在来设计一个分类器进行训练&#xff0c;让这个分类器对其他的照片进行正确分类&#xff08;假设三角形和圆的总数是无限大&#xff09;&#xff0c;简单的…

Typora一款极简Markdown文档编辑器和阅读器,实时预览,序列号生成!免费!最新可用!

文章目录 一、Typora下载和安装二、Typora序列号生成 Typora是一款Markdown编辑器和阅读器&#xff0c;风格极简&#xff0c;实时预览&#xff0c;所见即所得&#xff0c;支持MacOS、Windows、Linux操作系统&#xff0c;有图片和文字、代码块、数学公式、图表、目录大纲、文件管…

异常处理与调试:如何编写稳健的代码(8/10)

目录 异常处理与调试&#xff1a;如何编写稳健的代码&#xff08;8/10&#xff09; 介绍 异常概述 常见的异常类型 使用 try...except 处理异常 基本结构 示例&#xff1a;读取文件内容 捕获多个异常 自定义异常 示例&#xff1a;自定义异常类 调试代码 使用 print…

AI跟踪报道第62期-本周AI新闻: 微软推出Copilot的AI Agent和Computer Control

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

重写(外壳不变)

重写&#xff1a;是子类对父类非静态、非private修饰、非final修饰、非构造方法等的实现过程进行重新编写返回值和形参都不能改变。 重写的好处&#xff1a;子类可以根据需要&#xff0c;定义专属于自己的行为。&#xff08;子类能够根据需要实现父类的方法&#xff09; 方法…

封装echarts组件,即插即用(附源码)

前言&#xff1a;最近一个项目刚收工&#xff0c;分享一个常用的封装echarts的组件。 一、直接上组件代码 <template><el-card class"echart-card" shadow"hover"><template v-slot:header><div class"card-header">&…

JS面试八股文(三)

&#x1f60a;文章目录 21.说一下事件循环22.ajax是什么&#xff1f;怎么实现&#xff1f;23.get和post有什么区别&#xff1f;24.Promise的内部原理是什么&#xff1f;它的缺点是什么&#xff1f;25.Promise和async await的区别是什么&#xff1f;26.浏览器的存储方式有哪些&a…

python实战(二)——房屋价格回归建模

一、任务背景 本章将使用一个经典的Kaggle数据集——House Prices - Advanced Regression Techniques进行回归建模的讲解。这是一个房价数据集&#xff0c;与我们熟知的波士顿房价数据集类似&#xff0c;但是特征数量要更多&#xff0c;数据也要更为复杂一些。下面&#xff0c;…

Linux 命令行查看当前目录的总大小/总磁盘空间/磁盘清理

一、du 查看目录空间大小 &#xff08;一&#xff09; du 命令解析 在Linux命令行可以使用 du 命令来查看当前目录的总大小。du 是 disk usage 的缩写&#xff0c;表示磁盘使用情况。 命令解释&#xff1a;总结每个文件的磁盘使用情况&#xff0c;递归地用于目录。 使用格式…

以通俗易懂的仓库来讲解JVM内存模型

JVM内存模型可以想象成一个大型的仓库&#xff0c;这个仓库被分成了几个不同的区域&#xff0c;每个区域都有特定的用途和规则。下面我们用一个仓库的比喻来介绍JVM内存模型&#xff1a; 仓库大门&#xff08;JVM启动&#xff09;&#xff1a; 当JVM启动时&#xff0c;就像打开…

自动化抖音点赞取消脚本批量处理

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

多个立方体盒子组成

效果&#xff1a; 知识了解&#xff1a; 在同一水平上&#xff0c;盒子经纬度计算&#xff1a;经度有误差&#xff0c;纬度没有误差 纬度计算&#xff1a;lat50/111320 约等于0.000449 经度计算&#xff1a;lon50/111320*cos(纬度) 约等于0.000519 一个立方体&#xff1a; // 添…

CentOS进入单用户模式进行密码重置

一、单用户模式介绍 单用户模式是一种特殊的启动模式&#xff0c;主要用于系统维护和故障排除。在单用户模式下&#xff0c;系统以最小化的状态启动&#xff0c;只有最基本的系统服务会被加载&#xff0c;通常只有root用户可以登录。这种模式提供了对系统的完全控制&#xff0…

模型训练识别手写数字(一)

一、模型训练数据集 1. 导入所需库 import numpy as np from sklearn.datasets import fetch_openmlnumpy 是用于数值计算的库。 fetch_openml 是用于从 OpenML 下载数据集的函数。 2. 获取 MNIST 数据集 X, y fetch_openml(mnist_784, version1, return_X_yTrue)fetch_ope…

Spring Boot与Flyway实现自动化数据库版本控制

一、为什么使用Flyway 最简单的一个项目是一个软件连接到一个数据库&#xff0c;但是大多数项目中我们不仅要处理我们开发环境的副本&#xff0c;还需要处理其他很多副本。例如&#xff1a;开发环境、测试环境、生产环境。想到数据库管理&#xff0c;我们立刻就能想到一系列问…