SpringBoot集成Kafka
- 一、前言
- 二、项目
- 1. pom
- 2. application.properties
- 4. 消息生产者-测试
- 5. 消息消费者
- 三、启动测试
- 四、有总结的不对的地方/或者问题 请指正, 我在努力中
一、前言
该文章中主要对SpringBoot 集成Kafka 主要是 application.properties 与 pom坐标就算集成完成,剩下的就是一些 消费者 / 发布者的操作了详细的请看项目代码。简简单单 快速集成
二、项目
1. pom
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2. application.properties
spring.kafka.bootstrap-servers=192.168.0.113:9092
# 生产者配置
spring.kafka.producer.retries=2
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=-1# 消费者配置
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval.ms=1000
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.max-poll-records=500
spring.kafka.consumer.fetch-max-wait.ms=500
spring.kafka.consumer.fetch-min-size=1
spring.kafka.consumer.heartbeat-interval.ms=3000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
4. 消息生产者-测试
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ZoheVideoStructApplication.class)
public class ZoheVideoStructApplicationTest {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@Testpublic void test(){kafkaTemplate.send("sb_topic","123132132132132132123132132");}
}
5. 消息消费者
@Component
public class KafkaConsumer {//监听消费@KafkaListener(topics = {"sb_topic"},groupId = "test")public void onNormalMessage(ConsumerRecord<String, Object> record) {System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "="+ record.value());}//监听消费@KafkaListener(topics = {"sb_topic"},groupId = "test2")public void onNormalMessage2(ConsumerRecord<String, Object> record) {System.out.println("简单消费2:" + record.topic() + "-" + record.partition() + "=" + record.value());}}