【Kafka】2.在SpringBoot中使用官方原生java版Kafka客户端

目 录

    • 1. 新建一个消息生产者
    • 2. 新建一个消息消费者
    • 3. 测 试

在开始之前,需要先做点准备工作,用 IDEA 新建一个 Maven 项目,取名 kafka-study,然后删掉它的 src 目录,接着在 pom.xml 里面引入下面的依赖。这个项目的作用是作为整个工程的父项目,后面的项目都在此基础上新建 module 即可。

这里用到的环境信息如下:

  • JDK1.8
  • SpringBoot2.7.1
  • IDEA
  • Maven3.6.3
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.1</version><relativePath/></parent><groupId>org.yuhuofei</groupId><artifactId>kafka-study</artifactId><version>1.0-SNAPSHOT</version><name>kafka-study</name><description>Study project for Kafka</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

1. 新建一个消息生产者

使用 IDEA 在 kafka-study 的基础上新建一个 Maven 类型的 module ,命名为 producer-01,直至完成。接着,我们改造这个 module ,把它变成我们想要的消息生产者。

1、在 java 目录下,新建一个包 com.yuhuofei ,然后再创建一个启动类 ProducerApplication.java ,内容如下:

package com.yuhuofei;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}}

2、在 resources 目录下,新建配置文件 application.properties ,内容如下:

server.port=8080
server.servlet.context-path=/producer-01
#Kafka配置
topic.name=my-kafka-topic

3、在自己的 pom.xml 文件中,引入 Kafka 客户端的依赖。这里需要注意的是,由于我们在上一篇博客里安装的是 kafka_2.11-2.2.1 版本的服务端,所以引入的 Kafka 客户端的版本最好对应,也选择 2.2.1 版本的。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>kafka-study</artifactId><groupId>org.yuhuofei</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>producer-01</artifactId><dependencies><!--官方原生的java版kafka客户端依赖--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.1</version></dependency><!--fastjson依赖--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.11.graal</version></dependency></dependencies></project>

4、由于 Kafka 进行消息的发布或者消费是需要有 Topic 的,因此我们需要要创建一个 Topic。在 zookeeper 和 Kafka 服务端都启动的情况下,打开 D:\Kafka\kafka_2.11-2.2.1\bin\windows 文件夹,在地址栏输入 cmd,打开控制台,然后用下面的命令创建、查看、删除 topic 。

如下图所示,我已经创建了一个名字为 my-kafka-topic 的 topic ,这个 topic 的分区为 1,副本也是 1。

# 创建topic
kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
# 查询topic列表
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list					
# 删除topic
kafka-topics.bat --delete --zookeeper 127.0.0.1:2181 --topic my-kafka-topic
# 查询某个topic信息
kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-kafka-topic
#kafka查看topic数据内容
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-kafka-topic --from-beginning

在这里插入图片描述

5、实现生产者发送消息

  • controller层
package com.yuhuofei.controller;import com.yuhuofei.service.ProducerInterface;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @Description* @ClassName ProducerController* @Author yuhuofei* @Date 2023/8/13 16:38* @Version 1.0*/
@RequestMapping("/info")
@RestController
public class ProducerController {@Resourceprivate ProducerInterface producerInterface;@GetMapping("/create-message")public void createTopic() {producerInterface.createMessage();}
}
  • service层
package com.yuhuofei.service;/*** @Description* @InterfaceName ProducerController* @Author yuhuofei* @Date 2023/8/13 16:39* @Version 1.0*/
public interface ProducerInterface {void createMessage();
}
  • serviceImpl层
package com.yuhuofei.service.impl;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuhuofei.service.ProducerInterface;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @Description* @ClassName ProducerInterfaceImpl* @Author yuhuofei* @Date 2023/8/13 16:39* @Version 1.0*/
@Service
public class ProducerInterfaceImpl implements ProducerInterface {@Value("${topic.name}")private String topicName;@Resourceprivate Producer producer;@Overridepublic void createMessage() {//创建测试数据JSONObject jsonObject = new JSONObject();jsonObject.put("word", "这是生产者生产的测试消息");String jsonString = JSON.toJSONString(jsonObject);//使用kafka发送消息ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, jsonString);producer.send(producerRecord);}
}
  • 客户端配置类
package com.yuhuofei.config;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Properties;/*** @Description 配置生产者客户端参数* @ClassName KafkaProperties* @Author yuhuofei* @Date 2023/8/13 22:18* @Version 1.0*/
@Configuration
public class KafkaConfig {@Bean("producer")public Producer<String, String> getKafkaProducer() {Properties properties = new Properties();//配置Kafka的服务端IP和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");//指定key使用的序列化类properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//指定value使用的序列化类properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return new KafkaProducer<>(properties);}
}

6、启动服务,调用接口 http://localhost:8080/producer-01/info/create-message ,然后在控制台用命令查看一下消息是否写入,如下图所见,虽然是乱码,但是已经可以看出消息成功写入到 Kafka

在这里插入图片描述
到这里,消息生产者的创建暂时告一段落。

2. 新建一个消息消费者

和前面建立生产者类似的步骤,只是文件信息和名字不一样,这里步骤就不重复了,直接给出相关的配置和类。这个消费者,取名 consumer-01 。

  • pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>kafka-study</artifactId><groupId>org.yuhuofei</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>consumer-01</artifactId><dependencies><!--官方原生的java版kafka客户端依赖--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.1</version></dependency><!--fastjson依赖--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.11.graal</version></dependency></dependencies>
</project>
  • properties文件
server.port=8081
server.servlet.context-path=/consumer-01
#Kafka配置
topic.name=my-kafka-topic
  • 启动类 ConsumerApplication.java
package com.yuhuofei;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}}
  • 控制层ConsumerController.java
package com.yuhuofei.controller;import com.yuhuofei.service.ConsumerInterface;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @Description* @ClassName ConsumerController* @Author yuhuofei* @Date 2023/8/14 0:19* @Version 1.0*/
@RestController
@RequestMapping("/consumer")
public class ConsumerController {@Resourceprivate ConsumerInterface consumerInterface;@GetMapping("/getData")public void getData() {consumerInterface.getData();}
}
  • service层
package com.yuhuofei.service;/*** @Description* @ClassName ConsumerInterface* @Author yuhuofei* @Date 2023/8/14 0:20* @Version 1.0*/
public interface ConsumerInterface {void getData();
}
  • serviceImpl层
package com.yuhuofei.service.impl;import com.yuhuofei.service.ConsumerInterface;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;/*** @Description* @ClassName ConsumerInterfaceImpl* @Author yuhuofei* @Date 2023/8/14 0:20* @Version 1.0*/
@Service
public class ConsumerInterfaceImpl implements ConsumerInterface {@Value("${topic.name}")private String topicName;@Resourceprivate Consumer consumer;@Overridepublic void getData() {Collection<String> topics = Collections.singletonList(topicName);consumer.subscribe(topics);List<String> result = new ArrayList<>();while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord consumerRecord : consumerRecords) {//从ConsumerRecord中获取消费数据String res = (String) consumerRecord.value();System.out.println("从Kafka中消费的原始数据: " + res);result.add(res);System.out.println(result);}}}
}
  • config配置类
package com.yuhuofei.config;import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Properties;/*** @Description 配置消费者参数* @ClassName KafkaConsumerConfig* @Author yuhuofei* @Date 2023/8/14 0:08* @Version 1.0*/
@Configuration
public class KafkaConsumerConfig {@Bean("consumer")public Consumer<String, String> getKafkaConsumer() {Properties properties = new Properties();//配置Kafka的服务端IP和端口properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");//指定key使用的序列化类properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//指定value使用的序列化类properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//指定消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-01-group");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,100);properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");return new KafkaConsumer<>(properties);}
}

最终整个项目目录如下图所示。

在这里插入图片描述

3. 测 试

1、检查 zookeeper 和 Kafka 服务端是否是启动的,没启动就先启动。

2、接着,启动消费者服务,然后调用一下接口 http://localhost:8081/consumer-01/consumer/getData ,这么做的目的启动一个线程,监听 topic 。

3、最后,启动生产者服务,调用接口 http://localhost:8080/producer-01/info/create-message ,向 Kafka 服务器写数据。

结果如下所示:

控制台看到的所有消息记录
这是所有发送的记录
生产者发送一条消息
在这里插入图片描述
消费者监听到一条消息并接收
在这里插入图片描述

在 SpringBoot 中,使用官方原生 java 版 Kafka 客户端的入门介绍到这里结束,还有更多使用方式,等待挖掘。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/93564.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NanoPi NEO移植LVGL8.3.5到1.69寸ST7789V屏幕

移植前准备 移植好fbtft屏幕驱动 参考链接&#xff1a;友善之臂NanoPi NEO利用fbtft驱动点亮1.69寸ST7789V2屏幕 获取源码 名称地址描述lvglhttps://github.com/lvgl/lvgl.gitlvgl-8.3.5lv_drivershttps://github.com/lvgl/lv_drivers.gitlv_drivers-6.1.1 创建工程目录 创…

时序预测 | MATLAB实现基于GRU门控循环单元的时间序列预测-递归预测未来(多指标评价)

时序预测 | MATLAB实现基于GRU门控循环单元的时间序列预测-递归预测未来(多指标评价) 目录 时序预测 | MATLAB实现基于GRU门控循环单元的时间序列预测-递归预测未来(多指标评价)预测结果基本介绍程序设计参考资料 预测结果 基本介绍 1.Matlab实现GRU门控循环单元时间序列预测未…

计算机竞赛 python 爬虫与协同过滤的新闻推荐系统

1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; python 爬虫与协同过滤的新闻推荐系统 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;4分 该项目较为新颖&…

AUTOSAR NvM协议栈集成方法

一、涉及的模块 Bsw&#xff1a;NvM、MemIf、Fee、Crc Mcal&#xff1a;Fls 其中一些芯片厂商的MCAL也会提供Fee模块&#xff0c;本文选择使用ETAS提供的Fee模块&#xff0c;好处是Fee的Block不需要手动配&#xff0c;在NvM中配好了Block之后&#xff0c;生成Bsw代码的同时会…

山东布谷科技直播软件源码Nginx服务器横向扩展:搭建更稳定的平台服务

在直播软件源码平台中&#xff0c;服务器扮演着重要的角色&#xff0c;关系着视频传输、数据处理、用户管理等工作的顺利完成。随着互联网的迅猛发展&#xff0c;直播行业也随之崛起&#xff0c;全世界的人们都加入到了直播软件源码平台中&#xff0c;用户流量的增加让服务器的…

设计模式之构建器(Builder)C++实现

1、构建器提出 在软件功能开发中&#xff0c;有时面临“一个复杂对象”的创建工作&#xff0c;该对象的每个功能接口由于需求的变化&#xff0c;会使每个功能接口发生变化&#xff0c;但是该对象使用每个功能实现一个接口的流程是稳定的。构建器就是解决该类现象的。构建就是定…

爬虫逆向实战(八)--猿人学第十五题

一、数据接口分析 主页地址&#xff1a;猿人学第十五题 1、抓包 通过抓包可以发现数据接口是api/match/15 2、判断是否有加密参数 请求参数是否加密&#xff1f; 查看“载荷”模块可以发现有一个m加密参数 请求头是否加密&#xff1f; 无响应是否加密&#xff1f; 无cook…

[C++] string类的介绍与构造的模拟实现,进来看吧,里面有空调

文章目录 1、string类的出现1.1 C语言中的字符串 2、标准库中的string类2.1 string类 3、string类的常见接口说明及模拟实现3.1 string的常见构造3.2 string的构造函数3.3 string的拷贝构造3.4 string的赋值构造 4、完整代码 1、string类的出现 1.1 C语言中的字符串 C语言中&…

【数据分析入门】Numpy进阶

目录 一、数据重塑1.1 透视1.2 透视表1.3 堆栈/反堆栈1.3 融合 二、迭代三、高级索引3.1 基础选择3.2 通过isin选择3.3 通过Where选择3.4 通过Query选择3.5 设置/取消索引3.6 重置索引3.6.1 前向填充3.6.2 后向填充 3.7 多重索引 四、重复数据五、数据分组5.1 聚合5.2 转换 六、…

搭建Web服务器并用cpolar发布至公网访问

本地电脑搭建Web服务器并用cpolar发布至公网访问 文章目录 本地电脑搭建Web服务器并用cpolar发布至公网访问前言1. 首先在电脑安装PHPStudy、WordPress、cpolar2. 安装cpolar&#xff0c;进入Web-UI界面3. 安装wordpress4. 进入wordpress网页安装程序5. 利用cpolar建立的内网穿…

FL Studio 21最新for Windows-21.1.0.3267中文解锁版安装激活教程及更新日志

FL Studio 21最新版本for Windows 21.1.0.3267中文解锁版是最新强大的音乐制作工具。它可以与所有类型的音乐一起创作出令人惊叹的音乐。它提供了一个非常简单且用户友好的集成开发环境&#xff08;IDE&#xff09;来工作。这个完整的音乐工作站是由比利时公司 Image-Line 开发…

OpenAI全球招外包大军,手把手训练ChatGPT取代码农 ; 码农:我自己「杀」自己

目录 前言 OpenAI招了一千多名外包人员&#xff0c;训练AI学会像人类一样一步步思考。如果ChatGPT「学成归来」&#xff0c;码农恐怕真的危了&#xff1f; 码农真的危了&#xff01; 当时OpenAI也说&#xff0c;ChatGPT最合适的定位&#xff0c;应该是编码辅助工具。 用Cha…

设计模式之原型模式详解

前言 在设计模式的系列文章中&#xff0c;我们前面已经写了工厂模式、单列模式、建造者模式&#xff0c;在针对创建型模式中&#xff0c;今天想跟大家分享的是原型模式&#xff0c;我觉的这种模式叫克隆模式会更佳恰当。原型模式的目的就是通过复制一个现有的对象来生成一个新…

【Linux】生产者消费者模型

目录 什么是生产消费者模型 为什么要使用生产消费者模型 基于阻塞队列的生产消费者模型 什么是生产消费者模型 生产者消费者模型是一种常见的并发编程模型&#xff0c;用于解决生产者和消费者之间数据交换和同步的问题。在这个模型中&#xff0c;生产者会生成数据并将其放入…

Spring之AOP的特性

一. AOP简介 AOP是Aspect-Oriented Programming的缩写&#xff0c;即面向切面编程。利用oop思想&#xff0c;可以很好的处理业务流程&#xff0c;但是不能把系统中某些特定的重复性行为封装到模块中。例如&#xff0c;在很多业务中都需要记录操作日志&#xff0c;结果我们不得…

HTML5 游戏开发实战 | 五子棋

01、五子棋游戏设计的思路 在下棋过程中&#xff0c;为了保存下过的棋子的信息&#xff0c;使用数组 chessData。chessData&#xff3b;x&#xff3d;&#xff3b;y&#xff3d;存储棋盘(x&#xff0c;y)处棋子信息&#xff0c;1 代表黑子&#xff0c;2 代表白子&#xff0c;0…

【100天精通python】Day38:GUI界面编程_PyQT从入门到实战(中)

目录 专栏导读 4 数据库操作 4.1 连接数据库 4.2 执行 SQL 查询和更新&#xff1a; 4.3 使用模型和视图显示数据 5 多线程编程 5.1 多线程编程的概念和优势 5.2 在 PyQt 中使用多线程 5.3 处理多线程间的同步和通信问题 5.3.1 信号槽机制 5.3.2 线程安全的数据访问 Q…

高效数据传输:轻松上手将Kafka实时数据接入CnosDB

本篇我们将主要介绍如何在 Ubuntu 22.04.2 LTS 环境下&#xff0c;实现一个KafkaTelegrafCnosDB 同步实时获取流数据并存储的方案。在本次操作中&#xff0c;CnosDB 版本是2.3.0&#xff0c;Kafka 版本是2.5.1&#xff0c;Telegraf 版本是1.27.1 随着越来越多的应用程序架构转…

Linux驱动开发之点亮三盏小灯

头文件 #ifndef __HEAD_H__ #define __HEAD_H__//LED1和LED3的硬件地址 #define PHY_LED1_MODER 0x50006000 #define PHY_LED1_ODR 0x50006014 #define PHY_LED1_RCC 0x50000A28 //LED2的硬件地址 #define PHY_LED2_MODER 0x50007000 #define PHY_LED2_ODR 0x50007014 #define…

TiDB基础介绍、应用场景及架构

1. 什么是newsql NewSQL 是对各种新的可扩展/高性能数据库的简称&#xff0c;这类数据库不仅具有NoSQL对海量数据的存储管理能力&#xff0c;还保持了传统数据库支持ACID和SQL等特性。 NewSQL是指这样一类新式的关系型数据库管理系统&#xff0c;针对OLTP&#xff08;读-写&…