Kafka生产者使用案例

 本文代码链接:https://download.csdn.net/download/shangjg03/88422633

1.生产者发送消息的过程

首先介绍一下 Kafka 生产者发送消息的过程:

1)Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。

2) 接下来,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。

3) 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。

2.创建生产者

2.1 项目依赖

本项目采用 Maven 构建,想要调用 Kafka 生产者 API,需要导入 `kafka-clients` 依赖,如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>

2.2 创建生产者

创建 Kafka 生产者时,以下三个属性是必须指定的:

bootstrap.servers:指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;

key.serializer:指定键的序列化器;

value.serializer:指定值的序列化器。

创建的示例代码如下:

public class SimpleProducer {public static void main(String[] args) {String topicName = "Hello-Kafka";Properties props = new Properties();props.put("bootstrap.servers", "hadoop001:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");/*创建生产者*/Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i, "world" + i);/* 发送消息*/producer.send(record);}/*关闭生产者*/producer.close();}
}

2.3 测试

2.3.1启动Kakfa

Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:

zookeeper启动命令
bin/zkServer.sh start# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:

bin/kafka-server-start.sh config/server.properties

2.3.2 创建topic

# 创建用于测试主题
bin/kafka-topics.sh --create \--bootstrap-server hadoop001:9092 \--replication-factor 1 --partitions 1 \--topic Hello-Kafka# 查看所有主题bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

2.3.3 启动消费者

 启动一个控制台消费者用于观察写入情况,启动命令如下:

bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning

2.3.4 运行项目

此时可以看到消费者控制台,输出如下,这里 `kafka-console-consumer` 只会打印出值信息,不会打印出键信息。

2.4 可能出现的问题

在这里可能出现的一个问题是:生产者程序在启动后,一直处于等待状态。这通常出现在你使用默认配置启动 Kafka 的情况下,此时需要对 `server.properties` 文件中的 `listeners` 配置进行更改:

hadoop001 为我启动kafka服务的主机名,你可以换成自己的主机名或者ip地址
listeners=PLAINTEXT://hadoop001:9092

3.发送消息

上面的示例程序调用了 `send` 方法发送消息后没有做任何操作,在这种情况下,我们没有办法知道消息发送的结果。想要知道消息发送的结果,可以使用同步发送或者异步发送来实现。

3.1 同步发送

在调用 `send` 方法后可以接着调用 `get()` 方法,`send` 方法的返回值是一个 Future<RecordMetadata>对象,RecordMetadata 里面包含了发送消息的主题、分区、偏移量等信息。改写后的代码如下:

for (int i = 0; i < 10; i++) {try {ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);/*同步发送消息*/RecordMetadata metadata = producer.send(record).get();System.out.printf("topic=%s, partition=%d, offset=%s \n",metadata.topic(), metadata.partition(), metadata.offset());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
}

此时得到的输出如下:偏移量和调用次数有关,所有记录都分配到了 0 分区,这是因为在创建 `Hello-Kafka` 主题时候,使用 `--partitions` 指定其分区数为 1,即只有一个分区。

topic=Hello-Kafka, partition=0, offset=40 
topic=Hello-Kafka, partition=0, offset=41 
topic=Hello-Kafka, partition=0, offset=42 
topic=Hello-Kafka, partition=0, offset=43 
topic=Hello-Kafka, partition=0, offset=44 
topic=Hello-Kafka, partition=0, offset=45 
topic=Hello-Kafka, partition=0, offset=46 
topic=Hello-Kafka, partition=0, offset=47 
topic=Hello-Kafka, partition=0, offset=48 
topic=Hello-Kafka, partition=0, offset=49 

3.2 异步发送

通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此 Kafka 提供了异步发送和回调函数。 代码如下:

for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);/*异步发送消息,并监听回调*/producer.send(record, new Callback() {
        @Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.out.println("进行异常处理");} else {System.out.printf("topic=%s, partition=%d, offset=%s \n",metadata.topic(), metadata.partition(), metadata.offset());}}});
}

4.自定义分区器

Kafka 有着默认的分区机制:

如果键值为 null, 则使用轮询 (Round Robin) 算法将消息均衡地分布到各个分区上;

如果键值不为 null,那么 Kafka 会使用内置的散列算法对键进行散列,然后分布到各个分区上。

某些情况下,你可能有着自己的分区需求,这时候可以采用自定义分区器实现。这里给出一个自定义分区器的示例:

4.1 自定义分区器

/*** 自定义分区器*/
public class CustomPartitioner implements Partitioner {private int passLine;    @Overridepublic void configure(Map<String, ?> configs) {/*从生产者配置中获取分数线*/passLine = (Integer) configs.get("pass.line");}    @Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {/*key 值为分数,当分数大于分数线时候,分配到 1 分区,否则分配到 0 分区*/return (Integer) key >= passLine ? 1 : 0;}    @Overridepublic void close() {System.out.println("分区器关闭");}
}

需要在创建生产者时指定分区器,和分区器所需要的配置参数:

public class ProducerWithPartitioner {public static void main(String[] args) {String topicName = "Kafka-Partitioner-Test";Properties props = new Properties();props.put("bootstrap.servers", "hadoop001:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");/*传递自定义分区器*/props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner");/*传递分区器所需的参数*/props.put("pass.line", 6);Producer<Integer, String> producer = new KafkaProducer<>(props);for (int i = 0; i <= 10; i++) {String score = "score:" + i;ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);/*异步发送消息*/producer.send(record, (metadata, exception) ->System.out.printf("%s, partition=%d, \n", score, metadata.partition()));}producer.close();}
}

4.2 测试

需要创建一个至少有两个分区的主题:

 bin/kafka-topics.sh --create \--bootstrap-server hadoop001:9092 \--replication-factor 1 --partitions 2 \--topic Kafka-Partitioner-Test

此时输入如下,可以看到分数大于等于 6 分的都被分到 1 分区,而小于 6 分的都被分到了 0 分区。

score:6, partition=1, 
score:7, partition=1, 
score:8, partition=1, 
score:9, partition=1, 
score:10, partition=1, 
score:0, partition=0, 
score:1, partition=0, 
score:2, partition=0, 
score:3, partition=0, 
score:4, partition=0, 
score:5, partition=0, 
分区器关闭

5.生产者其他属性

上面生产者的创建都仅指定了服务地址,键序列化器、值序列化器,实际上 Kafka 的生产者还有很多可配置属性,如下:

1. acks

acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:

acks=0: 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;

acks=1: 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;

acks=all:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。

2. buffer.memory

设置生产者内存缓冲区的大小。

3. compression.type

默认情况下,发送的消息不会被压缩。如果想要进行压缩,可以配置此参数,可选值有 snappy,gzip,lz4。

4. retries

发生错误后,消息重发的次数。如果达到设定值,生产者就会放弃重试并返回错误。

5. batch.size

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。

6. linger.ms

该参数制定了生产者在发送批次之前等待更多消息加入批次的时间。

7. clent.id

客户端 id,服务器用来识别消息的来源。

8. max.in.flight.requests.per.connection

指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量,把它设置为 1 可以保证消息是按照发送的顺序写入服务器,即使发生了重试。

9. timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms

- timeout.ms 指定了 borker 等待同步副本返回消息的确认时间;

- request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间;

- metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如分区首领是谁)时等待服务器返回响应的时间。

10. max.block.ms

指定了在调用 `send()` 方法或使用 `partitionsFor()` 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

11. max.request.size

该参数用于控制生产者发送的请求大小。它可以指发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1000K ,那么可以发送的单个最大消息为 1000K ,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1K。 

12. receive.buffer.bytes & send.buffer.byte

这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/159614.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何正确维护实验室超声波清洗器?

实验室一直被视为一个严谨而严肃的场所&#xff0c;实验应遵循一定的步骤&#xff0c;使用的设备也经历了详细的选择&#xff0c;如实验室超声波清洗机&#xff0c;其特点远强于一般类型的清洗机。专门负责采购的实验室人员一般对优质服务的实验室超声波清洗机印象深刻&#xf…

计算机毕业设计 无人智慧超市管理系统的设计与实现 Javaweb项目 Java实战项目 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

MySQL学习(二)——MySQL内置函数

文章目录 1. 函数1.1 字符串函数1.2 数值函数1.3 日期函数1.4 流程函数 2. 约束2.1 概述2.2 外键约束2.2.1 外键使用2.2.2 删除/更新行为 1. 函数 和其他编程语言一样&#xff0c;MySQL也有函数的定义。函数 是指一段可以直接被另一段程序调用的程序或代码。 也就意味着&#…

竞赛 深度学习OCR中文识别 - opencv python

文章目录 0 前言1 课题背景2 实现效果3 文本区域检测网络-CTPN4 文本识别网络-CRNN5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习OCR中文识别系统 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;…

xcode打包macos报错:FlutterInputs.xcfilelist 和 FlutterOutputs.xcfilelist

xcode 打包macos的时候&#xff0c;报错如下&#xff1a; Unable to load contents of the file list: ‘macos/ephemeral/FlutterInputs.xcfilelist’ ‘macos/ephemeral/FlutterOutputs.xcfilelist’ 解决方案&#xff1a; 我的项目macos下没有找到FlutterInputs.xcfilelis…

k8s-10 cni 网络

k8s通过CNI接口接入其他网络插件来实现网络通讯。目前比较流行的插件有flannel,calico等。 CNI插件存放位置: # cat /etc/cni/net.d/10-flannel.conflist 插件使用的解决方案如下: 虚拟网桥&#xff0c;虚拟网卡&#xff0c;多个容器共用一个虚拟网卡进行通信。多路复用: Mac…

零经验想跳槽转行网络安全,需要准备什么?

最近在后台看到很多私信都是有关转行网络安全的问题&#xff0c;目前咨询最多的都是&#xff1a;觉得现在的工作没有发展空间&#xff0c;替代性强&#xff0c;工资低&#xff0c;想跳槽转行网络安全。其中&#xff0c;他们主要关心的是&#xff1a;没有经验怎么学习&#xff1…

2022最新版-李宏毅机器学习深度学习课程-P14 批次(batch)与动量(momentum)

一、batch 回顾epoch、shuffle batch size大还是小&#xff1f;都有好处 大batchsize的好处 由于GPU有并行计算的能力&#xff0c;左边并不一定用时更长 反而是&#xff0c;batch size小的时候&#xff0c;要跑完一个epoch所用的update时间更长&#xff0c;所以时间方面的比较…

java 两个list比较,删除相同的元素

概述 在Java开发中&#xff0c;经常需要比较两个List并删除相同的元素。本文将介绍整个流程&#xff0c;并提供相应的代码示例&#xff0c;帮助新手开发者完成这个任务。 流程 下面是比较两个List并删除相同元素的流程&#xff1a; 代码示例 创建两个List 我们首先需要创建两…

[开源]企业级流程中心,基于flowable和bpmn.js封装的流程引擎

一、开源项目简介 企业级流程中心&#xff08;基于flowable和bpmn.js封装的流程引擎&#xff0c;采用Springboot&#xff0c;Mybatis-plus, Ehcache, sa-token 等框架技术,前端采用Vue3&Antd&#xff0c;Vben&#xff09;。 二、开源协议 使用Apache-2.0开源协议 三、界…

docker 复习

文章目录 docker 安装配置镜像加速器拉取镜像的仓库&#xff1a; docker 部署Mysql 镜像命令的详细解释docker 相关命令总结 docker 安装 查看是否安装的镜像已经在系统中存在&#xff1a; docker images &#xff08;存在2&#xff0c;不存在 3&#xff09; 卸载旧版本 yum r…

技术分享:深入浅出讲解GLSB是什么?

在互联网早期&#xff0c;由于网络不是很发达&#xff0c;流量也相对比较小&#xff0c;单体架构已经能足够满足需求。但伴随着互联网越来越&#xff0c;网站的流量请求甚至能达到上千亿。为了实现高可用&#xff0c;需要用到多台机器来提升处理流量的能力。在这种环境下&#…

C++项目实战——基于多设计模式下的同步异步日志系统-⑫-日志宏全局接口设计(代理模式)

文章目录 专栏导读日志宏&全局接口设计全局接口测试项目目录结构整理示例代码拓展示例代码 专栏导读 &#x1f338;作者简介&#xff1a;花想云 &#xff0c;在读本科生一枚&#xff0c;C/C领域新星创作者&#xff0c;新星计划导师&#xff0c;阿里云专家博主&#xff0c;C…

DevExpress Reporting中文教程 - 如何在macOS等系统中生成导出报表文档

DevExpress Reporting是.NET Framework下功能完善的报表平台&#xff0c;它附带了易于使用的Visual Studio报表设计器和丰富的报表控件集&#xff0c;包括数据透视表、图表&#xff0c;因此您可以构建无与伦比、信息清晰的报表。 在本文中&#xff0c;我们将讨论如何在.NET MA…

如何制作有专业水准的的电子杂志:专家教你秘籍

​随着数字化时代的到来&#xff0c;电子杂志作为一种新型的传媒形式&#xff0c;越来越受到人们的关注和喜爱。但是&#xff0c;如何制作一份具有专业水准的电子杂志呢&#xff1f; 今天&#xff0c;给大家分享一款在线就能制作的电子杂志------FLBOOK&#xff0c;让你轻松打造…

宝塔面板部署express以及MySql项目

第一次在宝塔面板上部署express和MySql项目&#xff0c;部署过程一直跑不通接口&#xff0c;特此记录一下。 在部署的时候&#xff0c;建议第一步把数据库MySql给跑通&#xff0c;中间好多原因是由于数据库的原因给引起的。 一.连接数据库 &#xff08;1&#xff09;在宝塔面…

CSS变量 var()的用法

写在前面 这里介绍一下开发中常用的css变量var()&#xff0c;它可以实现样式的动态设置&#xff0c;使用方法主要分为全局使用和局部使用两种。 如何定义CSS变量var() 在CSS文件中&#xff0c;变量需要使用 – 作为前缀来定义&#xff0c;后面跟上变量名和值&#xff0c;如&a…

从0开始学go第八天

gin获取URL路径参数 package main//获取path&#xff08;URL&#xff09;参数 import ("net/http""github.com/gin-gonic/gin" )func main() {r : gin.Default()r.GET("/:name/:age", func(c *gin.Context) {//获取路径参数name : c.Param(&quo…

Python 连接数据库添加字段

任务需求&#xff1a; 数据库hospital集合所有数据添加一个八位数的编码 import pymongo# 连接数据customer&#xff08;库&#xff09;hospital&#xff08;集合&#xff09; client pymongo.MongoClient(host127.0.0.1) db client.customer collection db.hospitalhospit…

开源六轴机械臂myCobot 280末端执行器实用案例解析

Intrduction 大家好&#xff0c;今天这篇文章的主要内容是讲解以及使用一些myCobot 280 的配件&#xff0c;来了解这些末端执行器都能够完成哪些功能&#xff0c;从而帮助大家能够正确的选择一款适合的配件来进行使用。 本文中主要介绍4款常用的机械臂的末端执行器。 Product m…