RocketMQ5.0 生产者

生产者消息类型:

​​​​​​​

延迟队列的生产者

package mainimport ("context""fmt""github.com/apache/rocketmq-clients/golang/v5""github.com/apache/rocketmq-clients/golang/v5/credentials"errgroup2 "golang.org/x/sync/errgroup""log""os""strconv""time"
)const (Topic     = "DelayTopic"GroupName = "testG"Endpoint  = "localhost:8081"Region    = "xxxxxx"AccessKey = "xxxxxx"SecretKey = "xxxxxx"
)func main() {os.Setenv("mq.consoleAppender.enabled", "true")golang.ResetLogger()// new producer instanceproducer, err := golang.NewProducer(&golang.Config{Endpoint:    Endpoint,Credentials: &credentials.SessionCredentials{},},golang.WithTopics(Topic),)if err != nil {log.Fatal(err)}// start producererr = producer.Start()if err != nil {log.Fatal(err)}// gracefule stop producerdefer producer.GracefulStop()var wg = errgroup2.Group{}wg.SetLimit(10)for i := 0; i < 1000; i++ {wg.Go(func() error {msg := &golang.Message{Topic: Topic,Body:  []byte("this is a message : " + strconv.Itoa(i) + time.Now().Format(time.DateTime)),}// set keys and tagmsg.SetKeys("a", "b")msg.SetTag("ab")msg.SetDelayTimestamp(time.Now().Add(time.Second * 10))// send message in syncresp, err := producer.Send(context.TODO(), msg)if err != nil {log.Fatal(err)}for i := 0; i < len(resp); i++ {fmt.Printf("%#v\n", resp[i])}return nil})// wait a momenttime.Sleep(time.Second * 1)}wg.Wait()time.Sleep(time.Minute * 10)
}

设置topic的。message.type                                                                                                                                     docker exec -it rmqnamesrv /bin/bash       

sh mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAYsh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=DELAY

   消费者

package mainimport ("context""fmt""log""os""time""github.com/apache/rocketmq-clients/golang""github.com/apache/rocketmq-clients/golang/credentials"
)const (Topic     = "DelayTopic"GroupName = "testG"Endpoint  = "localhost:8081"
)var (// maximum waiting time for receive funcawaitDuration = time.Second * 5// maximum number of messages received at one timemaxMessageNum int32 = 16// invisibleDuration should > 20sinvisibleDuration = time.Second * 20// receive messages in a loop
)func main() {// log to consoleos.Setenv("mq.consoleAppender.enabled", "true")golang.ResetLogger()// new simpleConsumer instancesimpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{Endpoint:      Endpoint,Credentials:   &credentials.SessionCredentials{},ConsumerGroup: "string",},golang.WithAwaitDuration(awaitDuration),golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{Topic: golang.SUB_ALL,}),)if err != nil {log.Fatal(err)}// start simpleConsumererr = simpleConsumer.Start()if err != nil {log.Fatal(err)}// gracefule stop simpleConsumerdefer simpleConsumer.GracefulStop()go func() {defer func() {if err := recover(); err != nil {fmt.Println(err)}}()for {fmt.Println("start recevie message")mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)if err != nil {fmt.Println(err)}// ack messagefor _, mv := range mvs {simpleConsumer.Ack(context.TODO(), mv)fmt.Println(string(mv.GetBody()) + "  " + time.Now().Format(time.DateTime))}fmt.Println("wait a moment")fmt.Println()time.Sleep(time.Second * 3)}}()// run for a whiletime.Sleep(time.Minute * 20)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/392218.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学习记录——day26 进程间的通信(IPC)无名管道 无名管道 信号通信 特殊的信号处理

目录 一、进程间通信引入 二、无名管道 1、无名管道相关概念 2、无名管道的API接口函数 pipe(int pipefd[2]); 3、管道通信的特点 4、管道的读写特点 三、无名管道 1、有名管道&#xff1a;有名字的管道文件&#xff0c;其他进程可以调用 2、可以用于亲缘进程间的通信&…

代码随想录训练营第五十二天 孤岛的总面积

第一题&#xff1a;孤岛的总面积 第二题&#xff1a;沉没孤岛 思路&#xff1a; 将所有在边界的岛屿所在的visited数组位置都置为true&#xff0c;剩下的visited[i][j] true && grid[i][j] 1的位置就是孤岛&#xff0c;将其置为1即可。 代码如下 #include <io…

【限免】通信信号与干扰信号【附MATLAB代码】

微信公众号&#xff1a;EW Frontier 关注可了解更多的雷达、通信、人工智能相关代码。问题或建议&#xff0c;请公众号留言; 个人博客&#xff1a;106.54.201.174 QQ交流群&#xff1a;949444104 摘要 本项目主要模拟仿真常见通信信号及干扰信号&#xff0c;高斯白噪声、噪声调…

NSF共享目录未授权访问

NSF共享目录未授权访问 Network File System(NFS)&#xff0c;是由SUN公司研制的UNIX表示层协议(pressentation layer protocol)&#xff0c;能使使用者访问网络上别处的文件就像在使用自己的计算机一样。服务器在启用nfs服务以后&#xff0c;由于fs服务未限制对外访问&#x…

无人机之导航系统篇

一、导航系统组成 包括惯性导航系统、卫星导航系统、视觉导航系统等。 二、导航原理 利用传感器感知无人机的位置、速度和姿态信息&#xff0c;结合地图数据和导航算法&#xff0c;计算出无人机当前的位置和航向&#xff0c;从而引导无人机按照预设的航线飞行。 三、导航精…

使用SpringBoot+Vue3开发项目(2)---- 设计文章分类的相关接口及页面

目录 一.所用技术栈&#xff1a; 二.后端开发&#xff1a; 1.文章分类列表渲染&#xff1a; 2.新增文章分类&#xff1a; 3.编辑文章分类&#xff1a; 4.删除文章分类 &#xff1a; 5.完整三层架构后端代码&#xff1a; &#xff08;1&#xff09;Controller层&#xff1a…

优化if-else的11种方案

优雅永不过时&#xff01; 1. 使用早返回&#xff08;Early Return&#xff09;&#xff1a;尽可能早地返回&#xff0c;避免嵌套的if-else。 优化前&#xff1a; public class NoEarlyReturnExample {public boolean hasPositiveNumber(int[] numbers) {boolean foundPositi…

学习大数据DAY31 Python基础语法4和基于Python中的MySQL 编程

目录 Python 库 模块 time&datetime 库 连接 MySQL 操作 结构操作 数据增删改操作 数据查询操作 上机练习 7 面向对象 OOP 封装 继承 三层架构---面向对象思想模型层 数据层 业务逻辑显示层 上机练习 8 三层架构开发豆瓣网 关于我对 AI 写代码的看法&#xf…

客户数据分析模型:RFM模型的深度解析与应用探索

RFM模型&#xff0c;作为客户数据分析中的经典工具&#xff0c;凭借其简单而强大的分析能力&#xff0c;被广泛应用于各行各业。本文旨在深入探讨RFM模型的核心原理、应用价值&#xff0c;并详细阐述其在2C&#xff08;面向消费者&#xff09;和2B&#xff08;面向企业&#xf…

使用ThreadLocal来存取单线程内的数据

一.什么是ThreadLocal&#xff1f; ThreadLocal&#xff0c;即线程本地变量。如果你创建了一个 ThreadLocal变量&#xff0c;那么访问这个变量的每个线程都会有这个变量的一个本地拷贝&#xff0c;多个线程操作这个变量的时候&#xff0c;实际是在操作自己本地内存里面的变量&…

自注意力和位置编码

一、自注意力 1、给定一个由词元组成的输入序列x1,…,xn&#xff0c; 其中任意xi∈R^d&#xff08;1≤i≤n&#xff09;。 该序列的自注意力输出为一个长度相同的序列 y1,…,yn&#xff0c;其中&#xff1a; 2、自注意力池化层将xi当作key&#xff0c;value&#xff0c;query来…

Ubuntu配置carla docker环境

前言: 本文只在以下设备成功运行, 其他设备不保证能成功, 可以参考在自己设备进行配置 环境 ubuntu 20.04carla 0.9.15gpu 3060(notebook) 安装显卡驱动&nvidia-container-toolkit 显卡驱动 安装完成系统后直接在’软件和更新->附加驱动’直接选择470(proprietary…

Leetcode3227. 字符串元音游戏

Every day a Leetcode 题目来源&#xff1a;3227. 字符串元音游戏 解法1&#xff1a;博弈论 分类讨论&#xff1a; 如果 s 不包含任何元音&#xff0c;小红输。如果 s 包含奇数个元音&#xff0c;小红可以直接把整个 s 移除&#xff0c;小红赢。如果 s 包含正偶数个元音&am…

R 语言学习教程,从入门到精通,R 基础运算(5)

1、R 基础运算 本章介绍 R 语言的简单运算。 1.1、赋值 一般语言的赋值是 号&#xff0c;但是 R 语言是数学语言&#xff0c;所以赋值符号与我们数学书上的伪代码很相似&#xff0c;是一个左箭头 <- &#xff1a; a <- 123 b <- 456 print(a b)以上代码执行结果…

最好用的掏耳勺是哪种?年度五款可视挖耳勺高分机型

在我们的日常生活中&#xff0c;掏耳朵似乎是一件再平常不过的小事&#xff0c;不过&#xff0c;传统挖耳勺在使用时完全依赖我们的感觉和经验&#xff0c;我们无法直接看到耳道内部的情况。这就如同在黑暗中摸索&#xff0c;极易造成意外伤害。稍有不慎&#xff0c;就可能刮伤…

使用gitea私有仓库作为依赖

实际问题 由于公司团队使用gitea搭建了git私有仓库&#xff0c;在开发Go程序的时候会有一些公共代码&#xff0c;比如插件和主程序之间要共享接口和数据结构&#xff0c;所以就需要在gitea私有仓库中创建依赖仓库&#xff0c;然后其他仓库引用这个私有仓库作为依赖。 解决方案…

如何实现pxe安装部署

此实验环境&#xff1a;rhel7主机 一、kickstart自动化安装脚本 1、安装可视化图形 [rootlocalhost ~]# yum group install "Server with GUI" 2、关闭vmware dhcp功能&#xff08;编辑-虚拟网络编辑器&#xff09; 3、httpd 1、安装httpd服务 [rootlocalhost …

网鼎杯比赛二次注入技巧

文章目录 前端的网页展示分析题目暴力破解寻找代码找到注入点进行注入查询想要的文件 前端的网页展示 分析题目 1.目前我们能看到的只有三个页面&#xff0c;但是我们可以看到三个*号。 2.考虑三个*的密码是什么&#xff0c;这里可以采用暴力破解&#xff08;我们先猜这是三个…

会声会影下载免费吗?会声会影2023中文旗舰版下载及配置最低要求

**会声会影2024&#xff1a;引领视频创作新时代的创新之旅** 在数字时代的浪潮中&#xff0c;视频创作已成为连接世界、表达创意的重要方式。随着技术的不断进步&#xff0c;一款名为“会声会影2024”的视频编辑软件横空出世&#xff0c;它不仅继承了前代产品的优秀传统&#…

C#MQTT协议应用

1 &#xff0c;MQTT介绍&#xff1a;MQTT详解以及实际操作_mqtt使用-CSDN博客 2&#xff0c;MQTT应用&#xff1a; C#MQTT编程06--MQTT服务器和客户端(winform版)_c# mqtt服务器-CSDN博客 3&#xff0c;MQTT实例&#xff1a; 效果 代码&#xff1a; 服务端 public parti…