SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以
用于 SpringBoot 的消费者。
1)在 IDEA 中安装 lombok 插件
在 Plugins 下搜索 lombok 然后在线安装即可,安装后注意重启
2)SpringBoot 环境准备
(1)创建一个 Spring Initializr
注意:有时候 SpringBoot 官方脚手架不稳定,我们切换国内地址 https://start.aliyun.com
(2)项目名称 springboot
(3)添加项目依赖
(4)检查自动生成的配置文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.1</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.atguigu</groupId><artifactId>springboot</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>
3.1 SpringBoot 生产者
(1)修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息
# 应用名称
spring.application.name=atguigu_springboot_kafka
# 指定 kafka 的地址
spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#指定 key 和 value 的序列化器
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
(2)创建 controller 从浏览器接收数据, 并写入指定的 topic
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {// Kafka 模板用来向 kafka 发送数据@AutowiredKafkaTemplate<String, String> kafka;@RequestMapping("/prince")public String data(String msg) {kafka.send("first", msg);return "ok";}
}
3.2 SpringBoot 消费者
(1)修改 SpringBoot 核心配置文件 application.propeties
# =========消费者配置开始=========
# 指定 kafka 的地址
spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 指定 key 和 value 的反序列化器
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserial
izer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserial
izer
#指定消费者组的 group_id
spring.kafka.consumer.group-id=prince
# =========消费者配置结束=========
(2)创建类消费 Kafka 中指定 topic 的数据
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class KafkaConsumer {// 指定要监听的 topic@KafkaListener(topics = "first")public void consumeTopic(String msg) { // 参数: 收到的 valueSystem.out.println("收到的信息: " + msg);}
}
(3)向 first 主题发送数据
[hadoop102 kafka]$ bin/kafka-console-producer.sh --
bootstrap-server hadoop102:9092 --topic first
>