Kafka-消费者-KafkaConsumer分析

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。

为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。

这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。

例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单、快速。

还可以使用两个线程池实现“生产者—消费者”模式,解耦消息消费和消息处理的逻辑。

其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,而另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。

下面开始对KafkaConsumer的分析。

KafkaConsumer实现了Consumer接口,Consumer接口中定义了KafkaConsumer对外的API,其核心方法可以分为下面六类。

  • subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。
  • assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥。
  • commit*()方法:提交消费者已经消费完成的offset。
  • seek*()方法:指定消费者起始消费的位置。
  • poll()方法:负责从服务端获取消息。
  • pause()、resume()方法:暂停/继续Consumer,暂停后poll方法会返回空。

了解了Consumer接口定义的功能之后,我们下面就来分析KafkaConsumer的具体实现。首先,我们需要了解KafkaConsumer中重要的字段,如图所示。

在这里插入图片描述

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
  • clientld:Consumer的唯一标示。
  • coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,可以将其理解成Consumer与服务端GroupCoordinator通信的门面。
  • keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
  • fetcher:负责从服务端获取消息。
  • interceptors:Consumerlnterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。
  • client:负责消费者与Kafka服务端的网络通信。
  • subscriptions:维护了消费者的消费状态。
  • metadata:记录了整个Kafka集群的元信息。
  • currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已。

在后面的分析过程中,我们会逐个分析KafkaConsumer依赖的组件的功能和实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/241021.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTML 表单

文章目录 表单什么是表单GET和POST两种提交方式有什么不同?表单元素表单项外文本单行文本输入框单行文本密码框单选框复选框下拉列表框上传文件隐藏域填写邮箱填写电话填写数字填写日期进度条多行文本输入框提交按钮取消按钮 用户注册案例 表单 什么是表单 form:表单元素 此…

navigateTo失效-跳转不了页面解决办法!uniapp\vue

改了一个小时多的错误,跳转页面无论怎么样都跳转不了,有2个问题: 注意:uniapp的报错可以在console里检查! 1.pages.json文件没有配置路径, 在pages:[ ]里面加 (根据自己的路径进行修改 {&qu…

Python爬虫---scrapy框架---当当网管道封装

项目结构: dang.py文件:自己创建,实现爬虫核心功能的文件 import scrapy from scrapy_dangdang_20240113.items import ScrapyDangdang20240113Itemclass DangSpider(scrapy.Spider):name "dang" # 名字# 如果是多页下载的话, …

Byrdhouse AI实时语音翻译工具,可以在视频通话中翻译100多种语言

你是否曾经在跨国会议或与外国友人聊天时,因为语言不通而感到尴尬或困扰?是不是还在找可以实时翻译的软件或者APP?现在,有了这款语音翻译神器,一切都将变得简单! 免费使用链接:https://byrdhous…

2809. 使数组和小于等于 x 的最少时间

2809. 使数组和小于等于 x 的最少时间 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/minimum-time-to-make-array-sum-at-most-x/?envTypedaily-question&envId2024-01-19 经典动态规划 class Solution { public:int minimumTime(vector<int&…

合并K个升序链表(LeetCode 23)

文章目录 1.问题描述2.难度等级3.热门指数4.解题思路方法一&#xff1a;顺序合并方法二&#xff1a;分治合并方法三&#xff1a;使用优先队列合并 参考文献 1.问题描述 给你一个链表数组&#xff0c;每个链表都已经按升序排列。 请你将所有链表合并到一个升序链表中&#xff…

uniapp踩坑之项目:canvas第一次保存是空白图片

在ctx.draw()回调生成图片&#xff0c;参考canvasToTempFilePath接口文档 // data imgFilePath: null,// 缓存二维码图片canvas路径//js // 首先在draw&#xff08;&#xff09;里进行本地存储 ...... ctx.draw(false, () >{uni.canvasToTempFilePath({ // 把画布转化成临时…

c JPEG 1D DCT

步骤&#xff1a; 1. 对yuv 88 数据 8行分别1D DCT 2, 用8行 1D DCT 得到的数据生成中间88 块 Zj 3,对Zj 的8列再 1D DCT 后生成8列,用这8列组合成8*8的2D DCT 系数 准备用此1D DCT程序代替以前写的2D DCT,看能减少多少编码时间。 看网上文章&#xff0c;ffmpeg用…

翻译: Streamlit从入门到精通七 缓存Cache控制缓存大小和持续时间

Streamlit从入门到精通 系列&#xff1a; 翻译: Streamlit从入门到精通 基础控件 一翻译: Streamlit从入门到精通 显示图表Graphs 地图Map 主题Themes 二翻译: Streamlit从入门到精通 构建一个机器学习应用程序 三翻译: Streamlit从入门到精通 部署一个机器学习应用程序 四翻译…

移动端开发进阶之蓝牙通讯(四)

移动端开发进阶之蓝牙通讯&#xff08;四&#xff09; 在移动端开发实践中&#xff0c;可能会要求在不同的设备之间切换&#xff0c;从而提升用户体验&#xff1b; 或者为了提升设备的利用率&#xff0c;实现设备之间的连接和协同工作&#xff1b; 不得不通过多端连接&#xf…

RT-Thread experimental 代码学习(1)thread_sample

RTOS的最基础功能是线程。 线程的调度是如何工作的&#xff1f;RT-thread官方的实验文档是最好的参考。 老规矩&#xff0c;先放法国人doxygen。 thread_sample 代码的调用关系图 有意思的是&#xff0c;RT有两种创建线程的方式 - 静态和动态&#xff0c;粗略的理解是&…

vue+elementui实现12个日历平铺,初始化工作日,并且可点击

<template><div class"app-container"><el-form :model"queryParams" ref"queryForm" size"small" :inline"true"><el-form-item label"年份" prop"holidayYear"><el-date-…

93.乐理基础-记号篇-装饰音记号(一)级进、跳进、经过音、辅助音

内容参考于&#xff1a;三分钟音乐社 上一个内容&#xff1a;92.乐理基础-记号篇-演奏记号&#xff08;三&#xff09;刮奏、琶音-CSDN博客 首先 级进 与 跳进 1.级进指的是忽略掉所有升降号&#xff0c;如果两个音之间不存在其它的唱名&#xff0c;那前一个音到后一个音就成…

Android中矩阵Matrix实现平移,旋转,缩放和翻转的用法详细介绍

一&#xff0c;矩阵Matrix的数学原理 矩阵的数学原理涉及到矩阵的运算和变换&#xff0c;是高等代数学中的重要概念。在图形变换中&#xff0c;矩阵起到关键作用&#xff0c;通过矩阵的变换可以改变图形的位置、形状和大小。矩阵的运算是数值分析领域的重要问题&#xff0c;对…

面试题 05.06. 整数转换(力扣)(OJ题)

题目链接&#xff1a;面试题 05.06. 整数转换 - 力扣&#xff08;LeetCode&#xff09; 所属专栏&#xff1a;刷题 整数转换。编写一个函数&#xff0c;确定需要改变几个位才能将整数A转成整数B。 示例1: 输入&#xff1a;A 29 &#xff08;或者0b11101&#xff09;, B 15…

浅谈对Maven的理解

一、什么是Maven Maven——是Java社区事实标准的项目管理工具&#xff0c;能帮你从琐碎的手工劳动中解脱出来&#xff0c;帮你规范整个组织的构建系统。不仅如此&#xff0c;它还有依赖管理、自动生成项目站点等特性&#xff0c;已经有无数的开源项目使用它来构建项目并促进团队…

供应链共舞:数字化协同推动服装企业商品计划的无缝衔接

在数字化时代&#xff0c;服装企业不再是孤立经营的个体&#xff0c;而是在供应链共舞的大舞台上实现了商品计划的无缝衔接。数字化协同不仅改变了企业内部的运营方式&#xff0c;更深刻地重塑了整个供应链的协同模式。以下探讨数字化协同如何推动服装企业商品计划实现无缝衔接…

MySQL存储函数与存储过程习题

创建表并插入数据&#xff1a; 字段名 数据类型 主键 外键 非空 唯一 自增 id INT 是 否 是 是 否 name VARCHAR(50) 否 否 是 否 否 glass VARCHAR(50) 否 否 是 否 否 ​ ​ sch 表内容 id name glass 1 xiaommg glass 1 2 xiaojun glass 2 1、创建一个可以统计表格内记录…

mybatisPlus注解将List集合插入到数据库

1.maven引入依赖&#xff08;特别注意版本&#xff0c;3.1以下不支持&#xff09; <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.3.1</version></dependency&g…

Docker容器添加映射端口

方式一 简单粗暴&#xff08;需要等一段时间&#xff09; 直接给现在容器停了&#xff08;当然你要不想停也可以&#xff0c;只是打包会慢一点&#xff0c;当然我是没出意外&#xff0c;如果你怕出现特殊情况&#xff0c;那就先把容器停了&#xff09;&#xff0c;然后把这个容…