kafka+spring cloud stream 发送接收消息

方案 1:使用旧版 @StreamListener(适用于 Spring Cloud Stream <= 2.x)

1. 添加依赖(pom.xml

<!-- Spring Cloud Stream + Kafka Binder -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2. 定义消息通道接口

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MyChannels {
    String INPUT = "myInput";

    @Input(INPUT)
    SubscribableChannel input();
}

3. 使用 @StreamListener 监听

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(MyChannels.class) // 绑定消息通道
public class KafkaStreamListener {

    @StreamListener(MyChannels.INPUT)
    public void handleMessage(String message) {
        System.out.println("Received via @StreamListener: " + message);
    }
}

4. 配置 application.properties

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定输入通道到 Kafka Topic
spring.cloud.stream.bindings.myInput.destination=my-topic
spring.cloud.stream.bindings.myInput.group=my-group
spring.cloud.stream.bindings.myInput.content-type=text/plain

方案 2:新版函数式编程模型(推荐,Spring Cloud Stream >= 3.x)

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

@Component
public class KafkaStreamListener {

    @Bean
    public Consumer<String> myInput() {
        return message -> {
            System.out.println("Received via Function: " + message);
        };
    }
}

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定函数到 Kafka Topic
spring.cloud.stream.bindings.myInput-in-0.destination=my-topic
spring.cloud.stream.bindings.myInput-in-0.group=my-group
spring.cloud.stream.bindings.myInput-in-0.content-type=text/plain

生产者代码示例(发送消息)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    @Autowired
    private StreamBridge streamBridge;

    public void sendMessage(String topic, String message) {
        streamBridge.send(topic, message);
    }
}

测试步骤

  1. 启动 Kafka:确保 Kafka 和 Zookeeper 服务运行。

  2. 创建 Topic

    kafka-topics --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

  3. 发送消息

    kafkaProducer.sendMessage("my-topic", "Hello Kafka Stream!");

  4. 查看消费者日志

    Received via @StreamListener: Hello Kafka Stream! // 或 Received via Function: Hello Kafka Stream!


常见问题

  1. 版本兼容性

    • Spring Cloud Stream 3.x 后需使用函数式编程。

    • 检查 Spring Boot 版本与 Spring Cloud Stream 的匹配关系(如 Spring Boot 2.6.x + Spring Cloud 2021.x)。

  2. 绑定配置

    • 函数式模型中,绑定名称格式为 <functionName>-in-<index>(如 myInput-in-0)。

  3. 序列化配置

    • 若传递 JSON 对象,需配置 content-type=application/json 并添加 Jackson 依赖。


总结

  • 旧版:使用 @StreamListener + 通道接口(适合遗留代码升级)。

  • 新版:推荐函数式编程模型(更简洁,符合现代 Spring 设计)。

  • 根据实际 Spring Cloud Stream 版本选择方案!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/22533.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVAweb-标签选择器,盒模型,定位,浮动

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>标签</title><style type"text/css&q…

计算机视觉:主流数据集整理

第一章&#xff1a;计算机视觉中图像的基础认知 第二章&#xff1a;计算机视觉&#xff1a;卷积神经网络(CNN)基本概念(一) 第三章&#xff1a;计算机视觉&#xff1a;卷积神经网络(CNN)基本概念(二) 第四章&#xff1a;搭建一个经典的LeNet5神经网络(附代码) 第五章&#xff1…

二级公共基础之数据结构与算法篇(五)树和二叉树

目录 前言 一、树的基本概念 1.父结点和根节点 2.子节点和叶子节点 3.度和深度 4.子树 二、二叉树及其基本性质 1. 二叉树的定义 2. 二叉树的基本性质 性质1 性质2 性质3 性质4 性质5 性质6 三、二叉树的存储结构 四、二叉树的遍历 1.遍历二叉树的概念 1. 前…

自制操作系统学习第七天

今天要做什么&#xff1f; 实现HLT&#xff0c;不让计算机处于HALT&#xff08;HLT&#xff09;.用C语言实现内存写入&#xff08;错误&#xff0c;需要分析&#xff09; 一:使用HLT&#xff0c;让计算机处于睡眠状态 写了下面这个程序&#xff0c;naskfunc.nas 函数名叫io_h…

Python Django系列—入门实例(二)

数据库配置 现在&#xff0c;打开 mysite/settings.py 。这是个包含了 Django 项目设置的 Python 模块。 默认情况下&#xff0c;​ DATABASES 配置使用 SQLite。如果你是数据库新手&#xff0c;或者只是想尝试 Django&#xff0c;这是最简单的选择。SQLite 包含在 Python 中…

DeepSeek接入Siri(已升级支持苹果手表)完整版硅基流动DeepSeek-R1部署

DeepSeek接入Siri&#xff08;已升级支持苹果手表&#xff09;完整版硅基流动DeepSeek-R1部署 **DeepSeek** 是一款专注于深度学习和人工智能的工具或平台&#xff0c;通常与人工智能、机器学习、自动化分析等领域有关。它的主要功能可能包括&#xff1a;深度学习模型搜索&…

抗辐照加固CAN FD芯片的商业航天与车规级应用解析

在工业自动化、智能汽车、航空航天及国防装备等关键领域&#xff0c;数据传输的安全性、可靠性与极端环境适应能力是技术升级的核心挑战。国科安芯推出全新一代CANFD&#xff08;Controller Area Network Flexible Data Rate&#xff09;芯片&#xff0c;以高安全、高可靠、断电…

Java数据结构第十二期:走进二叉树的奇妙世界(一)

专栏&#xff1a;数据结构(Java版) 个人主页&#xff1a;手握风云 目录 一、树型结构 1.1. 树的定义 1.2. 树的基本概念 1.3. 树的表示形式 二、二叉树 2.1. 概念 2.2. 两种特殊的二叉树 2.3. 二叉树的性质 2.4. 二叉树的存储 三、二叉树的基本操作 一、树型结构 1.…

nginx 反向代理 配置请求路由

nginx | 反向代理 | 配置请求路由 nginx简介 Nginx&#xff08;发音为“Engine-X”&#xff09;是一款高性能、开源的 Web 服务器和反向代理服务器&#xff0c;同时也支持邮件代理和负载均衡等功能。它由俄罗斯程序员伊戈尔西索夫&#xff08;Igor Sysoev&#xff09;于 2004…

ath9k(Atheros芯片)开源驱动之wifi连接

为什么会推荐这个wifi 驱动进行学习&#xff1f; ath9k&#xff08;Atheros芯片&#xff09;&#xff1a;代码结构清晰&#xff0c;适合学习实践 为什么我只在开篇写了一个wifi连接的操作&#xff1f; 先让一个开源驱动在你的硬件上跑起来&#xff0c;再逐步修改&#xff0c…

LLaMA-Factory|微调大语言模型初探索(4),64G显存微调13b模型

上篇文章记录了使用lora微调deepseek-7b&#xff0c;微调成功&#xff0c;但是微调llama3-8b显存爆炸&#xff0c;这次尝试使用qlora微调HQQ方式量化&#xff0c;微调更大参数体量的大语言模型&#xff0c;记录下来微调过程&#xff0c;仅供参考。 对过程不感兴趣的兄弟们可以直…

知识管理平台如何实现高效数据整合?

内容概要 现代知识管理平台通过架构化的四库体系&#xff08;资源库、规则库、模型库、知识库&#xff09;驱动数据智能整合进程。核心机制依托智能数据工具集对异构数据进行自动化清洗与语义标注&#xff0c;其跨源数据汇聚能力支持超过200种结构化与非结构化数据源的接入&am…

近10年气象分析(深度学习)

这是一个气象数据分析程序&#xff0c;主要用于分析和可视化气象数据。以下是该文件的主要功能&#xff1a; 1. 数据加载 在线数据&#xff1a;尝试从 GitHub 加载气象数据。 示例数据&#xff1a;如果无法加载在线数据&#xff0c;程序会自动生成示例数据。 2. 数据分析 …

DeepSeek最新开源动态:核心技术公布

2月21日午间&#xff0c;DeepSeek在社交平台X发文称&#xff0c;从下周开始&#xff0c;他们将开源5个代码库&#xff0c;以完全透明的方式与全球开发者社区分享他们的研究进展。并将这一计划定义为“Open Source Week”。 DeepSeek表示&#xff0c;即将开源的代码库是他们在线…

wps中zotero插件消失,解决每次都需要重新开问题

参考 查看zotero目录 D:\zotero\integration\word-for-windows 加载项点击 dotm即可 长期解决 把dom 复制到 C:\Users\89735\AppData\Roaming\kingsoft\office6\templates\wps\zh_CN还是每次都需要重新开的话 重新加载一下

洛谷B3629

B3629 吃冰棍 - 洛谷 代码区&#xff1a; #include<algorithm> #include<iostream>using namespace std; int main(){int n,ans;cin >> n;for(int in/2;i<n;i){int ti;ans0;while(t>3){t-3;ans3;t;}if(anst>n){cout << i;return 0;}}return…

VMware安装Centos 9虚拟机+设置共享文件夹+远程登录

一、安装背景 工作需要安装一台CentOS-Stream-9的机器环境&#xff0c;所以一开始的安装准备工作有&#xff1a; vmware版本&#xff1a;VMware Workstation 16 镜像版本&#xff1a;CentOS-Stream-9-latest-x86_64-dvd1.iso &#xff08;kernel-5.14.0&#xff09; …

[ProtoBuf] 介绍 | 保姆级win/linux安装教程

目录 一、序列化概念 二、ProtoBuf 是什么 三、ProtoBuf 的使用特点 ProtoBuf 在不同操作系统下的安装 一、ProtoBuf 在 Windows 下的安装 二、ProtoBuf 在 Linux 下的安装 三、检查是否安装成功 安装教程 可以直接目录跳转到后面 笔记参考&#xff1a;官方文档 一、序…

element ui的select选择框

我们首先先试一下&#xff0c;这个东西怎么玩的 <el-select v-model"select" change"changeSelect"><el-option value"香蕉"></el-option><el-option value"菠萝"></el-option><el-option value&quo…

51单片机学习之旅——定时器

打开软件 1与其它等于其它&#xff0c;0与其它等于0 1或其它等于1&#xff0c;0或其它等于其它 TMODTMOD&0xF0;//0xF01111 0000进行与操作&#xff0c;高四位保持&#xff0c;低四位清零&#xff0c;高四位定时器1&#xff0c;低四位定时器0 TMODTMOD|0x01;//0x010000 0…