springboot框架使用RabbitMQ举例代码

以前分享过一个理论有兴趣的小伙伴可以看下
https://blog.csdn.net/Drug_/article/details/138164180

不多说 还是直接上代码

第一步:引入依赖 可以不指定版本

     <!--        amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

第二步 配置文件

  #配置rabbitMq 服务器rabbitmq:host: ${rabbitmq.rabbitmqHost}port: ${rabbitmq.rabbitmqPort}username: ${rabbitmq.rabbitmqUsername}password: ${rabbitmq.rabbitmqPassword}virtual-host: ${rabbitmq.rabbitmqVhost}connection-timeout: 5000#消费者配置listener:simple:
# 不建议使用 自带的重试配置  因为有几种情况会失效  在网上摘抄的 网友的测试
#  重试机制使用场景:
#  1. 如果是业务代码,比如空指针之类的异常那重试机制其实没什么用
#  2. 代码中不能使用try/catch捕获异常,否则重试机制失效
#  我在消费者 使用了 try 发现 确实失效了  所以 我觉得 需要手动在消费者里累计重试次数    自行处理异常
#        retry:
#          enabled: true  #开启消费者retry重试机制
#          max-attempts: 3  # 最大重试次数
#          multiplier: 2.0 # 重试间隔时间倍数
#          initial-interval: 1000 # 初始重试间隔时间(毫秒)
#          max-interval: 10000 # 最大重试间隔时间(毫秒)acknowledge-mode: manual # 手动确认消息,防止消息丢失 auto manual手动确认模式default-requeue-rejected: true #是否将拒绝的消息重新入队。默认是 true,即拒绝的消息会重新入队。 配合手动确认模式concurrency: 1 #: 消费者线程池的并发数。设置同时处理的消费者数量max-concurrency: 1 #最大并发消费者数量prefetch: 1 # 限制每个消费者一次可以获取的消息数量,防止消息在某个消费者身上发生阻塞#生产者配置
#    publisher-returns: true  # 启用发布者返回模式。设置为 true 启用,确保如果消息无法路由到目标队列,则会返回给生产者。# none: 不启用发布者确认。# correlated: 启用发布者确认并使用 CorrelationData 对象,可以在回调中进行处理。#: 启用简单的发布者确认模式,不带 CorrelationData。
#    publisher-confirm-type: none

第三步定义常量 :

package com.testweb.testweb.rabbitmq.web;/*** User:Json* Date: 2024/9/3**/
public class MqConstant {public static final String TestDirectRouting = "rabbitmq.TestDirectRouting";public static final String TestDirectQueue = "rabbitmq.TestDirectQueue";public static final String TestDirectExchange = "rabbitmq.TestDirectExchange";
}

第四步 消费者定义:

package com.testweb.testweb.rabbitmq.web.consumer;import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;/*** User:Json* Date: 2024/9/3* 消费者**/
@Configuration
public class TestConsumer {//队列@Beanpublic Queue TestDirectQueue() {Map<String, Object> args = new HashMap<>();args.put("x-ha-policy", "all"); //将队列设置为在集群中的所有节点上都可用return new Queue(MqConstant.TestDirectQueue, true, false, false, args);}@Beanpublic DirectExchange TestDirectExchange() {return new DirectExchange(MqConstant.TestDirectExchange, true, false);}@Beanpublic Binding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(MqConstant.TestDirectRouting);}@RabbitListener(queues = MqConstant.TestDirectQueue)public void process1(Message testMessage, Channel channel) {// 消息的唯一标识idlong deliveryTag = testMessage.getMessageProperties().getDeliveryTag();//重试次数Integer retryCount =(Integer) testMessage.getMessageProperties().getHeaders().getOrDefault("retryCount", 0);try {// 处理消息的业务逻辑System.out.println("Received order message: " + new String(testMessage.getBody()));//假装异常int a=  1/0;// 手动确认消息// deliveryTag 唯一标识// multiple 是否批量拒绝,true 表示拒绝当前及之前的所有消息,false 表示仅拒绝当前消息channel.basicAck(deliveryTag, false);} catch (Exception e) {if (retryCount < 3) { // 设置最大重试次数try {System.out.println("处理失败,拒绝消息并重新入队 :" + testMessage);MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("retryCount", retryCount + 1);Message newMessage = new Message(testMessage.getBody(), messageProperties);// 重新入队  未写完channel.basicPublish(MqConstant.TestDirectExchange, MqConstant.TestDirectRouting, null, newMessage.getBody());// 手动确认原消息,防止死循环channel.basicAck(deliveryTag, false);// 处理失败,拒绝消息并重新入队 方式1// 消息标识 deliveryTag,// multiple 是否批量拒绝,true 表示拒绝当前及之前的所有消息,false 表示仅拒绝当前消息,,// requeue  是否将消息重新放回队列,true 表示重新放入队列,false 表示丢弃//    channel.basicNack(deliveryTag, false, true);// 处理失败,拒绝消息并重新入队 方式2// 消息标识 deliveryTag// requeue 是否将消息重新放回队列,true 表示重新放入队列,false 表示丢弃。//channel.basicReject(long deliveryTag, boolean requeue);
//                3. 使用场景
//                basicNack:
//                当你需要拒绝一批消息时,使用 basicNack 是更好的选择,尤其是当你想在消费失败时批量拒绝多条消息。
//                适用于更复杂的场景,比如一次性处理多个未确认的消息。
//                basicReject:
//                当你只想拒绝当前消息时,basicReject 是一个简化的选择。它通常用于更简单的场景,只需处理当前消息即可。
//                适合处理单个消息的拒绝。//如果你在消费者 里 只写了 消息确定 没有写 如果异常后 的处理 默认是不会把消息重新放回队列的} catch (Exception nackException) {System.out.println("重新入队失败!!!");// 处理 nack 失败的情况nackException.printStackTrace();}} else {System.out.println("达到最大重试次数 将消息发送到死信队列或进行其他处理!!!");try {channel.basicReject(deliveryTag, false); // 丢弃消息或转发到死信队列} catch (Exception rejectException) {rejectException.printStackTrace();}}}}}

第五步:生产者

package com.testweb.testweb.rabbitmq.web.producer;import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** User:Json* Date: 2024/9/3* 生产者**/
@Component
public class TestProducer {@ResourceAmqpTemplate amqpTemplate;public <T> void produce(T payload){amqpTemplate.convertAndSend(MqConstant.TestDirectExchange,MqConstant.TestDirectRouting, payload);}
}

第六步 测试:

package com.testweb.testweb.rabbitmq.web.controller;import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** User:Json* Date: 2024/9/3**/
@RestController
@RequestMapping("/testMq")
public class TestMqController {@ResourceAmqpTemplate amqpTemplate;@GetMapping("test")@CrossOrigin(origins = "*")public void test(@RequestParam String msg){amqpTemplate.convertAndSend(MqConstant.TestDirectExchange,MqConstant.TestDirectRouting, msg);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/465215.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

R语言*号标识显著性差异判断组间差异是否具有统计意义

前言 该R代码用于对Iris数据集进行多组比较分析&#xff0c;探讨不同鸢尾花品种在不同测量变量&#xff08;花萼和花瓣长度与宽度&#xff09;上的显著性差异。通过将数据转换为长格式&#xff0c;并利用ANOVA和Tukey检验&#xff0c;代码生成了不同品种间的显著性标记&#x…

手边酒店多商户版V2源码独立部署_博纳软云

新版采用laraveluniapp开发&#xff0c;为更多平台小程序开发提供坚实可靠的底层架构基础。后台UI全部重写&#xff0c;兼容手机端管理。 全新架构、会员卡、钟点房、商城、点餐、商户独立管理

Multi Agents协作机制设计及实践

01 多智能体协作机制的背景概述 在前述博客中&#xff0c;我们利用LangChain、AutoGen等开发框架构建了一个数据多智能体的平台&#xff0c;并使用了LangChain的Multi-Agents框架。然而&#xff0c;在实施过程中&#xff0c;我们发现现有的框架存在一些局限性&#xff0c;这些…

ReactPress—基于React的免费开源博客CMS内容管理系统

ReactPress Github项目地址&#xff1a;https://github.com/fecommunity/reactpress 欢迎提出宝贵的建议&#xff0c;感谢Star。 ![ReactPress](https://i-blog.csdnimg.cn/direct/0720f155edaa4eadba796f4d96d394d7.png#pic_center ReactPress 是使用React开发的开源发布平台&…

如何在一个 Docker 容器中运行多个进程 ?

在容器化的世界里&#xff0c;Docker 彻底改变了开发人员构建、发布和运行应用程序的方式。Docker 容器封装了运行应用程序所需的所有依赖项&#xff0c;使其易于跨不同环境一致地部署。然而&#xff0c;在单个 Docker 容器中管理多个进程可能具有挑战性&#xff0c;这就是 Sup…

【JavaEE初阶 — 多线程】线程安全问题 & synchronized

目录 1. 什么是线程安全问题 (1) 观察线程不安全 (2) 线程安全的概念 2. 造成线程安全的原因 (1)线程调度的随机性 问题描述 解决方案 (2)修改共享数据&#xff06;原子性问题 问题描述 解决方案 3.synchronized 关键字 1. synchronized 的特性 (1) …

产品经理的重要性

一直觉得产品经理很重要&#xff0c;这几年写了好几篇和产品经理相关的思考。2020年写过对产品经理的一些思考的文章&#xff0c;2021年&#xff0c;写了一篇对如何分析项目的思考&#xff0c;2024年写了如何与PM探讨项目。 今天还想再写一篇&#xff0c;主要是最近很有感慨。…

Hunyuan-Large:推动AI技术进步的下一代语言模型

腾讯近期推出了基于Transformer架构的混合专家&#xff08;MoE&#xff09;模型——Hunyuan-Large&#xff08;Hunyuan-MoE-A52B&#xff09;。该模型目前是业界开源的最大MoE模型之一&#xff0c;拥有3890亿总参数和520亿激活参数&#xff0c;展示了极强的计算能力和资源优化优…

【Linux系列】利用 CURL 发送 POST 请求

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

通义灵码实操—飞机大战游戏

通义灵码实操—飞机大战游戏 有没有想象过自己独立编写一个有趣的小游戏。在本实践课程中&#xff0c;你不仅可以实现这个想法&#xff0c;而且还将得到通义灵码智能编程助手的支持与指导。我们将携手步入编程的神奇世界&#xff0c;以一种简洁、高效且具有创造性的方式&#…

lora训练模型 打造个人IP

准备工作 下载秋叶炼丹器整理自己的照片下载底膜 https://rentry.org/lycoris-experiments 实操步骤 解压整合包 lora-scripts,先点击“更新” 训练图片收集 比如要训练一个自己头像的模型&#xff0c;就可以拍一些自己的照片&#xff08;20-50张&#xff0c;最少15张&…

Caffeine 手动策略缓存 put() 方法源码解析

BoundedLocalManualCache put() 方法源码解析 先看一下BoundedLocalManualCache的类图 com.github.benmanes.caffeine.cache.BoundedLocalCache中定义的BoundedLocalManualCache静态内部类。 static class BoundedLocalManualCache<K, V> implements LocalManualCache&…

Spring Boot框架下的教育导师匹配系统

第一章 绪论 1.1 选题背景 如今的信息时代&#xff0c;对信息的共享性&#xff0c;信息的流通性有着较高要求&#xff0c;尽管身边每时每刻都在产生大量信息&#xff0c;这些信息也都会在短时间内得到处理&#xff0c;并迅速传播。因为很多时候&#xff0c;管理层决策需要大量信…

Unity SRP学习笔记(二)

Unity SRP学习笔记&#xff08;二&#xff09; 主要参考&#xff1a; https://catlikecoding.com/unity/tutorials/custom-srp/ https://docs.unity.cn/cn/2022.3/ScriptReference/index.html 中文教程部分参考&#xff08;可选&#xff09;&#xff1a; https://tuncle.blog/c…

2024年10款超好用的企业防泄密软件|企业文件加密防泄密必备!

随着信息技术的迅速发展&#xff0c;企业面临的数据泄露风险越来越高。为了保护企业的敏感信息&#xff0c;防止数据泄露&#xff0c;企业防泄密软件应运而生。以下是2024年值得关注的10款企业防泄密软件&#xff0c;帮助企业有效保障数据安全。 1.安秉网盾 安秉网盾防泄密是一…

K8S flannel网络模式对比

K8S flannel网络模式对比 VXLAN 模式Host-GW 模式如何查看 Flannel 的网络模式?如何修改 Flannel 的网络模式?如何修改flannel vxlan端口?Flannel 是一个 Kubernetes 中常用的网络插件,用于在集群中的节点之间提供网络连接。Flannel 提供了多种后端实现方式,vxlan 和 host…

计算机网络:网络层 —— 移动 IP 技术

文章目录 IPv6IPv6 的诞生背景主要优势IPv6引进的主要变化 IPv6数据报的基本首部IPv6数据报首部与IPv4数据报首部的对比 IPv6数据报的拓展首部IPv6地址IPv6地址空间大小IPv6地址的表示方法 IPv6地址的分类从IPv4向IPv6过渡使用双协议栈使用隧道技术 网际控制报文协议 ICMPv6ICM…

大客户营销数字销售实战讲师培训讲师唐兴通专家人工智能大模型销售客户开发AI大数据挑战式销售顾问式销售专业销售向高层销售业绩增长创新

唐兴通 销售增长策略专家、数字销售实战导师 专注帮助企业构建面向AI数字时代新销售体系&#xff0c;擅长运用数字化工具重塑销售流程&#xff0c;提升销售业绩。作为《挑战式销售》译者&#xff0c;将全球顶尖销售理论大师马修狄克逊等理论导入中国销售业界。 核心专长&…

【dvwa靶场:XSS系列】XSS (Stored)低-中-高级别,通关啦

更改name的文本数量限制大小&#xff0c; 其他我们只在name中进行操作 【除了低级可以在message中进行操作】 一、低级low <script>alert("假客套")</script> 二、中级middle 过滤了小写&#xff0c;咱们可以大写 <Script>alert("假客套…

css中pointer-events:none属性对div里面元素的鼠标事件的影响

文章目录 前倾提要当没有设置属性pointer-events时候结果 当子元素设置了pointer-events: none修改后的代码结果如下所示 当父元素设置了pointer-events: none若两个div同级也就是兄弟级 前倾提要 在gis三维开发的地图组件上放一个背景图片&#xff0c;左右两侧的颜色渐变等&a…