配置
创建网桥
docker network create app-tier --driver bridge
拉取并启动镜像
docker run -d --name kafka-server --hostname kafka-server \--network app-tier \-p 9092:9092 \-e ALLOW_PLAINTEXT_LISTENER=yes \-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.66.1:9092 \-e KAFKA_CFG_NODE_ID=0 \-e KAFKA_CFG_PROCESS_ROLES=controller,broker \-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \bitnami/kafka:latest
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.66.1
:9092,高亮位置为自己的服务器ip
创建一个first分区
docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --topic first --create --bootstrap-server kafka-server:9092
查看一下分区
docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --list --bootstrap-server kafka-server:9092
Go生产与消费kafka中的消息
package mainimport ("context""fmt""github.com/segmentio/kafka-go""log""os""os/signal"
)func prod() {// 设置 Kafka 代理地址brokerList := []string{"192.168.66.1:9092"}// 创建一个 Kafka 生产者producer := kafka.NewWriter(kafka.WriterConfig{Brokers: brokerList,Topic: "first",Balancer: &kafka.LeastBytes{},})// 待发送的消息message := kafka.Message{Key: []byte("key"),Value: []byte("Hello, Kafka!"),}// 发送消息err := producer.WriteMessages(context.Background(), message)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭 Kafka 生产者err = producer.Close()if err != nil {log.Fatal("failed to close writer:", err)}fmt.Println("Message sent successfully!")
}func main() {go prod()// 设置 Kafka broker 地址和主题名称brokerAddress := "192.168.66.1:9092"topic := "first"// 创建 Kafka 连接conn, err := kafka.DialLeader(context.Background(), "tcp", brokerAddress, topic, 0)if err != nil {log.Fatalf("Failed to connect to Kafka broker: %s", err)}defer conn.Close()// 设置消费者起始偏移量为最新//conn.ResetOffsets()// 创建消费者consumer := kafka.NewReader(kafka.ReaderConfig{Brokers: []string{brokerAddress},Topic: topic,Partition: 0,MinBytes: 10e3, // 最小字节数MaxBytes: 10e6, // 最大字节数})// 创建一个信号通道,用于捕获中断信号signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)// 开始消费消息for {select {case <-signals:log.Println("Received interrupt signal, shutting down...")returndefault:// 从 Kafka 获取一条消息msg, err := consumer.ReadMessage(context.Background())if err != nil {log.Printf("Failed to read message: %s", err)continue}// 处理消息fmt.Printf("Received message: %s\n", string(msg.Value))}}
}
上图
Reference
https://hub.docker.com/r/bitnami/kafka