Kafka学习笔记(一)Kafka基准测试、幂等性和事务、Java编程操作Kafka

文章目录

  • 前言
  • 4 Kafka基准测试
    • 4.1 基于1个分区1个副本的基准测试
    • 4.2 基于3个分区1个副本的基准测试
    • 4.3 基于1个分区3个副本的基准测试
  • 5 Java编程操作Kafka
    • 5.1 引入依赖
    • 5.2 向Kafka发送消息
    • 5.3 从Kafka消费消息
    • 5.4 异步使用带有回调函数的生产消息
  • 6 幂等性
    • 6.1 幂等性介绍
    • 6.2 Kafka幂等性实现原理
  • 7 Kafka事务
    • 7.1 Kafka事务介绍
    • 7.2 事务操作API
    • 7.3 Kafka事务编程
      • 7.3.1 需求
      • 7.3.2 创建Topic
      • 7.3.3 编写生产者
      • 7.3.4 创建消费者
      • 7.3.5 消费旧Topic数据并生产到新Topic
      • 7.3.6 测试
      • 7.3.7 模拟异常测试事务

前言

Kafka学习笔记(一)Linux环境基于Zookeeper搭建Kafka集群、Kafka的架构

4 Kafka基准测试

基准测试(benchmark testing)是一种测量和评估软件性能指标的活动。 我们可以通过基准测试,了解到软件、硬件的性能水平,主要测试负载的执行时间、传输速度、吞吐量、资源占用率等。

4.1 基于1个分区1个副本的基准测试

  • 1)创建1个分区1个副本的Topic

  • 2)生产消息基准测试
bin/kafka-producer-perf-test.sh --topic topic_1_1 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 acks=1

命令解释:

  • bin/kafka-producer-perf-test.sh 性能测试脚本
  • –topic Topic的名称
  • –num-records 指定生产数据量(默认5000W)
  • –throughput 指定吞吐量,即限流(-1不指定)
  • –record-size record数据大小(字节)
  • –producer-props bootstrap.servers 指定Kafka集群地址
  • acks=1 ACK模式

执行以上命令,结果如下:

  • 3)消费消息基准测试
bin/kafka-consumer-perf-test.sh --broker-list 192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 --topic topic_1_1 --fetch-size 1048576 --messages 5000000 --timeout 100000

命令解释:

  • bin/kafka-consumer-perf-test.sh 消费消息基准测试脚本
  • –broker-list 集群Broker列表
  • –topic Topic的名称
  • –fetch-size 每次拉取的数据大小
  • –messages 总共要消费的消息个数
  • –timeout 超时时间

执行以上命令,结果如下:

4.2 基于3个分区1个副本的基准测试

  • 1)创建3个分区1个副本的Topic

  • 2)生产消息基准测试
bin/kafka-producer-perf-test.sh --topic topic_3_1 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 acks=1
指标3分区1副本1分区1副本性能(对比1分区1副本)
吞吐量10900.822140 records/sec8994.536718 records/sec提升↑
吞吐速率10.40 MB/sec8.58 MB/sec提升↑
平均延迟时间2508.37 ms avg latency3418.50 ms avg latency提升↑
最大延迟时间47436.00 ms max latency50592.00 ms max latency
  • 3)消费消息基准测试
bin/kafka-consumer-perf-test.sh --broker-list 192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 --topic topic_3_1 --fetch-size 1048576 --messages 5000000 --timeout 100000
指标3分区1副本1分区1副本性能(对比1分区1副本)
data.consumed.in.MB 共计消费数据量4768.40214768.3716
MB.sec 每秒消费数据量28.563721.1589提升↑
data.consumed.in.nMsg 共计消费消息数量50000325000000
nMsg.sec 每秒消费消息数量29951.251722186.7235提升↑

4.3 基于1个分区3个副本的基准测试

  • 1)创建1个分区3个副本的Topic

  • 2)生产消息基准测试
bin/kafka-producer-perf-test.sh --topic topic_1_3 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 acks=1
指标1分区3副本1分区1副本性能(对比1分区1副本)
吞吐量4323.273652 records/sec8994.536718 records/sec下降↓
吞吐速率4.12 MB/sec8.58 MB/sec下降↓
平均延迟时间7533.70 ms avg latency3418.50 ms avg latency下降↓
最大延迟时间32871.00 ms max latency50592.00 ms max latency

可见,副本越多,生产消息的性能反而下降。

  • 3)消费消息基准测试
bin/kafka-consumer-perf-test.sh --broker-list 192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 --topic topic_1_3 --fetch-size 1048576 --messages 5000000 --timeout 100000
指标1分区3副本1分区1副本性能(对比1分区1副本)
data.consumed.in.MB 共计消费数据量4768.37164768.3716
MB.sec 每秒消费数据量46.950421.1589下降↓
data.consumed.in.nMsg 共计消费消息数量50000005000000
nMsg.sec 每秒消费消息数量49231.011622186.7235下降↓

同样,副本越多,消费消息的性能也下降。

5 Java编程操作Kafka

创建一个Maven项目,测试Java变成操作Kafka。

5.1 引入依赖

<!-- kafka_demo\pom.xml --><!-- kafka客户端工具 -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version>
</dependency>

5.2 向Kafka发送消息

public class KafkaProducerTest {public static void main(String[] args) {// 1.创建用于连接Kafka的Properties配置Properties props = new Properties();props.put("bootstrap.servers", "192.168.245.130:9092");props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2.创建一个生产者对象KafkaProducerKafkaProducer<String, String> producer = new KafkaProducer<>(props);// 3.调用send发送1-100消息到`my_topic`主题for(int i = 0; i < 100; ++i) {try {// 获取返回值Future,该对象封装了返回值Future<RecordMetadata> future = producer.send(new ProducerRecord<>("my_topic", null, i + ""));// 调用一个Future.get()方法等待响应future.get

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/433572.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

搜索引擎onesearch3实现解释和升级到Elasticsearch v8系列(一)-概述

简介 此前的专栏介绍onesearch1.0和2.0&#xff0c;详情参看4 参考资料&#xff0c;本文解释onesearch 3.0&#xff0c;从Elasticsearch6升级到Elasticsearch8代码实现 &#xff0c;Elasticsearch8 废弃了high rest client&#xff0c;使用新的ElasticsearchClient&#xff0c;…

AI驱动的智能运维:行业案例与挑战解析

华为、蚂蚁、字节跳动如何引领智能运维&#xff1f; ©作者|潇潇 来源|神州问学 引言 OpenAI 发布的 ChatGPT 就像是打开了潘多拉的魔盒&#xff0c;释放出了生产环境中的大语言模型&#xff08;LLMs&#xff09;。一些新的概念&#xff1a;“大语言模型运维 (LLMOps)”…

统信服务器操作系统进入【单用户模式】

统信服务器操作系统D版、E版、A版进入单用户模式的方式。 文章目录 前言一、问题现象二、问题原因三、解决方案1. D版问题解决方案2. E版及A版问题解决方案前言 D版又称企业版、E版又称欧拉版、A版又称龙蜥版。 单用户模式主要是在 grub2 引导时编辑内核引导,一般用于修改用…

mysql索引结构操作(主键/唯一键/普通索引的创建/查询/删除),复合索引介绍(索引覆盖,索引最左匹配原则)

目录 索引操作 创建索引 主键索引 介绍 在创建表时设置主键 创建表后添加主键 唯一键索引 介绍 在创建表时设置唯一键 创建表后添加唯一键 普通索引 在创建表时指定某列为索引 创建表后添加普通索引 自主命名索引 索引创建原则 哪些列适合创建索引 不适合作为…

【Linux:共享内存】

共享内存的概念&#xff1a; 操作系统通过页表将共享内存的起始虚拟地址映射到当前进程的地址空间中共享内存是由需要通信的双方进程之一来创建但该资源并不属于创建它的进程&#xff0c;而属于操作系统 共享内存可以在系统中存在多份&#xff0c;供不同个数&#xff0c;不同进…

14 vue3之内置组件trastion全系列

前置知识 Vue 提供了 transition 的封装组件&#xff0c;在下列情形中&#xff0c;可以给任何元素和组件添加进入/离开过渡: 条件渲染 (使用 v-if)条件展示 (使用 v-show)动态组件组件根节点 自定义 transition 过度效果&#xff0c;你需要对transition组件的name属性自定义。…

基于BeagleBone Black的网页LED控制功能(flask+gpiod)

目录 项目介绍硬件介绍项目设计开发环境功能实现控制LED外设构建Webserver 功能展示项目总结 &#x1f449; 【Funpack3-5】基于BeagleBone Black的网页LED控制功能 &#x1f449; Github: EmbeddedCamerata/BBB_led_flask_web_control 项目介绍 基于 BeagleBoard Black 开发板…

ChatGPT 推出“Auto”自动模式:智能匹配你的需求

OpenAI 最近为 ChatGPT 带来了一项新功能——“Auto”自动模式&#xff0c;这一更新让所有用户无论使用哪种设备都能享受到更加个性化的体验。简单来说&#xff0c;当你选择 Auto 模式后&#xff0c;ChatGPT 会根据你输入的提示词复杂程度&#xff0c;自动为你挑选最适合的AI模…

DataGrip远程连接Hive

学会用datagrip远程操作hive 连接前提条件&#xff1a; 注意&#xff1a;mysql是否是开启状态 启动hadoop集群 start-all.sh 1、启动hiveserver2服务 nohup hiveserver2 >> /usr/local/soft/hive-3.1.3/hiveserver2.log 2>&1 & 2、beeline连接 beelin…

缓存装饰器@cached_property

这个装饰器好像在好多包里都有&#xff0c;我在阅读源码的过程中&#xff0c;transformers.utils也有这个。查阅资料&#xff0c;大体上了解了它的用法。参考&#xff1a;[python]cached_property缓存装饰器 - faithfu - 博客园 这个装饰器用在类里面的某个方法前面&#xff0…

统信服务器操作系统【qcow2 镜像空间扩容】方案

使用 qcow2 镜像安装系统,当默认安装系统存储空间不够用时,进行自定义扩容 文章目录 准备环境扩容步骤一、检查环境信息1.查看镜像信息2.查看镜像分区信息3.确认需要扩容的分区名二、扩容1.备份镜像2.创建新的镜像文件,并指定空间3.将系统扩容到新的镜像三、扩容 lvm 分区四…

用5款AI帮你写论文,只需10分钟(附详细工具)

在当前的学术写作领域&#xff0c;AI技术的应用已经变得越来越普遍。借助这些工具&#xff0c;学生和研究人员可以显著提高写作效率&#xff0c;并在短时间内生成高质量的论文初稿。以下是五款值得推荐的AI论文写作工具&#xff0c;它们可以帮助你在10分钟内完成一篇论文&#…

【笔记】自动驾驶预测与决策规划_Part4_时空联合规划

文章目录 0. 前言1. 时空联合规划的基本概念1.1 时空分离方法1.2 时空联合方法 2.基于搜索的时空联合规划 &#xff08;Hybrid A* &#xff09;2.1 基于Hybrid A* 的时空联合规划建模2.2 构建三维时空联合地图2.3 基于Hybrid A*的时空节点扩展2.4 Hybrid A* &#xff1a;时空节…

Centos 7 搭建Samba

笔记&#xff1a; 环境&#xff1a;VMware Centos 7&#xff08;网络请选择桥接模式&#xff0c;不要用NAT&#xff09; 遇到一个问题就是yum 安装404&#xff0c;解决办法在下面&#xff08;没有遇到可以无视这句话&#xff09; # 安装Samba软件 yum -y install samba# 创建…

性能测试:性能测试计划

性能测试计划是在进行软件或系统的性能测试之前制定的详细计划和指导文件。它描述了所需性能测试的目标、范围、测试环境、资源需求、测试策略、测试用例、时间表等重要信息。 为什么要制定性能测试计划 制定性能测试计划的主要目的是确保性能测试的有效性和可靠性。以下是制…

THREE.JS法线Shader

以普通情况而论 vNormal normal;//...gl_FragColor vec4( vNormal, 1. );vNormal normal;//...gl_FragColor vec4( normalize( vNormal ) * 0.5 0.5, 1. );vNormal normalMatrix * normal;//...gl_FragColor vec4( normalize( vNormal ) * 0.5 0.5, 1. );normalMa…

【Android】布局优化—include,merge,ViewStub的使用方法

引言 1.重要性 在Android应用开发中&#xff0c;布局是用户界面的基础。一个高效的布局不仅能提升用户体验&#xff0c;还能显著改善应用的性能。随着应用功能的复杂性增加&#xff0c;布局的优化变得尤为重要。优化布局能够减少渲染时间&#xff0c;提高响应速度&#xff0c…

JavaWeb纯小白笔记02:Tomcat的使用:发布项目的三种方式、配置虚拟主机、配置用户名和密码

通过Tomcat进行发布项目的目的是为了提供项目的访问能力&#xff1a;Tomcat作为Web服务器&#xff0c;能够处理HTTP请求和响应&#xff0c;将项目的内容提供给用户进行访问和使用。 一.Tomcat发布项目的三种方式&#xff1a; 第一种&#xff1a;直接在Tomcat文件夹里的webapp…

K8s Calico替换为Cilium,以及安装Cilium过程(鲁莽版)

迁移CNI插件的3种办法&#xff1a; 1、创建一个新的集群&#xff0c;通过Gitops的方式迁移负载&#xff0c;然而&#xff0c;这可能涉及大量的准备工作和潜在的中断。 2、另一种方法是重新配置/etc/cni/net.d/指向Cilium。但是&#xff0c;现有的pod仍将由旧的…

Stable Diffusion 优秀博客转载

初版论文地址&#xff1a;https://arxiv.org/pdf/2112.10752 主要流程图&#xff1a; Latent Diffusion Models&#xff08;LDMs&#xff09; DDPM是"Denoising Diffusion Probabilistic Models"的缩写&#xff0c; 去噪扩散概率模型 博客&#xff1a; https://ja…