大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka (正在更新…)

章节内容

上节我们完成了:

  • topics.sh、producer.sh、consumer.sh 脚本的基本使用
  • pom.xml 配置
  • JavaAPI的使用:producer 和 consumer

在这里插入图片描述

架构图

上节已经出现过了,这里再放一次
在这里插入图片描述

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>springboot-kafka</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

配置文件

我们常见的配置文件如下图:

spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializertemplate:default-topic: my-topic

Producer

编写代码

编写了一个KafkaProducerController
里边写了两个方法,都是使用了 KafkaTemplate 的工具。

@RestController
public class KafkaProducerController {@Resourceprivate KafkaTemplate<Integer, String> kafkaTemplate;@RequestMapping("/sendSync/{message}")public String sendSync(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 1, message);ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);try {SendResult<Integer, String> result = future.get();System.out.println(result.getProducerRecord().key() + "->" +result.getProducerRecord().partition() + "->" +result.getProducerRecord().timestamp());} catch (Exception e) {e.printStackTrace();}return "Success";}@RequestMapping("/sendAsync/{message}")public String sendAsync(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 2, message);ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败!");ex.printStackTrace();}@Overridepublic void onSuccess(SendResult<Integer, String> result) {System.out.println("发送成功");System.out.println(result.getProducerRecord().key() + "->" +result.getProducerRecord().partition() + "->" +result.getProducerRecord().timestamp());}});return "Success";}}

测试结果

http://localhost:8085/sendSync/wzktest1
http://localhost:8085/sendAsync/wzktest2
http://localhost:8085/sendAsync/wzktest222222

我们观察控制台的效果如下:
在这里插入图片描述

Consumer

编写代码

编一个类来实现Consumer:

@Configuration
public class KafkaConsumer {@KafkaListener(topics = {"wzk_topic_test"})public void consume(ConsumerRecord<Integer, String> consumerRecord) {System.out.println(consumerRecord.topic() + "\t"+ consumerRecord.partition() + "\t"+ consumerRecord.offset() + "\t"+ consumerRecord.key() + "\t"+ consumerRecord.value());}}

测试运行

2024-07-12 13:48:46.831  INFO 15352 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=13, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=h121.wzk.icu:9092 (id: 0 rack: null), epoch=0}}
2024-07-12 13:48:46.926  INFO 15352 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : wzk-test: partitions assigned: [wzk_topic_test-0]
wzk_topic_test	0	13	1	wzktest
wzk_topic_test	0	14	2	wzktest222
wzk_topic_test	0	15	2	wzktest222222

控制台的截图如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/387340.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

支持AI的好用的编辑器aieditor

一、工具概述 AiEditor 是一个面向 AI 的下一代富文本编辑器&#xff0c;她基于 Web Component&#xff0c;因此支持 Layui、Vue、React、Angular 等几乎任何前端框架。她适配了 PC Web 端和手机端&#xff0c;并提供了 亮色 和 暗色 两个主题。除此之外&#xff0c;她还提供了…

【Django5】内置Admin系统

系列文章目录 第一章 Django使用的基础知识 第二章 setting.py文件的配置 第三章 路由的定义与使用 第四章 视图的定义与使用 第五章 二进制文件下载响应 第六章 Http请求&HttpRequest请求类 第七章 会话管理&#xff08;Cookies&Session&#xff09; 第八章 文件上传…

聚观早报 | 华为nova Flip官宣;苹果iOS 17.6正式版发布

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 7月31日消息 华为nova Flip官宣 苹果iOS 17.6正式版发布 方程豹豹5全系降价 vivo X200 Pro主摄参数 谷歌Pixel …

PHP经销商订货管理系统小程序源码

经销商订货管理系统&#xff1a;重塑供应链效率的利器 &#x1f680; 开篇&#xff1a;解锁供应链管理的新纪元 在竞争激烈的商业环境中&#xff0c;经销商作为供应链的关键一环&#xff0c;其订货效率直接影响到整个供应链的流畅度和响应速度。传统的订货方式往往繁琐、易出…

Android mLruProcesses的分布结构

AMS中的进程管理 final ArrayList<ProcessRecord> mLruProcesses new ArrayList<ProcessRecord>(); 在AMS的内部属性中使用mLruProcesses集合保存所有的进程信息&#xff0c;AMS将所有进程按照优先级从低到高的顺序保存着对应的ProcessRecord信息&#xff0c;即排…

day06 项目实践:router,axios

vue组件的生命周期钩子 今天几乎没有讲什么新内容&#xff0c;就是一起做项目&#xff0c;只有一个小小的知识点&#xff0c;就是关于vue组件的生命周期钩子&#xff0c;其中最重要的四个函数—— beforeCreate()&#xff1a;组件创建之间执行 created()&#xff1a;组件创建…

react.16+

1、函数式组件 在vite脚手架中执行&#xff1a; app.jsx: import { useState } from react import reactLogo from ./assets/react.svg import viteLogo from /vite.svg import ./App.cssfunction App() {console.log(this)return <h2>我是函数式组件</h2> }exp…

【自学深度学习梳理2】深度学习基础

一、优化方法 上一篇说到,使用梯度下降进行优化模型参数,可能会卡在局部最小值,或优化方法不合适永远找不到具有最优参数的函数。 1、局部最小值 梯度下降如何工作? 梯度下降是一种优化算法,用于最小化损失函数,即寻找一组模型参数,使得损失函数的值最小(局部最小值…

【JavaSE-线程安全问题-死锁详解】

&#x1f308;个人主页&#xff1a;努力学编程’ ⛅个人推荐&#xff1a; c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构&#xff0c;刷题刻不容缓&#xff1a;点击一起刷题 &#x1f319;心灵鸡汤&#xff1a;总有人要赢&#xff0c;为什么不能是我呢 &#x1f308;…

医疗器械网络安全 | 第三方组件安全检测怎么做?

医疗器械软件安全中的第三方组件安全检测是确保医疗器械软件整体安全性的重要环节。以下是如何进行第三方组件安全检测的详细步骤&#xff1a; 一、明确检测目标 首先&#xff0c;需要明确检测的目标和范围&#xff0c;即确定哪些第三方组件需要进行安全检测。这通常包括操作系…

【C#】 使用GDI+获取两个多边形区域相交、非相交区域

一、使用GDI获取两个多边形区域相交、非相交区域 在 C# 中使用 GDI&#xff08;Graphics Device Interface Plus&#xff09;处理图形时&#xff0c;你可以使用 System.Drawing 和 System.Drawing.Drawing2D 命名空间中的类来操作区域&#xff08;Region&#xff09;。下面是一…

JS中如何对数组或者数组对象中所有的元素进行快速判断(every、some)

every是判断数组中所有元素均满足某个条件&#xff0c;some是判断数组中任意一个元素满足条件 举个栗子&#xff1a; const arr1 [{name:谭,},{name:谭},{name:高}]; const arr2 [{name:谭,},{name:谭},{name:谭}];const result1 arr1.every(item > item.name 谭);cons…

7月29(信息差)

&#x1f30d;最强模型 Llama 3.1 如期而至&#xff01;扎克伯格最新访谈&#xff1a;Llama 会成为 AI 界的 Linux &#x1f384;谷歌AlphaProof攻克国际奥赛数学题 https://www.51cto.com/article/793632.html ✨SearchGPT第一波评测来了&#xff01;响应速度超快还没广告&…

基于bert的自动对对联系统

目录 概述 演示效果 核心逻辑 使用方式 1.裁剪数据集 根据自己的需要选择 2.用couplet数据集训练模型 模型存储在model文件夹中 3.将模型转换为ONNX格式 4.打开index.html就可以在前端使用此自动对对联系统了。 本文所涉及所有资源均在传知代码平台可获取。 概述 这个生成器利用…

学习c语言第十八天(指针笔试题)

一维数组 字符数组 char*p"abcdef" p里面放的是a元素的地址 二维数组 指针笔试题 第一题 2 5 第二题 第三题 第四题 第五题 第六题 10 5 第七题 at 第八题 POINT ER ST EW

迪文屏使用记录

项目中要使用到迪文屏&#xff0c;奈何该屏资料太琐碎&#xff0c;找的人头皮发麻&#xff0c;遂进行相关整理。 屏幕&#xff1a;2.4寸电容屏 型号&#xff1a;DWG32240C024_03WTC 软件&#xff1a;DGUS_V7.647 1.竖屏横显 打开软件左下方的配置文件生成工具&#…

AI绘画【stable diffusion 1.5 Lora模型】摄影级真人写真,逼真大片!唯美!看完被震撼了!

前言 今天是鲜花摄像方面推荐的第四款SD 1.5 Lora模型&#xff0c;也是近日鲜花方面最后一款推荐的模型——**NAL_花海与车_摄影系列。**该款模型灵感来自于一张坐在车里的艺术照&#xff0c;lora主要作用于添加了花植物之类的填充效果&#xff0c;还有车内的坐姿&#xff0c;…

网络安全等级保护:上下文中的API安全性

网络安全等级保护&#xff1a;什么是API安全&#xff1f; 上下文中的API安全性 应用程序编程接口安全性位于多个安全学科的交叉点&#xff0c;如图所示。其中最重要的是以下三个领域&#xff1a; 1.信息安全&#xff08;InfoSec&#xff09;涉及在信息的整个生命周期中保护信…

智能城市管理系统设计思路详解:集成InfluxDB、Grafana和MQTTx协议(代码示例)

引言 随着城市化进程的加快&#xff0c;城市管理面临越来越多的挑战。智能城市管理系统的出现&#xff0c;为城市的基础设施管理、资源优化和数据分析提供了现代化的解决方案。本文将详细介绍一个基于开源技术的智能城市管理系统&#xff0c;涵盖系统功能、技术实现、环境搭建…

【C++】选择结构- 嵌套if语句

嵌套if语句的语法格式&#xff1a; if(条件1) { if(条件1满足后判断是否满足此条件) {条件2满足后执行的操作} else {条件2不满足执行的操作} } 下面是一个实例 #include<iostream> using namespace std;int main4() {/*提示用户输入一个高考分数&#xff0c;根据分…