生产者消息类型:
延迟队列的生产者
package mainimport ("context""fmt""github.com/apache/rocketmq-clients/golang/v5""github.com/apache/rocketmq-clients/golang/v5/credentials"errgroup2 "golang.org/x/sync/errgroup""log""os""strconv""time"
)const (Topic = "DelayTopic"GroupName = "testG"Endpoint = "localhost:8081"Region = "xxxxxx"AccessKey = "xxxxxx"SecretKey = "xxxxxx"
)func main() {os.Setenv("mq.consoleAppender.enabled", "true")golang.ResetLogger()// new producer instanceproducer, err := golang.NewProducer(&golang.Config{Endpoint: Endpoint,Credentials: &credentials.SessionCredentials{},},golang.WithTopics(Topic),)if err != nil {log.Fatal(err)}// start producererr = producer.Start()if err != nil {log.Fatal(err)}// gracefule stop producerdefer producer.GracefulStop()var wg = errgroup2.Group{}wg.SetLimit(10)for i := 0; i < 1000; i++ {wg.Go(func() error {msg := &golang.Message{Topic: Topic,Body: []byte("this is a message : " + strconv.Itoa(i) + time.Now().Format(time.DateTime)),}// set keys and tagmsg.SetKeys("a", "b")msg.SetTag("ab")msg.SetDelayTimestamp(time.Now().Add(time.Second * 10))// send message in syncresp, err := producer.Send(context.TODO(), msg)if err != nil {log.Fatal(err)}for i := 0; i < len(resp); i++ {fmt.Printf("%#v\n", resp[i])}return nil})// wait a momenttime.Sleep(time.Second * 1)}wg.Wait()time.Sleep(time.Minute * 10)
}
设置topic的。message.type docker exec -it rmqnamesrv /bin/bash
sh mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAYsh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=DELAY
消费者
package mainimport ("context""fmt""log""os""time""github.com/apache/rocketmq-clients/golang""github.com/apache/rocketmq-clients/golang/credentials"
)const (Topic = "DelayTopic"GroupName = "testG"Endpoint = "localhost:8081"
)var (// maximum waiting time for receive funcawaitDuration = time.Second * 5// maximum number of messages received at one timemaxMessageNum int32 = 16// invisibleDuration should > 20sinvisibleDuration = time.Second * 20// receive messages in a loop
)func main() {// log to consoleos.Setenv("mq.consoleAppender.enabled", "true")golang.ResetLogger()// new simpleConsumer instancesimpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{Endpoint: Endpoint,Credentials: &credentials.SessionCredentials{},ConsumerGroup: "string",},golang.WithAwaitDuration(awaitDuration),golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{Topic: golang.SUB_ALL,}),)if err != nil {log.Fatal(err)}// start simpleConsumererr = simpleConsumer.Start()if err != nil {log.Fatal(err)}// gracefule stop simpleConsumerdefer simpleConsumer.GracefulStop()go func() {defer func() {if err := recover(); err != nil {fmt.Println(err)}}()for {fmt.Println("start recevie message")mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)if err != nil {fmt.Println(err)}// ack messagefor _, mv := range mvs {simpleConsumer.Ack(context.TODO(), mv)fmt.Println(string(mv.GetBody()) + " " + time.Now().Format(time.DateTime))}fmt.Println("wait a moment")fmt.Println()time.Sleep(time.Second * 3)}}()// run for a whiletime.Sleep(time.Minute * 20)
}