kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客
1、springboot中引入kafka依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>
</dependencies>
2、配置application.yml
server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
传递String类型的消息
3、controller实现消息发送接口
package com.heima.kafkademo.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class HelloController {@AutowiredKafkaTemplate<String,String> kafkaTemplate;@GetMapping("/hello")public String addMessage(){kafkaTemplate.send("lakers-topic","湖人总冠军!");return "ok";}
}
4、component中实现接收类HelloListener
package com.heima.kafkademo.component;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class HelloListener {@KafkaListener(topics = "lakers-topic")public void onMessage(String msg){System.out.println(msg);}}
5、测试
浏览器访问该接口并查看控制台
接收成功
传递对象类型的消息
思路:在传递消息时,将对象转为json字符串,在接收时再解析
1、controller实现发送
package com.heima.kafkademo.controller;import com.alibaba.fastjson.JSON;
import com.heima.kafkademo.model.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class HelloController {@AutowiredKafkaTemplate<String,String> kafkaTemplate;@GetMapping("/hello")public String addMessage(){User user = new User();user.setName("勒布朗");user.setAge(37);user.setGender("男");user.setJob("NBA球员");kafkaTemplate.send("lakers-topic", JSON.toJSONString(user));return "ok";}
}
2、component实现接收类
package com.heima.kafkademo.component;import com.alibaba.fastjson.JSON;
import com.heima.kafkademo.model.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class HelloListener {@KafkaListener(topics = "lakers-topic")public void onMessage(String msg){System.out.println(JSON.parseObject(msg, User.class));}}
3、打印测试结果