zookeperkafka学习

1、why kafka

优点   缺点
kafka
  • 吞吐量高,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。
延迟也会高,不适合电商场景。
RabbitMQ
  • 如果有大量消息堆积在队列中,性能会急剧下降
  • 每秒处理几万到几十万的消息。如果应用要求高的性能,不要选择RabbitMQ。
性能RocketMQ低
RocketMQ
  • 性能比RabbitMQ高一个数量级,适合电商场景。
  • RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。
  • 每秒处理几十万的消息,同时响应在毫秒级。如果应用很关注响应时间,可以使用RocketMQ。

2、Broker:

缓存代理(可以把Broker理解为Kafka的服务器),Kafka 集群中的一台或多台服务器统称为 broker。kafka中支持消息持久化的,生产者生产消息后,kafka不会直接把消息传递给消费者,而是先要在broker中进行存储,持久化是保存在kafka的日志文件中。 

3、分区:

一个消费者可以对应多个分区,一个分区只能对应一个消费者。

topic分区有leader和follower。 Log的分区被分布到集群中的多个服务器上。每个服务器处理它分到的分区。 根据配置每个分区还可以复制到其它服务器作为备份容错。 每个分区有一个leader,零或多个follower。Leader处理此分区的所有的读写请求,而follower被动的复制数据。如果leader宕机,其它的一个follower会被推举为新的leader。 一台服务器可能同时是一个分区的leader,另一个分区的follower。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。

4、消费者组 :

topic实现JMS模型中消费者组中只有一个消费者,这种情况下topic的消费的offset是无序的。当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。

kafka为什么读写快?

利用零拷贝和页面缓存技术,零拷贝技术读取文件数据并发送到网络的步骤如下:

  • 将磁盘文件的数据复制到页面缓存。
  • 将数据从页面缓存直接发送到网卡从而发到网络中。

rebalance

主要是对partition的个数和group当中的consumer个数重新统计,再重新对应consumer和partition的关系。一个消费者可以对应多个分区。一个分区只能对应一个消费者。

kafka producer API

生产者的分区由key决定

我们创建消息的时候,必须要提供主题和消息的内容,而消息的key是可选的,当不指定key时默认为null。消息的key有两个重要的作用:1)提供描述消息的额外信息;2)用来决定消息写入到哪个分区,所有具有相同key的消息会分配到同一个分区中。

如果key为null,那么生产者会使用默认的分配器,该分配器使用轮询(round-robin)算法来将消息均衡到所有分区。

如果key不为null而且使用的是默认的分配器,那么生产者会对key进行哈希并根据结果将消息分配到特定的分区。

案例:

Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384); //默认是16kB, 每个Batch要存放batch.size大小的数据后,才可以发送出去。props.put("linger.ms", 1); //一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了。props.put("buffer.memory", 33554432); //默认是32MB,KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的。props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for(int i = 0; i < 100; i++)producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));producer.close();

kafka consumer API

案例一:手动同步提交

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "false");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));final int minBatchSize = 200;List<ConsumerRecord<String, String>> buffer = new ArrayList<>();while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {insertIntoDb(buffer);consumer.commitSync();buffer.clear();}}

案例二:每个partition手动同步提交

try {while(running) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);for (TopicPartition partition : records.partitions()) {//拿到这个partition下面的所有数据List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value());}//通过这个partition的list获取最后一个数据的offsetlong lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {consumer.close();}

Kafka文件存储:

知道通过分片和索引机制找到offset的就行了。index和log文件以当前的第一条消息的offset命名。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/196991.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

若依框架数据源切换为pg库

一 切换数据源 在ruoyi-admin项目里引入pg数据库驱动 <dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.18</version> </dependency>修改配置文件里的数据源为pg spring:d…

交通 | 神奇动物在哪里?Operations Research经典文章

论文作者&#xff1a;Robert G. Haight, Charles S. Revelle, Stephanie A. Snyder​ 论文原文&#xff1a;Robert G. Haight, Charles S. Revelle, Stephanie A. Snyder, (2000) An Integer Optimization Approach to a Probabilistic Reserve Site Selection Problem. Operat…

解决java在idea运行正常,但是打成jar包后中文乱码问题

目录 比如&#xff1a; 打包命令使用utf-8编码&#xff1a; 1.当在idea中编写的程序,运行一切正常.但是当被打成jar包时,执行的程序会中文乱码.产生问题的原因和解决方案是什么呢? 一.问题分析 分别使用idea和jar包形式打印出System中所有的jvm参数---代码如下: public static…

H5ke11--1登录界面一直保存--用本地localStorage存储

目录 代码详解 localStage优点 :一直保存着 注意事项: storage属性们 代码详解 ke8学校陈老师H5-CSDN博客文章浏览阅读76次。实现H5中新增的三个元素&#xff1a;forEach的使用方法。https://blog.csdn.net/m0_72735063/article/details/134019012即此之后 当然可以分为按…

快速入门:构建您的第一个 .NET Aspire 应用程序

##前言 云原生应用程序通常需要连接到各种服务&#xff0c;例如数据库、存储和缓存解决方案、消息传递提供商或其他 Web 服务。.NET Aspire 旨在简化这些类型服务之间的连接和配置。在本快速入门中&#xff0c;您将了解如何创建 .NET Aspire Starter 应用程序模板解决方案。 …

Postman接收列表、数组参数@RequestParam List<String> ids

示例如下: 接口定义如下: GetMapping(value "/queryNewMoviePath")public List<Map<String, Object>> queryNewMoviePath(RequestParam List<String> ids ) {return service.queryNewMoviePath(ids);}postman中测试如下&#xff1a; http://loc…

MFA多因子认证

什么是多因子认证&#xff08;MFA&#xff09;&#xff1f;为什么需要MFA&#xff1f; 同义词 多因子认证或者多因素验证 [尤其是需要做等级保护测评的时候需要用到] 摘要 多因子认证MFA&#xff08;Multi Factor Authentication&#xff09;是一种安全认证过程&#xff0c;需…

k8s-部署Redis-cluster(TLS)

helm pull bitnami/redis-cluster v8.3.8拉取源码生成证书 git clone https://github.com/redis/redis.git #文档 https://redis.io/docs/management/security/encryption/#getting-started生成你的TLS证书用官网的工具生成 1 Run ./utils/gen-test-certs.sh 生成根CA和服务…

springboot321基于java的校园服务平台设计与开发

交流学习&#xff1a; 更多项目&#xff1a; 全网最全的Java成品项目列表 https://docs.qq.com/doc/DUXdsVlhIdVlsemdX 演示 项目功能演示&#xff1a; ————————————————

【双指针】复写0

复写0 1089. 复写零 - 力扣&#xff08;LeetCode&#xff09; 给你一个长度固定的整数数组 arr &#xff0c;请你将该数组中出现的每个零都复写一遍&#xff0c;并将其余的元素向右平移。 注意&#xff1a;请不要在超过该数组长度的位置写入元素。请对输入的数组 就地 进行上…

python趣味编程-5分钟实现一个益智数独游戏(含源码、步骤讲解)

Puzzle Game In Python是用 Python 编程语言Puzzle Game Code In Python编写的,有一个 4*4 的棋盘,有 15 个数字。然后将数字随机洗牌。 在本教程中,我将教您如何使用Python 创建记忆谜题游戏。 Python Puzzle Game游戏需要遵循以下步骤,首先是将图块数量移动到空的图块空…

软件开发、网络空间安全、人工智能三个方向的就业和前景怎么样?哪个方向更值得学习?

软件开发、网络空间安全、人工智能这三个方向都是当前及未来的热门领域&#xff0c;每个领域都有各自的就业前景和价值&#xff0c;以下是对这三个方向的分析&#xff1a; 1、软件开发&#xff1a; 就业前景&#xff1a;随着信息化的加速&#xff0c;软件开发的需求日益增长。…

重生之我是一名程序员 34

哈喽啊大家晚上好&#xff01; 今天给大家带来的知识是——库函数qsort。首先&#xff0c;给大家介绍一下qsort函数&#xff0c; qsort函数是C标准库中的一种排序函数&#xff0c;用于对数组中的元素进行快速排序。它接受四个参数&#xff1a;待排序数组的基地址&#xff0c;数…

某60区块链安全之重入漏洞实战记录

区块链安全 文章目录 区块链安全重入漏洞实战实验目的实验环境实验工具实验原理实验内容 重入漏洞实战 实验目的 学会使用python3的web3模块 学会以太坊重入漏洞分析及利用 实验环境 Ubuntu18.04操作机 实验工具 python3 实验原理 以太坊智能合约的特点之一是能够调用和…

若依前后端分离版,快速上手

哈喽~大家好&#xff0c;这篇来看看若依前后端分离版&#xff0c;快速上手&#xff08;肝了挺久的&#xff09;。 &#x1f947;个人主页&#xff1a;个人主页​​​​​ &#x1f948; 系列专栏&#xff1a;【Springboot和Vue全栈开发】…

Spring Boot - filter 的顺序

定义过滤器的执行顺序 1、第一个过滤器 import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; impor…

⑩③【MySQL】详解SQL优化

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ SQL优化 ⑩③【MySQL】了解并掌握SQL优化1. 插…

【C++】类与对象(上)

目录 1. 面向过程和面向对象初步认识 2. 类的引入 3. 类的定义 4. 类的访问限定符及封装 4.1 访问限定符 4.2 封装 5. 类的作用域 6. 类的实例化 7. 类对象模型 7.1 如何计算类对象的大小 7.2 类对象的存储方式猜测 7.3 结构体内存对齐规则 8. this指针 8.1 this指…

大数据毕业设计选题推荐-机房信息大数据平台-Hadoop-Spark-Hive

✨作者主页&#xff1a;IT研究室✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

OpenGL 坐标投影与反投影(Qt)

文章目录 一、简介1.1投影1.2反投影二、应用代码三、实现效果参考资料一、简介 在学习OpenGL一段时间之后,我们都会了解坐标的转换过程,如下图所示: 1.1投影 正如图中所述,OpenGL将一个3D坐标投影到一个2D空间主要有以下几个步骤,这也是我们比较熟知的几个步骤: 现实局部…