1、下载kafka的jar包文件
https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.12-3.1.0.tgz
2、下载完成直接操作命令启动
1、打开新的terminal(终端)窗口,进入kafka的bin目录 启动zk./zookeeper-server-start.sh ../config/zookeeper.properties2、新开命令行启动kafka,进入bin目录:./kafka-server-start.sh ../config/server.properties &
3、测试
1、进入bin目录 ,创建主题./kafka-topics.sh —create —bootstrap-server localhost:9092 —replication-factor 1 —partition 1 —topic kafkaTest 2、启动生产者./kafka-console-producer.sh --broker-list localhost:9092 --topic kafkaTest 3、启动消费者./kafka-console-consumer.sh --bootstrap-server qf01:9092 --topic test --from-beginning
4、展示
生产者:
发送kafka的测试,消费者直接消费
5、java操作kafka完成消费
需要启动上面的操作 启动kafka 建立主题,启动zk
首先下载一个可视化软件
mac版本
https://www.kafkatool.com/download.html
5.1 导入依赖
<!-- kafka客户端工具 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><!-- 工具类 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-io</artifactId><version>1.3.2</version></dependency>
5.2 开发生产者
调用send发送1-100消息到指定Topic test
public class KafkaProducerTest {public static void main(String[] args) {// 1. 创建用于连接Kafka的Properties配置Properties props = new Properties();props.put("bootstrap.servers", "server1:9092");props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建一个生产者对象KafkaProducerKafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);// 3. 调用send发送1-100消息到指定Topic testfor (int i = 0; i < 100; ++i) {try {// 获取返回值Future,该对象封装了返回值Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));// 调用一个Future.get()方法等待响应future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}// 5. 关闭生产者producer.close();}
}
5.3 示例
5.4 开发消费者
public class KafkaConsumerTest {/*** @param args* @throws InterruptedException */public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.setProperty("bootstrap.servers", "localhost:9092");props.setProperty("group.id", "test");// 自动提交offsetprops.setProperty("enable.auto.commit", "true");// 自动提交offset的时间间隔props.setProperty("auto.commit.interval.ms", "1000");// 拉取的key、value数据的props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 2.创建Kafka消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);// 3. 订阅要消费的主题// 指定消费者从哪个topic中拉取数据kafkaConsumer.subscribe(Arrays.asList("test"));// 4.使用一个while循环,不断从Kafka的topic中拉取消息while (true) {// Kafka的消费者一次拉取一批的数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(5);// 5.将将记录(record)的offset、key、value都打印出来for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {// 主题String topic = consumerRecord.topic();// offset:这条消息处于Kafka分区中的哪个位置long offset = consumerRecord.offset();// key\valueString key = consumerRecord.key();String value = consumerRecord.value();System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);}Thread.sleep(1000);}}}