Kafka(三)生产者发送消息

文章目录

  • 生产者发送思路
  • 自定义序列化类
  • 配置生产者参数
    • 提升吞吐量
  • 发送消息
  • 关闭生产者
  • 结语
  • 示例源码仓库

生产者发送思路

如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是

  1. ack使用默认的all
  2. 开启重试
  3. 在一定时间内重试不成功,则入库,后续由定时任务继续发送
  4. 这里在某些异常情况下一定会生产重复消息,如何确保消息只消费一次,后续在Consumer实现中详细展开
  5. 这里我们只要确保生产的消息,不论重试多少次,最终都只会被发送到同一分区。Kafka的确定消息的分区策略是: 如果提供了key,则根据hash(key)计算分区。由于我们每个消息都有一个消息ID,不管是重试多少次,ID是不会变的,同时我们不会在消息高峰阶段调整分区数量。所以基于这些,我们保证一个消息无论多少次,都会发送到同一分区。

自定义序列化类

消息格式为JSON, 使用Jackson将类序列化为JSON字符串

public class UserDTOSerializer implements Serializer<UserDTO> {@Override@SneakyThrowspublic byte[] serialize(final String s, final UserDTO userDTO) {ObjectMapper objectMapper = new ObjectMapper();return objectMapper.writeValueAsBytes(userDTO);}
}

配置生产者参数

有几点需要注意

  1. 开启压缩
  2. retries 官方建议不配置, 官方建议使用delivery.timeout.ms 参数来控制重试时间, 默认2分钟
  3. buffer.memory 如果没有什么特别情况,使用默认的即可, 32MB
  4. ack使用默认的all
    /*** 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景吞吐量需求 自己调整* 如果是本地, bootstrap.server的IP地址和docker-compose.yml中的EXTERNAL保持一致* @return*/public static Properties loadProducerConfig(String valueSerializer) {Properties result = new Properties();result.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "l192.168.0.102:9093");result.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);result.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");// 每封邮件消息大小大约20KB, 使用默认配置吞吐量不高,下列配置增加kafka的吞吐量// 默认16384 bytes,太小了,这会导致邮件消息一个一个发送到kafka,达不到批量发送的目的,不符合发送邮件的场景result.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576 * 10);// 默认1048576 bytes,限制的是一个batch的大小,对于20KB的消息来说,消息太小result.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576 * 10);// 等10ms, 为了让更多的消息聚合到一个batch中,提高吞吐量result.put(ProducerConfig.LINGER_MS_CONFIG, 10);return result;}

提升吞吐量

  • 在实际场景中,我们的邮件消息一个大概20KB,而batch.size默认是16KB,也就是说,在不修改该参数的情况下,生产者只能一个一个的发消息,这会导致我们的吞吐量上不去, 所以修改batch.size为10MB
  • 只修改这个参数还不行, max.request.size 限制了单次请求的大小,默认为1MB,也就是说即使batch.size为10MB,但是由于一次只能最多发1MB,吞吐量也上不去,所以这里将max.request.size也改为10MB
  • 由于我们将一个批次可发送的数量大大提高,所以可以让生产者等一会再发,等更多的数据到达。linger.ms默认是为0,也就是立刻发送,根据实际情况适当增加等待时间

发送消息

@Log
public class MessageProducer {public static final KafkaProducer<String, UserDTO> PRODUCER = new KafkaProducer<>(KafkaConfiguration.loadProducerConfig(UserDTOSerializer.class.getName()));private MessageFailedService messageFailedService = new MessageFailedService();/*** kafka producer 发送失败时会进行重试,相关参数 retries 和 delivery.timeout.ms, 官方建议使用delivery.timeout.ms,默认2分钟* callback函数只有在最后一次重试之后才会调用, 如果你想在本地测试Kafka生产者的重试,详情可以看https://lists.apache.org/thread/nwg326bxpo7ry116nqhxmsmc3bokc6hm* @param userDTO*/public void sendMessage(final UserDTO userDTO) {ProducerRecord<String, UserDTO> user = new ProducerRecord<>("email", userDTO.getMessageId(),  userDTO);try {PRODUCER.send( user, (recordMetadata, e) -> {if (Objects.nonNull(e)) {log.severe("message has sent failed");MessageFailedEntity messageFailedEntity = new MessageFailedEntity();messageFailedEntity.setMessageId(userDTO.getMessageId());ObjectMapper mapper = new ObjectMapper();try {messageFailedEntity.setMessageContentJsonFormat(mapper.writeValueAsString(userDTO));} catch (JsonProcessingException jsonProcessingException) {log.severe("message content json format failed");}messageFailedEntity.setMessageType(MessageType.EMAIL);messageFailedEntity.setMessageFailedPhrase(MessageFailedPhrase.PRODUCER);messageFailedEntity.setFailedReason(e.getMessage());// 如果sendMessage传进来的是个list,也同理,不能放到list.foreach外面// 如果放在主线程里,由于kafka producer是异步的,// kafka producer的执行速度可能慢于主线程,可能拿到的值是空的是有问题的,例如拿到的failedReason是空的messageFailedService.saveOrUpdateMessageFailed(messageFailedEntity);} else {log.info("message has sent to topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition() );}});} catch (TimeoutException e) {log.info("send message to kafka timeout, message: ");// TODO: 自定义逻辑,比如发邮件通知kafka管理员}}
}

对上述代码做几点解释

  1. 我们使用异步的方式发送,如果发送成功,打印一条消息
  2. 关键在于重试,callback函数只有在最后一次重试之后才会调用。不会重试多少次就调用多少次callback, 这个问题我发邮件问过社区, 详情见这里的 邮件

关闭生产者

实现ServletContextListener接口, 然后在web.xml的listener元素中配置

public class KafkaListener implements ServletContextListener {private static final List<KafkaProducer> KAFKA_PRODUCERS = new LinkedList<>();@Overridepublic void contextInitialized(ServletContextEvent sce) {KAFKA_PRODUCERS.add(MessageProducer.PRODUCER);}@Overridepublic void contextDestroyed(ServletContextEvent sce) {KAFKA_PRODUCERS.forEach(KafkaProducer::close);}
}
<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaeehttps://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"version="6.0"><listener><listener-class>com.business.server.listener.KafkaListener</listener-class></listener>
</web-app>

结语

  1. 在实际编码过程中,可以参考官方写的Kafka权威指南对应章节书写,或者参考各大云服务厂商的Kafak的开发者文档。不过我建议还是看Kafka权威指南, 我看了阿里云和华为云的,虽然都号称兼容开源Kafka,但是发现其版本和开源版本之间存在一定的滞后性,许多最佳实践已经过时
  2. Kafka生产者端没什么特别的,主要是根据业务场景设计消息格式,以及如何尽可能的减小消息体积
  3. 如果你的消息很大,比我的场景还大,达到了1M以上,生产者的吞吐量是个问题,消费者的消费速度也是个问题。你要是问我有什么好的想法,没有具体场景,我确实想不出什么好的方式

示例源码仓库

  1. Github地址
  2. 项目下business-server module代表生产者
  3. 运行时IDEA配置如下在这里插入图片描述
    注意Application context的路径, 启动之后访问端口+Application context, 例如
http://localhost:8999/business-server

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/193179.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VS Code画流程图:draw.io插件

文章目录 简介快捷键 简介 Draw.io是著名的流程图绘制软件&#xff0c;开源免费&#xff0c;对标Visio&#xff0c;用过的都说好。而且除了提供常规的桌面软件之外&#xff0c;直接访问draw.io就可以在线使用&#xff0c;堪称百分之百跨平台&#xff0c;便捷性直接拉满。 那么…

TOUGH系列软件教程

查看原文>>>全流程TOUGH系列软件实践技术应用 TOUGH系列软件是由美国劳伦斯伯克利实验室开发的&#xff0c;旨在解决非饱和带中地下水、热运移的通用模拟软件。和传统地下水模拟软件Feflow和Modflow不同&#xff0c;TOUGH系列软件采用模块化设计和有限积分差网格剖分…

(八)Spring源码解析:Spring MVC

一、Servlet及上下文的初始化 1.1> DispatcherServlet的初始化 对于Spring MVC来说&#xff0c;最核心的一个类就是DispatcherServlet&#xff0c;它负责请求的行为流转。那么在Servlet的初始化阶段&#xff0c;会调用init()方法进行初始化操作&#xff0c;在DispatcherSe…

第三天课程 RabbitMQ

RabbitMQ 1.初识MQ 1.1.同步和异步通讯 微服务间通讯有同步和异步两种方式&#xff1a; 同步通讯&#xff1a;就像打电话&#xff0c;需要实时响应。 异步通讯&#xff1a;就像发邮件&#xff0c;不需要马上回复。 两种方式各有优劣&#xff0c;打电话可以立即得到响应&am…

数据库事务相关问题

1. 什么是数据库事务&#xff1f; 事务&#xff0c;由一个有限的数据库操作序列构成&#xff0c;这些操作要么全部执行,要么全部不执行&#xff0c;是一个不可分割的工作单位。 假如A转账给B 100 元&#xff0c;先从A的账户里扣除 100 元&#xff0c;再在 B 的账户上加上 100 …

python自动化第一篇—— 带图文的execl的自动化合并

简述 最近接到一个需求&#xff0c;需要为公司里的一个部门提供一个文件上传自动化合并的系统&#xff0c;以供用户稽核&#xff0c;谈到自动化&#xff0c;肯定是选择python&#xff0c;毕竟python的轮子多。比较了市面上几个用得多的python库&#xff0c;我最终选择了xlwings…

SOME/IP学习笔记3

目录 1.SOMEIP Transformer 1.1 SOME/IP on-wire format 1.2 协议指定 2. SOMEIP TP 2.1 SOME/IP TP Header 3.小结 1.SOMEIP Transformer 根据autosar CP 相关规范&#xff0c;SOME/IP Transformer主要用于将SOME/IP格式的数据序列化&#xff0c;相当于一个转换器。总体…

uniapp+vite+vue3开发跨平台app,运行到安卓模拟器调试方法

因为没有使用hbuilder开发uniapp&#xff0c;而是使用了vscode和vite来开发的&#xff0c;所以怎么将这个程序运行到安卓模拟器调试开发呢&#xff1f;其实方法很简单&#xff0c;使用android studio创建一个模拟器或者其他mumu模拟器&#xff0c;然后将项目使用hbuilder打开&a…

macos死机后IDEA打不开,Cannot connect to already running IDE instance.

Cannot connect to already running IDE instance. Exception: Process 573 is still running 解决办法 进入&#xff1a;/Users/lzq/Library/Application Support/JetBrains 找到IDEA的目录删除隐藏文件夹 .lock rm -rf .lock

黑马程序员微服务 第五天课程 分布式搜索引擎2

分布式搜索引擎02 在昨天的学习中&#xff0c;我们已经导入了大量数据到elasticsearch中&#xff0c;实现了elasticsearch的数据存储功能。但elasticsearch最擅长的还是搜索和数据分析。 所以今天&#xff0c;我们研究下elasticsearch的数据搜索功能。我们会分别使用DSL和Res…

入选《人工智能领域内容榜》

入选《人工智能领域内容榜》第 23名 C# OpenCvSharp DNN HybridNets 同时处理车辆检测、可驾驶区域分割、车道线分割-CSDN博客

Playwright UI 自动化测试实战

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;欢迎加入我们一起学习&#xff01;&#x1f4e2;资源分享&#xff1a;耗时200小时精选的「软件测试」资…

Semantic Kernel 学习笔记1

1. 挂代理跑通openai API 2. 无需魔法跑通Azure API 下载Semantic Kernel的github代码包到本地&#xff0c;主要用于方便学习python->notebooks文件夹中的内容。 1. Openai API&#xff1a;根据上述文件夹中的.env.example示例创建.env文件&#xff0c;需要填写下方两个内…

Vue 简单的语法

1.插值表达式 1.插值表达式的作用是什么&#xff1f; 利用表达式进行插值&#xff0c;将数据渲染到页面中&#xff1b; 2.语法结构&#xff1f; {{表达式}} 3.插值表达式的注意点是什么&#xff1f; &#xff08;1&#xff09;使用的数据要存在&#xff0c;在data中&…

错误:ERROR:torch.distributed.elastic.multiprocessing.api:failed

在多卡运行时&#xff0c;会出现错误&#xff08;ERROR:torch.distributed.elastic.multiprocessing.api:failed&#xff09;&#xff0c;但是单卡运行并不会报错&#xff0c;通常在反向梯度传播时多卡梯度不同步。但我是在多卡处理数据进行tokenizer阶段报错&#xff0c;这竟然…

仿真算法收敛与初值的关系

问题&#xff1a; 当电路中存在大电容时&#xff0c;由于初值设置不合理可能导致的仿真算法不收敛的问题。 解决方法&#xff1a;设置初始节点值。 疑问&#xff1a;Node set和Initial Condition的区别。 [求助] node set 和initial condition有很么区别呢&#xff1f; 注&…

Kafka 的应用场景

Kafka 是一个开源的分布式流式平台&#xff0c;它可以处理大量的实时数据&#xff0c;并提供高吞吐量&#xff0c;低延迟&#xff0c;高可靠性和高可扩展性。 Kafka 最初是为分布式系统中海量日志处理而设计的。它可以通过持久化功能将消息保存到磁盘&#xff0c;并让消费者按…

Xshell+Xftp通过代理的方式访问局域网内网服务器

最近在部署项目时遇到只有1台服务器拥有公网ip&#xff0c;其它服务器只有局域网ip&#xff0c;当然其它服务器可以正常访问网络&#xff0c;例如如下模型。之前访问其它几台服务器&#xff0c;都是先通过登录公网IP服务器&#xff0c;然后在Xshell里面执行ssh远程连接&#xf…

uniapp: 实现pdf预览功能

目录 第一章 实现效果 第二章 了解并解决需求 2.1 了解需求 2.2 解决需求 2.2.1 方法一 2.2.2 方法二 第三章 资源下载 第一章 实现效果 第二章 了解并解决需求 2.1 了解需求 前端需要利用后端传的pdf临时路径实现H5端以及app端的pdf预览首先我们别像pc端一样&#…

单相过压继电器DVR-G-100-1 0~500V AC/DC220V 导轨安装

系列型号 DVR-G-100-1X3数字式过压继电器&#xff1b; DVR-G-100-3三相过压继电器&#xff1b; DVR(H)-G-100-1单相过压继电器&#xff1b; DVR-Q-100-3三相欠压继电器&#xff1b; DVR(H)-Q-100-3三相欠压继电器 一、用途 主要应用于电机、变压器等主设备以及输配电系统的继…