springboot整合rabbitmq

添加依赖

<!--RabbitMQ 依赖--> 
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>

添加配置

properties文件版本 

spring.rabbitmq.host=182.92.234.71
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

 yaml文件版本

spring:rabbitmq:host:182.92.234.71port:5672username:adminpassword:123

添加 Swagger 配置类  

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;@Configuration
@EnableSwagger2
public class SwaggerConfig {@Beanpublic Docket webApiConfig(){return new Docket(DocumentationType.SWAGGER_2).groupName("webApi").apiInfo(webApiInfo()).select().build();}private ApiInfo webApiInfo(){return new ApiInfoBuilder().title("rabbitmq 接口文档").description("本文档描述了 rabbitmq 微服务接口定义").version("1.0").build();}
}

 延时队列

队列架构 

  

配置类文件 

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String DEAD_LETTER_QUEUE = "QD";// 声明 xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}// 声明 xExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 A ttl 为 10s 并绑定到对应的死信交换机@Bean("queueA")public Queue queueA(){Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}// 声明队列 A 绑定 X 交换机@Beanpublic Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//声明队列 B ttl 为 40s 并绑定到对应的死信交换机@Bean("queueB")public Queue queueB(){Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}//声明队列 B 绑定 X 交换机@Beanpublic Binding queuebBindingX(@Qualifier("queueB") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queue1B).to(xExchange).with("XB");}//声明死信队列 QD@Bean("queueD")public Queue queueD(){return new Queue(DEAD_LETTER_QUEUE);}//声明死信队列 QD 绑定关系@Beanpublic Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

 消息生产者代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message);rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);} 
}

 消息消费者代码

import java.io.IOException;
import java.util.Date;@Slf4j
@Component
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);}
}

 结果

不过,如果这样使用的话,岂不是 每增加一个新的时间需求,就要新增一个队列 ,这里只有 10S 40S
两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然
后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

 延时消息

在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间,设置TTL消息

  

配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;@Component
public class MsgTtlQueueConfig {public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String QUEUE_C = "QC";//声明队列 C 死信交换机@Bean("queueC")public Queue queueB(){Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//没有声明 TTL 属性return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 B 绑定 X 交换机@Beanpublic Binding queuecBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}

 生产者


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message);rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);}@GetMapping("sendExpirationMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);}
}

 结果

缺点

如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期 ,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

延时队列插件

安装插件

通过GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delay Messaging for RabbitMQ下载对应的版本,我的是centos 7.9的,rabbit的版本为erlang-23.3.4.18-1.el7.x86_64.rpm,rabbitmq-server-3.8.30-1.el7.noarch.rpm,对应的rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez插件版本

  

将插件放入/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.30/plugins包下

 

 进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ

rabbitmq-plugins enable rabbitmq_delayed_message_exchange 

 

 重启rabbitmq

systemctl restart rabbitmq-server

 显示延迟插件选项

 架构

 配置类

 

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}//自定义交换机 我们在这里定义的是一个延迟交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();//自定义交换机的类型args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message);rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);}@GetMapping("sendExpirationMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);}public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";@GetMapping("sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,correlationData ->{correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});log.info(" 当 前 时 间 : {}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", newDate(),delayTime, message);}
}

 消费者

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;@Slf4j
@Component
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);}public static final String DELAYED_QUEUE_NAME = "delayed.queue";@RabbitListener(queues = DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);}
}

 结果

frist先消费了 ,符合要求。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/102512.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DFX概述 | Design For X | Design For Excellent

Design for X (DFX) Methods 什么是“Design for X”&#xff1f; Design for eXcellence是一种在设计和制造领域中的不断发展的原则哲学。它采用了全面和系统的设计方法&#xff0c;关注产品的各个方面——从概念生成到最终交付。 它提供了良好的实践和设计指南&#xff0c…

【RHEL】硬盘分区与格式化

fdisk命令 在linux中&#xff0c;fdisk是基于菜单的命令。对硬盘分区时&#xff0c;可以在fdisk命令后面直接加上要分区的硬盘作为参数(分区工具) 利用如下所示命令&#xff0c;打开fdisk操作菜单。 输入p,查看当前分区表。从命令执行结果可以到&#xff0c;/dev/mapper/rhel…

Python入门【原生字符串、边界字符、search函数、re模块中其他常用的函数 、贪婪模式和非贪婪模式、择一匹配(|)的使用、分组】(三十)

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱敲代码的小王&#xff0c;CSDN博客博主,Python小白 &#x1f4d5;系列专栏&#xff1a;python入门到实战、Python爬虫开发、Python办公自动化、Python数据分析、Python前后端开发 &#x1f4e7;如果文章知识点有错误…

【算法系列篇】二分查找——这还是你所知道的二分查找算法吗?

文章目录 前言什么是二分查找算法1.二分查找1.1 题目要求1.2 做题思路1.3 Java代码实现 2.在排序数组中查找元素的第一个和最后一个位置2.1 题目要求2.2 做题思路2.3 Java代码实现 3.搜索插入位置3.1 题目要求3.2 做题思路3.3 Java代码实现 4.x的平方根4.1 题目要求4.2 做题思路…

锚定医学营养 健启星深耕不辍

在生命医学中&#xff0c;营养被称为维持患者生命的物质基础。医学营养&#xff0c;是结合了医学临床营养、营养素与疾病预防等方面&#xff0c;并根据患者的医疗记录、身体检查及心理情况&#xff0c;由医生及专业营养师给出配比完善的营养素&#xff0c;以此来增加患者身体的…

控制台实现家庭记账本功能

需求&#xff1a; 在控制台实现家庭记账本的小功能&#xff0c;功能如下 参考代码如下&#xff1a; import java.util.Scanner;/*** <p>家庭账本-综合案例</p>** author 波波老师(weixin : javabobo0513)*/ public class Demo24 {public static void main(Strin…

docker之Compose与DockerSwarm

目录 Compose 简介 概念 为什么需要&#xff1f; 配置字段 常用命令 安装 1.下载 2.授权 使用 1.创建文件 2.启动 docker Swarm 关键概念 调度策略 spread binpack random 特性 集群部署 1.准备 2.创建swarm并添加节点 在主服务器上创建swarm集群 节点…

OpenCV中QR二维码的生成与识别(CIS摄像头解析)

1、QR概述 QR(Quick Response)属于二维条码的一种&#xff0c;意思是快速响应的意思。QR码不仅信息容量大、可靠性高、成本低&#xff0c;还可表示汉字及图像等多种文字信息、其保密防伪性强而且使用非常方便。更重要的是QR码这项技术是开源的&#xff0c;在移动支付、电影票、…

分布式锁 总结

分布式锁 在应用开发中&#xff0c;特别是web工程开发&#xff0c;通常都是并发编程&#xff0c;不是多进程就是多线程。这种场景下极易出现线程并发性安全问题&#xff0c;此时不得不使用锁来解决问题。在多线程高并发场景下&#xff0c;为了保证资源的线程安全问题&#xff0…

深度学习基础

文章目录 1. 数学基础1.1 标量和向量1.2 向量运算1.3 矩阵1.4 张量1.5 导数 2. numpy常用操作3. 梯度下降算法4. 反向传播4.1 完整的反向传播过程4.2 代码演示 5. 网络结构 -- 全连接层6. 激活函数6.1 激活函数-Sigmoid6.2 激活函数-tanh6.3 激活函数-Relu6.4 激活函数-Softmax…

Linux权限

Linux中一切皆文件&#xff0c;那么文件就应该有相对于的类型&#xff0c;而在Linux当中&#xff0c;类型不是直接看后缀来决定的。 -普通文件、文本、可执行、归档文件等d目录b块设备、block、磁盘c字符设备、键盘、显示器p管道文件s网络socket文件l链接文件 link 然后后面的九…

2023年京东儿童智能手表行业数据分析(京东销售数据分析)

儿童消费市场向来火爆&#xff0c;儿童智能手表作为能够实现定位导航&#xff0c;信息通讯&#xff0c;SOS求救&#xff0c;远程监听&#xff0c;智能防丢等多功能的智能可穿戴设备&#xff0c;能够通过较为精准的定位功能和安全防护能力保障儿童的安全&#xff0c;因而广受消费…

Oracle字段长度不足位数补零

Oracle字段长度不足位数补零 有时候从数据库中取出的月份值是1&#xff0c;而不是01&#xff0c;该怎么办呢 SELECTLPAD( CODE_MONTH, 2, 0 ) FROMtb_cube_TY001 WHERECODE_BM_MEATYPE TY20 AND code_measure MYLX01 AND code_month <> ~ AND CODE_ENTITY 01A AND…

k8s 安装istio (一)

前置条件 已经完成 K8S安装过程十&#xff1a;Kubernetes CNI插件与CoreDNS服务部署 部署 istio 服务网格与 Ingress 服务用到了 helm 与 kubectl 这两个命令行工具&#xff0c;这个命令行工具依赖 ~/.kube/config 这个配置文件&#xff0c;目前只在 kubernetes master 节点中…

Centos 7 安装系列(8):openGauss 3.0.0

安装依赖包&#xff1a; yum -y install libaio-devel flex bison ncurses-devel glibc-devel patch redhat-lsb-core readline-devel openssl-devel sqlite-devel libnsl 安装插件&#xff1a; yum install -y bzip2 net-tools为什么要安装这两个&#xff1f; 安装bzip2 是…

Ansible 创建使用角色

使用 Ansible Galaxy 和要求文件 /ansible/roles/requirements.yml 。从以下 URL 下载角色并安装到 /ansible/roles &#xff1a; http://materials/haproxy.tar 此角色的名称应当为 balancer http://materials/phpinfo.tar 此角色的名称应当为 phpinfo #创建 vim /ansible/r…

Wlan——锐捷零漫游网络解决方案以及相关配置

目录 零漫游介绍 一代零漫游 二代单频率零漫游 二代双频率零漫游 锐捷零漫游方案总结 锐捷零漫游方案的配置 配置无线信号的信道 开启关闭5G零漫游 查看配置 零漫游介绍 普通的漫游和零漫游的区别 普通漫游 漫游是由一个AP到另一个AP或者一个射频卡到另一个射频卡的漫…

工程管理与工作流

1 统一开发环境/ 协作工具 你知道开发环境指的是什么吗&#xff1f; 开发环境&#xff1a; 工程运行环境、开发工具/ 编辑器 、开发依赖环境、 配置文件 软件环境&#xff1a; “仿真预演”环境 Staging 生产环境前最终验证、 这一环境尽可能的仿真了真实的生产环境 、另一个…

MinIO线上扩容实战

硬件投入肯定是随着业务的增长而增长&#xff0c;这就要求中间件平台必须提供水平伸缩机制&#xff0c;MinIO对象存储服务也不例外&#xff0c;本文就详细介绍MinIO的扩容。 Minio支持通过增加新的Server Pool来扩容老的集群。每个Server Pool都是一个相对独立的故障域&#x…

SpeedBI数据可视化工具:浏览器上做分析

SpeedBI数据分析云是一种在浏览器上进行数据可视化分析的工具&#xff0c;它能够将数据以可视化的形式呈现出来&#xff0c;并支持多种数据源和图表类型。 所有操作&#xff0c;均在浏览器上进行 在浏览器中打开SpeedBI数据分析云官网&#xff0c;点击【免费使用】进入&#…