Kafka 之顺序消息

前言:

在分布式消息系统中,消息的顺序性是一个重要的问题,也是一个常见的业务场景,那 Kafka 作为一个高性能的分布式消息中间件,又是如何实现顺序消息的呢?本篇我们将对 Kafka 的顺序消息展开讨论。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 之批量消息发送消费

Kafka 之消息广播消费

Kafka 之消息并发消费

顺序消息的使用场景

顺序消息的使用场景众多,这里我简单列举几个如下:

  • 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致。
  • 电商中下单后,订单创建、支付、订单发货和物流更新的顺序性。
  • 手机充值过程中的扣款短信和重置成功的短信应该有顺序性。
  • 。。。。等等等场景。

Kafka 如何保证消息的顺序性

讨论 Kafka 消息的顺序性,需要分单分区和多分区来讨论,具体如下:

  • 单分区:单分区的消息顺序性相对简单,因为消息在单分区中是相对有序的,只需要保证消息发送顺序和消费顺序即可。
  • 多分区:多分区要保证消息有序,就需要额外的设计来保证消息全局有序了。

根据上面的简单分析,我们知道 Kafka 单分区的消息有序相对简单,接下来我们分析一下 Kafka 如何保证单分区消息有序。

Kafka 如何保证单分区消息有序

Kafka 保证单分区消息有序需要从两个方面来讲,一个是消息生产者,一个是消息消费者,具体如下:

消息生产者:

  • 使用相同的分区键(Partition Key):生产者发送消息时,指定相同的分区键,使得所有消息都发送到同一个分区。
  • 指定消息 key,如果没有指定分区,我们指定一个相同的消息 Key,Kafka 会根据 Key 进行 Hash 计算出一个分区号,如果消息的 Key 相同,那么也会计算一个相同的分区号,消息也会发送到同一个分区了。
  • 自定义分区器:如果想要实现更复杂的分区逻辑,可以实现自定义分区器,来达到消息最终到达同一个分区。

消息消费者:

生产这已经保证了消费的发送有序,因此消息消费者使用单线程消费即可。

Kafka 顺序消息实现案例

上面我们对 Kafka 顺序消息的实现做了基本分析,下面我们就使用代码来实现 Kafka 的顺序消息。

Kafka 顺序消息 Producer

在 Producer 中分别实现了两种顺序消息的方式,分别是指定分区和指定 Key,具体代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.ExecutionException;/*** @ClassName: MyKafkaOrderlyProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 顺序消息发送者*/
@Slf4j
@Component
public class MyKafkaOrderlyProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;//指定分区public void sendOrderlyByPartitionMessage() {try {this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666创建").get();this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666支付").get();this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666发货").get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}//指定 keypublic void sendOrderlyByKeyMessage() {try {this.kafkaTemplate.send("my-topic", "666", "Key--订单666创建").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666支付").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666发货").get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}

在 Producer 代码中我们使用了 Kafka 的同步发送消息。

Kafka 顺序消息 Consumer

顺序消息的消费者代码十分简单,还是使用 @KafkaListener 完成消息消费,注意是单线程消费即可。

package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @ClassName: MyKafkaConsumer* @Author: zhangyong* @Date: 2024/10/22 19:22* @Description: MyKafkaOrderlyConsumer*/
@Slf4j
@Component
public class MyKafkaOrderlyConsumer {@KafkaListener(id = "my-kafka-order-consumer",groupId = "my-kafka-consumer-groupId",topics = "my-topic",containerFactory = "myContainerFactory")public void listen(String message) {log.info("消息消费成功消息内容:{}", message);}}

Kafka 顺序消息发送消费验证

验证指定分区情况下的顺序消息:

2024-10-28 20:55:18.495  INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Partition--订单666创建
2024-10-28 20:55:18.599  INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Partition--订单666支付
2024-10-28 20:55:18.704  INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Partition--订单666发货

消息是按照发送顺序来消费的,结果符合预期。

验证指定 Key 情况下的顺序消息:

2024-10-28 20:56:13.238  INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666创建
2024-10-28 20:56:13.341  INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666支付
2024-10-28 20:56:13.443  INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666发货

消息是按照发送顺序来消费的,结果符合预期。

Kafka 自定义分区器

自定义分区器就是按自己的规则来指定消息最终要发送的分区,可以根据自己的需求灵活实现,案例代码中先获取分区数量,然后使用的是 key 的 Hash 值进行 Hash 取模的方式获取分区,具体代码如下:

package com.order.service.kafka;import com.order.service.exception.BusinessException;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;import java.util.List;
import java.util.Map;/*** @ClassName: CustomPartitioner* @Author: Author* @Date: 2024/10/28 20:57* @Description:*/
public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//获取 分区数量List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);if (key == null || keyBytes == null && !(key instanceof String)) {throw new BusinessException("key 不能为空且需要是字符串类型");}String keyStr = key.toString();int partition = keyStr.hashCode() % partitionInfos.size();return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

配置自定义分区器

自定义了分区器后还需要再 Kafka 配置中配置上我们自定义的分区器,关键配置如下:

//自定义分区器配置
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);

完整的配置 KafkaProducerConfig 配置如下:

package com.order.service.config;import com.order.service.kafka.CustomPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** @author :author* @description:* @modified By:* @version: V1.0*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.buffer-memory}")private String bufferMemory;@Value("${spring.kafka.producer.properties.linger.ms}")private String lingerMs;@Bean("myProducerKafkaProps")public Map<String, Object> getMyKafkaProps() {Map<String, Object> props = new HashMap<>(10);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//批量发送消息的大小 默认 16KBprops.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);//生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32Mprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);//批量发送的的最大时间间隔,单位是毫秒props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);//自定义分区器配置props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);return props;}@Beanpublic ProducerFactory<String, String> newProducerFactory() {return new DefaultKafkaProducerFactory<>(getMyKafkaProps());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(newProducerFactory());}}

自定义分区 Consumer 代码案例

自定义分区 Consumer 代码没有什么特殊之处,指定一个 key 即可,key 一致就可以保证消息发送到同一个 Partition 中,保证消息的顺序,具体代码如下:

//自定义分区发送消息
public void sendOrderlyByCustomPartitionerMessage() {try {this.kafkaTemplate.send("my-topic", "666", "Key--订单666创建").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666支付").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666发货").get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}
}

自定义分区顺序消息验证

触发消息发送后 debugger 如下:

在这里插入图片描述

控制台记录消费日志如下:

2024-10-30 17:24:52.716  INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666创建
2024-10-30 17:24:52.819  INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666支付
2024-10-30 17:24:52.921  INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666发货

消息是按顺序消费的,结果符合预期。

总结:Kafka 只能在单个 Partition 中保持消息的顺序存储,要想保证消息的顺序性就必须让需要保持顺序的消息发送到同一个 Partition,对于消费端,消费消息的顺序性只需要保证使用单线程进行消费即可,一般来说比较少用到 Kafka 的顺序消息,这里分享一下还是希望可以帮助到有需要的朋友。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/465075.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot day 1105

ok了家人们&#xff0c;今天继续学习spring boot&#xff0c;let‘s go 六.SpringBoot实现SSM整合 6.1 创建工程&#xff0c;导入静态资源 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</…

fastbootd模式刷android固件的方法

1. fastbootd追根溯源 Google在Android 10上正式引入了动态分区机制来提升OTA的可扩展性。动态分区使能后&#xff1a;andorid系统可以在开机阶段动态地进行分区创建、分区销毁、分区大小调整等操作&#xff0c;下游厂商只需要规划好super分区的总大小&#xff0c;其内部的各个…

什么是多因素身份验证(MFA)的安全性?

多因素身份验证(MFA)简介 什么是MFA 多因素身份验证(MFA)是一种安全过程&#xff0c;要求用户在授予对系统、应用程序或账户的访问权限之前提供两种或多种形式的验证。仅使用单个因素&#xff08;通常是用户名和密码&#xff09;保护资源会使它们容易受到泄露&#xff0c;添加…

2024年【汽车修理工(高级)】考试总结及汽车修理工(高级)试题及解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 汽车修理工&#xff08;高级&#xff09;考试总结是安全生产模拟考试一点通总题库中生成的一套汽车修理工&#xff08;高级&#xff09;试题及解析&#xff0c;安全生产模拟考试一点通上汽车修理工&#xff08;高级&a…

qt QFontDialog详解

1、概述 QFontDialog 是 Qt 框架中的一个对话框类&#xff0c;用于选择字体。它提供了一个可视化的界面&#xff0c;允许用户选择所需的字体以及相关的属性&#xff0c;如字体样式、大小、粗细等。用户可以通过对话框中的选项进行选择&#xff0c;并实时预览所选字体的效果。Q…

DolphinScheduler告警通知

DolphinScheduler告警通知 Dolphinscheduler支持多种告警媒介&#xff0c;此处以电子邮件为例进行演示。 1 准备邮箱 如需使用DolphinScheduler的电子邮件告警通知功能&#xff0c;需要准备一个电子邮箱账号&#xff0c;并启用SMTP服务。此处以 QQ 邮箱为例。 1.1 开启 SMTP 服…

Spring 中的 Environment 对象

可参考官网&#xff1a;Environment Abstraction 核心概念 Environment 对象对两个关键方面进行建模&#xff1a;profiles 和 属性&#xff08;properties&#xff09;。 Profile 简单来说&#xff1a;profile 机制在 IOC 容器中提供了一种机制&#xff1a;允许在不同的环境…

【论文速读】Optimization-based Prompt Injection Attack to LLM-as-a-Judge

基于优化的提示词注入攻击 摘要引言问题描述LLM-as-a-judge威胁模型攻击者知道什么 JUDGEDECEIVER 细节概述生成影子候选回复公式化为优化问题Target-aligned generation lossTarget-enhancement lossAdversarial perplexity loss优化问题 求解优化问题 摘要 LLM-as-a-Judge 利…

qt QStandardItem详解

1、概述 QStandardItem是Qt框架中QStandardItemModel的一个基础元素&#xff0c;用于在基于项的模型&#xff08;如QStandardItemModel&#xff09;中表示单个数据项。QStandardItem可以存储文本、图标、工具提示等丰富的信息&#xff0c;并且支持数据的编辑和自定义显示。通过…

戴尔电脑 Bios 如何进入?Dell Bios 进入 Bios 快捷键是什么?

BIOS&#xff08;基本输入输出系统&#xff09;是计算机启动时运行的第一个程序&#xff0c;它负责初始化硬件并加载操作系统。对于戴尔电脑用户来说&#xff0c;有时可能需要进入 BIOS 进行一些特定的设置调整&#xff0c;比如更改启动顺序、调整性能选项或解决硬件兼容性问题…

低代码解锁跨平台应用开发新境界

数字化转型中&#xff0c;企业面临应用开发挑战&#xff0c;低代码平台成为理想选择。ZohoCreator提供统一开发环境、拖拽设计、预置模板等&#xff0c;支持高效构建跨平台应用&#xff0c;确保数据安全与合规&#xff0c;助力企业数字化转型。 一、低代码平台是什么&#xff1…

`掌握Python-PPTX,让PPt制作变得轻而易举!`

文章目录 掌握Python-PPTX&#xff0c;让PPT制作变得轻而易举&#xff01;背景介绍python-pptx 是什么&#xff1f;如何安装 python-pptx&#xff1f;简单库函数使用方法应用场景常见Bug及解决方案总结 掌握Python-PPTX&#xff0c;让PPT制作变得轻而易举&#xff01; 背景介绍…

【含文档+源码】基于SpringBoot+Vue的新型吃住玩一体化旅游管理系统的设计与实现

开题报告 本文旨在探讨新型吃住玩一体化旅游管理系统的设计与实现。该系统融合了用户注册与登录、旅游景点管理、旅游攻略发帖、特色旅游路线推荐、附近美食推荐以及酒店客房推荐与预定等多项功能&#xff0c;旨在为游客提供全方位、一体化的旅游服务体验。在系统设计中&#…

[C++]——哈希(附源码)

目录 ​编辑 ​编辑 一、前言 二、正文 2.1 unorder系列关联式容器 2.1.1 unordered_map 2.1.1.1 unorderer_map的介绍 ①unordered_map的构造 ②unordered_map的容量 ③unordered_map的迭代器 ④unordered_map的元素访问 ⑤unordered_map的查询 ⑥unordered_map的修改操…

Oracle视频基础1.4.5练习

1.4.5 看bbk的框架 ls env | grep ORA cd /u01/oradata ls ll cd bbk ll cd /u01/admin/ ll ll bbk cd cd db cd dbs ls vi initbbk.ora clear ls ll env | grep ORA执行创建数据库语句。 sqlplus /nolog conn /as sysdba create spfile from pfile ! ls ll exit startup nom…

Spring Boot 与 Vue 共筑高校网上订餐卓越平台

作者介绍&#xff1a;✌️大厂全栈码农|毕设实战开发&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。 &#x1f345;获取源码联系方式请查看文末&#x1f345; 推荐订阅精彩专栏 &#x1f447;&#x1f3fb; 避免错过下次更新 Springboot项目精选实战案例 更多项目…

【设计模式系列】建造者模式(十)

目录 一、什么是建造者模式 二、建造者模式的角色 三、建造者模式的典型应用 四、建造者模式在StringBuilder中的应用 五、典型建造者模式的案例 一、什么是建造者模式 建造者模式&#xff08;Builder Pattern&#xff09;是一种创建型设计模式&#xff0c;用于构建复杂对…

H7-TOOL的LUA小程序教程第17期:扩展驱动AD7606, ADS1256,MCP3421, 8路继电器和5路DS18B20(2024-11-01)

LUA脚本的好处是用户可以根据自己注册的一批API&#xff08;当前TOOL已经提供了几百个函数供大家使用&#xff09;&#xff0c;实现各种小程序&#xff0c;不再限制Flash里面已经下载的程序&#xff0c;就跟手机安装APP差不多&#xff0c;所以在H7-TOOL里面被广泛使用&#xff…

P10 Pytorch入门实战——Pytorch实现车牌识别

一、前期准备 1. 设置device # import the necessary libraries import torch import torch.nn as nn import torchvision.transforms as transforms from torchvision import transforms, datasets import matplotlib.pyplot as plt import PIL,pathlib from PIL import Im…

基于SSM+小程序的宿舍管理系统(宿舍1)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1、项目介绍 本宿舍管理系统小程序有管理员和学生两个角色。 1、管理员功能有个人中心&#xff0c;公告信息管理&#xff0c;班级管理&#xff0c;学生管理&#xff0c;宿舍信息管理&#xff0c;宿舍…