2. kafka 生产者

一. 生产者消息发送流程

在这里插入图片描述

在消息发送的过程中,涉及到了两个线程:main线程和Sender线程。Producer发送的消息会分别经过Interceptors(拦截器),Serializer(序列化器),Partitioner(分区器)最终到达RecordAccumulator,RecordAccumulator是一个双端队列,主要起缓冲区的作用。Sender线程不断从RecordAccumulator中拉取消息发送到 Kafka集群。

二. 异步发送
1. 普通异步发送

普通异步发送指生产者在完成消息发送后不会等待Kafka集群的响应,而是继续去发送下一条消息。

代码实现:

package kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaSenderDemo  {private final static String BOOTSTRAP_SERVERS = "192.168.205.154:9092,192.168.205.155:9092,192.168.205.156:9092";public static void main(String[] args) throws Exception {Properties properties = new Properties();// 设置bootstrap server地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 设置消息的key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("first", "message: " + i));}producer.close();}
}

maven依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

启动kafka-console-consumer消费者

[root@hadoop1 kafka-3.6.0]# ./bin/kafka-console-consumer.sh  --bootstrap-server 192.168.205.154:9092 --topic first

运行结果:
在这里插入图片描述

2. 带回调的异步发送

带回调的异步发送是指异步发送后,生产者收到Kafka集群返回的Ack时会执行回调函数。

代码实现:

package kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaSenderDemo  {private final static String BOOTSTRAP_SERVERS = "192.168.205.154:9092,192.168.205.155:9092,192.168.205.156:9092";public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("first", String.valueOf(i), "message: " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + ", 分区: " + recordMetadata.partition());} else {e.printStackTrace();}}});}producer.close();}
}

运行结果:
在这里插入图片描述

主题:first, 分区: 1
主题:first, 分区: 1
主题:first, 分区: 0
主题:first, 分区: 0
主题:first, 分区: 0
主题:first, 分区: 0
主题:first, 分区: 2
主题:first, 分区: 2
主题:first, 分区: 2
主题:first, 分区: 2
三. 同步发送

在同步发送模式下,生产者发送完消息后会阻塞等待Kafka集群的响应,生产者收到Kafka集群的Ack才会进行下一步操作,同步发送的方式大大提高了消息的可靠性,但是也会因此损失性能。同步发送只需要执行完send方法后再调用一下 get()方法即可。

代码实现:

package kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaSenderDemo  {private final static String BOOTSTRAP_SERVERS = "192.168.205.154:9092,192.168.205.155:9092,192.168.205.156:9092";public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("first", "sync message: " + i)).get();}producer.close();}
}

运行结果:
在这里插入图片描述

四. 自定义分区器

Kafka中的Topic是可以分区的,使用分区的好处是显而易见的,它可以合理的使用存储资源,提高并行度,一个Topic的多个分区分散在不同的主机上,可以充分利用集群资源。

生产者发送的每一条消息最终只会进入某一个分区,决定消息和分区映射关系的就是Partitioner。Kafka默认分区器是DefaultPartitioner。

以下是DefaultPartitioner的部分源代码:

public class DefaultPartitioner implements Partitioner {
...public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {if (keyBytes == null) {return stickyPartitionCache.partition(topic, cluster);}return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);}
...
}

根据源码可以得出DefaultPartitioner的映射规则如下:

  1. 指明partition的情况下,直接将指明的值作为partition的值。
  2. 没有指明partition但有key的情况下,将key的hash值与topic的分区数取余得到partition的值。
  3. 既没有指明key又没有partition值的情况下,kafka采用Sticky Partition(粘性分区器),会随机选择一个分区,并一致尽可能使用该分区,待该分区的batch已满或者已完成,kafka再随机选择一个分区进行使用(和上一次的分区不同)

如果DefaultPartitioner不能满足实际业务的分区要求,那么可以自定义分区器,要自定义分区器只需要实现Partitioner类即可。下面以一个需求来说明如何实现自定义分区器。
需求:

将包含hello字符串的消息发送到分区0
将包含world字符串的消息发送到分区1

代码实现:

KafkaOwnPartitionerDemo.java

package kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class KafkaOwnPartitionerDemo {private final static String BOOTSTRAP_SERVERS = "192.168.205.154:9092,192.168.205.155:9092,192.168.205.156:9092";public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {String[] msgPrefix = {"hello", "world"};String msg = msgPrefix[i % 2] + i;producer.send(new ProducerRecord<String, String>("first", String.valueOf(i), msg), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("消息:" + msg + ", 主题: " + recordMetadata.topic() + ", 分区: " + recordMetadata.partition());} else {e.printStackTrace();}}});}producer.close();}
}

MyPartitioner.java

package kafka;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String msg = new String(valueBytes);if (msg.contains("hello")) {return 0;} else {return 1;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

运行结果:
在这里插入图片描述

消息:hello0, 主题: first, 分区: 0
消息:hello2, 主题: first, 分区: 0
消息:hello4, 主题: first, 分区: 0
消息:hello6, 主题: first, 分区: 0
消息:hello8, 主题: first, 分区: 0
消息:world1, 主题: first, 分区: 1
消息:world3, 主题: first, 分区: 1
消息:world5, 主题: first, 分区: 1
消息:world7, 主题: first, 分区: 1
消息:world9, 主题: first, 分区: 1
五. 生产者重要参数列表
参数名称参数描述
bootstrap.servers生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单
key.serializer 和 value.serializer指定发送消息的key和value的序列化类型
buffer.memoryRecordAccumulator 缓冲区总大小,默认 32m。
batch.size缓冲区一批数据最大值, 默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size, sender 等待 linger.time之后就会发送数据。单位 ms, 默认值是 0ms,表示没有延迟。
acks- 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据, Leader 收到数据后应答。-1(all):生产者发送过来的数据, Leader+和 ISR队列里面的所有节点收齐数据后应答

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/474338.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

web应用安全和信息泄露预防

文章目录 1&#xff1a;spring actuator导致的信息泄露1.1、Endpoint配置启用检测1.2、信息泄露复现1.3、防御 2&#xff1a;服务端口的合理使用3&#xff1a;弱口令&#xff08;密码&#xff09;管理4&#xff1a;服务端攻击4.1、短信业务&#xff0c;文件上传等资源型接口1、…

智慧安防丨以科技之力,筑起防范人贩的铜墙铁壁

近日&#xff0c;贵州省贵阳市中级人民法院对余华英拐卖儿童案做出了一审宣判&#xff0c;判处其死刑&#xff0c;剥夺政治权利终身&#xff0c;并处没收个人全部财产。这一判决不仅彰显了法律的威严&#xff0c;也再次唤起了社会对拐卖儿童犯罪的深切关注。 余华英自1993年至2…

计算机编程中的测试驱动开发(TDD)及其在提高代码质量中的应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 计算机编程中的测试驱动开发&#xff08;TDD&#xff09;及其在提高代码质量中的应用 计算机编程中的测试驱动开发&#xff08;T…

【MYSQL】锁详解(全局锁、表级锁、行级锁)【快速理解】

目录 一、全局锁 二、表级锁 1.表锁 2.元数据锁 3.意向锁 三、行级锁 1. 行锁 2.间隙锁 3.临建锁 锁是处理并发情况下&#xff0c;对数据的一致性的关键因素&#xff0c;也是并发情况下对效率影响非常大的。 1、全局锁&#xff1a;锁定表中所有数据。 2、表级锁&#xff1a;…

thinkphp6 入门(2)--视图、渲染html页面、赋值

use think\facade\View;View::assign([name > ThinkPHP,email > thinkphpqq.com]);View::assign(data,[name > ThinkPHP,email > thinkphpqq.com]); View::fetch(index);助手函数 view(index, [name > ThinkPHP,email > thinkphpqq.com ]); 模板输出 {$na…

百度智能云 VectorDB 优势数量 TOP 1

近日&#xff0c;IDC 发布了《RAG 与向量数据库市场前景预测》报告&#xff0c;深入剖析了检索增强生成&#xff08;RAG&#xff09;技术和向量数据库市场的发展趋势。报告不仅绘制了 RAG 技术的发展蓝图&#xff0c;还评估了市场上的主要厂商。在这一评估中&#xff0c;百度智…

MySQL索引的底层实现原理是什么?

MySQL索引的底层实现主要基于B树数据结构。B树是一种平衡多路查找树&#xff0c;具有以下特点&#xff1a; 1、树的所有叶子节点都位于同一层&#xff1a; 这确保了从根节点到每个叶子节点的路径长度相同&#xff0c;保证了查询效率的一致性。 2、节点中的数据按键值大小有序…

手搓神经网络(MLP)解决MNIST手写数字识别问题 | 数学推导+代码实现 | 仅用numpy,tensor和torch基本计算 | 含正反向传播数学推导

手写数字识别&#xff08;神经网络入门&#xff09; 文章目录 手写数字识别&#xff08;神经网络入门&#xff09;实验概述实验过程数据准备模型实现线性变换层前向传播反向传播更新参数整体实现 激活函数层&#xff08;ReLU&#xff09;前向传播反向传播整体实现 Softmax层&am…

在MATLAB中导入TXT文件的若干方法

这是一篇关于如何在MATLAB中导入TXT文件的文章&#xff0c;包括示例代码和详细说明 文章目录 在MATLAB中导入TXT文件1. 使用readtable函数导入TXT文件示例代码说明 2. 使用load函数导入TXT文件示例代码说明 3. 使用importdata函数导入TXT文件示例代码说明 4. 自定义导入选项示例…

ks 小程序sig3

前言 搞了app版的快手之后 &#xff08;被风控麻了&#xff09; 于是试下vx小程序版的 抓包调试 小程序抓包问题 网上很多教程&#xff0c; github也有开源的工具代码 自行搜索 因为我们需要调试代码&#xff0c;所以就用了下开源的工具 &#xff08;可以用chrome的F12功能&a…

解决Spring Boot整合Redis时的连接问题

前言 在使用Spring Boot整合Redis的过程中&#xff0c;经常会遇到连接问题&#xff0c;尤其是当Redis服务部署在远程服务器上时。 问题描述 当你尝试连接到Redis服务器时&#xff0c;可能会遇到以下错误&#xff1a; org.springframework.data.redis.connection.PoolExcept…

算法--“汽车加油”问题.

def greedy():n 100 # 汽车满油后可行驶的最大距离d [50, 80, 39, 60, 40, 32] # 加油站的距离k len(d) # 加油站的数量# 检查是否有加油站距离超过汽车的最大行驶距离for dist in d:if dist > n:print(no solution)returnnum 0 # 加油次数current_position 0 # 当…

道陟科技EMB产品开发进展与标准设计的建议|2024电动汽车智能底盘大会

11月12日&#xff0c;2024电动汽车智能底盘大会在重庆开幕。会议由中国汽车工程学会主办&#xff0c;电动汽车产业技术创新战略联盟、中国汽车工程学会智能底盘分会、智能绿色车辆与交通全国重点实验室承办。本届大会围绕电动汽车智能底盘相关技术发展与融合&#xff0c;满足高…

sqli—labs靶场 5-8关 (每日4关练习)持续更新!!!

Less-5 上来先进行查看是否有注入点&#xff0c;判断闭合方式&#xff0c;查询数据列数&#xff0c;用union联合注入查看回显位&#xff0c;发现到这一步的时候&#xff0c;和前四道题不太一样了&#xff0c;竟然没有回显位&#xff1f;&#xff1f;&#xff1f; 我们看一下源…

【qt】控件3

1.setToolTip和setToolTipDuration setToolTip这个函数用来设置提醒内容 setToolTipDuration这个函数用来设置提醒时间 Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);ui->help->setToolTip("按下这个按键就可以提…

STM32 使用 STM32CubeMX HAL库实现低功耗模式

STM32 使用 HAL 库的低功耗模式测试使用 ...... 矜辰所致前言 上次画了一个 STM32L010F4 最小系统的板子&#xff0c;也做了一些基本测试&#xff0c;但是最重要的低功耗一直拖到现在&#xff0c;以前在使用 STM32L151 的时候用标准库做过低功耗的项目&#xff0c;现在都使…

js实现导航栏鼠标移入时,下划线跟随鼠标滑动

话不多说&#xff0c;上代码&#xff1a; html代码&#xff1a; <div class"nav clearfix"><div class"bottomLine"></div><ul class"clearfix"><li class"nav__item"><a href"./index.html&…

React教程第二节之虚拟DOM与Diffing算法理解

1、什么是虚拟DOM 虚拟DOM 是javascript的一个对象&#xff0c;是内存中的一种数据结构&#xff0c;以树的形式存储UI的状态&#xff0c;树中的每个节点都代表着真实的DOM&#xff0c;用来描述我们希望在页面看到的 HTML结构&#xff1b; 现在的MVVM 框架&#xff0c;大多使用…

多线程4:线程池、并发、并行、综合案例-抢红包游戏

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

【WPF】Prism库学习(一)

Prism介绍 1. Prism框架概述&#xff1a; Prism是一个用于构建松耦合、可维护和可测试的XAML应用程序的框架。它支持WPF、.NET MAUI、Uno Platform和Xamarin Forms等多个平台。对于每个平台&#xff0c;Prism都有单独的发布版本&#xff0c;并且它们在不同的时间线上独立开发。…