【kafka实战】06 kafkaTemplate java代码使用示例

在 Spring Boot 中使用 KafkaTemplate 可以方便地向 Kafka 发送消息。下面为你详细介绍使用步骤和示例代码。

1. 创建 Spring Boot 项目

你可以使用 Spring Initializr(https://start.spring.io/ )来创建一个新的 Spring Boot 项目,添加以下依赖:

  • Spring for Apache Kafka

或者在 pom.xml 中手动添加依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>

2. 配置 Kafka

application.propertiesapplication.yml 中配置 Kafka 的相关信息,示例如下:

application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
application.yml
spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

3. 创建消息生产者服务

创建一个服务类,使用 KafkaTemplate 发送消息。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private static final String TOPIC_NAME = "test_topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(TOPIC_NAME, message);System.out.println("Message sent: " + message);}public void sendMessageWithKey(String key, String message) {kafkaTemplate.send(TOPIC_NAME, key, message);System.out.println("Message sent with key " + key + ": " + message);}
}

4. 创建控制器(可选)

如果你想通过 RESTful API 触发消息发送,可以创建一个控制器。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/kafka")
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducerService;@GetMapping("/send/{message}")public String sendMessage(@PathVariable String message) {kafkaProducerService.sendMessage(message);return "Message sent successfully";}@GetMapping("/send/{key}/{message}")public String sendMessageWithKey(@PathVariable String key, @PathVariable String message) {kafkaProducerService.sendMessageWithKey(key, message);return "Message with key sent successfully";}
}

5. 异步发送消息并处理结果

KafkaTemplatesend 方法返回一个 ListenableFuture,可以用来异步处理消息发送的结果。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;@Service
public class AsyncKafkaProducerService {private static final String TOPIC_NAME = "test_topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendAsyncMessage(String message) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, message);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {System.out.println("Message sent successfully: " + message + ", Offset: " + result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {System.err.println("Failed to send message: " + message + ", Error: " + ex.getMessage());}});}
}

6. 运行 Spring Boot 应用

启动 Spring Boot 应用程序,你可以通过以下方式测试消息发送:

  • 如果使用了控制器,可以通过访问 http://localhost:8080/kafka/send/your_messagehttp://localhost:8080/kafka/send/your_key/your_message 来发送消息。
  • 也可以在代码中直接调用 KafkaProducerServiceAsyncKafkaProducerService 的方法来发送消息。

7. 注意事项

  • 确保你的 Kafka 服务正在运行,并且 bootstrap-servers 配置正确。
  • 对于不同的数据类型,你可能需要调整 key-serializervalue-serializer 的配置。例如,如果要发送 JSON 数据,可以使用 org.springframework.kafka.support.serializer.JsonSerializer

通过以上步骤,你可以在 Spring Boot 项目中使用 KafkaTemplate 方便地向 Kafka 发送消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/15303.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Shapefile格式文件解析和显示

Java实现GIS SHP文件格式的解析和显示&#xff0c;JDK19下编译&#xff0c;awt图形系统显示。 SHP文件对应的属性存储在DBF格式数据库中&#xff0c;解析见&#xff1a;DBASE DBF数据库文件解析_数据库文件在线解析-CSDN博客 解析SHP文件代码&#xff1a; public static Shap…

Golang 并发机制-7:sync.Once实战应用指南

Go的并发模型是其突出的特性之一&#xff0c;但强大的功能也带来了巨大的责任。sync.Once是由Go的sync包提供的同步原语。它的目的是确保一段代码只执行一次&#xff0c;而不管有多少协程试图执行它。这听起来可能很简单&#xff0c;但它改变了并发环境中管理一次性操作的规则。…

【DeepSeek × Postman】请求回复

新建一个集合 在 Postman 中创建一个测试集合 DeepSeek API Test&#xff0c;并创建一个关联的测试环境 DeepSeek API Env&#xff0c;同时定义两个变量 base_url 和 api_key 的步骤如下&#xff1a; 1. 创建测试集合 DeepSeek API Test 打开 Postman。点击左侧导航栏中的 Co…

如何通过PHP接入DeepSeek的API

想知道如何通过PHP接入DeepSeek的API。看起来他对之前的Python步骤比较熟悉&#xff0c;但这次想用PHP实现。 首先&#xff0c;我需要回顾一下DeepSeek API的文档&#xff0c;确认它支持哪些方法和参数。假设用户已经配置了环境变量&#xff0c;比如API密钥&#xff0c;接下来…

网络工程师 (26)TCP/IP体系结构

一、层次 四层&#xff1a; 网络接口层&#xff1a;TCP/IP协议的最底层&#xff0c;负责网络层与硬件设备间的联系。该层协议非常多&#xff0c;包括逻辑链路和媒体访问控制&#xff0c;负责与物理传输的连接媒介打交道&#xff0c;主要功能是接收数据报&#xff0c;并把接收到…

每日Attention学习22——Inverted Residual RWKV

模块出处 [arXiv 25] [link] [code] RWKV-UNet: Improving UNet with Long-Range Cooperation for Effective Medical Image Segmentation 模块名称 Inverted Residual RWKV (IR-RWKV) 模块作用 用于vision的RWKV结构 模块结构 模块代码 注&#xff1a;cpp扩展请参考作者原…

vscode预览插件

在左侧列表拓展里搜索 Live Preview 安装&#xff0c;然后在html页面点击右键找到show Preview 结果如下图 然后就可以进行代码开发并实时预览了

【04】Java+若依+vue.js技术栈实现钱包积分管理系统项目-若依框架二次开发准备工作-以及建立初步后端目录菜单列-优雅草卓伊凡商业项目实战

【04】Java若依vue.js技术栈实现钱包积分管理系统项目-若依框架二次开发准备工作-以及建立初步后端目录菜单列-优雅草卓伊凡商业项目实战 项目背景 本项目经费43000元&#xff0c;需求文档如下&#xff0c;工期25天&#xff0c;目前已经过了8天&#xff0c;时间不多了&#x…

【DeepSeek】DeepSeek概述 | 本地部署deepseek

目录 1 -> 概述 1.1 -> 技术特点 1.2 -> 模型发布 1.3 -> 应用领域 1.4 -> 优势与影响 2 -> 本地部署 2.1 -> 安装ollama 2.2 -> 部署deepseek-r1模型 1 -> 概述 DeepSeek是由中国的深度求索公司开发的一系列人工智能模型&#xff0c;以其…

Windows下AMD显卡在本地运行大语言模型(deepseek-r1)

Windows下AMD显卡在本地运行大语言模型 本人电脑配置第一步先在官网确认自己的 AMD 显卡是否支持 ROCm下载Ollama安装程序模型下载位置更改下载 ROCmLibs先确认自己显卡的gfx型号下载解压 替换替换rocblas.dll替换library文件夹下的所有 重启Ollama下载模型运行效果 本人电脑配…

使用Pytorch训练一个图像分类器

一、准备数据集 一般来说&#xff0c;当你不得不与图像、文本或者视频资料打交道时&#xff0c;会选择使用python的标准库将原始数据加载转化成numpy数组&#xff0c;甚至可以继续转换成torch.*Tensor。 对图片而言&#xff0c;可以使用Pillow库和OpenCV库对视频而言&#xf…

DeepSeek之Api的使用(将DeepSeek的api集成到程序中)

一、DeepSeek API 的收费模式 前言&#xff1a;使用DeepSeek的api是收费的 免费版&#xff1a; 可能提供有限的免费额度&#xff08;如每月一定次数的 API 调用&#xff09;&#xff0c;适合个人开发者或小规模项目。 付费版&#xff1a; 超出免费额度后&#xff0c;可能需要按…

git fetch和git pull 的区别

git pull 实际上就是 fetch merge 的缩写, git pull 唯一关注的是提交最终合并到哪里&#xff08;也就是为 git fetch 所提供的 destination 参数&#xff09; git fetch 从远程仓库下载本地仓库中缺失的提交记录,并更新远程分支指针 git pull抓取更新再合并到本地分支,相当于…

信息科技伦理与道德3-2:智能决策

2.2 智能推荐 推荐算法介绍 推荐系统&#xff1a;猜你喜欢 https://blog.csdn.net/search_129_hr/article/details/120468187 推荐系统–矩阵分解 https://blog.csdn.net/search_129_hr/article/details/121598087 案例一&#xff1a;YouTube推荐算法向儿童推荐不适宜视频 …

[LVGL] 在VC_MFC中移植LVGL

前言&#xff1a; 0. 在MFC中开发LVGL的优点是可以用多个Window界面做辅助扩展【类似GUIguider】 1.本文基于VC2022-MFC单文档框架移植lvgl8 2. gitee上下载lvgl8.3 源码&#xff0c;并将其文件夹改名为lvgl lvgl: LVGL 是一个开源图形库&#xff0c;提供您创建具有易于使用…

[RabbitMQ] RabbitMQ常见面试题

&#x1f338;个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 &#x1f3f5;️热门专栏: &#x1f9ca; Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm1001.2014.3001.5482 &#x1f355; Collection与…

《qt easy3d中添加孔洞填充》

《qt easy3d中添加孔洞填充》 效果展示一、创建流程二、核心代码效果展示 参考链接Easy3D开发——点云孔洞填充 一、创建流程 创建动作,并转到槽函数,并将动作放置菜单栏,可以参考前文 其中,槽函数on_actionHoleFill_triggered实现如下:

Git(分布式版本控制系统)系统学习笔记【并利用腾讯云的CODING和Windows上的Git工具来实操】

Git的概要介绍 1️⃣ Git 是什么&#xff1f; Git 是一个 分布式版本控制系统&#xff08;DVCS&#xff09;&#xff0c;用于跟踪代码的变更、协作开发和管理项目历史。 由 Linus Torvalds&#xff08;Linux 之父&#xff09;在 2005 年开发&#xff0c;主要用于 代码管理。…

基于SpringBoot的校园社交平台

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

R语言LCMM多维度潜在类别模型流行病学研究:LCA、MM方法分析纵向数据

全文代码数据&#xff1a;https://tecdat.cn/?p39710 在数据分析领域&#xff0c;当我们面对一组数据时&#xff0c;通常会有已知的分组情况&#xff0c;比如不同的治疗组、性别组或种族组等&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 然而&#xff0c;…