Kafka-代码示例

一、构建开发环境

File > New > Project

选择一个最简单的模板

项目和坐标命名

配置maven路径

添加maven依赖

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.1</version></dependency>
</dependencies>

 加载刚刚添加的依赖

此时发现项目还没有包目录,如果遇到这种情况,点击新建目录就会自动提示了

二、创建一个新的topic

kafka-topics --create --topic kafka-study --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
#查看topic详情
kafka-topics --describe --zookeeper cdh1:2181 --topic kafka-study
#查看 topic 指定分区 offset
kafka-run-class kafka.tools.GetOffsetShell --topic kafka-study --time -1 --broker-list cdh1:9092

三、编写生产者

kafka源码中有生产者和消费者的示例,我们简单修改下就直接用了

package org.example.kafkaStudy;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;public class KafkaProducerDemo {public static void main(String[] args) {try{//topic名称String topicName = "kafka-study";//broker列表String bootstrapServers = "cdh1:9092,cdh2:9092,cdh3:9092";//向topic打多少数据int numRecords = 10000;//是否异步推送数据boolean isAsync = true;int key = 0;int sentRecords = 0;//创建生产者KafkaProducer<Integer, String> producer = createKafkaProducer(bootstrapServers,-1,null,false);//判断是否达到生产要求while (sentRecords < numRecords) {if (isAsync) {//异步推送asyncSend(producer,topicName, key, "test" + key,sentRecords);} else {//同步推送syncSend(producer,topicName, key, "test" + key,sentRecords);}key++;sentRecords++;}producer.close();} catch (Throwable e) {e.printStackTrace();}}private static RecordMetadata syncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords)throws ExecutionException, InterruptedException {try {// 发送记录,然后调用get,这会阻止等待来自broker的ackRecordMetadata metadata = producer.send(new ProducerRecord<>(topicName, key, value)).get();Utils.maybePrintRecord(sentRecords, key, value, metadata);return metadata;} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException| OutOfOrderSequenceException | SerializationException e) {Utils.printErr(e.getMessage());} catch (KafkaException e) {Utils.printErr(e.getMessage());}return null;}private static void asyncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords) {//异步发送记录,设置一个回调以通知结果。//请注意,即使使用linger.ms=0设置了一个batch.size 当缓冲区内存已满或元数据不可用时,发送操作仍将被阻止producer.send(new ProducerRecord<>(topicName, key, value), new ProducerCallback(key, value,sentRecords));}private static KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers ,int transactionTimeoutMs,String transactionalId,boolean enableIdempotency) {Properties props = new Properties();// 生产者连接到broker需要引导服务器配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/portprops.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());// 设置序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);if (transactionTimeoutMs > 0) {// 事务协调器主动中止正在进行的事务之前的最长时间props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);}if (transactionalId != null) {// 事务id必须是静态且唯一的,它用于在流程重启过程中标识相同的生产者实例props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);}// 在分区级别启用重复保护props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);return new KafkaProducer<>(props);}static class ProducerCallback implements Callback {private final int key;private final int sentRecords;private final String value;public ProducerCallback(int key, String value,int sentRecords) {this.key = key;this.sentRecords = sentRecords;this.value = value;}/*** 用户可以实现一种回调方法,以提供请求完成的异步处理。当发送到服务器的记录得到确认时,将调用此方法。当回调中的异常不为null时,* 元数据将包含除topicPartition之外的所有字段的特殊-1值,该值将有效。** @param metadata 发送的记录的元数据(即分区和偏移量)。如果发生错误,将返回除topicPartition之外的所有字段的值为-1的空元数据。* @param exception 处理此记录时引发的异常。如果没有发生错误,则为空。*/public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {Utils.printErr(exception.getMessage());if (!(exception instanceof RetriableException)) {// 我们无法从这些异常中恢复过来}} else {Utils.maybePrintRecord(sentRecords, key, value, metadata);}}}
}

四、编写消费者

package org.example.kafkaStudy;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import static java.util.Collections.singleton;public class KafkaConsumerDemo {public static void main(String[] args) {//topic名称String topicName = "kafka-study";//组名称String groupName = "my-group-1";//broker列表String bootstrapServers =  "cdh1:9092,cdh2:9092,cdh3:9092";//向topci打多少数据int numRecords = 10000;int remainingRecords = 10000;// 消费来自 topic = kafka-study 的数据KafkaConsumer<Integer, String> consumer = createKafkaConsumer(bootstrapServers,groupName,false);//订阅主题列表以获取动态分配的分区此类实现了我们在此处传递的再平衡侦听器,以接收此类事件的通知consumer.subscribe(singleton(topicName));Utils.printOut("Subscribed to %s", topicName);while (remainingRecords > 0) {try {// 如果需要,轮询会更新分区分配并调用配置的重新平衡侦听器,然后尝试使用上次提交的偏移量或auto.offset.reset按顺序获取记录。// 如果有记录或超时返回空记录集,则重置策略会立即返回。下一次轮询必须在session.timeout.ms中调用,以避免组重新平衡ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<Integer, String> record : records) {Utils.maybePrintRecord(numRecords, record);}remainingRecords -= records.count();} catch (AuthorizationException | UnsupportedVersionExceptione) {// 我们无法从这些异常中恢复过来Utils.printErr(e.getMessage());} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {// 在没有auto.reset.policy的情况下,找不到偏移量或偏移量无效Utils.printOut("Invalid or no offset found, using latest");consumer.seekToEnd(e.partitions());consumer.commitSync();} catch (KafkaException e) {// 记录异常并尝试继续Utils.printErr(e.getMessage());}}consumer.close();Utils.printOut("Fetched %d records", numRecords - remainingRecords);}private static KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers,String groupId , boolean readCommitted) {Properties props = new Properties();// 消费者连接到broker需要引导服务器配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/portprops.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());// 当我们使用订阅(topic)进行组管理时,需要消费者groupIdprops.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//设置静态成员资格以提高可用性(例如滚动重启)
//        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));//启用EOS时禁用自动提交,因为偏移量与事务一起提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");//读取数据用到的反序列化器,需要和生产者对应props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);if (readCommitted) {// 跳过正在进行和已中止的事务props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");}// 在偏移无效或没有偏移的情况下设置重置偏移策略props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");return new KafkaConsumer<>(props);}
}

五、运行程序

生产者日志打印

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(0, test0), partition(kafka-study-0), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(1, test1), partition(kafka-study-0), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(5, test5), partition(kafka-study-0), offset(2)

......

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9940, test9940), partition(kafka-study-0), offset(4979)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9960, test9960), partition(kafka-study-0), offset(4987)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9970, test9970), partition(kafka-study-0), offset(4991)

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(2, test2), partition(kafka-study-1), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(3, test3), partition(kafka-study-1), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(4, test4), partition(kafka-study-1), offset(2)

.......

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9950, test9950), partition(kafka-study-1), offset(4966)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9980, test9980), partition(kafka-study-1), offset(4986)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9990, test9990), partition(kafka-study-1), offset(4991)

我们再次用命令看下每个分区的offset

消费者日志打印

main - Subscribed to kafka-study
main - Sample: record(0, test0), partition(kafka-study-0), offset(0)
main - Sample: record(1000, test1000), partition(kafka-study-0), offset(506)
main - Sample: record(2000, test2000), partition(kafka-study-0), offset(1020)
main - Sample: record(3000, test3000), partition(kafka-study-0), offset(1554)
main - Sample: record(7000, test7000), partition(kafka-study-0), offset(3550)
main - Sample: record(4000, test4000), partition(kafka-study-1), offset(1929)
main - Sample: record(5000, test5000), partition(kafka-study-1), offset(2422)
main - Sample: record(6000, test6000), partition(kafka-study-1), offset(2932)
main - Sample: record(8000, test8000), partition(kafka-study-1), offset(3963)
main - Sample: record(9000, test9000), partition(kafka-study-1), offset(4467)
main - Fetched 10000 records

六、问题说明

从日志中我们可以看到,在异步生产和消费时offset并不是逐个递增上去的,这是为什么呢?

在前面博客中我们提到,生产者在异步的情况下会启用批处理,即:Kafka生产者将尝试在内存中积累数据,并在单个请求中发送更大的批处理。批处理可以配置为积累不超过固定数量的消息,并且等待时间不超过一些固定的延迟限制(例如64k或10毫秒)。这允许积累更多的消息来发送,并且在服务器上几乎没有更大的I/O操作。这种缓冲是可配置的,并提供了一种机制来权衡少量额外的延迟以获得更好的吞吐量。当然如果你选择的是同步推送或者异步中单条消息特别大会导致批处理优化使用不到。

消费者也是从brokers一批一批的拉取数据来消费的

我们也可以看下broker的日志中数据的索引情况

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.log | head -10

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.index | head -10

从这里我们可以看到,生产者是一批一批往broker推送的,broker以更大的批次往磁盘写,从而降低推送的频次,也降低与磁盘交互的频次。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/459705.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue2项目在发布后更新,提示用户刷新页面

1、在项目根目录创建resetVersion.js的文件&#xff0c;内容如下 &#xff08;具体路径可能会有点问题&#xff0c;但是不影响&#xff09; const path require(path); const fsExtra require(fs-extra);const runBuild async () > {try {const OUTPUT_DIR public; // …

WebGIS开发丨从入门到进阶,全系列课程分享

WebGIS开发所需的技能 1.前端技能&#xff1a;Html、CSS、 Javascript、WebAPLs、Vue 2.二维技能&#xff1a;WebGIS基础理论及开发、MapGIS二次开发Openlayers、Leaflet、Mapbox 、Echarts、公共开发平台开发等 3.三维技能&#xff1a;Blender、Three.js、Cesium等 Web开发…

17 Docker容器存储架构:docker存储持久化-bind mount

文章目录 三、docker存储持久化-bind mount3.1 将 /root/htdocs 目录下的 index.html 文件挂载给一个 httpd 容器3.2 更新宿主机上的 index.html 文件内容,并查看容器中的内容3.3 查看挂载类型3.4 创建基于 docker volume 的 container 镜像3.5 删除容器,销毁容器后,volume 依…

centos7 zabbix监控nginx的pv和uv和status_code

zabbix监控nginx的pv&#xff1a; pv)cat /var/log/nginx/access.log|awk {print $1}|wc -l;;zabbix-get验证&#xff1a; [rootbogon ~]# zabbix_get -s 192.168.253.231 -k pv_uv[pv] 100zabbix监控nginx的uv uv)cat /var/log/nginx/access.log|awk {print $1}|uniq -c | w…

C++进阶-->多态(Polymorphism)

1. 多态的概念 多态&#xff0c;顾名思义多种形态&#xff1b;多态分为编译时多态&#xff08;静态多态&#xff09;和运行时多态&#xff08;动态多态&#xff09;&#xff0c;静态多态就是就是我们前面讲的函数重载和函数模板&#xff0c;可以通过传不同类型&#xff0c;然后…

虚拟机桥接模式连不上,无法进行SSH等远程操作

说明&#xff1a;以下情况在window10上遇到&#xff0c;解决后顺便做了个笔记&#xff0c;以防后续再次用到&#xff0c;也给同道中人提供一个解决方案 一、首先按照以下步骤进行检查 1、是否连接了对应的wifi 2、是否设置了桥接模式 3、上述1、2确认无误的情况下请查看右上…

Flutter Image和Text图文组件实战案例

In this section, we’ll go through the process of building a user interface that showcases a product using the Text and Image widgets. We’ll follow Flutter’s best practices to ensure a clean and effective UI structure. 在本节中&#xff0c;我们将使用“Te…

使用Kubernetes管理容器化应用

使用Kubernetes管理容器化应用 Kubernetes简介 安装Kubernetes 安装Minikube 启动Minikube集群 创建一个简单的Web应用 创建项目目录 初始化项目 安装Node.js依赖 创建Docker镜像 编写Dockerfile 构建并推送Docker镜像 创建Kubernetes配置文件 创建Deployment 创建Service …

<十六>Ceph mon 运维

Ceph 集群有故障了&#xff0c;你执行的第一个运维命令是什么&#xff1f; 我猜测是ceph -s 。无论执行的第一个命令是什么&#xff0c;都肯定是先检查Mon。 在开始之前我们有必要介绍下Paxos协议&#xff0c;毕竟Mon就是靠它来实现数据唯一性。 一&#xff1a; Paxos 协议 1…

计算机网络-MSTP的基础概念

前面我们大致了解了MSTP的由来&#xff0c;是为了解决STP/RSTP只有一根生成树导致的VLAN流量负载分担与次优路径问题&#xff0c;了解MSTP采用实例映射VLAN的方式实现多实例生成树&#xff0c;MSTP有很多的理论概念需要知道&#xff0c;其实与其它的知识一样理论复杂配置还好的…

电源完整性

电源分配系统 电源分配系统:Power Distribution Network(简称 PDN) 真正用电节点在 Die&#xff0c;所以PDN系统包含 PCB 和 Package上的部分 PCB 上:VRM、大电容、小电容、电源平面、地平面 Package内:电容、电源平面、地平面 电源噪声的产生 稳压电源芯片本身的输出不恒定&a…

基于SpringBoot云养鸡互动平台的设计与实现

前言 对于当今社会的人们来说&#xff0c;互联网技术是必不可少的&#xff0c;随着经济和技术的不断发展&#xff0c;计算机已经深入到各个领域。云养鸡互动平台将人们的时间需求与计算机技术结合起来&#xff0c;架起一座桥梁&#xff0c;使云养鸡互动更加方便快捷。云养鸡互…

枫清科技仲光庆:AI+行业新范式,双轮驱动助力数智化升级

10月23日&#xff0c;由财视传媒主办&#xff0c;未来图灵、尚品新消费、未来企业家俱乐部、传播达人汇联合主办的“赢在大模型时代”2024未来发布论坛暨“向上拾年”财视十周年盛典在北京举行。枫清科技联合创始人兼 CPO 仲光庆应邀出席并分享了“AI行业落地新范式”&#xff…

谷歌云GCP基础概念讲解

概览 云的基础是虚拟化&#xff1a;服务器&#xff0c;存储&#xff0c;网络。服务器是远程计算机的逻辑分区。存储是物理硬盘的逻辑划分。网络则是虚拟私有云。 谷歌是唯一一个拥有全球私有基础设施的公司&#xff1b;他们的谷歌云基础设施没有任何一部分通过公共互联网。换句…

【ACM出版,EI稳定检索,九大高校联合举办, IEEE Fellow支持】2024年计算机视觉与艺术研讨会(CVA 2024)

在线投稿&#xff1a;学术会议-学术交流征稿-学术会议在线-艾思科蓝 2024年计算机视觉与艺术国际学术会议&#xff08;CVA 2024&#xff09;作为2024年人工智能、数字媒体技术与交互设计国际学术会议&#xff08;ICADI 2024)的分会。此次大会旨在汇聚全球在计算机视觉与艺术…

Java知识巩固(十二)

I/O JavaIO流了解吗&#xff1f; IO 即 Input/Output&#xff0c;输入和输出。数据输入到计算机内存的过程即输入&#xff0c;反之输出到外部存储&#xff08;比如数据库&#xff0c;文件&#xff0c;远程主机&#xff09;的过程即输出。数据传输过程类似于水流&#xff0c;因…

基于SSM+小程序的智慧旅游平台登录管理系统(旅游2)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1、项目介绍 旅游平台开发微信小程序功能有管理员和用户。 1、管理员功能有个人中心&#xff0c;用户管理&#xff0c;景点分类管理&#xff0c;旅游景点管理&#xff0c;景点购票管理&#xff0c;景…

我谈Canny算子

在Canny算子的论文中&#xff0c;提出了好的边缘检测算子应满足三点&#xff1a;①检测错误率低——尽可能多地查找出图像中的实际边缘&#xff0c;边缘的误检率&#xff08;将边缘识别为非边缘&#xff09;低&#xff0c;且避免噪声产生虚假边缘&#xff08;将非边缘识别为边缘…

量子容错计算

基本思想 容错量子计算的基本想法是&#xff0c;在合理编码后的量子态上直接量子计算&#xff0c;以至于不完全需要解码操作。假设有一个简单的量子电路&#xff0c;但不幸的是噪声影响着这个电路的每一个元件&#xff0c;包括量子态的制备、量子逻辑门、对输出的测量&#x…

代码随想录算法训练营第十一天(补) 栈与队列| 后序表达式、滑动窗口、高频元素、链表总结

目录 一、150. 逆波兰表达式求值 二、239. 滑动窗口最大值 三、347.前 K 个高频元素 四、总结 一、150. 逆波兰表达式求值 力扣题目链接(opens new window) 根据 逆波兰表示法&#xff0c;求表达式的值。 有效的运算符包括 , - , * , / 。每个运算对象可以是整数&#x…