Springboot整合阿里云ONS RocketMq(4.0 http)

1. 引入依赖

<!--阿里云ons,方便的接入到云服务-->
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.4.Final</version>
</dependency>

2. 配置

配置注意事项:

  1. nameSrvAddr我这里是用的4.0版本的支持http,5.0不支持http
    image.png
  2. 一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失。
  3. 订阅关系参考官方文档: 订阅关系一致
  4. 此处我配置了多个GroupId,Tag,Topic(order,market,vehicle)如果不需要配置一个即可,对应基本配置类需要增减对应属性
aliyun:rocketmq:accessKey: LTAI5txxxxxxxsecretKey: Afq06tBxrdBxxxxxxxxnameSrvAddr: http://MQ_INST_xxxxxxxxxx_BYkZuJCq.cn-beijing.mq.aliyuncs.com:80orderGroupId: GID_xxxxxx_testorderTag: 'order'orderTopic: vehicle-order-testmarketGroupId: GID_xxxxxx2_testmarketTag: 'market'marketTopic: vehicle-market-testvehicleGroupId: GID_xxxxxx3_testvehicleTag: 'vehicle'vehicleTopic: vehicle-order-test

3. 配置类

3.1 基本配置类

package com.vehicle.manager.core.config;import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.Properties;/*** Rocket MQ 配置类* @author zr 2024/3/1*/
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq")
@Data
public class RocketMqConfig {private String accessKey;private String secretKey;private String nameSrvAddr;private String marketGroupId;private String marketTopic;private String marketTag;private String orderTopic;private String orderGroupId;private String orderTag;private String vehicleTopic;private String vehicleGroupId;private String vehicleTag;public Properties getMqPropertie() {Properties properties = new Properties();properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);return properties;}
}

3.2 生产者配置

package com.vehicle.manager.core.config;import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author zr 2024/3/1*/
@Configuration
public class ProducerConfig {@Autowiredprivate RocketMqConfig mqConfig;@Bean(initMethod = "start", destroyMethod = "shutdown")public ProducerBean buildProducer() {ProducerBean producer = new ProducerBean();producer.setProperties(mqConfig.getMqPropertie());return producer;}
}

3.3 消费者配置

package com.vehicle.manager.core.config;import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.vehicle.manager.core.listener.VehicleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** RocketMq消费者* @author zr 2024/3/1*/
@Configuration
public class VehicleConsumerConfig {@Autowiredprivate RocketMqConfig mqConfig;@Autowiredprivate VehicleListener vehicleListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildVehicleBuyerConsumer() {ConsumerBean consumerBean = new ConsumerBean();//配置文件Properties properties = mqConfig.getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getVehicleGroupId());//将消费者线程数固定为20个 20为默认值properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");consumerBean.setProperties(properties);//订阅关系Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();Subscription subscription = new Subscription();subscription.setTopic(mqConfig.getVehicleTopic());subscription.setExpression(mqConfig.getVehicleTag());subscriptionTable.put(subscription, vehicleListener);//订阅多个topic如上面设置consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}
}

4. 生产者工具类

  • MessageRecord为记录消息发送的对象,可以自行根据字段进行设计调整
  • 参数说明:
    • topic – 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-“和下划线”_"构成.
    • tag – 消息标签, 请使用合法标识符, 尽量简短且见名知意
    • key – 业务主键
    • body – 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
package com.vehicle.manager.core.util;import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.vehicle.manager.core.config.RocketMqConfig;
import com.vehicle.manager.core.mapper.MessageRecordMapper;
import com.vehicle.manager.core.model.entity.MessageRecord;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.time.LocalDateTime;/*** RocketMessageProducer rocketMQ消息生产者* @author zr 2024/3/1*/
@Component
@Slf4j
public class RocketMessageProducer {private static ProducerBean producer;private static RocketMqConfig mqConfig;private  static MessageRecordMapper messageRecordMapper;@Autowiredprivate  MessageRecordMapper messageRecordMapperInstance;@PostConstructpublic void init() {RocketMessageProducer.messageRecordMapper = messageRecordMapperInstance;}public RocketMessageProducer(ProducerBean producer, RocketMqConfig mqConfig) {this.producer = producer;this.mqConfig = mqConfig;}/*** 生产车辆服务普通消息* @param tag* @param key* @param body*/public  static void producerVehicleMsg(String tag, String key, String body) {Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());long time = System.currentTimeMillis();try {SendResult sendResult = producer.send(msg);assert sendResult != null;log.info(time+ " Send mq message success.Topic is:" + msg.getTopic()+ " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()+" body is:"+new String(msg.getBody())+ " msgId is:" + sendResult.getMessageId());MessageRecord messageRecord = new MessageRecord();messageRecord.setPlatformType("mq");messageRecord.setMessageType("order");messageRecord.setMqMessageTopic(msg.getTopic());messageRecord.setMqMessageTag(msg.getTag());messageRecord.setMqMessageKey(msg.getKey());messageRecord.setMqMessageId(sendResult.getMessageId());messageRecord.setCreatedTime(LocalDateTime.now());messageRecord.setMessageContent(new String(msg.getBody()));messageRecordMapper.insert(messageRecord);} catch (ONSClientException e) {e.printStackTrace();log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());}}/*** 生产车辆服务延时普通消息* @param tag  order:订单服务   vehicle:主要用于本服务的超时回应* @param key* @param body* @param delay 延迟秒*/public  static void producerVehicleDelayMsg(String tag, String key, String body,Integer delay) {Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());long time = System.currentTimeMillis();msg.setStartDeliverTime(time+ delay*1000);try {SendResult sendResult = producer.send(msg);assert sendResult != null;log.info(time+ " 发送消息成功.Topic is:" + msg.getTopic()+ " Tag 为:" + msg.getTag() + " Key 为:" + msg.getKey()+" body 为:"+new String(msg.getBody())+ " msgId 为:" + sendResult.getMessageId());} catch (ONSClientException e) {e.printStackTrace();log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());}}
}

5. 消费者监听

package com.vehicle.manager.core.listener;import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.vehicle.manager.core.model.dto.req.VehicleMQMessageDTO;
import com.vehicle.manager.core.service.HlCarService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author zr 2024/3/1*/
@Component
@Slf4j
public class VehicleListener implements MessageListener {@Autowiredprivate HlCarService hlCarService;@Overridepublic Action consume(Message message, ConsumeContext context) {log.info("VehicleReceive 消息: " + message);try {byte[] body = message.getBody();String s = new String(body);log.info(s);// VehicleMQMessageDTO需要自行根据业务封装VehicleMQMessageDTO vehicleMQMessageDTO = JSON.parseObject(s, VehicleMQMessageDTO.class);log.info(vehicleMQMessageDTO.toString());// 以下做你的业务处理// .........return Action.CommitMessage;//进行消息的确认} catch (Exception e) {log.info(e.getMessage());//消费失败return Action.ReconsumeLater;}}
}

6. 测试

6.1 发送消息

package com.vehicle.manager.core;import com.alibaba.fastjson.JSON;
import com.vehicle.manager.api.StartApplication;
import com.vehicle.manager.core.util.RocketMessageProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** @author zr 2024/3/1*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StartApplication.class)
public class MqTest {@Testpublic void producerMsg() {RocketMessageProducer.producerVehicleMsg("vehicle","test", JSON.toJSONString(new String("testBody")));}
}

6.2 接收消息

image.png

7. 延时消息

如果需要使用延时消息可以参考RocketMessageProducer中有一个延时消息的方法producerVehicleDelayMsg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/359168.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

项目五 OpenStack镜像管理与制作

任务一 理解OpenStack镜像服务 1.1 •什么是镜像 • 镜像通常是指一系列文件或一个磁盘驱动器的精确副本。 • 虚拟机 所使用的 虚拟磁盘 &#xff0c; 实际上是 一种 特殊格式的镜像文件 。 • 云 环境下尤其需要 镜像。 • 镜像 就是一个模板&#xff0c;类似于 VMware 的虚…

Windwos +vs 2022 编译openssl 1.0.2 库

一 前言 先说 结论&#xff0c;编译64位报错&#xff0c;查了一圈没找到解决方案&#xff0c;最后换了32位的。 使用qt访问web接口&#xff0c;因为是https&#xff0c;没有openssl库会报错 QNetworkReply* reply qobject_cast<QNetworkReply*>(sender());if (reply){…

C语言入门系列:探秘二级指针与多级指针的奇妙世界

文章目录 一&#xff0c;指针的回忆杀1&#xff0c;指针的概念2&#xff0c;指针的声明和赋值3&#xff0c;指针的使用3.1 直接给指针变量赋值3.2 通过*运算符读写指针指向的内存3.2.1 读3.2.2 写 二&#xff0c;二级指针详解1&#xff0c;定义2&#xff0c;示例说明3&#xff…

Python爬虫-贝壳新房

前言 本文是该专栏的第32篇,后面会持续分享python爬虫干货知识,记得关注。 本文以某房网为例,如下图所示,采集对应城市的新房房源数据。具体实现思路和详细逻辑,笔者将在正文结合完整代码进行详细介绍。接下来,跟着笔者直接往下看正文详细内容。(附带完整代码) 正文 地…

北京智慧养老平台app打造,智慧养老,安心享老

目前&#xff0c;我国60岁以上老年人占人口比重已超过21%&#xff0c;我国老年人口数量快速增长&#xff0c;人口老龄化程度不断加深。与此同时&#xff0c;老年人的养老需求也在逐步上升。除了日常吃穿等生活需求外&#xff0c;他们在健康、精神方面也提出来新的要求。为了满足…

高职人工智能专业实训课之“自然语言处理”

一、前言 在人工智能领域&#xff0c;自然语言处理&#xff08;NLP&#xff09;技术日益成为研究和应用的热点。为了满足高职院校对NLP专业实训课程的需求&#xff0c;唯众人工智能教学实训凭借其前沿的教育技术平台&#xff0c;特别是GPU虚拟化技术&#xff0c;为学生提供了高…

【R语言】对一个Plot绘制多个图,并且每个图单元也包含多个图

以一个Plot绘制五行六列共30个图&#xff0c;然后每30个图单元包含两个图为例&#xff1a; 如下图所示&#xff1a; 代码如下&#xff1a; for (i in 1:(5*6)) {create_subplots <- function() {library(ggplot2)library(dplyr)library(tidyr)# 创建一个随机的数据框simula…

linux系统指令查漏补缺

目录 一.磁盘操作 二.lvm 三.top 4.nohup 一.磁盘操作 1. lsblk -f 显示磁盘和它的相关内容 2.tuen2fs -c -1 /dev/sdx 关闭某个磁盘的自检 3.修改配置&#xff0c;使文件系统不要开机自检 cat /etc/fstab 全0表示开机不自检 全1表示开机自检 同时在这个文件中可添加…

sql资料库

1、distinct(关键词distinct用于返回唯一不同的值)&#xff1a;查询结果中去除重复行的关键字 select distinct(university) from user_profile select distinct university from user_profile distinct是紧跟在select后面的&#xff0c;不能在其他位置&#xff0c;不然就…

4、SpringMVC 实战小项目【加法计算器、用户登录、留言板、图书管理系统】

SpringMVC 实战小项目 3.1 加法计算器3.1.1 准备⼯作前端 3.1.2 约定前后端交互接⼝需求分析接⼝定义请求参数:响应数据: 3.1.3 服务器代码 3.2 ⽤⼾登录3.2.1 准备⼯作3.2.2 约定前后端交互接⼝3.2.3 实现服务器端代码 3.3 留⾔板实现服务器端代码 3.4 图书管理系统准备后端 3…

轻量级日志系统——Loki

目录 一、loki简介 二、Loki 快速上手 第一步安装 Loki 第二步安装 Promtail 第三步安装granafa 三、LogQL 语法 四、Loki收集nginx日志 1、修改nginx配置 2、nginx服务器上安装promtail 3、添加doshbarod 一、loki简介 Loki是 Grafana Labs 团队最新的开源项目&am…

RERCS系统开发实战案例-Part05 FPM Application的Feeder Class搜索组件的实施

1、通过事务码 SE24对Feeder Class实施 1&#xff09;接口页签的简单说明&#xff1a; ① IF_FPM_GUIBB&#xff1a;通用UI构建块&#xff0c;整个UIBB模块的基础接口&#xff1b; ② IF_FPM_GUIBB_SEARCH&#xff1a;通用搜索UI构建块&#xff0c;搜索组件UIBB的基础接口&…

中东文明史

转自&#xff1a;想要了解完整的中东文明史&#xff1f;这篇文章成全你 - 知乎 (zhihu.com) 写在前面 中东文明是人类历史上最古老的文明。人类祖先从东非大裂谷走出之后&#xff0c;首先选择定居在中东地区的新月沃土上&#xff0c;并建立了人类历史上有文字记载的第一个文明…

Mybatis-映射文件中select标签resultType属性的使用

数据库的最最基本操作“增删改查”&#xff0c;“查”是最复杂的&#xff0c;有各种各样的查询&#xff0c;所以对应到Mybatis中的select标签也是这四个操作中最复杂的 resultType属性的使用 1.返回的结果是List集合的类型 select标签里的resultType类型设置为List集合里的元…

关于正点原子stm32f103精英板v1的stlink通信失败问题解决方案

由于最新的固件不适配&#xff0c;我们要想其工作要下载007的固件。 https://www.st.com/en/development-tools/stsw-link007.html?dlredirect 版本选择最低的。然后选择windows文件夹&#xff0c;更新程序 然后进keil就能正常识别到了

mediasoup源码分析(三)channel创建及信令交互

mediasoup源码分析--channel创建及信令交互 概述跨职能图业务流程图代码剖析tips 概述 在golang实现mediasoup的tcp服务及channel通道一文中&#xff0c;已经介绍过信令服务中tcp和channel的创建&#xff0c;本文主要讲解c中mediasoup的channel创建&#xff0c;以及信令服务和…

【C++高阶】掌握AVL树:构建与维护平衡二叉搜索树的艺术

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;C “ 登神长阶 ” &#x1f921;往期回顾&#x1f921;&#xff1a;STL-> map与set &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀AVL树 &#x1f4d2;1. AVL树…

MySQL理解-下载-安装

MySQL理解: mysql:是一种关系型数据库管理系统。 下载&#xff1a; 进入官网MySQLhttps://www.mysql.com/ 找到download 滑动到最下方&#xff1a;有一个开源社区版的链接地址&#xff1a; 然后就下载完成了 安装&#xff1a; 双击&#xff1a; 一直next 一直next这一步&…

Spire.PDF for .NET【文档操作】演示:设置 PDF 文档的 XMP 元数据

XMP 是一种文件标签技术&#xff0c;可让您在内容创建过程中将元数据嵌入文件本身。借助支持 XMP 的应用程序&#xff0c;您的工作组可以以团队以及软件应用程序、硬件设备甚至文件格式易于理解的格式捕获有关项目的有意义的信息&#xff08;例如标题和说明、可搜索的关键字以及…

java基于ssm+jsp 美食推荐管理系统

1前台首页功能模块 美食推荐管理系统&#xff0c;在系统首页可以查看首页、热门美食、美食教程、美食店铺、美食社区、美食资讯、我的、跳转到后台等内容&#xff0c;如图1所示。 图1前台首页功能界面图 用户注册&#xff0c;在注册页面可以填写用户名、密码、姓名、联系电话等…