三、Kafka生产者

目录

    • 3.1 生产者消息发送流程
      • 3.1.1 发送原理
    • 3.2 异步发送 API
    • 3.3 同步发送数据
    • 3.4 生产者分区
      • 3.4.1 kafka分区的好处
      • 3.4.2 生产者发送消息的分区策略
      • 3.4.3 自定义分区器
    • 3.5 生产者如何提高吞吐量
    • 3.6 数据可靠性

3.1 生产者消息发送流程

3.1.1 发送原理

3.2 异步发送 API

3.3 同步发送数据

3.4 生产者分区

3.4.1 kafka分区的好处

  • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

3.4.2 生产者发送消息的分区策略

在这里插入图片描述

3.4.3 自定义分区器

1、需求:
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区

2、定义类实现 Partitioner 接口,重写 partition()方法。

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取数据 atguigu  helloString msgValues = value.toString();int partition;if (msgValues.contains("atguigu")){partition = 0;}else {partition = 1;}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

3、使用分区器的方法,在生产者的配置中添加分区器参数

public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {// 0 配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");// 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 关联自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");// 1 创建kafka生产者对象// "" helloKafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("主题: " + metadata.topic() + " 分区: " + metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
}

在这里插入图片描述



3.5 生产者如何提高吞吐量

  • 分批次发送消息
  • 对生产者消息采用压缩

四个重要参数:
在这里插入图片描述

public class CustomProducerParameters {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接kafka集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");// 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 缓冲区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// 批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 1 创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2 发送数据for (int i = 0; i < 50; i++) {kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));}// 3 关闭资源kafkaProducer.close();}
}

3.6 数据可靠性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/98537.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【观察】戴尔科技:构建企业创新“韧性”,开辟数实融合新格局

过去几年&#xff0c;国家高度重视发展数字经济&#xff0c;将其上升为国家战略。其中&#xff0c;“十四五”规划中&#xff0c;就明确提出要推动数字经济和实体经济的深度融合&#xff0c;以数字经济赋能传统产业转型升级&#xff1b;而2023年年初正式发布的《数字中国建设整…

python使用matplotlib实现折线图的绘制

一、意义 数据可视化可以以简洁的方式呈现出数据&#xff0c;发现众多数据中隐藏的规律和意义。Matplotlib是一个数学绘图库。利用它可以制作简单的图表&#xff08;散点图、折线图&#xff09;。然后&#xff0c;将基于漫步概念生成一个更有趣的数据集–根据一系列随机决策生成…

【React学习】—组件三大核心属性: state(七)

【React学习】—组件三大核心属性: state&#xff08;七&#xff09; 2.2.2. 理解 state是组件对象最重要的属性, 值是对象(可以包含多个key-value的组合)组件被称为"状态机", 通过更新组件的state来更新对应的页面显示(重新渲染组件) 2.2.3. 强烈注意 组件中rend…

RocketMQ 消息消费 轮询机制 PullRequestHoldService

1. 概述 先来看看 RocketMQ 消费过程中的轮询机制是啥。首先需要补充一点消费相关的前置知识。 1.1 消息消费方式 RocketMQ 支持多种消费方式&#xff0c;包括 Push 模式和 Pull 模式 Pull 模式&#xff1a;用户自己进行消息的拉取和消费进度的更新Push 模式&#xff1a;Broker…

Redis从基础到进阶篇(一)

目录 一、了解NoSql 1.1 什么是Nosql 1.2 为什么要使用NoSql 1.3 NoSql数据库的优势 1.4 常见的NoSql产品 1.5 各产品的区别 二、Redis介绍 2.1什么是Redis 2.2 Redis优势 2.3 Redis应用场景 2.4 Redis下载 三、Linux下安装Redis 3.1 环境准备 3.2 Redis的…

通过LD_PRELOAD绕过disable_functions

LD_PRELOAD 在UNIX的动态链接库的世界中&#xff0c;LD_PRELOAD就是这样一个环境变量&#xff0c;它可以影响程序的运行时的链接&#xff08;Runtime linker&#xff09;&#xff0c;它允许你定义在程序运行前优先加载的动态链接库。这个功能主要就是用来有选择性的载入不同动态…

udp与can通信的选择与比较

UDP&#xff08;用户数据报协议&#xff09;和CAN&#xff08;控制器局域网&#xff09;是两种不同的通信协议&#xff0c;它们在实时传递性上有一些区别。 UDP是一种无连接的传输协议&#xff0c;它提供了简单的、不可靠的数据传输。UDP不提供可靠性保证、流控制或重传机制。…

根据源码,模拟实现 RabbitMQ - 内存数据管理(4)

目录 一、内存数据管理 1.1、需求分析 1.2、实现 MemoryDataCenter 类 1.2.1、ConcurrentHashMap 数据管理 1.2.2、封装交换机操作 1.2.3、封装队列操作 1.2.4、封装绑定操作 1.2.5、封装消息操作 1.2.6、封装未确认消息操作 1.2.7、封装恢复数据操作 一、内存数据管理…

protobuf+netty自定义编码解码

protobufnetty自定义编 项目背景 protobufnetty自定义编码解码 比如心跳协议&#xff0c;客户端请求的协议是10001&#xff0c;在java端如何解码&#xff0c;心跳返回协议如何编码&#xff0c;将协议号带过去 // 心跳包 //10001 message c2s_heartbeat { }//10002 message …

【C++笔记】C++之类与对象(中)

【C笔记】C之类与对象&#xff08;中&#xff09; 1、类的构造函数1.1、构造函数的基本用法1.2、构造函数的7个特性 2、类的析构函数2.1、析构函数的基本用法2.2、析构函数的6个特性 3、类的拷贝构造函数3.1、拷贝构造的基本用法3.2、拷贝构造的“无限套娃”陷阱3.3、深拷贝与浅…

二叉树搜索

✅<1>主页&#xff1a;我的代码爱吃辣&#x1f4c3;<2>知识讲解&#xff1a;数据结构——二叉搜索树☂️<3>开发环境 &#xff1a;Visual Studio 2022&#x1f4ac;<4>前言&#xff1a;在之前的我们已经学过了普通二叉树&#xff0c;了解了基本的二叉树…

vue导出文件流获取附件名称并下载(在response.headers里解析filename导出)

导出文件流下载&#xff0c;拦截器统一处理配置 需求以往实现的方法&#xff08;各自的业务层写方法&#xff09;现在实现的方法&#xff08;axios里拦截器统一配置处理&#xff09;把文章链接复制粘贴给后端&#xff0c;让大佬自己赏阅。 需求 之前实现的导出都是各自的业务层…

springboot之多数据源配置

文章目录 一、多数据源的典型使用场景1 业务复杂&#xff08;数据量大&#xff09;2 读写分离 二、如何实现多数据源通过AbstractRoutingDataSource动态指定数据源多数据源切换方式AOPMyBatis插件 三、spring集成多个Mybatis框架 实现多数据源控制四、dynamic-datasource 多数据…

01.Django入门

1.创建项目 1.1基于终端创建Django项目 打开终端进入文件路径&#xff08;打算将项目放在哪个目录&#xff0c;就进入哪个目录&#xff09; E:\learning\python\Django 执行命令创建项目 F:\Anaconda3\envs\pythonWeb\Scripts\django-admin.exe&#xff08;Django-admin.exe所…

残差网络实现

代码中涉及的图片实验数据下载地址&#xff1a;https://download.csdn.net/download/m0_37567738/88235543?spm1001.2014.3001.5501 代码&#xff1a; import torch import torch.nn as nn import torch.nn.functional as F #from utils import load_data,get_accur,train i…

茂名 湛江阳江某学校 ibm x3850服务器维修经历

简介&#xff1a;中国广东省阳江市某中学联想 IBM System x3850 x6服务器维修 io板故障处理经历分享&#xff1a; 这一天一位阳江的老师经其他学校老师介绍推荐对接我&#xff0c;说他们学校有一台ibm服务器出问题了&#xff0c;老师大致跟我描述了一下这台服务器发生故障的前…

Android12之com.android.media.swcodec无法生成apex问题(一百六十三)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

Apache DolphinScheduler 支持使用 OceanBase 作为元数据库啦!

DolphinScheduler是一个开源的分布式任务调度系统&#xff0c;拥有分布式架构、多任务类型、可视化操作、分布式调度和高可用等特性&#xff0c;适用于大规模分布式任务调度的场景。目前DolphinScheduler支持的元数据库有Mysql、PostgreSQL、H2&#xff0c;如果在业务中需要更好…

iOS UIAlertController控件

ios 9 以后 UIAlertController取代UIAlertView和UIActionSheet UIAlertControllerStyleAlert和UIAlertControllerStyleActionSheet。 在UIAlertController中添加按钮和关联输入框 UIAlertAction共有三种类型&#xff0c;默认&#xff08;UIAlertActionStyleDefault&#xff0…

【Linux】进程信号篇Ⅰ:信号的产生(signal、kill、raise、abort、alarm)、信号的保存(core dump)

文章目录 一、 signal 函数&#xff1a;用户自定义捕捉信号二、信号的产生1. 通过中断按键产生信号2. 调用系统函数向进程发信号2.1 kill 函数&#xff1a;给任意进程发送任意信号2.2 raise 函数&#xff1a;给调用进程发送任意信号2.3 abort 函数&#xff1a;给调用进程发送 6…