KAFKA第二课之生产者(面试重点)

生产者学习

1.1 生产者消息发送流程

在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
生产者如何发送的?
现在Main线程中将数据进行处理,处理成IO型数据,然后调用sender进行发送
Main:
1.读取生产者配置
2.产生数据
3.过滤数据(校验什么的)
4.序列化
5.放入缓冲区 RecordAccumulator
6.发送Sender

细节: 考虑的问题 1.生产者配置的读取和修改 2.数据的过滤与分区, 3.缓冲区是如何设置的,大小
4.发送(发送失败怎么样,请求区的大小)
这里注意一下,可以在缓冲区对数据进行压缩,这样就提高缓冲区的容量和发送的数据量,提高吞吐量

1.2 同步发送与异步发送

1.什么是同步和异步

同步就是,串行,一条龙 异步 一起运行
举例: 餐馆点餐
同步: 需要等服务员过来,让服务员记录,
异步: 点餐APP直接点餐,交给队列,让他自己运行

2.发送的同步异步

同步:需要得到返回值
异步:发送过去不管了

3. 分区好处

啥是分区?
将一个数据块分成多个数据块
将数据分布式处理了
存储: 可以分在多个机器上, 也可以整多个副本。便于存储,同时提高健壮性
IO:多个数据块可以同时进行发送接收消费。生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费

4. 默认分区器

前提条件: 1.分区 2.key值
规则:

  • 1存在,按1分区
  • 1不存在,按2.key值对分区数取余得到的值分区
  • 1.2都不存在 随机选个分区,等这个批次发送完了,再换

3 就是粘性分区
那么粘性分区的缺点是什么?
因为缓冲区溢出的条件是,大小和时间双重判断,如果大小不够,但是时间够了,还是会发走,这样,最后导致,分区上产生数据倾斜
如何解决的?
3.3.1 Kafka去掉粘性分区的时间控制,批次只由大小判断

1.3.自定义分区器

1.思路

  • 1.实现接口Parititoner,重写相关方法
  • 2.修改配置 将partitioner设置为默认配置

2.1 自定义分区器代码

public class MyPartitioner implements Partitioner {//  自定义分区器 实现partitioner接口// 1.分区方法@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String data = value.toString();// 创建partition 作为最后的分区标识int partitions;// 分区逻辑// 根据含有的字符串进行判断 判断进入哪个分区if (data.contains("atguigu")){partitions = 0;} else if (data.contains("shangguigu")){partitions = 1;} else {partitions = 2;}return partitions;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

2.2 主类

package com.atguigu.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class ProducerClientAsync {public static void main(String[] args) {// 0 配置对象Properties properties = new Properties();//  --指定kafka的Broker地址properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");//  -- 1.指定序列化器 序列化器的全限定类名properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//.setProperty(ProducerConfig.LINGER_MS_CONFIG,"0");// -- 2.设置分区器properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());// -- 3.获取客户端连接对象KafkaProducer<String,String> kafkaProducer= new KafkaProducer<String,String>(properties);//  key是主题  v是发送内容  这里注意一下// -- 4.发送数据String[] str= {"atguigu","111","atguigu","shangguigu","222"};for (int i =0; i < str.length; i++) {System.out.println(str[i]);try {kafkaProducer.send(new ProducerRecord<>("first", str[i]), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题:" + metadata.topic() + "->"  + "分区:" + metadata.partition());}else {// 出现异常打印exception.printStackTrace();}}}).get();} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}}kafkaProducer.close();}
}

在这里插入图片描述

3.面试细节

1.如何提高生产者的吞吐量

  • 批次大小调到16
  • 将等待时间改成50-100ms 默认是0
  • 压缩数据量,这样每次发送的数据就多了
  • 加大缓冲区大小,进来的数据变多,发送也能提上去

2.生产者如何保证数据可靠性的

主要通过ack机制

1.什么是ACK机制?

根据ack值来决定Kafka集群服务端的存储应答

  • ack=0 最低 生产者只管发送,不用接收
  • ack=1 中等 生产者发送完需要等待Leader保存后回应,
  • ack=-1 最高 生产者发送完需要等待所有副本保存后回应

2.分析ACK机制

性能与安全是成反比的
所以,-1虽然最安全,但是效率最低

3.如果将ACK调到-1会出现什么问题?

有可能出现数据重复发送与接收
比如,在同步的瞬间,Leader死掉,但是其他副本已经落盘,这时候,就是问题了。
因为Leader死掉了,所以会直接更换Leader,选出一个副本作为Leader,注意,这时显示没有收到内容,所以,send重新发送,这时候,每个副本上,收到的就是2份该数据了。

4.应用场景

acks=0 几乎不用
acks=1 传输普通日志,允许丢失
acks=-1 传输高可靠性数据,一般与钱有关

5.ACK=-1一定可靠么?

不一定
如果分区副本数设置为1 ,或者ISR里应答的最小副本数设置为1(默认也是1),这时候,ack=1效果相同了。
也就是说,应答一个,就能走,就没意义了
所以需要完全可靠就需要配置一下
ACK=-1 & 分区副本大于等于2 & ISR应答最小副本数量大于等于2

3. 数据去重

1.概念

至少一次:一次或者多次 完全可靠
在这里插入图片描述
最多一次:直接不管回复只管发送 ack=0

至少:保证数据不丢失,但是无法保证数据不重复
最多: 无法保证数据不丢失

1.如何解决数据的重复发送与接收的问题,同时保证数据的不丢失

注意,这里解决的是sender和服务端的重复发送与接收,而不是生产者本身发送多个重复消息的问题,这个要搞清楚。
一般重复问题,都是通过标识来判别,从而去重的
Kafka 0.11 引入 幂等性和事务
精确一次: 幂等性 +至少一次(ack=-1 & 分区副本>=2 & ISR最小副本>=2)

4.幂等性

1.概念

啥是幂等性,标识一个消息的唯一标识
<pid,partition,Seqnumber>
Pid 是会话ID,每次重新生成会话,就会重新生成PID
partition是分区 标识 消息是哪个分区的
Seqnumber是单调递增的标识,注意,这是每个分区独享的
这三个在一起,才是唯一标识。

2.如何使用幂等性

开启参数enable.idempotence 默认为true,false关闭。
开启开关就行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/90357.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android学习之路(4) UI控件之输入框

本节引言&#xff1a; 在本节中&#xff0c;我们来学习第二个很常用的控件EditText(输入框)&#xff1b; 和TextView非常类似&#xff0c;最大的区别是&#xff1a;EditText可以接受用户输入&#xff01; 1.设置默认提示文本 如下图&#xff0c;相信你对于这种用户登录的界面并…

RocketMQ消费者可以手动消费但无法主动消费问题,或生成者发送超时

1.大多数是配置问题 修改rocketmq文件夹broker.conf 2.配置与集群IP或本地IPV4一样 重启 在RocketMQ独享实例中支持IPv4和IPv6双栈&#xff0c;主要是通过在网络层面上同时支持IPv4和IPv6协议栈来实现的。RocketMQ的Broker端、Namesrv端和客户端都需要支持IPv4和IPv6协议&…

Cookie、Session、Token的区别

有人或许还停留在它们只是验证身份信息的机制&#xff0c;但是它们之间的关系你真的弄懂了么&#xff1f; 发展史&#xff1a; Coolie: Netscape Communications 公司引入了 Cookie 概念&#xff0c;作为在客户端存储状态信息的一种方法。初始目的是为了解决 HTTP 的无状态性…

电脑剪辑用哪个软件比较好?电脑视频剪辑软件分享

在电脑上剪辑视频可以让您更容易地编辑和组织素材&#xff0c;以及添加音频、标题和其他效果。此外&#xff0c;电脑上的剪辑软件通常比手机上的应用程序更强大&#xff0c;使我们可以进行更精细的编辑&#xff0c;并获得更好的最终产品。那么电脑剪辑视频哪个软件比较好用呢&a…

Tomcat 部署优化

Tomcat Tomcat 开放源代码web应用服务器&#xff0c;是由java代码开发的 tomcat就是处理动态请求和基于java代码的页面开发 可以在html当中写入java代码&#xff0c;tomcat可以解析html页面当中的iava&#xff0c;执行动态请求 动态页面机制有问题&#xff1a;不对tomcat进行优…

Sentinel使用实例

不说了&#xff0c;直接上官方文档 https://github.com/alibaba/spring-cloud-alibaba/blob/master/spring-cloud-alibaba-examples/sentinel-example/sentinel-core-example/readme-zh.md Sentinel Example 项目说明 本项目演示如何使用 Sentinel starter 完成 Spring Clo…

vuejs 设计与实现 - 快速diff算法

Vue.js 2 所采用的双端 Diff 算法。既然快速 Diff 算法如此高效&#xff0c;我们有必要了解它的思路。接下来&#xff0c;我们就着重讨论快速 Diff 算法的实现原理。 相同的前置元素和后置元素 快速 Diff 算法借鉴了纯文本 Diff 算法中预处理的步骤。 案例&#xff1a; 旧的…

学习助手(安卓)

首先&#xff0c;这是一款人工智能的学习软件&#xff0c;功能非常的强大&#xff0c;进入软件就能看见多种功能&#xff0c;它可以根据大家提供的主题&#xff0c;环境&#xff0c;文体&#xff0c;语言等要求进行写作&#xff0c;还有诗歌创作&#xff0c;也可以帮我们进行内…

Unity3D高级编程:主程手记学习1

第一章 软件架构 Untiy 分层设计 分层后再分治

深度学习笔记(kaggle课程《Intro to Deep Learning》)

一、什么是深度学习&#xff1f; 深度学习是一种机器学习方法&#xff0c;通过构建和训练深层神经网络来处理和理解数据。它模仿人脑神经系统的工作方式&#xff0c;通过多层次的神经网络结构来学习和提取数据的特征。深度学习在图像识别、语音识别、自然语言处理等领域取得了…

[PyTorch][chapter 49][创建自己的数据集 1]

前言&#xff1a; 后面几章主要利用DataSet 创建自己的数据集&#xff0c;实现建模&#xff0c; 训练&#xff0c;迁移等功能。 目录: pokemon 数据集深度学习工程步骤 一 pokemon 数据集介绍 1.1 pokemon: 数据集地址&#xff1a; 百度网盘路径: https://pan.baidu.com/s/1…

【EI/SCOPUS检索】第三届计算机视觉、应用与算法国际学术会议(CVAA 2023)

第三届计算机视觉、应用与算法国际学术会议&#xff08;CVAA 2023) The 3rd International Conference on Computer Vision, Application and Algorithm 2023年第三届计算机视觉、应用与算法国际学术会议&#xff08;CVAA 2023&#xff09;主要围绕计算机视觉、计算机应用、计…

[PyTorch][chapter 50][创建自己的数据集 2]

前言&#xff1a; 这里主要针对图像数据进行预处理.定义了一个 class Pokemon(Dataset) 类&#xff0c;实现 图像数据集加载,划分的基本方法. 目录&#xff1a; 整体框架 __init__ load_images save_csv divide_data __len__ denormalize __g…

数据结构——堆

数据结构——堆 堆堆简介堆的分类 二叉堆过程插入操作 删除操作向下调整&#xff1a; 增加某个点的权值实现参考代码&#xff1a;建堆方法一&#xff1a;使用 decreasekey&#xff08;即&#xff0c;向上调整&#xff09;方法二&#xff1a;使用向下调整 应用对顶堆 其他&#…

dirsearch_暴力扫描网页结构

python3 dirsearch 暴力扫描网页结构&#xff08;包括网页中的目录和文件&#xff09; 下载地址&#xff1a;https://gitee.com/xiaozhu2022/dirsearch/repository/archive/master.zip 下载解压后&#xff0c;在dirsearch.py文件窗口&#xff0c;打开终端&#xff08;任务栏…

深入理解索引B+树的基本原理

目录 1. 引言 2. 为什么要使用索引&#xff1f; 3. 索引的概述 4. 索引的优点是什么&#xff1f; 4.1 降低数据库的IO成本&#xff0c;提高数据查找效率 4.2 保证数据库每一行数据的唯一性 4.3 加速表与表之间的连接 4.4 减少查询中分组与排序的执行时间 5. 索引的缺点…

[足式机器人]Part3机构运动微分几何学分析与综合Ch03-1 空间约束曲线与约束曲面微分几何学——【读书笔记】

本文仅供学习使用 本文参考&#xff1a; 《机构运动微分几何学分析与综合》-王德伦、汪伟 《微分几何》吴大任 Ch01-4 平面运动微分几何学 3.1 空间曲线微分几何学概述3.1.1 矢量表示3.1.2 Frenet标架 连杆机构中的连杆与连架杆构成运动副&#xff0c;该运动副元素的特征点或特…

【Stable Diffusion】雨天、湿身

一、Models 1.1、Wet Clothes (Clothing Style) [LoHA] WECL SEE-THROUGH WET WET HAIR BIKINI OR SWIMSUIT UNDER CLOTHES NO BRA BRA VISIBLE THROUGH CLOTHES MISC SHIRTS MISC CLOTHES1.2、Rain 雨 Multiply Style rain style1.3、Wet T-Shirt LORA <lora:wetshirt:…

5.1 web浏览安全

数据参考&#xff1a;CISP官方 目录 Web应用基础浏览器所面临的安全威胁养成良好的Web浏览安全意识如何安全使用浏览器 一、Web应用基础 1、Web应用的基本概念 Web ( World wide Web) 也称为万维网 脱离单机Web应用在互联网上占据了及其重要的地位Web应用的发展&#xf…

最新Kali Linux安装教程:从零开始打造网络安全之旅

Kali Linux&#xff0c;全称为Kali Linux Distribution&#xff0c;是一个操作系统(2013-03-13诞生)&#xff0c;是一款基于Debian的Linux发行版&#xff0c;基于包含了约600个安全工具&#xff0c;省去了繁琐的安装、编译、配置、更新步骤&#xff0c;为所有工具运行提供了一个…