在 Java 中实现 Kafka Producer 的单例模式


在这里插入图片描述
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
在这里插入图片描述

  • 推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~

  • 专栏导航

    • Python系列: Python面试题合集,剑指大厂
    • Git系列: Git操作技巧
    • GO系列: 记录博主学习GO语言的笔记,该笔记专栏尽量写的试用所有入门GO语言的初学者
    • 数据库系列: 详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等
    • 运维系列: 总结好用的命令,高效开发
    • 算法与数据结构系列: 总结数据结构和算法,不同类型针对性训练,提升编程思维

    非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨

    💖The Start💖点点关注,收藏不迷路💖

    📒文章目录

      • 一、前言
      • 二、Kafka Producer 的基本配置
      • 三、实现 Kafka Producer 的单例模式
      • 四、使用 Kafka Producer 发送消息
      • 五、总结


一、前言

在分布式系统中,Apache Kafka 是一个非常受欢迎的消息中间件。它提供了高吞吐量、低延迟的消息传递机制,非常适合处理实时数据流。本文将介绍如何在 Java 中使用 Kafka Producer 并实现单例模式,以确保资源的有效管理。

Kafka 是一个分布式流处理平台,它的核心功能包括发布和订阅记录流、存储流记录、以及处理流记录。为了充分利用 Kafka 的功能,一个高效的 Kafka 生产者(Producer)是必要的。在生产环境中,使用单例模式可以确保 Kafka Producer 资源的唯一性和线程安全性。

二、Kafka Producer 的基本配置

在开始之前,我们需要引入一些必要的依赖。假设我们使用 Maven 项目,pom.xml 文件中需要添加以下依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.30</version></dependency>
</dependencies>

三、实现 Kafka Producer 的单例模式

下面是完整的 QuoteKafkaProducer 类,包含了 Kafka Producer 的配置、消息发送和关闭方法。

package com.stormsha.util;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Properties;public class QuoteKafkaProducer {private static final Logger logger = LoggerFactory.getLogger(QuoteKafkaProducer.class);private static String bootstrapServers;  // Kafka 服务器的地址private static KafkaProducer<String, String> producer;  // Kafka Producer 实例private static final Object lock = new Object();  // 锁对象,用于线程安全的单例模式/*** 获取 Kafka Producer 单例实例** @param servers Kafka 服务器地址* @return KafkaProducer 实例*/public static KafkaProducer<String, String> getInstance(String servers) {if (isLocalEnvironment()) {  // 本地启动不需要实例化kafkareturn producer;}if (producer == null) {  // 双重检查锁定机制,确保单例实例的唯一性和线程安全synchronized (lock) {if (producer == null) {bootstrapServers = servers;  // 设置 Kafka 服务器地址// 配置 Kafka Producer 属性Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  // 设置 Kafka 服务器地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  // 设置键的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  // 设置值的序列化器// 创建 Kafka Producer 实例producer = new KafkaProducer<>(props);// JVM 关闭时,确保 Kafka producer 被关闭Runtime.getRuntime().addShutdownHook(new Thread(() -> {logger.info("关闭 Kafka Producer");close();}));}}}return producer;}/*** 发送 Kafka 消息** @param topic 目标主题* @param key   消息键* @param value 消息值*/public static void sendMessage(String topic, String key, String value) {if (isLocalEnvironment()) {  // 本地启动不需要实例化kafkareturn;}// 获取 Kafka Producer 单例实例KafkaProducer<String, String> producer = QuoteKafkaProducer.getInstance(bootstrapServers);// 创建一条消息,包含topic、key 和 valueProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);// 异步发送消息producer.send(record, (metadata, exception) -> {if (exception != null) {// 发送失败的处理logger.error("消息发送失败", exception);} else {// 发送成功的处理logger.info("消息发送成功: 主题: {}, 分区: {}, 偏移量: {}", metadata.topic(), metadata.partition(), metadata.offset());}});}/*** 关闭 Kafka Producer 实例*/public static void close() {if (producer != null) {synchronized (lock) {if (producer != null) {producer.close();  // 关闭 Kafka Producer 实例producer = null;  // 清空 Kafka Producer 实例}}}}/*** 判断当前程序是否在本地环境启动** @return true 如果在本地环境启动,否则返回 false*/private static boolean isLocalEnvironment() {// 获取环境变量String env = System.getenv().getOrDefault("ENV", "dev");// 返回是否为本地环境标志return "local".equals(env);}
}

四、使用 Kafka Producer 发送消息

以下是如何使用 QuoteKafkaProducer 类发送 Kafka 消息的示例:

// 使用示例
String servers = "127.0.0.1:9092,127.0.0.1:9092";
QuoteKafkaProducer.getInstance(servers);
QuoteKafkaProducer.sendMessage("topicId", "dataKey", "发送的内容");

五、总结

通过实现 Kafka Producer 的单例模式,我们可以确保 Kafka Producer 在整个应用程序中是唯一的,并且在多线程环境下是安全的。同时,通过引入日志记录,我们可以更好地监控消息的发送状态和处理潜在的异常。这种模式不仅提高了资源的利用效率,还简化了资源管理,特别是在处理高并发和大规模数据流的应用中。

希望本文对你在实际项目中使用 Kafka Producer 提供一些帮助。如果有任何问题或建议,欢迎在评论区讨论。


🔥🔥🔥道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙

💖The End💖点点关注,收藏不迷路💖

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/424238.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

怎么让手机ip地址变化?介绍几种实用方法

随着网络技术的发展&#xff0c;IP地址作为网络设备的唯一标识&#xff0c;其变动对于保护个人隐私、规避网络限制等方面具有重要意义。本文将介绍几种实用的方法&#xff0c;帮助用户实现手机IP地址的变化&#xff0c;并提醒注意事项。 一、连接不同的WiFi网络‌ 连接不同的W…

【初阶数据结构】详解树和二叉树(一) - 预备知识(我真的很想进步)

文章目录 前言1. 树1.1 树的概念1.2 树的相关概念1.3 树的表示1.4 树在实际中的运用 2. 二叉树2.1 二叉树的概念2.2 现实中的二叉树2.3 特殊的二叉树2.4 二叉树的性质2.5 二叉树概念和性质的一些习题 前言 初阶数据结构篇马上要迎来了一个新的成员&#xff0c;那就是"二叉…

Linux基础---07文件传输(网络和Win文件)

Linux文件传输地图如下&#xff0c;先选取你所需的场景&#xff0c;若你是需要Linux和Linux之间传输文件就查看SCP工具即可。 一.下载网站文件 前提是有网&#xff1a; 检查网络是否畅通命令&#xff1a;ping www.baidu.com&#xff0c;若有持续的返回值就说明网络畅通。Ctr…

stm32 W25Q数据存储

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、cubemx配置二、keil中文件修改与配置三、几个重要函数的说明四、DMA方式传输&#xff08;待写&#xff09;总结 前言 W25Q128 容量为128位 128/8 16 也就…

0x07 Nginx越界读取缓存漏洞 CVE-2017-7529 复现

参考&#xff1a; Nginx越界读取缓存漏洞 CVE-2017-7529 | PeiQi文库 (wgpsec.org)Nginx越界读取缓存漏洞&#xff08;CVE-2017-7529&#xff09;复现分析 - qweg_focus - 博客园 (cnblogs.com) 一、漏洞描述&#xff1a; 在Nginx的反向代理配置中&#xff0c;静态文件缓存包…

国产化中间件正在侵蚀开源中间件

开源中间件的发展趋势表明&#xff0c;它们将继续在技术创新和生态建设中发挥重要作用&#xff0c;尤其是在云计算、大数据等新兴技术领域。开源中间件如Apache Kafka、RabbitMQ、ActiveMQ和RocketMQ等在市场上有着广泛的应用。它们在技术社区中得到了良好的支持&#xff0c;并…

SpringBoot 处理 @KafkaListener 消息

消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息&#xff0c;并将这些消息按指定格式转换后交给由KafkaListener注解的方法处理&#xff0c;相当于一个消费者&#xff1b; 看看其整体代码结构&#xff1a; 可以发现其入口方法为doStart(),…

Linux 访问控制列表(Access Control List)

在Linux中&#xff0c;目录或文件的权限是针对的所有者(owner)&#xff0c;所属组(group)&#xff0c;其他人(others)这3种类别来设置的。这种根据类别控制权限的方法无法精确控制每个用户的行为。为了解决这个问题&#xff0c;Linux引入了访问控制列表&#xff08;Access Cont…

Navicat 17 新特性 | 聚焦 MongoDB

随着 Navicat 17 的盛大发布&#xff0c;其一系列创新特性赢得了广大用户的热烈反响。它不仅在模型设计上实现了突破性优化&#xff0c;提升了查询与配置的效率&#xff0c;还大幅优化了用户界面的交互体验&#xff0c;原生支持国产平台与操作系统&#xff0c;同时增强 BI 能力…

JAVA毕业设计170—基于Java+Springboot+vue3+小程序的房屋租赁小程序系统(源代码+数据库)

毕设所有选题&#xff1a; https://blog.csdn.net/2303_76227485/article/details/131104075 基于JavaSpringbootvue3小程序的房屋租赁小程序系统(源代码数据库)170 一、系统介绍 本项目前后端分离(可以改为ssm版本)&#xff0c;分为用户、房东、管理员三种角色 1、用户&am…

Qt+FFmpeg开发视频播放器笔记(三):音视频流解析封装

音频解析 音频解码是指将压缩的音频数据转换为可以再生的PCM(脉冲编码调制)数据的过程。 FFmpeg音频解码的基本步骤如下: 初始化FFmpeg解码器(4.0版本后可省略): 调用av_register_all()初始化编解码器。 调用avcodec_register_all()注册所有编解码器。 打开输入的音频流:…

Idea springboot项目热部署

使用 spring-boot-devtools spring-boot-devtools 是 Spring Boot 提供的开发工具模块&#xff0c;它可以自动检测到代码的变化并重启应用&#xff0c;实现热部署。 配置步骤&#xff1a; 添加依赖&#xff1a; 在项目的 pom.xml 中加入 spring-boot-devtools 依赖&#xff1…

Redis(主从复制、哨兵模式、集群)概述及部署测试

目录 一、Redis 主从复制 1.1、Redis 主从复制概念 1.2、主从复制的作用 1.3、主从复制流程 1.4、搭建Redis 主从复制 二、Redis 哨兵模式 2.1、Redis 哨兵模式概念 2.2、哨兵模式原理 2.3、哨兵模式的作用 2.4、哨兵模式的结构 2.5、故障转移机制 2.6、主节点的选…

Node.js 多版本安装与切换指南

一.使用nvm的方法 1. 卸载nodejs 如果你的电脑有安装nodejs&#xff0c;需要先卸载掉&#xff1b;若没有请直接下一步。 2. 前往官网下载nvm nvm&#xff1a;一个nodejs版本管理工具&#xff01; 官网地址&#xff1a;nvm文档手册 - nvm是一个nodejs版本管理工具 - nvm中文…

MySQL详解:数据类型、约束

MySQL 1. 数据类型1.1 数值类型1.1.1 bit 位类型1.1.2 整数数据类型1.1.3 小数类型floatdecimal 1.2 字符类型1.2.1 char1.2.2 varchar 可变长字符串1.2.3 日期和时间类型datedatetimetimestamp 1.2.4 enum1.2.5 set集合查询函数 find_in_set 2. 表的约束2.1 NULL 空属性2.2 默…

基于鸿蒙API10的RTSP播放器(七:亮度调节功能测试)

目标&#xff1a; 当我的手指在设备左方进行上下移动的时候&#xff0c;可以进行屏幕亮度的调节&#xff0c;在调节的同时&#xff0c;有实时的调节进度条显示 步骤&#xff1a; 界面逻辑&#xff1a;使用Stack() 组件&#xff0c;完成音量图标和进度条的组合显示&#xff0c…

Linux echo,printf 命令

参考资料 【Linux】ハイフンをいっぱい出したかっただけなのに【printfコマンド】 目录 一. echo命令1.1 -n 选项1.2 -e 选项1.3 配合扩展实现批量换行输出1.3.1 xargs -n 11.3.2 tr \n1.3.3 xargs printf "%s\n"1.4 ANSI转义序列1.5 彩色文本输出 二. printf 命令…

C# System.BadImageFormatException问题及解决

C# System.BadImageFormatException问题 出现System.BadImageFormatException 异常有两种情况&#xff1a;程序目标平台不一致&引用dll文件的系统平台不一致。 异常参考 BadImageFormatException 程序目标平台不一致&#xff1a; 项目>属性>生成&#xff1a;x86 …

【吊打面试官系列-Redis面试题】使用过 Redis 做异步队列么,你是怎么用的?

大家好&#xff0c;我是锋哥。今天分享关于【使用过 Redis 做异步队列么&#xff0c;你是怎么用的&#xff1f;】面试题&#xff0c;希望对大家有帮助&#xff1b; 使用过 Redis 做异步队列么&#xff0c;你是怎么用的&#xff1f; 一般使用 list 结构作为队列&#xff0c;rpus…

【Redis】redis5种数据类型(list)

目录 基本介绍 命令 LPUSH LPUSHX RPUSH RPUSHX LRANGE LPOP RPOP LINDEX LINSERT LLEN LREM LTRIM LSET 阻塞版本的命令 BLPOP 内部编码 基本介绍 list相当于c的双端队列deque 区分获取和删除的区别 lindex能获取到元素的值lrem也能返回被删除元素的值 命…