💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
-
推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~
-
专栏导航
- Python系列: Python面试题合集,剑指大厂
- Git系列: Git操作技巧
- GO系列: 记录博主学习GO语言的笔记,该笔记专栏尽量写的试用所有入门GO语言的初学者
- 数据库系列: 详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等
- 运维系列: 总结好用的命令,高效开发
- 算法与数据结构系列: 总结数据结构和算法,不同类型针对性训练,提升编程思维
非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨
💖The Start💖点点关注,收藏不迷路💖📒文章目录
- 一、前言
- 二、Kafka Producer 的基本配置
- 三、实现 Kafka Producer 的单例模式
- 四、使用 Kafka Producer 发送消息
- 五、总结
一、前言
在分布式系统中,Apache Kafka 是一个非常受欢迎的消息中间件。它提供了高吞吐量、低延迟的消息传递机制,非常适合处理实时数据流。本文将介绍如何在 Java 中使用 Kafka Producer 并实现单例模式,以确保资源的有效管理。
Kafka 是一个分布式流处理平台,它的核心功能包括发布和订阅记录流、存储流记录、以及处理流记录。为了充分利用 Kafka 的功能,一个高效的 Kafka 生产者(Producer)是必要的。在生产环境中,使用单例模式可以确保 Kafka Producer 资源的唯一性和线程安全性。
二、Kafka Producer 的基本配置
在开始之前,我们需要引入一些必要的依赖。假设我们使用 Maven 项目,pom.xml
文件中需要添加以下依赖:
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.30</version></dependency>
</dependencies>
三、实现 Kafka Producer 的单例模式
下面是完整的 QuoteKafkaProducer
类,包含了 Kafka Producer 的配置、消息发送和关闭方法。
package com.stormsha.util;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Properties;public class QuoteKafkaProducer {private static final Logger logger = LoggerFactory.getLogger(QuoteKafkaProducer.class);private static String bootstrapServers; // Kafka 服务器的地址private static KafkaProducer<String, String> producer; // Kafka Producer 实例private static final Object lock = new Object(); // 锁对象,用于线程安全的单例模式/*** 获取 Kafka Producer 单例实例** @param servers Kafka 服务器地址* @return KafkaProducer 实例*/public static KafkaProducer<String, String> getInstance(String servers) {if (isLocalEnvironment()) { // 本地启动不需要实例化kafkareturn producer;}if (producer == null) { // 双重检查锁定机制,确保单例实例的唯一性和线程安全synchronized (lock) {if (producer == null) {bootstrapServers = servers; // 设置 Kafka 服务器地址// 配置 Kafka Producer 属性Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // 设置 Kafka 服务器地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置键的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置值的序列化器// 创建 Kafka Producer 实例producer = new KafkaProducer<>(props);// JVM 关闭时,确保 Kafka producer 被关闭Runtime.getRuntime().addShutdownHook(new Thread(() -> {logger.info("关闭 Kafka Producer");close();}));}}}return producer;}/*** 发送 Kafka 消息** @param topic 目标主题* @param key 消息键* @param value 消息值*/public static void sendMessage(String topic, String key, String value) {if (isLocalEnvironment()) { // 本地启动不需要实例化kafkareturn;}// 获取 Kafka Producer 单例实例KafkaProducer<String, String> producer = QuoteKafkaProducer.getInstance(bootstrapServers);// 创建一条消息,包含topic、key 和 valueProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);// 异步发送消息producer.send(record, (metadata, exception) -> {if (exception != null) {// 发送失败的处理logger.error("消息发送失败", exception);} else {// 发送成功的处理logger.info("消息发送成功: 主题: {}, 分区: {}, 偏移量: {}", metadata.topic(), metadata.partition(), metadata.offset());}});}/*** 关闭 Kafka Producer 实例*/public static void close() {if (producer != null) {synchronized (lock) {if (producer != null) {producer.close(); // 关闭 Kafka Producer 实例producer = null; // 清空 Kafka Producer 实例}}}}/*** 判断当前程序是否在本地环境启动** @return true 如果在本地环境启动,否则返回 false*/private static boolean isLocalEnvironment() {// 获取环境变量String env = System.getenv().getOrDefault("ENV", "dev");// 返回是否为本地环境标志return "local".equals(env);}
}
四、使用 Kafka Producer 发送消息
以下是如何使用 QuoteKafkaProducer
类发送 Kafka 消息的示例:
// 使用示例
String servers = "127.0.0.1:9092,127.0.0.1:9092";
QuoteKafkaProducer.getInstance(servers);
QuoteKafkaProducer.sendMessage("topicId", "dataKey", "发送的内容");
五、总结
通过实现 Kafka Producer 的单例模式,我们可以确保 Kafka Producer 在整个应用程序中是唯一的,并且在多线程环境下是安全的。同时,通过引入日志记录,我们可以更好地监控消息的发送状态和处理潜在的异常。这种模式不仅提高了资源的利用效率,还简化了资源管理,特别是在处理高并发和大规模数据流的应用中。
希望本文对你在实际项目中使用 Kafka Producer 提供一些帮助。如果有任何问题或建议,欢迎在评论区讨论。
🔥🔥🔥道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙
💖The End💖点点关注,收藏不迷路💖 |