Kafka收发消息核心参数详解

文章目录

  • 1、从基础的客户端说起
    • 1.1、消息发送者主流程
    • 1.2、消息消费者主流程
  • 2、从客户端属性来梳理客户端工作机制
    • 2.1、消费者分组消费机制

1、从基础的客户端说起

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></dependency>

1.1、消息发送者主流程

​ 然后可以使用Kafka提供的Producer类,快速发送消息。

public class MyProducer {private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");//同步发送:获取服务端应答消息前,会阻塞当前线程。RecordMetadata recordMetadata = producer.send(record).get();String topic = recordMetadata.topic();int partition = recordMetadata.partition();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();producer.close();}
}

​ 整体来说,构建Producer分为三个步骤:

  • 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  • 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  • 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

1.2、消息消费者主流程

​ 接下来可以使用Kafka提供的Consumer类,快速消费消息。

public class MyConsumer {private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}}
}

​ 整体来说,Consumer同样是分为三个步骤:

  • 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  • 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  • 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。
    ​ Kafka的客户端基本就是固定的按照这三个大的步骤运行。在具体使用过程中,最大的变数基本上就是给生产者和消费者的设定合适的属性。这些属性极大的影响了客户端程序的执行方式。

2、从客户端属性来梳理客户端工作机制

​ 渔与鱼:Kafka的客户端API的重要目的就是想要简化客户端的使用方式,所以对于API的使用,尽量熟练就可以了。对于其他重要的属性,都可以通过源码中的描述去学习,并且可以设计一些场景去进行验证。其重点,是要逐步在脑海之中建立一个Message在Kafka集群中进行流转的基础模型。

​ 其实Kafka的设计精髓,是在网络不稳定,服务也随时会崩溃的这些作死的复杂场景下,如何保证消息的高并发、高吞吐,那才是Kafka最为精妙的地方。但是要理解那些复杂的问题,都是需要建立在这个基础模型基础上的。

2.1、消费者分组消费机制

​ 这是我们在使用kafka时,最为重要的一个机制,因此最先进行梳理。

​ 在Consumer中,都需要指定一个GROUP_ID_CONFIG属性,这表示当前Consumer所属的消费者组。他的描述是这样的:

  public static final String GROUP_ID_CONFIG = "group.id";public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";

既然这里提到了kafka-based offset management strategy,那是不是也有非Kafka管理Offset的策略呢?

另外,还有一个相关的参数GROUP_INSTANCE_ID_CONFIG,可以给组成员设置一个固定的instanceId,这个参数通常可以用来减少Kafka不必要的rebalance。

​ 从这段描述中看到,对于Consumer,如果需要在subcribe时使用组管理功能以及Kafka提供的offset管理策略,那就必须要配置GROUP_ID_CONFIG属性。这个分组消费机制简单描述就是这样的:

在这里插入图片描述

​ 生产者往Topic下发消息时,会尽量均匀的将消息发送到Topic下的各个Partition当中。而这个消息,会向所有订阅了该Topic的消费者推送。推送时,每个ConsumerGroup中只会推送一份。也就是同一个消费者组中的多个消费者实例,只会共同消费一个消息副本。而不同消费者组之间,会重复消费消息副本。这就是消费者组的作用。

​ 与之相关的还有Offset偏移量。这个偏移量表示每个消费者组在每个Partiton中已经消费处理的进度。在Kafka中,可以看到消费者组的Offset记录情况。

[oper@worker1 bin]$ ./kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/147393.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

读书笔记|《数据压缩入门》—— 柯尔特·麦克安利斯 亚历克斯·海奇

前言&#xff1a;在接触文本隐写研究领域时了解到这本书。本书可算作《数据压缩》的入门书籍之一&#xff0c;这本书对熵编码、变长编码、统计编码、自适应统计编码、字典编码、上下文编码等常用编码方式的定义及来源进行介绍&#xff0c;对不同场景下不同格式的压缩数据有针对…

【数据结构---排序】很详细的哦

本篇文章介绍数据结构中的几种排序哦~ 文章目录 前言一、排序是什么&#xff1f;二、排序的分类 1.直接插入排序2.希尔排序3.选择排序4.冒泡排序5.快速排序6.归并排序总结 前言 排序在我们的生活当中无处不在&#xff0c;当然&#xff0c;它在计算机程序当中也是一种很重要的操…

java学生成绩管理信息系统

一、 引言 学生成绩管理信息系统是一个基于Java Swing的桌面应用程序&#xff0c;旨在方便学校、老师和学生对学生成绩进行管理和查询。本文档将提供系统的详细说明&#xff0c;包括系统特性、使用方法和技术实现。 二、 系统特性 2.1 学生管理 添加学生信息&#xff1a;录…

【HUAWEI】单臂路由

目录 ​ &#x1f96e;写在前面 &#x1f96e;2.1、拓扑图 &#x1f96e;2.2、操作思路 &#x1f96e;2.3、配置操作 &#x1f363;2.3.1、LSW4配置 &#x1f363;2.3.2、R2配置 &#x1f363;2.3.3、测试网络 &#x1f990;博客主页&#xff1a;大虾好吃吗的博客 &…

【知识梳理】多级页表的原理分析【地址形成过程】【扩充思考】

多级页表的地址形成过程 首先每个进程中都至少有一个页表&#xff08;段页式可以有多个页表&#xff09;&#xff0c;都有一个页表基地址寄存器&#xff08;PTBR&#xff09;&#xff0c;以下针对三级页表进行分析。 level1&#xff1a;PTBR代表的是一级页表的基地址&#xf…

SLAM面试笔记(7) — Linux面试题

目录 问题1&#xff1a;Linux系统基本组件&#xff1f; 问题2&#xff1a;Linux和Unix有什么区别&#xff1f; 问题3&#xff1a;Linux下编译程序 问题4&#xff1a;gcc基本格式和常用指令 问题5&#xff1a;用什么命令查找内存和交换使用情况&#xff1f; 问题6&#xf…

此芯科技加入百度飞桨硬件生态共创计划,加速端侧AI生态布局

近日&#xff0c;此芯科技&#xff08;上海&#xff09;有限公司&#xff08;以下简称“此芯科技”&#xff09;与百度签署硬件生态共创计划合作协议&#xff0c;正式加入由百度发起的硬件生态共创计划。双方将共同推动端侧AI和大模型在个人计算、车载计算以及元宇宙计算等领域…

Autowired和Resource的关系

相同点对于下面的代码来说&#xff0c;如果是Spring容器的话&#xff0c;两个注解的功能基本是等价的&#xff0c;他们都可以将bean注入到对应的field中 不同点但是请注意&#xff0c;这里说的是基本相同&#xff0c;说明还是有一些不同点的&#xff1a; byName和byType匹配顺…

结构型设计模式——组合模式

摘要 组合模式(composite pattern): 允许你将对象组合成树形结构来表现"整体/部分"层次结构. 组合能让客户以一致的方式处理个别对象以及对象组合。 一、组合模式的意图 将对象组合成树形结构来表示“整体/部分”层次关系&#xff0c;允许用户以相同的方式处理单独…

C++list模拟实现

list模拟实现 1.链表结点2.类模板基本框架3.构造4.插入普通迭代器实现4.1尾插4.2普通迭代器实现4.3对比list和vector的iterator4.4迭代器的价值4.5insert4.6尾插头插复用写法 5.删除erase5.1erase5.2尾删头删复用写法 6.析构emptysizeclear6.1clear6.2size6.3 empty6.4 析构 7.…

idea清空缓存类

解决办法 网上有很多是让你去清空什么maven依赖&#xff0c;但假如这个项目是你不可以大刀阔斧的话 可以清空idea缓存 选择 Invalidate 开头的 然后全选 运行重启idea OK

你写过的最蠢的代码是?——后端篇

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页: &#x1f405;&#x1f43e;猫头虎的博客&#x1f390;《面试题大全专栏》 &#x1f995; 文章图文并茂&#x1f996…

保姆级Anaconda安装教程

一.anaconda下载 建议使用清华大学开源软件镜像站进行下载&#xff0c;使用官网下载速度比较慢。 anaconda清华大学开源软件镜像站 &#xff1a; https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/ 一路next即可&#xff0c;注意添加环境变量得选项都勾上。 二.验证…

简化数据库操作:探索 Gorm 的约定优于配置原则

文章目录 使用 ID 作为主键数据库表名TableName临时指定表名列名时间戳自动填充CreatedAtUpdatedAt时间戳类型Gorm 采用约定优于配置的原则,提供了一些默认的命名规则和行为,简化开发者的操作。 使用 ID 作为主键 默认情况下,GORM 会使用 ID 作为表的主键: type User st…

excel提取单元格中的数字

excel取单元格中的数字excel取出单元格中的数字快速提取单元格中有文本的数字如何提取文本左侧的数字、文本右侧的数字、文本中的数字以及文本中混合的数字 RIGHT(C2,11)从右边开始在C2单元格中取出11位字符 LEFT(C2,2)&#xff0c;引用获取单元格总长度的函数LEN&#xff0c;…

【数据结构】排序(2)—冒泡排序 快速排序

目录 一. 冒泡排序 基本思想 代码实现 时间和空间复杂度 稳定性 二. 快速排序 基本思想 代码实现 hoare法 挖坑法 前后指针法 时间和空间复杂度 稳定性 一. 冒泡排序 基本思想 冒泡排序是一种交换排序。两两比较数组元素&#xff0c;如果是逆序(即排列顺序与排序后…

基于PHP+MySQL的家教平台

摘要 设计和实现基于PHP的家教平台是一个复杂而令人兴奋的任务。这个项目旨在为学生、家长和教师提供一个便捷的在线学习和教授平台。本文摘要将概述这个项目的关键方面&#xff0c;包括用户管理、课程管理、支付处理、评价系统、通知系统和安全性。首先&#xff0c;我们将建立…

openGauss学习笔记-88 openGauss 数据库管理-内存优化表MOT管理-内存表特性-使用MOT-MOT使用将磁盘表转换为MOT

文章目录 openGauss学习笔记-88 openGauss 数据库管理-内存优化表MOT管理-内存表特性-使用MOT-MOT使用将磁盘表转换为MOT88.1 前置条件检查88.2 转换88.3 转换示例 openGauss学习笔记-88 openGauss 数据库管理-内存优化表MOT管理-内存表特性-使用MOT-MOT使用将磁盘表转换为MOT …

阿里云PolarDB自研数据库详细介绍_兼容MySQL、PostgreSQL和Oracle语法

阿里云PolarDB数据库是阿里巴巴自研的关系型分布式云原生数据库&#xff0c;PolarDB兼容三种数据库引擎&#xff1a;MySQL、PostgreSQL、Oracle&#xff08;语法兼容&#xff09;&#xff0c;目前提供云原生数据库PolarDB MySQL版、云原生数据库PolarDB PostgreSQL版和云原生数…

Django之十二:模板的继承+用户列表

模板的继承 新建layout.html&#xff1a; {% load static %} <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><link rel"stylesheet" href"{% static plugins…