Kafka相关API开发

(一)引入依赖

        用API直接去操作kafka(读写数据)在实际开发中用的并不多,学习它主要还是为了加深对Kafka功能的理解。kafka的读写操作,实际开发中,是通过各类更上层的组件去实现。而这些组件在读写kafka数据时,用的当然是kafka的java api,比如flink、spark streaming和flume等。

<properties> <kafka.version>2.4.1</kafka.version>
</properties>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version>       
</dependency>

(二)API 开发——producer 生产者

1.构造一个生产者,可以持续发送大量数据

2.构造一个生产者,有必须设置的参数:

bootstrap.server

key.seralizer

value.seralizer

其他的,可选

3.使用特定接口

kafka的生产者发送用户的业务数据时,必须使用org.apache.kafka.common.serialization.Serializer接口的实现类这一序列化框架来序列化用户的数据。

4.发往指定topic

构造一个Kafka生产者后,并没有固定数据要发往的topic,因此,可以将不同的数据发往不同的topic

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** kafka生产者API代码示例*/
public class ProducerDemo {public static void main(String[] args) throws InterruptedException {// 泛型K:要发送的数据中的key// 泛型V:要发送的数据中的value// 隐含之意:kafka中的message,是Key-Value结果的(可以没有Key)Properties props = new Properties();props.setProperty("bootstrap.servers", "node141:9092,node142:9092");// 因为,kafka底层存储没有类型维护机制,用户所发的所有数据类型,都必须变成 序列化后的byte[]// 所以,kafka的producer需要一个针对用户要发送的数据类型的序列化工具类// 且这个序列化工具类,需要实现kafka所提供的序列化工具接口:org.apache.kafka.common.serialization.Serializerprops.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");/*** 代码中进行客户端参数配置的另一种写法*/props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node141:9092,node142:9092");props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.setProperty(ProducerConfig.ACKS_CONFIG, "all");// 消息发送应答级别// 构造一个生产者客户端KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {// 将业务数据封装成客户端所能发送的封装格式// 0->abc0// 1->abc1// TODO:奇数发往abcx,偶数发往abcyProducerRecord<String, String> message = null;if (i % 2 == 0) {message = new ProducerRecord<>("abcy", "user_id" + i, "doit_edu" + i);} else {message = new ProducerRecord<>("abcx", "user_id" + i, "doit_edu" + i);}// 消费时只会打印value的值,key并没有读到// 调用客户端去发送// 数据的发送动作在producer的底层是异步线程去异步发送的,即调用send方法立即执行完毕,直接走之后的代码,不代表数据发送成功producer.send(message);Thread.sleep(100);}// 关闭客户端
//        producer.flush();producer.close();}
}

5.消费消息

(三)API开发——consumer消费者

kafka消费者的起始消费位置有两种决定机制:

1.手动指定了起始位置,它肯定从指定的位置开始

2.如果没有手动指定起始位置,它去找消费者组之前所记录的偏移量开始

3.如果之前的位置也获取不到,就看参数:auto.offset.reset所指定的重置策略

4.如果指定的offset>原有分区内的最大offset,就自动重置到最大的offset

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;/*** kafka消费者API代码示例*/
public class ConsumerDemo {public static void main(String[] args) {Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node141:9092,node142:9092");props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// kafka的消费者,默认是从所属组之前所记录的偏移量开始消费,如果找不到之前记录的偏移量,则从如下参数配置的策略来确定消费起始偏移量:// 1.earliest:自动重置到每个分区的最前一条消息// 2.latest:自动重置到每个分区的最新一条消息// 3.none:如果没有为使用者的组找到以前的偏移,则向使用者抛出异常// 如果输入除了上述三种之外的,会向使用者抛出异常props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 如果latest消息找不到,consumer.seek就起作用了// 设置消费者所属的组idprops.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "d30-1");// 设置消费者自动提交最新的的消费位移——默认是开启的props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 设置自动提交位移的时间间隔——默认是5000msprops.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");// 构造一个消费者客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题(可以是多个)
//        consumer.subscribe(Collections.singletonList("abcx"));consumer.subscribe(Arrays.asList("abcx","abcy"));// 正则订阅主题
//        consumer.subscribe(Pattern.compile ("abc.*" ));// 显式指定消费起始偏移量/*TopicPartition abcxP0 = new TopicPartition("abcx", 0);TopicPartition abcxP1 = new TopicPartition("abcx", 1);consumer.seek(abcxP0,10);consumer.seek(abcxP1,15);*/// 循环往复拉取数据boolean condition = true;while (condition) {// 客户端去拉取数据的时候,如果服务端没有数据响应,会保持连接等待服务端响应// poll中传入的超时时长参数,是指等待的最大时长ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// Iterator:迭代器// Iterable:可迭代的,是迭代器的再封装// 实现了Iterable的对象,可以用增强for循环去遍历迭代,也可以从对象上取到iterator,来用iterator.hasNext来迭代// Iterator<ConsumerRecord<String, String>> iterator = records.iterator();// 直接用for循环来迭代本次取到的一批数据for (ConsumerRecord<String, String> record : records) {// ConsumerRecord中,不光有用户的业务数据,还有Kafka注入的元数据String key = record.key();String value = record.value();// 本条消息所属的topic:拉取的时候可能不止一个topic,所以会有这个方法String topic = record.topic();// 本条数据所属的分区int partition = record.partition();// 本条数据的偏移量long offset = record.offset();//key的长度int keySize = record.serializedKeySize();//value的长度int valueSize = record.serializedValueSize();// 当前这条数据所在分区的leader的朝代纪年Optional<Integer> leaderEpoch = record.leaderEpoch();// kafka的数据底层存储中,不光有用户的业务数据,还有大量元数据// timestamp就是其中之一:记录本条数据的时间戳// 时间戳有两种类型:一个是CreateTime(这条数据创建的时间——生产者), LogAppendTime(broker往log里面追加的时间)TimestampType timestampType = record.timestampType();long timestamp = record.timestamp();// 数据头:是生产者在写入数据时附加进去的,相当于用户自定义的元数据// 在生产者写入消息时,可以自定义元数据,所以record.headers()方法就能够消费到// public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)// 如果生产者写入消息时,没有定义元数据,record.headers()方法就不会消费到Headers headers = record.headers();//            for (Header header : headers) {
//                String hKey = header.key();
//                byte[] hValue = header.value();
//                String valueString = new String(hValue);
//                System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString);
//            }System.out.println(String.format("key = %s, value = %s,topic = %s , partition = %s, offset = %s," +"leader的纪元 = %s, timestampType = %s ,timestamp = %s," +" key序列化的长度 = %s, value 序列化的长度 = %s",key, value, topic, partition, offset,leaderEpoch.get(), timestampType.name, timestamp,keySize, valueSize));}}// 对数据进行业务逻辑处理// 关闭客户端// consumer.close();}
}

有了上面两个API,先开启消费者,然后开启生产者,消费者控制就会输出消息。

 // 当前这条数据所在分区的leader的朝代纪年
Optional<Integer> leaderEpoch = record.leaderEpoch();

当leader有变化,leaderEpoch.get()的值就会+1,初始值为0

(四)API开发——指定偏移量订阅消息

1.subscribe与assign订阅

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;/*** 指定偏移量*/
public class ConsumerDemo2 {public static void main(String[] args) throws IOException {Properties props = new Properties();// 从配置文件中加载props.load(ConsumerDemo2.class.getClassLoader().getResourceAsStream("consumer.properties"));props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "doit30-5");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);/*  // subscribe订阅,会参与消费者组的再均衡机制才能真正获得自己要消费的topic及其分区的consumer.subscribe(Collections.singletonList("ddd"));// 这里无意义地去拉一次数据,主要就是为了确保:分区分配动作已完成consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// 然后再定义到指定的偏移量,开始正式消费consumer.seek(new TopicPartition("ddd",0),2);*/// 既然要自己指定一个确定的起始消费位置,那通常隐含之意是不需要去参与消费者组自动再均衡机制,该方法比较常用// 那么,就不要使用subscribe来订阅主题consumer.assign(Arrays.asList(new TopicPartition("ddd", 0)));consumer.seek(new TopicPartition("ddd", 0), 4);while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));for (ConsumerRecord<String, String> record : records) {int keySize = record.serializedKeySize();int valueSize = record.serializedValueSize();System.out.println(String.format("key = %s, value = %s,topic = %s , partition = %s, offset = %s," +"leader的纪元 = %s, timestampType = %s ,timestamp = %s," +" key序列化的长度 = %s, value 序列化的长度 = %s",record.key(), record.value(), record.topic(), record.partition(), record.offset(),record.leaderEpoch().get(), record.timestampType().name, record.timestamp(),keySize, valueSize));}}}
}

2.subscribe与assign订阅具体区别

  • 通过subscribe()方法订阅主题具有消费者自动再均衡功能:

        在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。

  • assign()方法订阅分区时,是不具备消费者自动均衡的功能的:

        其实这一点从assign()方法参数可以看出端倪,两种类型 subscribe()都有 ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。

3.取消订阅

        如果将subscribe(Collection)或 assign(Collection)集合参数设置为空集合,作用与unsubscribe()方法相同,如下示例中三行代码的效果相同:

consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());

组协调器就是x组写消费位移的leader副本所在的broker。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/460503.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Backtrader 数据篇 02

Backtrader 数据篇 本系列是使用Backtrader在量化领域的学习与实践&#xff0c;着重介绍Backtrader的使用。Backtrader 中几个核心组件&#xff1a; Cerebro&#xff1a;BackTrader的基石&#xff0c;所有的操作都是基于Cerebro的。Feed&#xff1a;将运行策略所需的基础数据…

Leetcode224 -- 基本计算器及其拓展

题目分析&#xff1a; 其实这个计算器的实现并不难&#xff0c;因为除了括号就剩下加减法嘛&#xff0c;括号肯定比加减法先执行&#xff0c;但是加减法是同级的&#xff0c;只是会改变数字的正负号而已&#xff0c;所以实现的逻辑并不是很难&#xff0c;我们只需要一个栈&…

【jvm】为什么Xms和Xmx的值通常设置为相同的?

目录 1. 说明2. 避免性能开销3. 提升稳定性4. 简化配置5. 优化垃圾收集6. 获取参数6.1 代码示例6.2 结果示例 1. 说明 1.-Xms 和 -Xmx 参数分别用于设置堆内存的初始大小&#xff08;最小值&#xff09;和最大大小。2.在开发环境中&#xff0c;开发人员可能希望快速启动应用程…

瑞芯微RK3566/RK3568 Android11下该如何默认屏蔽导航栏/状态栏?看这篇文章就懂了

本文介绍瑞芯微RK3566/RK3568在Android11系统下&#xff0c;默认屏蔽导航栏/状态栏方法&#xff0c;使用触觉智能Purple Pi OH鸿蒙开发板演示&#xff0c;搭载了瑞芯微RK3566芯片&#xff0c;类树莓派设计&#xff0c;Laval官方社区主荐&#xff0c;已适配全新OpenHarmony5.0 R…

使用AIM对SAP PO核心指标的自动化巡检监控

一、背景 由于SAP PO系统维护成本较高&#xff0c;各类型异常报错等都需要人员进行时刻监控和响应&#xff0c;遂由AIM平台进行自动化巡检SAP PO的各指标&#xff0c;然后告警通知用户&#xff0c;节省维护成本和提高工作效率 二、核心指标监控 SAP PO失败消息 适用于S…

openpnp - 手工修改配置文件(元件高度,size,吸嘴)

文章目录 openpnp - 手工修改配置文件(元件高度,size,吸嘴)概述笔记parts.xmlpackages.xml 手工将已经存在的NT1,NT2拷贝出来改名备注END openpnp - 手工修改配置文件(元件高度,size,吸嘴) 概述 载入新板子贴片准备时&#xff0c;除了引入Named CSV文件&#xff0c;还要在ope…

Centos下安装Maven(无坑版)

Linux 安装 Maven Maven 压缩包下载与解压 华为云下载源&#xff0c;自行选择版本 下面的示例使用的是 3.8.1 版本 wget https://repo.huaweicloud.com/apache/maven/maven-3/3.8.1/binaries/apache-maven-3.8.1-bin.tar.gz解压 tar -zxvf apache-maven-3.8.1-bin.tar.gz移…

算法:排序

排序算法 1. 简单排序1.1 直接插入排序1.2 冒泡排序1.3 简单选择排序 2. 希尔排序3. 快速排序4. 堆排序5. 归并排序 将文件的内容按照某种规则进行排列。 排序算法的稳定判定&#xff1a;若在待排序的一个序列中&#xff0c; R i R_i Ri​和 R j R_j Rj​的关键码相同&#xf…

Topaz Photo AI for Mac人工智能图像降噪软件 安装教程【保姆级教程,简单操作轻松上手】

Mac分享吧 文章目录 Topaz Photo AI for Mac人工智能图像降噪软件 安装完成&#xff0c;软件打开效果一、Topaz Photo AI 人工智能图像降噪软件 Mac电脑版——v3.3.0⚠️注意事项&#xff1a;1️⃣&#xff1a;下载软件2️⃣&#xff1a;安装软件&#xff0c;根据步骤完成操作…

k8s部署redis远程连接示例

一、环境 节点 IP 服务 master 192.168.126.46 docker、kubeadm、kubelet、kubectl、flannel、telnet node1 192.168.126.47 docker、kubeadm、kubelet、kubectl、flannel、telnet node2 192.168.126.48 docker、kubeadm、kubelet、kubectl、flannel、telnet ubunt…

ubuntu内核更新导致显卡驱动掉的解决办法

方法1&#xff0c;DKMS指定内核版本 用第一个就行 1&#xff0c;借鉴别人博客解决方法 2&#xff0c;借鉴别人博客解决方法 方法2&#xff0c;删除多于内核的方法 系统版本&#xff1a;ubuntu20.24 这个方法是下下策&#xff0c;如果重装驱动还是不行&#xff0c;就删内核在…

Apache Hive分布式容错数据仓库系统

Apache Hive™是一个分布式的、容错的数据仓库系统&#xff0c;它支持大规模的分析&#xff0c;并使用SQL方便地读取、写入和管理驻留在分布式存储中的pb级数据。 Apache Hive Apache Hive是什么 Apache Hive是一个分布式的、容错的数据仓库系统&#xff0c;支持大规模的分析…

运用AI视频拍摄技术生成3D场景:适用于建模、XR及文旅项目Demo制作

利用AI技术从拍摄的视频中生成3D场景&#xff0c;这种创新方法非常适合用于快速构建高质量的3D模型。生成的3D场景不仅能够用于建筑和设计行业的模型展示&#xff0c;还能应用于扩展现实&#xff08;XR&#xff09;技术的大空间体验开发。此外&#xff0c;在文化旅游领域&#…

论文提交步骤 | 2024年第五届MathorCup大数据竞赛

2024年第五届MathorCup数学应用挑战赛—大数据竞赛于2024年10月25日下午6点正式开赛。 论文和承诺书、支撑材料&#xff08;可选&#xff09;及计算结果文档由各参赛队队长电脑登录下方报名主页提交&#xff1a; https://www.saikr.com/vse/bigdata2024 初赛作品提交截止时间为…

11-Dockerfile

11-Dockerfile Dockerfile Dockerfile是用来构建Docker镜像的文本文件&#xff0c;是由一条条构建镜像所需的指令和参数构成的脚本。 构建步骤&#xff1a; 编写Dockerfile文件docker build命令构建镜像docker run依据镜像运行容器实例 构建过程 Dockerfile编写&#xff1a…

如何合并几个pdf文件?值得推荐几个PDF文件的方法

如何合并几个pdf文件&#xff1f;这些PDF文件既是智慧的宝库&#xff0c;也是错综复杂的迷宫&#xff0c;它们如同夜空中散落的繁星&#xff0c;虽然各自闪耀&#xff0c;却因缺乏联系而难以汇聚成照亮前行之路的璀璨星河。当我们急需从这片信息的海洋中捕捞到关键的智慧珍珠时…

rabbitmq高级特性(2)TTL、死信/延迟队列、事务与消息分发

目录 1.TTL 1.1.设置消息过期时间 1.2.设置队列过期时间 2.死信队列 2.1.介绍 2.2.演示 3.延迟队列 3.1.模拟实现延迟队列 3.2.延迟队列插件 4.事务与消息分发 4.1.事务 4.2.消息分发 1.TTL 所谓的ttl&#xff0c;就是过期时间。对于rabbitmq&#xff0c;可以设置…

Java集合常见面试题总结(5)

HashSet 如何检查重复? 当你把对象加入HashSet时&#xff0c;HashSet 会先计算对象的hashcode值来判断对象加入的位置&#xff0c;同时也会与其他加入的对象的 hashcode 值作比较&#xff0c;如果没有相符的 hashcode&#xff0c;HashSet 会假设对象没有重复出现。但是如果发…

二十二、MySQL 8.0 主从复制原理分析与实战

文章目录 一、复制&#xff08;Replication&#xff09;1、什么是复制2、复制的方式3、复制的数据同步类型3.1、异步复制3.2、半同步复制3.3、设计理念&#xff1a;复制状态机——几乎所有的分布式存储都是这么复制数据的 4、基于binlog位点同步的主从复制原理4.1、异步复制示例…

测试管理|如何做好质量管理、提高研发的代码质量?

以下为作者观点&#xff1a; 起因是领导一直在提的一个观点&#xff1a;测试不能只测试系统&#xff0c;重点要放到质量管理上&#xff0c;要管理、监督研发的开发质量。 公司是乙方&#xff0c;接项目过日子&#xff0c;算是外包企业吧。项目时间一般都比较紧张&#xff08;…