事务消息逻辑
docker部署容器,并且创建消息
dd首先我们来docker 部署rocketMQ与rocketMQDashBoard
docker ps查看rocketMQ 容器名称
docker ps
进入容器内部
docker exec -it rmqnamesrv /bin/bash
创建事务消息
MeessageType: TRANSACTION
sh mqadmin updateTopic -c DefaultCluster -t TRANSACTIONTopic -n 127.0.0.1:9876 -a +message.type=TRANSACTION
创建事务消息成功
rocktmq5.0 golang客户端支持事务消息
官方事务消息demo
https://github.com/apache/rocketmq-clients/blob/master/golang/example/producer/transaction/main.go
半事务触发时间限制
模拟半事务提交失败
func TRANSACTIONTopic() {// log to consoleos.Setenv("mq.consoleAppender.enabled", "true")golang.ResetLogger()topic := "TRANSACTIONTopic"producer, err := golang.NewProducer(&golang.Config{Endpoint: Endpoint,Credentials: &credentials.SessionCredentials{},},golang.WithTransactionChecker(&golang.TransactionChecker{Check: CheckHalf, //检查half消息}),golang.WithTopics(topic),)if err != nil {log.Fatal(err)}// start producererr = producer.Start()if err != nil {log.Fatal(err)}// graceful stop producerdefer producer.GracefulStop()msg := &golang.Message{Topic: topic, Body: []byte("this is a message TRANSACTION ")}msg.SetKeys("a", "b")msg.SetTag("ab")transaction := producer.BeginTransaction()resp, err := producer.SendWithTransaction(context.TODO(), msg, transaction)if err != nil {log.Fatal(err)}for i := 0; i < len(resp); i++ {fmt.Printf("%#v\n", resp[i])}//commit transaction message//TODO 本地事务执行成功TODOLocalTransaction()//提交事务err = transaction.Commit()if err != nil {log.Fatal(err)}time.Sleep(time.Minute * 10)
}// 检查本地事务
func CheckHalf(msg *golang.MessageView) golang.TransactionResolution {log.Printf("check transaction message: %v", msg)CheckLocalTransaction() //检查本地事务return golang.COMMIT
}
func TODOLocalTransaction() bool {return true
}
func CheckLocalTransaction() bool {return true
}
第一步:producer.SendWithTransaction 发送半事务(发送成功)
第二步:TODO 本地事务执行成功 TODOLocalTransaction(执行本地事务成功)
第三步:当我们注释掉第三步commit代码,模拟提交事务异常(模拟commit事务消息失败)
第四步:rocketmq触发CheckHalf代码
check事务状态类型
-
UNKNOWN // 暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查,超过半事务最大超时时长,消息主动回滚 COMMIT //提交事务,允许消费者消费该消息 ROLLBACK //回滚事务,消息将被丢弃不允许消费。
消费者重试
当我们不给消息返回ack,触发消息重试,golang最新版本SimpleConsumer最大不可见时间为20秒。官方文档为10秒钟。超过最大不可见时间未返回ack,触发消费者重试!
消息发生重试
在消息重试的时,在dashboard中可以看到该ConsumeGroup下面的的重试队列信息
消费者重试达到最大次数
达到最大重试16次之后,按照消费者的最大重试周期之后,将被投向死信队列,在dashboard中可以查看(如下),死信队列概念:RocketMQ5.0死信队列-CSDN博客,超过16次之后,死信队列发生重试
rocketMQ5.0定时/延迟消息实战-CSDN博客