安装kafka,创建 topic:
Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851
Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353
添加依赖包:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.10.RELEASE</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency>
kafka代码示例(一):
主要按照以下步骤:
-
设置 broker服务器的ip和端口, 设置 消费者群组id
-
初始化消费者
-
消费者订阅主题
-
消费者批量拉取消息
public class KafkaDemo1 {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {consumerRecord();}public static void consumerRecord() {//属性配置Properties properties = getProperties(BROKER_LIST, GROUP_ID);//消费者初始化KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//消息者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));//循环while (true) {//每次拉取 1千条消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("=============> 消费kafka消息:"+ record.value());}}}public static Properties getProperties(String brokerList, String groupId) {Properties properties = new Properties();//序列化properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//broker服务器的ip和端口,多个用逗号隔开properties.put("bootstrap.servers", brokerList);//消费者群组idproperties.put("group.id", groupId);return properties;}}
使用文章开头安装好的 kafka,并按文章中的步骤,创建 topic ,打开一个 生产者 producer,并发送消息。
观察idea 控制台,可以看到 成功消费了消息:
=============> 消费kafka消息:hello kafka
参考资料:
《深入理解kafka 核心设计与实践原理》