消息队列-kafka-消息发送流程(源码跟踪) 与消息可靠性

官方网址

源码:https://kafka.apache.org/downloads
快速开始:https://kafka.apache.org/documentation/#gettingStarted
springcloud整合

发送消息流程

在这里插入图片描述
主线程:主线程只负责组织消息,如果是同步发送会阻塞,如果是异步发送需要传入一个回调函数。
Map集合:存储了主线程的消息。
Sender线程:真正的发送其实是sender去发送到broker中。

源码阅读

1 首先打开Producer.send()可以看到里面的内容

// 返回值是一个 Future 参数为ProducerRecord
Future<RecordMetadata> send(ProducerRecord<K, V> record);
// ProducerRecord定义了这些信息
// 主题
private final String topic;
// 分区
private final Integer partition;
// header
private final Headers headers;
private final K key;
private final V value;
// 时间戳
private final Long timestamp;

2 发送之前的前置处理

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptions// 这里给开发者提供了前置处理的勾子ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);// 我们最终发送的是经过处理后的消息 并且如果是异步发送会有callback 这个是用户定义的return doSend(interceptedRecord, callback);}

3 进入真正的发送逻辑Future doSend()

  • 由于是网络通信,所以我们要序列化,在这个函数里面就做了序列化的内容。
try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}
  • 然后我们获取分区
// 然后这里又是一个策略者模式 也是由用户可以配置的  DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了这样三个分区器
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();return partition != null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

4 到了我们的RecordAccumulator,也就是先由主线程发送到了RecordAccumulator

// 也就是对图中的Map集合
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

我们发现里面是用一个MAP存储的一个分区和ProducerBatch 是讲这个消息写到内存里面MemoryRecordsBuilder 通过这个进行写入

// 可以看到是一个链表实现的双向队列,也就是消息会按append的顺序写到 内存记录中去
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

5 接着我们看,我们append了以后,会有一个判断去唤醒sender线程,见下面的注释

// 如果说哦我们当前的 这个batch满了或者 我们创建了一个新的batch 这个时候唤醒 sender线程去发送数据
if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);// 唤醒sender 去发送数据this.sender.wakeup();}
// 实现了Runnable 所以我们去看一下RUN方法的逻辑
public class Sender implements Runnable 

好上来就是一个循环

while (running) {try {runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}
}

接着进入runOnece方法,直接看核心逻辑

// 从RecordAccumulator 拿数据 然后发送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);addToInflightBatches(batches);
// 中间省去了非核心逻辑
sendProduceRequests(batches, now);

如果继续跟踪的话最终是走到了selector.send()里面:

Send send = request.toSend(destination, header);InFlightRequest inFlightRequest = new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);this.inFlightRequests.add(inFlightRequest);selector.send(send);

6 接着我们就要看返回逻辑了,可以看到在sendRequest里面sendProduceRequest方法是通过传入了一个回调函数处理返回的。

RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());}};
// 如果有返回
if (response.hasResponse()) {ProduceResponse produceResponse = (ProduceResponse) response.responseBody();for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {TopicPartition tp = entry.getKey();ProduceResponse.PartitionResponse partResp = entry.getValue();ProducerBatch batch = batches.get(tp);completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());}this.sensors.recordLatency(response.destination(), response.requestLatencyMs());} 

追踪到ProducerBatch

if (this.finalState.compareAndSet(null, tryFinalState)) {completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);return true;}
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` callproduceFuture.set(baseOffset, logAppendTime, exception);// execute callbacksfor (Thunk thunk : thunks) {try {if (exception == null) {RecordMetadata metadata = thunk.future.value();if (thunk.callback != null)thunk.callback.onCompletion(metadata, null);} else {if (thunk.callback != null)thunk.callback.onCompletion(null, exception);}} catch (Exception e) {log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);}}produceFuture.done();}

Thunk 这个其实就是我们在Append的时候的回调:
在这里插入图片描述
至此整个流程就完成了,从发送消息,到响应后回调我们的函数。

消息可靠性

// 所有消费者的配置都在ProducerConfig 里面
public static final String ACKS_CONFIG = "acks";

acks = 0:异步形式,单向发送,不会等待 broker 的响应
acks = 1:主分区保存成功,然后就响应了客户端,并不保证所有的副本分区保存成功
acks = all 或 -1:等待 broker 的响应,然后 broker 等待副本分区的响应,总之数据落地到所有的分区后,才能给到producer 一个响应

在可靠性的保证下,假设一些故障:

  • Broker 收到消息后,同步 ISR 异常:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制
  • Broker 收到消息,并同步 ISR 成功,但是响应超时:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制

可靠性能保证哪些,不能保障哪些?

  • 保证了消息不会丢失
  • 不保证消息一定不会重复(消息有重复的概率,需要消费者有幂等性控制机制)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/272021.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTML—常用标签

常用标签&#xff1a; 标题标签&#xff1a;<h1></h1>......<h6></h6>段落标签&#xff1a;<p></p>换行标签&#xff1a;<br/>列表&#xff1a;无序列表<ul><li></li></ul> 有序列表<ol>&…

人工智能|机器学习——k-近邻算法(KNN分类算法)

1.简介 k-最近邻算法&#xff0c;也称为 kNN 或 k-NN&#xff0c;是一种非参数、有监督的学习分类器&#xff0c;它使用邻近度对单个数据点的分组进行分类或预测。虽然它可以用于回归问题&#xff0c;但它通常用作分类算法&#xff0c;假设可以在彼此附近找到相似点。 对于分类…

【中间件】RabbitMQ入门

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;中间件 ⛺️稳中求进&#xff0c;晒太阳 MQ的优劣&#xff1a; 优势 应用解耦&#xff1a;提升了系统容错性和可维护性异步提速&#xff1a;提升用户体验和系统吞吐量消峰填谷&#xff1…

【LeetCode 算法专题突破】---二分查找(⭐⭐⭐)

前言 我在算法题目的海洋中畅游已久&#xff0c;也曾在算法竞赛中荣获佳绩。然而&#xff0c;我发现自己对于算法的学习&#xff0c;还缺乏一个系统性的总结和归类。尽管我已经涉猎过不少算法类型&#xff0c;但心中仍旧觉得有所欠缺&#xff0c;未能形成完整的算法体系。 因…

问题解决 | vscode无法连接服务器而ssh和sftp可以

解决步骤 进入家目录删除.vscode-server rm -rf .vscode-server 然后再次用vscode连接服务器时&#xff0c;会重新安装&#xff0c;这时可能报出一些缺少依赖的错 需要联系管理员安装相关依赖&#xff0c;比如 sudo apt-get install libstdc6 至此问题解决

【数据结构】单链表的层层实现!! !

关注小庄 顿顿解馋(●’◡’●) 上篇回顾 我们上篇学习了本质为数组的数据结构—顺序表&#xff0c;顺序表支持下标随机访问而且高速缓存命中率高&#xff0c;然而可能造成空间的浪费&#xff0c;同时增加数据时多次移动会造成效率低下&#xff0c;那有什么解决之法呢&#xff…

打造经典游戏:HTML5与CSS3实现俄罗斯方块

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

机器学习开源分子生成系列(1)-DeepFrag的本地部署及使用

欢迎浏览我的CSND博客&#xff01; Blockbuater_drug …进入 文章目录 前言一、DeepFrag是什么&#xff1f;二、conda中安装DeepFrag CLI环境1. 创建环境并激活2. 下载pre-trained model3. DeepFrag CLI 使用方法必需参数&#xff1a;可选参数&#xff1a; 4. DeepFrag CLI 使用…

带摄像头的 AirPods,苹果会怎么做出来?

苹果对智能产品的设计&#xff0c;正在放飞自我。 根爆料&#xff0c;苹果在「未来设备」的规划里&#xff0c;有两个大胆的想法&#xff1a; 一是带有屏幕的 HomePod 正在研发中&#xff0c;当中将集成 Apple TV、FaceTime 等重多功能&#xff1b;二是配备摄像头的 AirPod…

201909青少年软件编程(Scratch)等级考试试卷(三级)

青少年软件编程&#xff08;Scratch&#xff09;等级考试试卷&#xff08;三级&#xff09;2019年9月 第1题&#xff1a;【 单选题】 执行下面的脚本后&#xff0c;变量“分数”的值是多少&#xff1f;&#xff08;&#xff09; A:5 B:6 C:10 D:25 【正确答案】: C 【试题…

[java基础揉碎]super关键字

super关键字: 基本介绍 super代表父类的引用&#xff0c;用于访问父类的属性、方法、构造器 super给编程带来的便利/细节 1.调用父类的构造器的好处(分工明确&#xff0c;父类属性由父类初始化&#xff0c;子类的属性由子类初始化) 2.当子类中有和父类中的成员(属性和方法)重…

新零售SaaS架构:订单履约系统架构设计(万字图文总结)

什么是订单履约系统&#xff1f; 订单履约系统用来管理从接收客户订单到将商品送达客户手中的全过程。 它连接了上游交易&#xff08;客户在销售平台下单环&#xff09;和下游仓储配送&#xff08;如库存管理、物流配送&#xff09;&#xff0c;确保信息流顺畅、操作协同&…

UDP实现文件的发送、UDP实现全双工的聊天、TCP通信协议

我要成为嵌入式高手之3月7日Linux高编第十七天&#xff01;&#xff01; ———————————————————————————— 回顾 重要程序 1、UDP实现文件的发送 发端&#xff1a; #include "head.h"int main(void) {int sockfd 0;struct sockaddr_i…

141 Linux 系统编程18 ,线程,线程实现原理,ps –Lf 进程 查看

一 线程概念 什么是线程 LWP&#xff1a;light weight process 轻量级的进程&#xff0c;本质仍是进程(在Linux环境下) 进程&#xff1a;独立地址空间&#xff0c;拥有PCB 线程&#xff1a;有独立的PCB&#xff0c;但没有独立的地址空间(共享) 区别&#xff1a;在于是否共…

一周学会Django5 Python Web开发-Django5内置模板引擎-模板上下文变量

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计32条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…

瑞芯微 | I2S-音频基础 -1

最近调试音频驱动&#xff0c;顺便整理学习了一下i2s、alsa相关知识&#xff0c;整理成了几篇文章&#xff0c;后续会陆续更新。 喜欢嵌入式、Li怒晓得老铁可以关注一口君账号。 1. 音频常用术语 名称含义ADC&#xff08;Analog to Digit Conversion&#xff09;模拟信号转换…

第五十三回 入云龙斗法破高廉 黑旋风下井救柴进-AI训练数据处理和读取

罗真人教了公孙胜五雷天罡正法&#xff0c;并让他记住“逢幽而止&#xff0c;遇汴而环”八个字。三人辞别了罗真人&#xff0c;戴宗先回去报信&#xff0c;李逵和公孙胜结伴而行。 走了三天&#xff0c;来到了武冈镇&#xff0c;李逵碰到一个铁匠&#xff0c;叫金钱豹子汤隆&a…

启动查看工具总结

启动目标&#xff1a;2s内优秀&#xff0c;2-5s普通&#xff0c;之后的都需要优化&#xff0c;热启动则是1.5s-2s内 1 看下大致串联启动流程&#xff1a; App 进程在 Fork 之后&#xff0c;需要首先执行 bindApplication Application 的环境创建好之后&#xff0c;就开始activ…

去电脑维修店修电脑需要注意什么呢?装机之家晓龙

每当电脑出现故障时&#xff0c;你无疑会感到非常沮丧。 如果计算机已过了保修期&#xff0c;您将无法享受制造商的免费保修服务。 这意味着您必须自费找到一家电脑维修店。 去电脑维修店并不容易。 大家一定要知道&#xff0c;电脑维修非常困难&#xff0c;尤其是笔记本电脑维…

C#,数值计算,解微分方程的龙格-库塔四阶方法与源代码

Carl Runge Martin Wilhelm Kutta 1 龙格-库塔四阶方法 数值分析中&#xff0c;龙格&#xff0d;库塔法&#xff08;Runge-Kutta&#xff09;是用于模拟常微分方程的解的重要的一类隐式或显式迭代法。这些技术由数学家卡尔龙格和马丁威尔海姆库塔于1900年左右发明。 对于一阶…