手拉手springboot整合kafka发送消息

环境介绍
技术栈springboot+mybatis-plus+mysql+rocketmq
软件版本
mysql8
IDEAIntelliJ IDEA 2022.2.1
JDK17
Spring Boot3.1.7
kafka2.13-3.7.0

创建topic时,若不指定topic的分区(Partition主题分区数)数量使,则默认为1个分区(partition)

springboot加入依赖kafka

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

application.yml配置连接kafka

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092

生产者

发送消息

@Resource
private KafkaTemplate<String,String> kafkaTemplate;@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

消费者

接收消息

@Component
public class KafkaConsumer {@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}}

若没有配置groupid

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

@Component
public class KafkaConsumer {@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}}

想从第一条消息开始读取(若同组的消费者已经消费过该主题,并且kafka已经保存了该消费者组的偏移量,则设置auto.offset.reset设置为earliest不生效,需要手动修改偏移量或使用新的消费者组)

application.yml需要将auto.offset.reset设置为earliest

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092
consumer:
auto-offset-reset: earliest

Earliest:将偏移量重置为最早的偏移量

Latest: 将偏移量重置为最新的偏移量

None: 没有为消费者组找到以前的偏移量,向消费者抛出异常

Exception: 向消费者抛出异常

重置消费者组偏移量

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute

重置完成

Spring-kafka生产者发送消息

.send与sendDefault()方法都返回CompletableFuture<String<k,v>>;

CompletableFuture类用于异步编程,表示异步计算结果。该特征使得调用者不必等待操作完成就可以继续执行其他任务,从而提高引用的响应速度和吞吐量

@Resource
private KafkaTemplate<String,String> kafkaTemplate;@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

发送Message

@Test
void kafkaSendMessageTest1(){
//通过构建器模式创建Message
Message<String> message = MessageBuilder.withPayload("hello kafka send message")
.setHeader(KafkaHeaders.TOPIC,"kafkamsg01")
.build();
kafkaTemplate.send(message);
}

SendProducerRecord

String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers

@Test
void kafkaSendProducerRecordTest1() {
//参数 String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers
Headers headers = new RecordHeaders();
headers.add("msg","123".getBytes(StandardCharsets.UTF_8));
ProducerRecord<String,String> record = new ProducerRecord(
"kafkaTopic01",
0,
System.currentTimeMillis(),
"key",
"hello kafka send message");
kafkaTemplate.send(record);
}

默认主题发送消息

yml配置默认主题

template:
default-topic: default-topic

@Test
void kafkaSendDefaultTest01(){
kafkaTemplate.sendDefault(0,System.currentTimeMillis(),"key01","hello ");
}

发送Object消息

序列化默认为String

@Resource
private KafkaTemplate<String,Object> kafkaTemplate1;
@Test
void kafkaSendObject(){
MessageM messageM =MessageM.builder().userID(123).sn("xo1111").desc("测试").build();
//分区是null,kafka自行决定消息发送到哪个分区
kafkaTemplate1.sendDefault(null,System.currentTimeMillis(),"key01",messageM);
}

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/336712.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【深度学习基础】使用Pytorch搭建DNN深度神经网络与手写数字识别

目录 写在开头 一、DNN的搭建 问题描述与数据集 神经网络搭建 模型训练 模型评估 模型复用 二、手写数字识别 任务描述 数据集 神经网络搭建 模型训练 模型评估 写在最后 写在开头 本文将介绍如何使用PyTorch框架搭建深度神经网络模型。实现模型的搭建、模…

Ps系统教程03

选区工具的组合使用 先用魔棒将大致区域点击圈主 会发现一些零散的小区域 使用套索工具进行区域的加减&#xff08;按住shift/alt键进行相关区域加减&#xff09; 可以放大查看 基本处理完细节之后 如果把不用的填充背景直接按delete删除&#xff0c;那么原版图案就会…

【贪心算法题目练习】

1. 分发饼干 这道题目和我们之前讲到的田忌赛马的问题很相似&#xff0c;只不过这这里不需要劣等马去抵消掉优等马&#xff0c;直接上贪心策略&#xff1a; 先将两个数组排序。针对胃口较小的孩子&#xff0c;从小到大挑选饼干: i. 如果当前饼干能满足&#xff0c;直接喂(最小…

大语言模型实战——最小化模型评测

1. 引言 现在国内外的主流模型&#xff0c;在新模型发布时都会给出很多评测数据&#xff0c;用以说明当前模型在不同数据集上的测评表现&#xff08;如下面llama3发布的评测数据&#xff09;。 这些评测数据是如何给出来的呢&#xff1f;这篇文章会用一个最小化的流程来还原下…

【限免】短时傅里叶变换时频分析【附MATLAB代码】

来源&#xff1a;微信公众号&#xff1a;EW Frontier 简介 一种能够同时对信号时域和频域分析的方法——短时傅里叶变换&#xff08;STFT&#xff09;&#xff0c;可以在时频二维角度准确地描述信号 的时间、频域的局部特性&#xff0c;与其他算法不同&#xff0c;通过该算法可…

Open3D(C++) OTSU点云二值化

目录 一、算法原理二、代码实现三、结果展示1、原始点云2、二值化本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT。 一、算法原理 最大类间方差法(Between-class scatter method)是一种用于分割的方法,它通过计算图…

【C++】命名空间

命名空间 为了解决C语言命名冲突问题而诞生 namespace 命名空间名 {...... }命名空间内函数作用域只在此命名空间内 错误 using std::cout; //为了保证正常输出先忽略此行 using std::endl; //为了保证正常输出先忽略此行 #include <iostream>namespace a {int n10…

git 代码提交规范,feat,fix,chore都是什么意思?

写到前面 经常看到别人提交的代码记录里面包含一些feat、fix、chore等等&#xff0c;而我在提交时也不会区分什么&#xff0c;直接写下提交信息&#xff0c;今天就来看一下怎么个事&#xff0c;就拿 element-plus 举例来看一下 其实这么写是一种代码提交规范&#xff0c;当然…

SpringBoot六种API请求参数读取方式

SpringBoot六种API请求参数读取方式 同步请求和异步请求 同步: 指单线程依次做几件事异步: 指多线程同时做几件事 同步请求: 指客户端浏览器只有一个主线程, 此线程负责页面的渲染和发出请求等操作, 如果此主线程发出请求的话则停止渲染而且会清空页面显示的内容 直到服务器响…

前端html-docx实现html转word,并导出文件,文字+图片

前端html-docx实现html转word&#xff0c;并导出文件 前端web页面 有文字&#xff0c;有图片&#xff0c;保存web的css效果 使用工具&#xff1a;html-docx 官方网址&#xff1a;http://docs.asprain.cn/html-docx/readme.html 步骤&#xff1a; 1 npm install html-docx-js…

输入3个字符串,要求将字母按由小到大顺序输出

对于将3个整数按由小到大顺序输出&#xff0c;是很容易处理的。可以按照同样的算法来处理将3个字符串按大小顺序输出。可以直接写出程序。 编写程序&#xff1a; 运行结果&#xff1a; 这个程序是很好理解的。在程序中对字符串变量用关系运算符进行比较&#xff0c;如同对数值…

GUI 01:GUI 编程概述,AWT 相关知识,Frame 窗口,Panel 面板,及监听事件的应用

一、前言 记录时间 [2024-05-30] 疑问导航 GUI 是什么&#xff1f;GUI 如何使用&#xff1f;GUI 有哪些应用&#xff1f; 学习目的 写一些自己心中的小工具&#xff1b;Swing 界面的维护&#xff1b;了解 MVC 架构&#xff0c;以及监听事件。 本文对图形用户界面&#xff08…

基于BP神经网络的64QAM解调算法matlab性能仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 MATLAB2022A 3.部分核心程序 ....................................................... % 第一部分&#xff1a;加载并可视…

CSS绘制圆弧

css绘制如图的圆弧&#xff1a; 这种矩形弧形的效果中&#xff0c;弧形的效果一般是由一条曲线拉伸出来的&#xff0c;这条曲线往往是属于一个椭圆的&#xff0c;所以可以绘制一个椭圆&#xff0c;截取部分可视区域实现效果。 <style> .wrapper{width: 400px;height: 60…

Minio篇:初识MinIO

1. MinIO快速入门 1.1.MinIO核心概念 下面介绍MinIO中的几个核心概念&#xff0c;这些概念在所有的对象存储服务中也都是通用的。 对象&#xff08;Object&#xff09; 对象是实际的数据单元&#xff0c;例如我们上传的一个图片。 存储桶&#xff08;Bucket&#xff09; 存储…

C语言分支和循环(2)

我的相关博客&#xff1a; C语言的分支与循环&#xff08;1&#xff09; 1.switch语句 除了 if 语句外&#xff0c;C语⾔还提供了 switch 语句来实现分⽀结构。 switch 语句是⼀种特殊形式的 的 if...else 结构&#xff0c;⽤于判断条件有多个结果的情况。它把多重 else if…

栈和队列题目练习

本节小编选了两道题来加深对栈和队列的认识理解&#xff01; 有效的括号 方法1&#xff1a;直接用栈的结构&#xff08;动态数组&#xff09; 本题可以用栈这个结构来解答&#xff0c;将(,{,[ 左括号压入栈中&#xff0c;然后取出栈顶元素与右括号),},]匹配。不匹配的话&…

单片机通信协议(1):SPI简介

关于SPI SPI&#xff08;串行外设接口&#xff09;是板载设备间通信接口之一。它是由摩托罗拉公司&#xff08;飞思卡尔半导体&#xff09;推出的。由于其简单性和通用性&#xff0c;它被纳入各种外围设备中&#xff0c;并与飞利浦I2C总线并列。 SPI的三线或四线信号数量比IIC…

OSPF状态机+SPF算法

OSPF状态机 1.点到点网络类型 down-->init-->(前提为可以建立邻接)exstart——>exchange-->若查看邻接的DBD 目录后发现不用进行LSA 直接进入ful。若查看后需要进行查询、应答先进入loading&#xff0c;在查询应答完后再进入 fuIl: 2.MA网络类型 down --&g…

【C++修行之道】类和对象(三)拷贝构造函数

目录 一、 概念 二、特征 正确的拷贝构造函数写法&#xff1a; 拷贝函数的另一种写法 三、若未显式定义&#xff0c;编译器会生成默认的拷贝构造函数。 四、编译器生成的默认拷贝构造函数已经可以完成字节序的值拷贝了&#xff0c;还需要自己显式实现吗&#xff1f; 深拷…