SpringBoot整合【RocketMQ】

目录

1.POM文件添加依赖及yml配置

2.RocketmqUtil

3.生产者(异步发送示例)

4.消费者

5.测试


1.POM文件添加依赖及yml配置

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>
rocketmq:name-server: 127.0.0.1:9876producer:group: My_Groupsend-message-timeout: 3000retry-times-when-send-failed: 3retry-times-when-send-async-failed: 3

2.RocketmqUtil

package com.kaying.marketing.platform.common.util.rocketMq;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** @Description: RocketMQ消息的生产者* @Author: hwk*/@Component
@Slf4j
public class RocketMqUtil {@Autowiredprivate RocketMQTemplate rocketMqTemplate;public void sendMsg(String topic,String data) {rocketMqTemplate.convertAndSend(topic,data);log.info("【RocketMQ】发送同步消息:{}", data);}public void asyncSend(String topic, String tag, String data,Integer messageDelayLevel) {rocketMqTemplate.asyncSend(topic + ":" + tag, MessageBuilder.withPayload(data).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 消息发送成功log.error("消息发送成功"+sendResult);}@Overridepublic void onException(Throwable throwable) {// 消息发送异常log.error("异步发送消息异常。topic:" + topic + ";tag:" + tag + ";mqMsg" + data, throwable);}},3000L,// messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d// messageDelayLevel = 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18 19messageDelayLevel);}/*** 发送同步消息:消息响应后发送下一条消息** @param topic 消息主题* @param tag   消息tag* @param key   业务号* @param data  消息内容*/public void sendSyncMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;SendResult sendResult = rocketMqTemplate.syncSend(destination, message);log.info("【RocketMQ】发送同步消息:{}", sendResult);}/*** 发送异步消息:异步回调通知消息发送的状况** @param topic 消息主题* @param tag   消息tag* @param key   业务号* @param data  消息内容*/public void sendAsyncMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.asyncSend(destination, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("【RocketMQ】发送异步消息:{}", sendResult);}@Overridepublic void onException(Throwable e) {log.info("【RocketMQ】发送异步消息异常:{}", e.getMessage());}});}/*** 发送单向消息:消息发送后无响应,可靠性差,效率高** @param topic 消息主题* @param tag   消息tag* @param key   业务号* @param data  消息内容*/public void sendOneWayMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.sendOneWay(destination, message);}/*** 同步延迟消息** @param topic      主题* @param tag        标签* @param key        业务号* @param data       消息体* @param timeout    发送消息的过期时间* @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h**/public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {// messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d// messageDelayLevel = 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18 19//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;SendResult sendResult = rocketMqTemplate.syncSend(destination, message, timeout, delayLevel);log.info("【RocketMQ】发送同步延迟消息:{}", sendResult);}/*** 异步延迟消息** @param topic      主题* @param tag        标签* @param key        业务号* @param data       消息体* @param timeout    发送消息的过期时间* @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h*/public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.asyncSend(destination, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("【RocketMQ】发送异步延迟消息:{}", sendResult);}@Overridepublic void onException(Throwable e) {log.info("【RocketMQ】发送异步延迟消息异常:{}", e.getMessage());}}, timeout, delayLevel);}/*** 同步顺序消息** @param topic 主题* @param tag   标签* @param key   业务号* @param data  消息体*/public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;SendResult sendResult = rocketMqTemplate.syncSendOrderly(destination, message, key);log.info("【RocketMQ】发送同步顺序消息:{}", sendResult);}/*** 异步顺序消息** @param topic 主题* @param tag   标签* @param key   业务号* @param data  消息体*/public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.asyncSendOrderly(destination, message, key, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("【RocketMQ】发送异步顺序消息:{}", sendResult);}@Overridepublic void onException(Throwable e) {log.info("【RocketMQ】发送异步顺序消息异常:{}", e.getMessage());}});}
}

3.生产者(异步发送示例)

//异步发送消息代码示例
rocketMqUtil.sendAsyncMsg(RocketConstant.TEST_TOPIC1, RocketConstant.TEST_TAG1, UUID.randomUUID().toString(), "测试消息一");

4.消费者

简单的负载均衡消费的示例(指定topic和tag,相同的组即为负载均衡消费)

也可以指定不同的topic和不同的tag进行消息区分

注意线上和本地连接同一个MQ也会导致负载均衡,导致线上消息丢失

    @RocketMQMessageListener(consumerGroup = "1",topic = RocketConstant.TEST_TOPIC1,selectorExpression = RocketConstant.TEST_TAG1)@Servicepublic class RocketConsumerTag1 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {String orderNo = message;log.info("tag1,接收:{}", orderNo);}}@RocketMQMessageListener(consumerGroup ="1",topic = RocketConstant.TEST_TOPIC1,selectorExpression = RocketConstant.TEST_TAG1)@Servicepublic class RocketConsumerTag2 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {String orderNo = message;log.info("tag2,接收:{}", orderNo);}}

5.测试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/270707.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安全增强型 Linux

书接上篇 一查看selinux状态 SELinux的状态&#xff1a; enforcing&#xff1a;强制&#xff0c;每个受限的进程都必然受限 permissive&#xff1a;允许&#xff0c;每个受限的进程违规操作不会被禁止&#xff0c;但会被记录于审计日志 disabled&#xff1a;禁用 相关命令…

内含资料下载丨黄东旭:2024 现代应用开发关键趋势——降低成本、简化架构

作为一名工程师和创业者&#xff0c;创办 PingCAP 是我进入创新世界的一次深潜。这段旅程既有令人振奋的发现&#xff0c;也充满令人生畏的不确定性。作为这次探险之旅见证的 TiDB &#xff0c;现在已在全球服务超过 3000 家企业&#xff0c;其中有已经实现了商业成功的大公司&…

【设计模式 01】单例模式

单例模式&#xff0c;是一种创建型设计模式&#xff0c;他的核心思想是保证一个类只有一个实例&#xff08;即&#xff0c;在整个应用程序中&#xff0c;只存在该类的一个实例对象&#xff0c;而不是创建多个相同类型的对象&#xff09;&#xff0c;并提供一个全局访问点来访问…

vite项目修改node_modules

问题详情 在使用某个依赖的时候遇到了bug&#xff0c;提交issue后不想一直等待到作者更新版本&#xff0c;所以寻求临时自己解决 问题解决 在node_modules里找到需要修改的依赖&#xff0c;修改想要修改的代码 修改后记得保存 然后在node_modules里找到.vite文件夹&#x…

阿里云Linux系统MySQL8忘记密码修改密码

相关版本 操作系统&#xff1a;Alibaba Cloud Linux 3.2104 LTS 64位MySQL&#xff1a;mysql Ver 8.0.34 for Linux on x86_64 (Source distribution) MySQL版本可通过下方命令查询 mysql --version一、修改my.cnf文件 文件位置&#xff1a;etc/my.cnf进入远程连接后可以打…

MQTT控制报文介绍(2)

一、CONNECT – 连接 服务端 客户端到服务端的网络连接建立后&#xff0c;客户端发送给服务端的第一个报文 必须是 CONNECT 报文。在一个网络连接上&#xff0c;客户端只能发送一次 CONNECT 报文。服务端 必须将客户端发送的第二个 CONNECT报文当作协议违规处理并断开客户端的…

项目中spring security与jwt.腾讯面试分享

写这篇文章是为了记录我面试pcg时平时没有留意或者钻研的地方。 面试是根据项目问的问题&#xff1a; 为什么采用jwt存储token&#xff1f; 我的项目是微服务项目&#xff0c;里面部署了资源服务和认证服务&#xff0c;这里选择jwt作为token一方面是可以存储用户的信息&#…

Ultimaker Cura使用(具体材料具体分析!)

参考视频&#xff1a;Cura学习视频 1 软件下载地址 Ultimaker官网- 专业便捷的3D打印品牌 2 软件设置 &#xff08;1&#xff09;中文设置&#xff1a; 偏好设置->language->简体中文->关掉界面&#xff0c;重启 &#xff08;2&#xff09;添加打印机 Custom FF…

二叉树前序遍历函数 代码图解(先序遍历 深度优先遍历)

void PreOrder(BiTree p)//只是遍历 即只是读&#xff0c;不会改变树根 {//这个p的类型是 树的结构体 不是之前的p指针if(p!NULL){printf("%c", p->c);PreOrder(p->lchild);//函数嵌套 打印左子树PreOrder(p->rchild);//函数嵌套 打印右子树} } 同理可证 中…

Mysql运维篇(七) 部署MHA--完结

一路走来&#xff0c;所有遇到的人&#xff0c;帮助过我的、伤害过我的都是朋友&#xff0c;没有一个是敌人。如有侵权&#xff0c;请留言&#xff0c;我及时删除&#xff01; 一、MHA软件构成 Manager工具包主要包括以下几个工具&#xff1a; masterha_manger 启…

手撕指针第一页

1. 理解内存和地址 1.1 内存 内存&#xff0c;顾名思义就是电脑用来存储数据的&#xff0c;当CPU&#xff08;中央处理器&#xff09;在工作时&#xff0c;不仅需要从内存中拿取数据也需要将数据放入内存当中&#xff0c;当把内存引入到现实当中&#xff0c;就像学校里面的宿…

Leetcode : 506. 相对名次

思路 &#xff1a; 遍历计算每个元素比它大的元素个数&#xff0c;并判断做出对应结果标签&#xff1b; #include <iostream> #include <vector>using namespace std;class Solution { public:vector<string> findRelativeRanks(vector<int>& scor…

DataGrip(IDEA 内置)连接 SQL Server

原文&#xff1a;https://blog.iyatt.com/?p14265 测试环境&#xff1a; IDEA 2023.1SQL Server 2022 首先打开 SQL Server 配置管理工具 启用 TCP/IP 打开 Windows 服务管理 在服务列表中找到 SQL Server&#xff08;MSSQLSERVER&#xff09;&#xff0c;右键重新启…

开发一套pacs系统需要考虑哪些因素?

PACS全称Picture Archivingand Communication Systems。它是应用在医院影像科室的系统&#xff0c;主要的任务就是把日常产生的各种医学影像&#xff08;包括核磁&#xff0c;CT&#xff0c;超声&#xff0c;X光机&#xff0c;红外仪、显微仪等设备产生的图像&#xff09;通过各…

Unity 轮转图, 惯性, 自动回正, 点击选择

简单的实现 2D 以及 3D 的轮转图, 类似于 Web 中无限循环的轮播图那样. 文中所有代码均已同步至 github.com/SlimeNull/UnityTests 3D 轮转图: Assets/Scripts/Scenes/CarouselTestScene/Carousel.cs2D 轮转图: Assets/Scripts/Scenes/CarouselTestScene/UICarousel.cs 主要逻…

【Spring高级】第2讲:容器实现类

目录 BeanFactory实现BeanDefinition后置处理器单例bean创建后置处理器顺序总结 ApplicationContext实现ClassPathXmlApplicationContextFileSystemXmlApplicationContextAnnotationConfigApplicationContextAnnotationConfigServletWebServerApplicationContext BeanFactory实…

springboot3.x 以上,官方不建议使用spring.factories

springboot2.7.x 以上,官方不建议使用spring.factories 最近公司项目升级.需要将springcloud/springboot版本升级到2.7.x以上,再升级的过程中遇到了太多的问题.总结在了如下文章中: springboot艰难版本升级之路!! springboot 2.3.x版本升级到2.7.x版本 这篇文章就重点是梳理一…

如何让多个视频同时转GIF 2024全新款 高清无损转换

大家是否经常会遇到这样的问题&#xff0c;看到一些有趣的短视频片段&#xff0c;但却不知道如何将它们转换成GIF动图&#xff1f;今天&#xff0c;小编就给大家分享一个简单教程&#xff0c;教你如何批量将喜欢的短视频转换成GIF动图&#xff0c;让我们一起来学习吧&#xff0…

linux安装ngnix

一、将nginx-1.20.1.tar.gz上传至linux服务器目录下 二、将nginx安装包解压到/usr/local目录下 tar -zxvf /home/local/nginx-1.20.1.tar.gz -C /usr/local/三、预先安装依赖 yum -y install pcre-devel yum -y install openssl openssl-devel yum -y install gcc gcc-c auto…

【自然语言处理】【大模型】BitNet:用1-bit Transformer训练LLM

BitNet&#xff1a;用1-bit Transformer训练LLM 《BitNet: Scaling 1-bit Transformers for Large Language Models》 论文地址&#xff1a;https://arxiv.org/pdf/2310.11453.pdf 相关博客 【自然语言处理】【大模型】BitNet&#xff1a;用1-bit Transformer训练LLM 【自然语言…