Go操作各大消息队列教程(RabbitMQ、Kafka)

Go操作各大消息队列教程

1 RabbitMQ

1.1 概念

①基本名词

当前市面上mq的产品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐献给Apache的RocketMQ。甚至连redis这种NoSQL都支持MQ的功能。

在这里插入图片描述

  1. Broker:表示消息队列服务实体
  2. Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
    • AMQP(Advanced Message Queuing Protocol)高级消息队列协议
  3. Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  4. Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

②常见模式

1. simple简单模式

在这里插入图片描述

消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)

2. worker工作模式

在这里插入图片描述

多个消费者从一个队列中争抢消息

  • (隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)
  • 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)
3. publish/subscribe发布订阅(共享资源)

在这里插入图片描述

消费者订阅消息,然后从订阅的队列中获取消息进行消费。

  • X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
  • 相关场景:邮件群发,群聊天,广播(广告)
4. routing路由模式

在这里插入图片描述

  • 交换机根据路由规则,将消息路由到不同的队列中
  • 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
5. topic主题模式(路由模式的一种)

在这里插入图片描述

  • 星号井号代表通配符
  • 星号代表多个单词,井号代表一个单词
  • 路由功能添加模糊匹配
  • 消息产生者产生消息,把消息交给交换机
  • 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

1.2 搭建(docker方式)

①拉取镜像

# 拉取镜像
docker pull rabbitmq:3.7-management

②创建并启动容器

# 创建并运行容器
docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7-management
#5672是项目中连接rabbitmq的端口(我这里映射的是5672),15672是rabbitmq的web管理界面端口(我映射为15672)# 输入网址http://ip:15672即可进入rabbitmq的web管理页面,账户密码:guest / guest

③web界面创建用户和virtual host

在这里插入图片描述

下面为了我们后续的操作,首先我们新建一个Virtual Host并且给他分配一个用户名,用来隔离数据,根据自己需要自行创建

  1. 新增virtual host
    在这里插入图片描述
  2. 新增用户
    在这里插入图片描述
  3. 点击新建好的用户,设置其host
    在这里插入图片描述
    在这里插入图片描述
  4. 最终效果
    在这里插入图片描述

1.3 代码操作

①RabbitMQ struct:包含创建、消费、生产消息

package RabbitMQimport ("fmt""github.com/streadway/amqp""log"
)//amqp:// 账号 密码@地址:端口号/vhost
const MQURL = "amqp://ziyi:ziyi@10.253.50.145:5672/ziyi"type RabbitMQ struct {//连接conn *amqp.Connection//管道channel *amqp.Channel//队列名称QueueName string//交换机Exchange string//key Simple模式 几乎用不到Key string//连接信息Mqurl string
}//创建RabbitMQ结构体实例
func NewRabbitMQ(queuename string, exchange string, key string) *RabbitMQ {rabbitmq := &RabbitMQ{QueueName: queuename, Exchange: exchange, Key: key, Mqurl: MQURL}var err error//创建rabbitmq连接rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err, "创建连接错误!")rabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "获取channel失败")return rabbitmq
}//断开channel和connection
func (r *RabbitMQ) Destory() {r.channel.Close()r.conn.Close()
}//错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {if err != nil {log.Fatalf("%s:%s", message, err)panic(fmt.Sprintf("%s:%s", message, err))}
}//简单模式step:1。创建简单模式下RabbitMQ实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {return NewRabbitMQ(queueName, "", "")
}//订阅模式创建rabbitmq实例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {//创建rabbitmq实例rabbitmq := NewRabbitMQ("", exchangeName, "")var err error//获取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err, "failed to connecct rabbitmq!")//获取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel!")return rabbitmq
}//订阅模式生成
func (r *RabbitMQ) PublishPub(message string) {//尝试创建交换机,不存在创建err := r.channel.ExchangeDeclare(//交换机名称r.Exchange,//交换机类型 广播类型"fanout",//是否持久化true,//是否字段删除false,//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定false,//是否阻塞 true表示要等待服务器的响应false,nil,)r.failOnErr(err, "failed to declare an excha"+"nge")//2 发送消息err = r.channel.Publish(r.Exchange,"",false,false,amqp.Publishing{//类型ContentType: "text/plain",//消息Body: []byte(message),})
}//订阅模式消费端代码
func (r *RabbitMQ) RecieveSub() {//尝试创建交换机,不存在创建err := r.channel.ExchangeDeclare(//交换机名称r.Exchange,//交换机类型 广播类型"fanout",//是否持久化true,//是否字段删除false,//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定false,//是否阻塞 true表示要等待服务器的响应false,nil,)r.failOnErr(err, "failed to declare an excha"+"nge")//2试探性创建队列,创建队列q, err := r.channel.QueueDeclare("", //随机生产队列名称false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//绑定队列到exchange中err = r.channel.QueueBind(q.Name,//在pub/sub模式下,这里的key要为空"",r.Exchange,false,nil,)//消费消息message, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range message {log.Printf("Received a message:%s,", d.Body)}}()fmt.Println("退出请按 Ctrl+C")<-forever
}//话题模式 创建RabbitMQ实例
func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ {//创建rabbitmq实例rabbitmq := NewRabbitMQ("", exchagne, routingKey)var err errorrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err, "failed     to connect rabbingmq!")rabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq
}//话题模式发送信息
func (r *RabbitMQ) PublishTopic(message string) {//尝试创建交换机,不存在创建err := r.channel.ExchangeDeclare(//交换机名称r.Exchange,//交换机类型 话题模式"topic",//是否持久化true,//是否字段删除false,//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定false,//是否阻塞 true表示要等待服务器的响应false,nil,)r.failOnErr(err, "topic failed to declare an excha"+"nge")//2发送信息err = r.channel.Publish(r.Exchange,//要设置r.Key,false,false,amqp.Publishing{//类型ContentType: "text/plain",//消息Body: []byte(message),})
}//话题模式接收信息
//要注意key
//其中* 用于匹配一个单词,#用于匹配多个单词(可以是零个)
//匹配 表示匹配imooc.* 表示匹配imooc.hello,但是imooc.hello.one需要用imooc.#才能匹配到
func (r *RabbitMQ) RecieveTopic() {//尝试创建交换机,不存在创建err := r.channel.ExchangeDeclare(//交换机名称r.Exchange,//交换机类型 话题模式"topic",//是否持久化true,//是否字段删除false,//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定false,//是否阻塞 true表示要等待服务器的响应false,nil,)r.failOnErr(err, "failed to declare an exchange")//2试探性创建队列,创建队列q, err := r.channel.QueueDeclare("", //随机生产队列名称false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//绑定队列到exchange中err = r.channel.QueueBind(q.Name,//在pub/sub模式下,这里的key要为空r.Key,r.Exchange,false,nil,)//消费消息message, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range message {log.Printf("Received a message:%s,", d.Body)}}()fmt.Println("退出请按 Ctrl+C")<-forever
}//路由模式 创建RabbitMQ实例
func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ {//创建rabbitmq实例rabbitmq := NewRabbitMQ("", exchagne, routingKey)var err errorrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err, "failed     to connect rabbingmq!")rabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq
}//路由模式发送信息
func (r *RabbitMQ) PublishRouting(message string) {//尝试创建交换机,不存在创建err := r.channel.ExchangeDeclare(//交换机名称r.Exchange,//交换机类型 广播类型"direct",//是否持久化true,//是否字段删除false,//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定false,//是否阻塞 true表示要等待服务器的响应false,nil,)r.failOnErr(err, "failed to declare an excha"+"nge")//发送信息err = r.channel.Publish(r.Exchange,//要设置r.Key,false,false,amqp.Publishing{//类型ContentType: "text/plain",//消息Body: []byte(message),})
}//路由模式接收信息
func (r *RabbitMQ) RecieveRouting() {//尝试创建交换机,不存在创建err := r.channel.ExchangeDeclare(//交换机名称r.Exchange,//交换机类型 广播类型"direct",//是否持久化true,//是否字段删除false,//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定false,//是否阻塞 true表示要等待服务器的响应false,nil,)r.failOnErr(err, "failed to declare an excha"+"nge")//2试探性创建队列,创建队列q, err := r.channel.QueueDeclare("", //随机生产队列名称false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//绑定队列到exchange中err = r.channel.QueueBind(q.Name,//在pub/sub模式下,这里的key要为空r.Key,r.Exchange,false,nil,)//消费消息message, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range message {log.Printf("Received a message:%s,", d.Body)}}()fmt.Println("退出请按 Ctrl+C")<-forever
}//简单模式Step:2、简单模式下生产代码
func (r *RabbitMQ) PublishSimple(message string) {//1、申请队列,如果队列存在就跳过,不存在创建//优点:保证队列存在,消息能发送到队列中_, err := r.channel.QueueDeclare(//队列名称r.QueueName,//是否持久化false,//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除false,//是否具有排他性 true表示自己可见 其他用户不能访问false,//是否阻塞 true表示要等待服务器的响应false,//额外数据nil,)if err != nil {fmt.Println(err)}//2.发送消息到队列中r.channel.Publish(//默认的Exchange交换机是default,类型是direct直接类型r.Exchange,//要赋值的队列名称r.QueueName,//如果为true,根据exchange类型和routkey规则,如果无法找到符合条件的队列那么会把发送的消息返回给发送者false,//如果为true,当exchange发送消息到队列后发现队列上没有绑定消费者,则会把消息还给发送者false,//消息amqp.Publishing{//类型ContentType: "text/plain",//消息Body: []byte(message),})
}func (r *RabbitMQ) ConsumeSimple() {//1、申请队列,如果队列存在就跳过,不存在创建//优点:保证队列存在,消息能发送到队列中_, err := r.channel.QueueDeclare(//队列名称r.QueueName,//是否持久化false,//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除false,//是否具有排他性false,//是否阻塞false,//额外数据nil,)if err != nil {fmt.Println(err)}//接收消息msgs, err := r.channel.Consume(r.QueueName,//用来区分多个消费者"",//是否自动应答true,//是否具有排他性false,//如果设置为true,表示不能同一个connection中发送的消息传递给这个connection中的消费者false,//队列是否阻塞false,nil,)if err != nil {fmt.Println(err)}forever := make(chan bool)//启用协程处理go func() {for d := range msgs {//实现我们要处理的逻辑函数log.Printf("Received a message:%s", d.Body)//fmt.Println(d.Body)}}()log.Printf("【*】warting for messages, To exit press CCTRAL+C")<-forever
}func (r *RabbitMQ) ConsumeWorker(consumerName string) {//1、申请队列,如果队列存在就跳过,不存在创建//优点:保证队列存在,消息能发送到队列中_, err := r.channel.QueueDeclare(//队列名称r.QueueName,//是否持久化false,//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除false,//是否具有排他性false,//是否阻塞false,//额外数据nil,)if err != nil {fmt.Println(err)}//接收消息msgs, err := r.channel.Consume(r.QueueName,//用来区分多个消费者consumerName,//是否自动应答true,//是否具有排他性false,//如果设置为true,表示不能同一个connection中发送的消息传递给这个connection中的消费者false,//队列是否阻塞false,nil,)if err != nil {fmt.Println(err)}forever := make(chan bool)//启用协程处理go func() {for d := range msgs {//实现我们要处理的逻辑函数log.Printf("%s Received a message:%s", consumerName, d.Body)//fmt.Println(d.Body)}}()log.Printf("【*】warting for messages, To exit press CCTRAL+C")<-forever
}

②测试代码

1. simple简单模式

consumer.go

func main() {//消费者rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")rabbitmq.ConsumeSimple()
}

producer.go

func main() {//Simple模式 生产者rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")for i := 0; i < 5; i++ {time.Sleep(time.Second * 2)rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))}
}
2. worker模式

consumer.go

func main() {/*worker模式无非就是多个消费者去同一个队列中消费消息*///消费者1rabbitmq1 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")go rabbitmq1.ConsumeWorker("consumer1")//消费者2rabbitmq2 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")rabbitmq2.ConsumeWorker("consumer2")
}

producer.go

func main() {//Worker模式 生产者rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiWorker")for i := 0; i < 100; i++ {//time.Sleep(time.Second * 2)rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))}
}
3. publish/subscribe模式

consumer.go:

func main() {//消费者rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")rabbitmq.RecieveSub()
}

producer.go

func main() {//订阅模式发送者rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")for i := 0; i <= 20; i++ {rabbitmq.PublishPub("订阅模式生产第" + strconv.Itoa(i) + "条数据")fmt.Println(i)time.Sleep(1 * time.Second)}
}
4. router模式

consumer.go

func main() {//消费者rabbitmq := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")rabbitmq.RecieveRouting()
}

producer.go

func main() {//路由模式生产者imoocOne := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")imoocTwo := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_two")for i := 0; i <= 10; i++ {imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i))imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i))time.Sleep(1 * time.Second)fmt.Println(i)}
}
5. topic模式

consumer.go

func main() {/*星号井号代表通配符星号代表多个单词,井号代表一个单词路由功能添加模糊匹配消息产生者产生消息,把消息交给交换机交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费*///Topic消费者//rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#") //匹配所有的key:topic88和topic99rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") //只匹配topic88的rabbitmq.RecieveTopic()
}

producer.go

func main() {//Topic模式生产者imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three")imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic99.four")for i := 0; i <= 10; i++ {imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i))imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i))time.Sleep(1 * time.Second)fmt.Println(i)}
}

2 Kafka

2.1 基本概念

在这里插入图片描述
Kafka是分布式的,其所有的构件borker(server服务端集群)、producer(消息生产)、consumer(消息消费者)都可以是分布式的。
producer给broker发送数据,这些消息会存到kafka server里,然后consumer再向kafka server发起请求去消费这些数据。
kafka server在这个过程中像是一个帮你保管数据的中间商。所以kafka服务器也可以叫做broker(broker直接翻译可以是中间人或者经纪人的意思)。

在消息的生产时可以使用一个标识topic来区分,且可以进行分区;每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。
同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡

参考:https://blog.csdn.net/lingfy1234/article/details/122900348

  • 应用场景
    • 监控
    • 消息队列
    • 流处理
    • 日志聚合
    • 持久性日志
  • 基础概念
    • topic:话题
    • broker:kafka服务集群,已发布的消息保存在一组服务器中,称之为kafka集群。集群中的每一个服务器都是一个代理(broker)
    • partition:分区,topic物理上的分组
    • message:消息,每个producer可以向一个topic主题发布一些消息

在这里插入图片描述
1.⽣产者从Kafka集群获取分区leader信息
2.⽣产者将消息发送给leader
3.leader将消息写入本地磁盘
4.follower从leader拉取消息数据
5.follower将消息写入本地磁盘后向leader发送ACK
6.leader收到所有的follower的ACK之后向生产者发送ACK

2.2 常见模式

①点对点模式:火车站出租车抢客

发送者将消息发送到消息队列中,消费者去消费,如果消费者有多个,他们会竞争地消费,也就是说对于某一条消息,只有一个消费者能“抢“到它。类似于火车站门口的出租车抢客的场景。

在这里插入图片描述

②发布订阅模式:组间无竞争,组内有竞争

消费者订阅对应的topic(主题),只有订阅了对应topic消费者的才会接收到消息。

例如:

  • 牛奶有很多种,光明牛奶,希望牛奶等,只有你订阅了光明牛奶,送奶工才会把光明牛奶送到对应位置,你也才会有机会消费这个牛奶

注意:为了提高消费者的消费能力,kafka中引入了消费者组的概念。相当于是:不同消费者组之间因为订阅的topic不同,不会有竞争关系。但是消费者组内是有竞争关系。

例如:

  • 成都、厦门的出租车司机分别组成各自的消费者组。
  • 成都的出租车司机只拉成都的人,厦门的只拉厦门的人。(因此他们两个消费者组不是竞争关系)
  • 成都市内的出租车司机之间是竞争关系。(消费者组内是竞争关系)

2.3 docker-compose部署

 vim docker-compose.yml
version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:6.2.0ports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:6.2.0ports:- "9092:9092"environment:KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181#KAFKA_ADVERTISED_LISTENERS后面改为自己本地宿主机的ip,例如我本地mac的ip为192.168.0.101KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1depends_on:- zookeeper
# 进入到docker-compose.yml所在目录,执行下面命令
docker-compose up -d
# 查看部署结果,状态为up表明部署成功
docker-compose ps 

在这里插入图片描述

2.4 代码操作

# 1. 创建对应topic
docker-compose exec kafka kafka-topics --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092# 2. 查看topic列表
docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181

在这里插入图片描述

①producer.go

package mainimport ("fmt""github.com/IBM/sarama"
)// 基于sarama第三方库开发的kafka clientfunc main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partitionconfig.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回// 构造一个消息msg := &sarama.ProducerMessage{}msg.Topic = "web_log"msg.Value = sarama.StringEncoder("this is a test log")// 连接kafkaclient, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)if err != nil {fmt.Println("producer closed, err:", err)return}defer client.Close()// 发送消息pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send msg failed, err:", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

②consumer.go

package mainimport ("fmt""github.com/IBM/sarama"
)// kafka consumerfunc main() {consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)if err != nil {fmt.Printf("fail to start consumer, err:%v\n", err)return}partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区if err != nil {fmt.Printf("fail to get list of partition:err%v\n", err)return}fmt.Println(partitionList)for partition := range partitionList { // 遍历所有的分区// 针对每个分区创建一个对应的分区消费者pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)return}defer pc.AsyncClose()// 异步从每个分区消费信息go func(sarama.PartitionConsumer) {for msg := range pc.Messages() {fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value))}}(pc)}//演示时使用select {}
}

③运行效果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/116023.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于购买AirPods,现在是否为最佳时机?

我们不需要解释你为什么想要AirPods。苹果对真正的无线耳机的采用彻底改变了市场&#xff0c;并从那时起大量销售。你总是在记者、同事和名人的耳朵里看到它们——尤其是在我们这个远程工作和Zoom会议的时代。 真正的问题是&#xff0c;你应该现在就买一个&#xff0c;还是在几…

MySQL索引,事务和存储引擎

一、索引 1、索引的概念 ●索引是一个排序的列表&#xff0c;在这个列表中存储着索引的值和包含这个值的数据所在行的物理地址&#xff08;类似于C语言的链表通过指针指向数据记录的内存地址&#xff09;。 ●使用索引后可以不用扫描全表来定位某行的数据&#xff0c;而是先…

博流RISC-V芯片Eclipse环境搭建

文章目录 1、下载 Eclipse2、导入 bouffalo_sdk3、编译4、烧录5、使用ninja编译 之前编译是通过 VSCode 编译&#xff0c;通过手工输入 make 命令编译&#xff0c;我们也可以通过 Eclipse 可视化 IDE 来编译、烧录。 1、下载 Eclipse 至 Eclipse 官网 https://www.eclipse.org…

IntelliJ IDEA 2023.2.1 Android开发变化

IntelliJ IDEA 2023.2.1之前的版本&#xff0c;Empty Activity是指Empty View Activity&#xff0c;而现在Empty Activity是指Empty Compose Activity&#xff0c;另外多了一个Empty View Activity的选项 这表明官方推荐使用Compose这种声明式的编程方式来描述UI&#xff0c;命…

springboot上线打包+vuecli2部署在linux服务器上(打包上线)

这里也是记录一下springboot的上线打包流程,我这里前端使用的是vuecli2 springboot的依赖是2.7.9的版本 前端是使用的vue2 打包前,你的linux上必须要先安装,tomcat\java\nginx springboot打包 springboot打包点击一下,等maven编译打包成功在target文件下找到,jar包, 然后,把j…

C语言:指针的运算

一、指针 或 - 整数 指针 或 - 整数表示指针跳过几个字节&#xff08;具体跳过几个字节由指针类型决定&#xff09; 本文不做具体讲解&#xff0c;详解跳转链接&#xff1a; 《C语言&#xff1a;指针类型的意义》 二、指针 - 指针 前提条件&#xff1a;指针类型相同并且指向同…

[学习笔记]斜率优化dp 总结

前言&#xff1a; 我们学过不少优化类的算法了&#xff0c;大部分都是基于凸函数的性质给出的优化&#xff0c;比如Slope Trick&#xff0c;Wqs二分&#xff0c;又比如今天的斜率优化&#xff08;不知道什么时候会有空把Slope Trick写掉&#xff09; 正文&#xff1a; 我们考…

应用案例 | 基于三维机器视觉的机器人麻袋拆垛应用解决方案

​Part.1 项目背景 在现代物流和制造行业中&#xff0c;麻袋的拆垛操作是一个重要且频繁的任务。传统的麻袋拆垛工作通常由人工完成&#xff0c;分拣效率较低&#xff0c;人力成本较高&#xff0c;现场麻袋堆叠、变形严重&#xff0c;垛型不规则、不固定&#xff0c;严重影响分…

拓世科技集团 | “书剑人生”李步云学术思想研讨会暨李步云先生九十华诞志庆

2023年&#xff0c;中国改革开放迎来了45周年&#xff0c;改革春风浩荡&#xff0c;席卷神州大地&#xff0c;45年间&#xff0c;中国特色社会主义伟大事业大步迈入崭新境界&#xff0c;一路上结出了饶为丰硕的果实。中华民族在这45年间的砥砺前行&#xff0c;不仅使中国的经济…

解决uniapp下拉框 内容被覆盖的问题

1. 下拉框 内容被覆盖的问题 场景: 现在是下拉框被表格覆盖了 解决办法: 在表格上添加css 样式来解决这个问题 .add-table{display: static;overflow: visible; } display: static: 将元素会按照默认的布局方式进行显示&#xff0c;不会分为块状或行内元素。 overflow: vi…

【用unity实现100个游戏之7】从零开始制作一个仿杀戮尖塔卡牌回合制游戏

文章目录 前言素材资源开始一、UI框架二、挂载脚本三、事件监听&#xff0c;用于绑定按钮事件四、声音管理器五、excel转txt文本六、游戏配置七、用户信息表八、战斗管理器九、 敌人管理器十、玩家血量、能量、防御值、卡牌数十一、敌人血量 行动显示逻辑十二、UI提示效果实现十…

PHP旅游管理系统Dreamweaver开发mysql数据库web结构php编程计算机网页

一、源码特点 PHP 旅游管理系统是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 PHP 旅游管理系统 源码下载地址&#xff1a; https://download.csdn.net/download/qq_41…

Docker切换文件系统为VFS

一、介绍 Docker支持AUFS、Btrfs、Device Mapper、OverlayFS、VFS、ZFS六种不同的存储驱动。 1. AUFS AUFS是一种常见的存储驱动程序&#xff0c;它也使用了Linux内核的AUFS文件系统。它的优点是支持所有的Linux发行版&#xff0c;可以在不同的容器之间共享文件系统&#xf…

JVM 对象的内存布局

对象头 Mark word 标记字段 用于存储对象自身的运行时数据&#xff0c;如哈希码&#xff08;HashCode&#xff09;、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID、偏向时间戳等 ClassPoint 类型指针 对象指向它的类型元数据的指针&#xff0c;Java虚拟机通过这个指针 来…

【STM32】学习笔记(TIM定时器)

TIM&#xff08;Timer&#xff09;定时器 定时器可以对输入的时钟进行计数&#xff0c;并在计数值达到设定值时触发中断 16位计数器、预分频器、自动重装寄存器的时基单元&#xff0c;在72MHz计数时钟下可以实现最大59.65s的定时 不仅具备基本的定时中断功能&#xff0c;而且…

【MySQL】基础知识(二)

MySQL基础知识(二) 文章目录 MySQL基础知识(二)01 表操作1.1 创建表1.2 查看所有表1.3 查看指定表的结构1.4 删除表练习 02 CURD2.1 新增2.1.1 指定列插入2.1.2 datetime类型插入 2.2 查询2.2.1 全列查询2.2.2 指定列查询2.2.3 查询字段为表达式2.2.4 别名查询2.2.5 去重2.2.6 …

面试被打脸,数据结构底层都不知道么--回去等通知吧

数据结构之常见的8种数据结构&#xff1a; -数组Array -链表 Linked List -堆 heap -栈 stack -队列 Queue -树 Tree -散列表 Hash -图 Graph 数据结构-链表篇 Linklist定义&#xff1a; -是一种线性表&#xff0c;并不会按线性的顺序存储数据&#xff0c;即逻辑上相邻…

基于硬件隔离增强risc-v调试安全2_安全提议

安全之安全(security)博客目录导读 2023 RISC-V中国峰会 安全相关议题汇总 说明&#xff1a;本文参考RISC-V 2023中国峰会如下议题&#xff0c;版权归原作者所有。

pdf怎么编辑文字?了解一下这几种编辑方法

pdf怎么编辑文字&#xff1f;PDF文件的普及使得它成为了一个重要的文件格式。然而&#xff0c;由于PDF文件的特性&#xff0c;它们不可直接编辑&#xff0c;这就使得PDF文件的修改变得比较麻烦。但是&#xff0c;不用担心&#xff0c;接下来这篇文章就给大家介绍几种编辑pdf文字…

SpringCloudAlibaba Gateway(二)详解-内置Predicate、Filter及自定义Predicate、Filter

Predicate(断言) ​ Predicate(断言)&#xff0c;用于进行判断&#xff0c;如果返回为真&#xff0c;才会路由到具体服务。SpirnngCloudGateway由路由断言工厂实现&#xff0c;直接配置即生效&#xff0c;当然也支持自定义路由断言工厂。 内置路由断言工厂实现 ​ SpringClo…