SpringBoot整合Kafka (二)

📑前言

本文主要讲了SpringBoot整合Kafka文章,如果有什么需要改进的地方还请大佬指出⛺️
上文链接:SpringBoot整合Kafka (一)

🎬作者简介:大家好,我是青衿🥇
☁️博客首页:CSDN主页放风讲故事
🌄每日一句:努力一点,优秀一点

在这里插入图片描述

目录

文章目录

  • 📑前言
  • **目录**
    • 一、介绍
    • 二、主要功能
    • 三、Kafka基本概念
    • 四、Spring Boot整合Kafka的demo
      • 1、构建项目
        • 1.1、引入依赖
        • 1.2、YML配置
        • 1.3、生产者简单生产
        • 1.4、消费者简单消费
      • 2、消费者
        • 2.1、Kafka应答机制
          • ACK应答级别
        • 2.2、Kafka消息消费确认机制
          • 自动提交
          • 手动提交
        • 2.3、指定消费
          • 监听一个主题,指定分区消费消息
  • 📑文章末尾


一、介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目

二、主要功能

1.消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
2.存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
3.日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种\nconsumer,例如hadoop、Hbase、Solr等。

三、Kafka基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。首先,让我们来看一下基础的消息(Message)相关术语:
Broker
消息中间件处理节点,一个Kafka节点就是一个broker,一 个或者多个Broker可以组成一个Kafka集群
Topic
Kafka根据topic对消息进行归类,发布到Kafka集群的每条 消息都需要指定一个topic
Producer
消息生产者,向Broker发送消息的客户端 Consumer 消息消费者,从Broker读取消息的客户端
ConsumerGroup
每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息
Partition
物理上的概念,一个topic可以分为多个partition,每个 partition内部消息是有序的在这里插入图片描述

四、Spring Boot整合Kafka的demo

1、构建项目

1.1、引入依赖
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
1.2、YML配置
spring:kafka:bootstrap-servers: 192.168.147.200:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。producer: # 消息提供者key和value序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3 # 生产者发送失败时,重试发送的次数consumer: # 消费端反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: demo # 用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""
1.3、生产者简单生产
@Autowired
private KafkaTemplate kafkaTemplate;@Test
void contextLoads() {ListenableFuture listenableFuture = kafkaTemplate.send("test01-topic", "Hello Wolrd test");System.out.println("发送完成");
}
1.4、消费者简单消费
@Component
public class TopicConsumer {@KafkaListener(topics = "test01-topic")public void readMsg(String msg){System.out.println("msg = " + msg);}
}

2、消费者

2.1、Kafka应答机制

在生产者(producer)往Kafka发送数据的进程中,为了确保数据能够发送到指定的topic中,topic中的每一个partition在收到数据后,都需要向生产者发送 ack(ackacknowledgement)。

假设 producer 在必定的时间内收不到应对,那么producer会再次向Kafka发送此条数据。这就类似于写信,假定我们写一封信给或人,然后我们会在一段时间后收到一封回信,但假设超过了一个月我们还没有收到回信,就会猜想是不是信件丢掉了,会将这封信进行从头发送,直到收到回信中止。

ACK应答级别

一、0
介绍:生产者发送过来的数据,不需要等数据落盘应答
数据可靠性分析:容易丢数据
丢失数据原因:生产者发送完成后,Leader没有接收到数据,但是生产者认为已经发送成功了

二、1
介绍:生产者发送过来的数据,Leader收到数据后应答
数据可靠性分析:容易丢数据
丢失数据原因:应答完成后,还没开始同步副本,Leader挂了,新的Leader不会收到同步的消息,因为生产者已经认为发送成功了

三、-1(all)
介绍:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答
数据可靠性分析:可靠

spring:kafka:bootstrap-servers: 192.168.***.***:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。producer: # 消息提供者key和value序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3 # 生产者发送失败时,重试发送的次数properties:linger.ms: 0 # spring.kafka.producer.properties.linger.ms=0,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafkaacks: 1 # 修改ACK应答级别,默认是1
2.2、Kafka消息消费确认机制

Kafka消费消息确认机制分为两种:自动确认和手动确认。
(1)自动确认:在自动确认模式下,Kafka消费者消费一条消息后,会自动将消息偏移量提交到服务器端,不需要手动进行确认,从而确保消息被有效处理。此种确认机制的优点是操作简单,但是可能会导致消息重复消费,即当消费者处理消息的过程中出现异常,导致偏移量提交失败,下一次启动时就会重新消费之前已经处理过的消息。
(2)手动确认:在手动确认模式下,消费者需要显式地调用commit()方法,将消息的偏移量提交到服务器端,才会被标记为已处理。手动确认模式下,可以避免重复消费的问题,但是需要开发者自己实现确认逻辑,增加了一定的开发复杂度。
总的来说,自动确认适用于对消息的可靠性要求不高、实时性较高的场景;手动确认适用于对消息的可靠性要求较高、不要求实时性的场景

自动提交

这种提交方式有两个很重要的参数:
enable.auto.commit=true(是否开启自动提交,true or false)
auto.commit.interval.ms=5000(提交偏移量的时间间隔,默认5000ms)
每隔5秒,消费者会自动把从poll方法接收到的最大偏移量提交上去。自动提交是在轮询中进行,消费者每次轮询时都会检查是否提交该偏移量。可是这种情况会发生重复消费和丢失消息的情况。

server:port: 18082
spring:kafka:bootstrap-servers: 192.168.***.***:9092,192.168.***.***:9093,192.168.***.***:9094consumer: # consumer消费者group-id: consumergroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 120000auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

设置enable-auto-commit: true,开启自动提交,也就是偏移量不需要我们手动提交,程序会自己提交。
设置auto.commit.interval.ms=120000,也就是消费后,不会立即提交,会在2分钟后提交,只要在这期间服务异常终止,偏移量就无法提交到Broker,再次启动,会重复消费。

手动提交

手动提交模式可以有效确保消息不丢失以及不重复消费

MANUAL:poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。
我们可以先测试一下MANUAL模式,只需要需改配置application.yml即可:

spring:kafka:bootstrap-servers: 192.168.***.***:9092,192.***.***.130:9093,192.***.***.130:9094consumer: # consumer消费者group-id: consumergroup # 默认的消费组IDenable-auto-commit: false # 是否自动提交offsetauto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manual # 手动添加偏移量

消费者代码

@KafkaListener(topics = {"itmentu"},groupId = "itmentuGroup")
public void listener(ConsumerRecord<String,String> record, Acknowledgment ack){//获取消息String message = record.value();//消息偏移量long offset = record.offset();System.out.println("读取的消息:"+message+"\n当前偏移量:"+offset);//手动提交偏移量ack.acknowledge();
}
2.3、指定消费

属性解释:
id:消费者ID
groupId:消费组ID
topics:监听的topic,可监听多个
topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区。

监听一个主题,指定分区消费消息
    /*** 监听一个主题,且指定消费主题的哪些分区。* 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})},concurrency = "2")public void consumeByPattern(ConsumerRecord<String, String> record) {System.out.println("consumeByPattern");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());

以上是简单的Spring Boot整合kafka的示例,可以根据自己的实际需求进行调整。

📑文章末尾

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/184379.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PTL货位指引标签为仓储管理打开新思路

PTL货位指引标签是一种新型的仓储管理技术&#xff0c;它通过LED灯光指引和数字显示&#xff0c;为仓库管理带来了全新的管理思路和效率提升&#xff0c;成为现代物流仓库管理中的重要工具。 首先&#xff0c;PTL货位指引标签为仓储管理业务带来了管理新思路。传统的仓库管理中…

iOS Crash 治理:淘宝VisionKitCore 问题修复

本文通过逆向系统&#xff0c;阅读汇编指令&#xff0c;逐步找到源码&#xff0c;定位到了 iOS 16.0.<iOS 16.2 WKWebView 的系统bug 。同时苹果已经在新版本修复了 Bug&#xff0c;对于巨大的存量用户&#xff0c;仍旧会造成日均 Crash pv 1200 uv 1000&#xff0c; 最终通…

串口中断(10)自定义通讯协议-协议带数据长度及接收应答处理

本文为博主 日月同辉&#xff0c;与我共生&#xff0c;csdn原创首发。希望看完后能对你有所帮助&#xff0c;不足之处请指正&#xff01;一起交流学习&#xff0c;共同进步&#xff01; > 发布人&#xff1a;日月同辉,与我共生_单片机-CSDN博客 > 欢迎你为独创博主日月同…

【无标题】【教3妹学编程-算法题】2918. 数组的最小相等和

3妹&#xff1a;呜呜&#xff0c;烦死了&#xff0c; 脸上长了一个痘 2哥 : 不要在意这些细节嘛&#xff0c;不用管它&#xff0c;过两天自然不就好了。 3妹&#xff1a;切&#xff0c;你不懂&#xff0c;影响这两天的心情哇。 2哥 : 我看你是不急着找工作了啊&#xff0c; 工作…

帷幄内容管理系统:从立人设、做内容到定向投流,品牌 KOS 体系打造「百万导购」

随着公域流量越来越贵&#xff0c;获客成本越来越高&#xff0c;品牌们已经越来越不满足于高曝光&#xff0c;而是更多地关注起销售转化率。继 KOL、KOC&#xff08;关键意见消费者&#xff09; 之后&#xff0c;KOS&#xff08;关键意见销售&#xff09;营销模式走入品牌的视野…

排序算法之-冒泡

顺序排序算法原理 从头开始遍历未排序数列&#xff0c;遍历时比较相邻的两个元素&#xff0c;前面的大于后面的&#xff0c;则双方交换位置&#xff0c;一直比较到末尾&#xff0c;这样最大的元素会出现在末尾&#xff0c;接着再依次从头开始遍历剩余未排序的元素&#xff0c;…

MSQL系列(十四) Mysql实战-SQL语句 left join inner join On和Where语句的区别

Mysql实战-SQL语句On和Where语句的区别 前面我们讲解了Join的底层驱动表 选择原理&#xff0c;也知道了基本的内连接外连接两种SQL查询表连接方式 但是我们再查询多表的时候on和where语句到底有什么区别? where是过滤条件 ,不满足where的一定不会出现在结果中on是连接条件, …

网上3D虚拟数字展厅进一步增强营销效果

数字化营销已经成为了企业推广产品和服务的重要手段。由于制作成本及周期限制&#xff0c;企业或个人难以拥有个性化的3D云展厅&#xff0c;顺应市场需求和时代发展&#xff0c;3D云展数字平台作为一种新型的数字化营销工具&#xff0c;具有许多传统营销方式无法比拟的优势。 3…

怎么调整excel表里面所有单元格中,某个相同字体大小,单元格中其他文字大小不变?

环境: excel 2021 python3.8 问题描述: 怎么调整excel表里面所有单元格里面1这个字体大小,单元格里面其他文字不变? excel表里面。很多单元格都有1,1和文字都是10号字体,现在想把全部1字字体调整为16号其他字大小都不变 解决方案: 一、使用python来实现,经过测…

iOS加固原理与常见措施:保护移动应用程序安全的利器

目录 iOS加固原理与常见措施&#xff1a;保护移动应用程序安全的利器 前言 一、iOS加固的原理 1. 代码混淆 2. 加密算法 3. 防调试技术 4. 签名校验 二、iOS加固的常见措施 1. 代码混淆 2. 加密算法 3. 防调试技术 4. 签名校验 三、iOS加固的效果和注意事项 参考…

如何在macbook上删除文件?Mac删除文件的多种方法

在使用MacBook电脑时&#xff0c;桌面上经常会积累大量的文件&#xff0c;而这些文件可能已经不再需要或已经过时。为了保持桌面的整洁和提高电脑性能&#xff0c;我们需要及时删除这些文件。本文将介绍MacBook怎么删除桌面文件&#xff0c;以及macbook删除桌面文件快捷键。 一…

为什么说亚马逊、Lazada、虾皮等跨境平台测评很重要?

在亚马逊、shopee、Lazada的生态系统中&#xff0c;给店铺测评是一个重要的环节&#xff0c;优质的评论可以给潜在的买家对于产品质量更加信任&#xff0c;其次对于提高产品的销售跟排名也可以起到关键的作用 为什么测评重要&#xff1f; 1. 提高页面权重 一般页面有三个部分…

【大数据】Apache NiFi 数据同步流程实践

Apache NiFi 数据同步流程实践 1.环境2.Apache NIFI 部署2.1 获取安装包2.2 部署 Apache NIFI 3.NIFI 在手&#xff0c;跟我走&#xff01;3.1 准备表结构和数据3.2 新建一个 Process Group3.3 新建一个 GenerateTableFetch 组件3.4 配置 GenerateTableFetch 组件3.5 配置 DBCP…

Linux学习第36天:Linux RTC 驱动实验:时间是一条流淌的河

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 RTC就是实时时钟。 本笔记主要学习Linux RTC驱动试验&#xff0c;主要内容包括Linux内核RTC驱动简介、I.MX6U内部RTC分析、RTC时间查看与设置。因为Linux内核已经…

matplotlib.pyplot学习笔记

import matplotlib.pyplot as plt import numpy as np # 画单条线 plot([x], y, [fmt], *, dataNone, **kwargs) # 画多条线 plot([x], y, [fmt], [x2], y2, [fmt2], ..., **kwargs) >>> plot(x, y) # 创建 y 中数据与 x 中对应值的二维线图&#xff0c;使用…

【GEE】8、Google 地球引擎中的时间序列分析【时间序列】

1简介 在本模块中&#xff0c;我们将讨论以下概念&#xff1a; 处理海洋的遥感图像。 从图像时间序列创建视频。 GEE 中的时间序列分析。 向图形用户界面添加基本元素。 2背景 深水地平线漏油事件被认为是有史以来最大的海上意外漏油事件。该井释放了超过 490 万桶石油&am…

JAVA自己写什么功能可以提升技术?

JAVA自己写什么功能可以提升技术&#xff1f; 对于技术提升这个话题&#xff0c;勤于练习&#xff0c;多敲多积累这是一个必经的过程。那我们展开来详细的说一说&#xff0c;比如&#xff1a; 实现各种数据结构和算法&#xff0c;比如链表、树、图、排序、搜索等。这可以提高…

MAC设备(M1)环境下编译安装openCV for Java

最近发现一个需求&#xff0c;可以用openCV来实现&#xff0c;碰巧又新买了mac笔记本&#xff0c;就打算利用业余时间安装下openCV。这里将主要步骤记录下&#xff0c;希望能帮助有需要的人。 1、准备编译环境 #查询编译opencv相关依赖 brew info opencv查询结果如下图所示&a…

Blocking waiting for file lock on the registry index 问题解决

问题表现&#xff1a; cargo build时一直卡在Blocking waiting for file lock on the registry index。 解决方法&#xff1a; 1、之前在linux下出现过一次&#xff0c;采用这种方法解决了&#xff1a;rust - Cargo build hangs with " Blocking waiting for file lock…

虚拟展厅如何在艺术领域应用,虚拟展厅对艺术展有什么帮助

引言&#xff1a; 随着科技的不断发展&#xff0c;虚拟展厅作为一种新的展示方式&#xff0c;在艺术领域逐渐受到重视和应用。虚拟展厅利用虚拟现实技术&#xff0c;将艺术品展示于虚拟空间中&#xff0c;为观众带来更加身临其境的艺术体验。 一、虚拟展厅在艺术领域的应用 1…