点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka (正在更新…)
章节内容
上节我们完成了:
- topics.sh、producer.sh、consumer.sh 脚本的基本使用
- pom.xml 配置
- JavaAPI的使用:producer 和 consumer
架构图
上节已经出现过了,这里再放一次
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>springboot-kafka</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
配置文件
我们常见的配置文件如下图:
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializertemplate:default-topic: my-topic
Producer
编写代码
编写了一个KafkaProducerController
里边写了两个方法,都是使用了 KafkaTemplate 的工具。
@RestController
public class KafkaProducerController {@Resourceprivate KafkaTemplate<Integer, String> kafkaTemplate;@RequestMapping("/sendSync/{message}")public String sendSync(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 1, message);ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);try {SendResult<Integer, String> result = future.get();System.out.println(result.getProducerRecord().key() + "->" +result.getProducerRecord().partition() + "->" +result.getProducerRecord().timestamp());} catch (Exception e) {e.printStackTrace();}return "Success";}@RequestMapping("/sendAsync/{message}")public String sendAsync(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 2, message);ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败!");ex.printStackTrace();}@Overridepublic void onSuccess(SendResult<Integer, String> result) {System.out.println("发送成功");System.out.println(result.getProducerRecord().key() + "->" +result.getProducerRecord().partition() + "->" +result.getProducerRecord().timestamp());}});return "Success";}}
测试结果
http://localhost:8085/sendSync/wzktest1
http://localhost:8085/sendAsync/wzktest2
http://localhost:8085/sendAsync/wzktest222222
我们观察控制台的效果如下:
Consumer
编写代码
编一个类来实现Consumer:
@Configuration
public class KafkaConsumer {@KafkaListener(topics = {"wzk_topic_test"})public void consume(ConsumerRecord<Integer, String> consumerRecord) {System.out.println(consumerRecord.topic() + "\t"+ consumerRecord.partition() + "\t"+ consumerRecord.offset() + "\t"+ consumerRecord.key() + "\t"+ consumerRecord.value());}}
测试运行
2024-07-12 13:48:46.831 INFO 15352 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=13, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=h121.wzk.icu:9092 (id: 0 rack: null), epoch=0}}
2024-07-12 13:48:46.926 INFO 15352 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : wzk-test: partitions assigned: [wzk_topic_test-0]
wzk_topic_test 0 13 1 wzktest
wzk_topic_test 0 14 2 wzktest222
wzk_topic_test 0 15 2 wzktest222222
控制台的截图如下: