SpringBoot集成kafka-生产者发送消息

springboot集成kafka发送消息

  • 1、kafkaTemplate.send()方法
    • 1.1、springboot集成kafka发送消息Message对象消息
    • 1.2、springboot集成kafka发送ProducerRecord对象消息
    • 1.3、springboot集成kafka发送指定分区消息
  • 2、kafkaTemplate.sendDefault()方法
  • 3、kafkaTemplate.send(...)和kafkaTemplate.sendDefault(...)的区别
  • 4、发送对象消息

在这里插入图片描述在这里插入图片描述

1、kafkaTemplate.send()方法

1.1、springboot集成kafka发送消息Message对象消息

生产者代码

package com.power.producer;import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}
}

2、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testMessage(){eventProducer.sendMessage();}@Testvoid testMessage(){eventProducer.sendMessage();}
}

1.2、springboot集成kafka发送ProducerRecord对象消息

生产者

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}public void sendProducerRecord(){//Headers里面放一些信息(信息是key-value键值对),到时候消费组接收到消息后,可以拿到这个Headers里面放的信息Headers headers = new RecordHeaders();headers.add("phone","17676767676".getBytes(StandardCharsets.UTF_8));headers.add("orderId","OD1456467576467".getBytes(StandardCharsets.UTF_8));//String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headersProducerRecord<String,String> record =new ProducerRecord("test-topic-02", 0, System.currentTimeMillis(), "key1", "value", headers);kafkaTemplate.send(record);}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testProducerRecord(){eventProducer.sendProducerRecord();}}

1.3、springboot集成kafka发送指定分区消息

生产者:(方法sendEvent4)

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendEvent4(){//String topic, Integer partition, Long timestamp, K key, V datakafkaTemplate.send("test-topic-02",0,System.currentTimeMillis(),"k2","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent4(){eventProducer.sendEvent4();}
}

2、kafkaTemplate.sendDefault()方法

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendDefault(){//Integer partition, Long timestamp, K key, V datakafkaTemplate.sendDefault(0,System.currentTimeMillis(),"k3","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendDefault(){eventProducer.sendDefault();}
}

配置文件:设置默认topic
不设置topic运行生产者会报找不到topic的错。
在这里插入图片描述

3、kafkaTemplate.send(…)和kafkaTemplate.sendDefault(…)的区别

在这里插入图片描述

4、发送对象消息

在这里插入图片描述

生产者:

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void getResult3(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.sendDefault(null,System.currentTimeMillis(),"k3",user);}}

实体类:

package com.power.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {private Integer id;private String phone;private Date birthday;}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid getResult2(){eventProducer.getResult3();}
}

配置文件

spring:application:#应用名称name: spring-boot-01-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置生产者(24个配置)producer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializer#配置消费者(24个配置)consumer:auto-offset-reset: earliesttemplate:default-topic: default-topic

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/409799.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于elementui table组件 —— 竖向表格

前端模拟数据方式&#xff1a; html代码&#x1f447;&#xff1a; <template><el-table :data"tableData" style"width: 60%;margin-top:20px" stripe :show-header"false" border :row-style"rowStyle"><el-table…

MyBatis如何自定义项目中SQL日志

说明&#xff1a;用过MyBatis框架的同学们都知道&#xff0c;打印SQL日志&#xff0c;可以通过在application.yml配置文件中加入下面配置来设置&#xff1a; mybatis:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl但打印出来的SQL如下&#xff0c;丑陋…

机器学习/数据分析--通俗语言带你入门决策树(结合分类和回归案例)

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 前言 机器学习是深度学习和数据分析的基础&#xff0c;接下来将更新常见的机器学习算法注意&#xff1a;在打数学建模比赛中&#xff0c;机器学习用的也很多&a…

NVIDIA将在Hot Chips 2024会议上展示Blackwell服务器装置

NVIDIA 将在 Hot Chips 2024 上展示其 Blackwell 技术堆栈&#xff0c;并在本周末和下周的主要活动中进行会前演示。对于 NVIDIA 发烧友来说&#xff0c;这是一个激动人心的时刻&#xff0c;他们将深入了解NVIDIA的一些最新技术。然而&#xff0c;Blackwell GPU 的潜在延迟可能…

企事业单位数据资料防外泄如何实现?这5个小技巧等你来掌握!

企事业单位的数据资料防外泄是一项重要的任务&#xff0c;它关乎企业的核心竞争力和信息安全。 以下是五个实用的小技巧&#xff0c;可以帮助企事业单位有效地防止数据外泄&#xff1a; 1. 数据加密 技巧说明&#xff1a;通过对敏感数据进行加密处理&#xff0c;即使数据被非…

外贸管理软件一般都有哪些功能

外贸管理软件通常被设计来帮助国际贸易企业高效管理其业务流程。这类软件的功能多样&#xff0c;这里以神卓外贸管理软件为例&#xff0c; 以下是一些常见的核心功能模块&#xff1a; 客户关系管理 (CRM) 客户信息管理询盘与报价管理销售机会跟踪 订单管理 订单生成与处理发货…

Sparse Kernel Canonical Correlation Analysis

论文链接&#xff1a;https://arxiv.org/pdf/1701.04207 看这篇论文终于看懂核函数了。。谢谢作者

Azure OpenAI citations with message correlation

题意&#xff1a;“Azure OpenAI 引用与消息关联” 问题背景&#xff1a; I am trying out Azure OpenAI with my own data. The data is uploaded to Azure Blob Storage and indexed for use with Azure AI search “我正在尝试使用自己的数据进行 Azure OpenAI。数据已上传…

中介者模式详解

中介者模式 简介通过引入一个中介者对象来封装和协调多个对象之间的交互&#xff0c;从而降低对象间的耦合度。 人话:就是两个类或者系统之间, 不要直接互相调用, 而是要中间的类来专门进行交互。 举个例子 比如两个国家之间(关系差, 没有大使馆), 需要联合国作为中介进行对话…

公园的客流统计意义何在,有哪些积极作用

随着城市化进程的加快&#xff0c;人们越来越重视休闲娱乐和亲近自然的机会。公园作为市民休闲放松的重要场所&#xff0c;其管理和维护的质量直接影响着市民的生活质量和城市的形象。客流统计在公园管理中扮演着重要角色&#xff0c;不仅可以帮助公园管理者更好地理解游客的行…

大模型网络安全能力和风险评估框架Cybench

大模型网络安全能力和风险评估框架Cybench 前言 语言模型在网络安全领域的双重应用&#xff0c;既可以用于攻击&#xff08;如识别并利用代码漏洞&#xff09;&#xff0c;也可以用于防御&#xff08;如渗透测试和漏洞检测&#xff09;。当前的研究包括对CTF挑战、代码片段中的…

LLM 培训

步骤 1 # 预训练 步骤 1 # 预训练 在预训练阶段,该模型被训练为互联网规模数据上的下一个单词预测器。 在预训练阶段 从互联网上收集大量多样化的数据集。此数据集包含来自各种来源的文本,以确保模型能够学习广泛的语言模式。清理和预处理数据以消除噪音、格式问题和不相关的…

CSS文本样式(一)

一、font-family 1、font-family属性 font-family​ &#xff1a;属性指定元素的​字体​&#xff0c;语法格式如下&#xff1a; ​font-family​: 字体1,字体2,...; 有两种字体系列名称&#xff1a; ​字体系列​&#xff1a;特定的字体系列&#xff08;如Times New Rom…

大型公司网络系统集成方案

一、前言 1.1.公司综合信息系统建设目标 -----------------------------------------------------3 1.2. 用户具体需求----------------------------------------------------------------------------4 1.3.公司综合信息系统建设原则 -------------------------------…

SpringBoot集成kafka接收对象消息

SpringBoot集成kafka接收对象消息 1、生产者2、消费者3、工具类4、消息实体对象5、配置文件6、启动类7、测试类8、测试结果 1、生产者 package com.power.producer;import com.power.model.User; import com.power.util.JSONUtils; import org.springframework.kafka.core.Kaf…

基于SSM的学生信息管理系统的设计与实现 (含源码+sql+视频导入教程+文档+VISIO图)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1 、功能描述 基于SSM的学生信息管理系统12拥有三种角色&#xff1a;学生、教师、管理员 学生&#xff1a;选课、查看已选课程、查看成绩 教师&#xff1a;成绩管理 管理员&#xff1a;课程管理、学生…

两个实用的Python编程技巧

一、变量类型声明技巧 虽然在Python中可以不用声明变量的类型&#xff0c;但是为了加快程序的运算速度&#xff0c;减少不必要的bug&#xff0c;我们可以在定义变量之初就把它的类型确定&#xff0c;这样可以更好地传输变量值。如下面的例子。 我们定义了两个变量&#xff0c…

linux 系统备份与恢复方法及解决方案

&#x1f600;前言 本篇博文是关于 linux 系统备份与恢复&#xff0c;希望你能够喜欢 &#x1f3e0;个人主页&#xff1a;晨犀主页 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是晨犀&#xff0c;希望我的文章可以帮助到大家&#xff0c;您的满意是我的动力&#x…

Jmeter提取token并设置为全局变量

参考文章&#xff1a;Jmeter提取token并设置为全局变量&#xff08;最详细的步骤&#xff09;_jmeter提取token到全局变量-CSDN博客 一般来说&#xff0c;系统内大多数接口&#xff0c;都需要先获取登录后的token值&#xff0c;所以我们需要创建一个获取token的接口&#xff0c…

4款文章生成器,自动写作优质文章

在当今信息爆炸的时代&#xff0c;内容创作已经成为网络世界中不可或缺的一部分。然而&#xff0c;随着人们对高质量内容的需求不断增加&#xff0c;传统的手动创作已经无法满足市场的需求。因此&#xff0c;文章生成器应运而生&#xff0c;成为许多从业者和企业的利器。在本文…