工作中常用的RabbitMQ实践

目录

1.前置

2.导入依赖

3.生产者

4.消费者

5.验证

验证Direct

验证Fanout

验证Topic


1.前置

安装了rabbitmq,并成功启动

2.导入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.生产者

生产端项目结构:
 

逻辑:生产者只对交换机进行生产,至于队列绑定等放在消费端进行执行

BusinessConfig

定义了三个不同类型的交换机

direct类型:(当生产者往该交换机发送消息时,他必须指定固定的routingkey,当routingkey值为空,他也会匹配routingkey为空的队列)

fanout类型:(当生产者往该交换机发送消息时,他所绑定的队列都会收到消息,routingkey即使写了也会忽略,一般为空字符串)

Topic类型:(当生产者往该交换机发送消息时,他并不像direct指定固定的routingkey,可以进行模糊匹配,当该routingkey为空时,他会匹配routingkey为空的队列)

package com.zsp.quartz.queue;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;/*** @Author: ZhangSP* @Date: 2023/12/7  14:05*/
@Slf4j
@Configuration
public class BusinessConfig {// 声明direct交换机public static final String EXCHANGE_DIRECT= "exchange_direct_inform";// 声明fanout交换机public static final String EXCHANGE_FANOUT= "exchange_fanout_inform";// 声明topic交换机public static final String EXCHANGE_TOPIC= "exchange_topic_inform";
}

TestProducer

生产消息

package com.zsp.quartz.queue;import com.alibaba.fastjson.JSON;
import com.zsp.quartz.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest
@RunWith(SpringRunner.class)
public class TestProducer {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void Producer_topics_springbootTest() {//使用rabbitTemplate发送消息String message = "";User user = new User();user.setName("张三");user.setEmail("anjduahsd");message = JSON.toJSONString(user);// directrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_DIRECT,"",message);// fanoutrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_FANOUT,"",message);// topicrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_TOPIC,"",message);}
}

4.消费者

消费者目录结构:

BusinessConfig:定义交换机类型,配置交换机与队列的绑定关系,通过容器工厂声明队列

package com.zsp.consumer.queue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;/*** @Author: ZhangSP* @Date: 2023/12/7  14:05*/
@Slf4j
@Configuration
public class BusinessConfig {// 声明directpublic static final String EXCHANGE_DIRECT= "exchange_direct_inform";public static final String QUEUE_DIRECT_EMAIL = "queue_direct_inform_email";public static final String QUEUE_DIRECT_SMS = "queue_direct_inform_sms";public void BindDirectEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);channel.queueDeclare(QUEUE_DIRECT_EMAIL, true, false, false, null);channel.queueBind(QUEUE_DIRECT_EMAIL, EXCHANGE_DIRECT, "");} catch (Exception e) {log.error("声明Direct->email队列时失败", e);}}public void BindDirectSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);channel.queueDeclare(QUEUE_DIRECT_SMS, true, false, false, null);channel.queueBind(QUEUE_DIRECT_SMS, EXCHANGE_DIRECT, "123");} catch (Exception e) {log.error("声明Direct->sms失败", e);}}// 声明fanoutpublic static final String EXCHANGE_FANOUT= "exchange_fanout_inform";public static final String QUEUE_FANOUT_EMAIL = "queue_fanout_inform_email";public static final String QUEUE_FANOUT_SMS = "queue_fanout_inform_sms";public void BindFanoutEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);channel.queueDeclare(QUEUE_FANOUT_EMAIL, true, false, false, null);channel.queueBind(QUEUE_FANOUT_EMAIL, EXCHANGE_FANOUT, "");} catch (Exception e) {log.error("声明Fanout->email队列时失败", e);}}public void BindFanoutSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);channel.queueDeclare(QUEUE_FANOUT_SMS, true, false, false, null);channel.queueBind(QUEUE_FANOUT_SMS, EXCHANGE_FANOUT,"");} catch (Exception e) {log.error("声明Fanout->sms失败", e);}}// 声明topicpublic static final String EXCHANGE_TOPIC= "exchange_topic_inform";public static final String QUEUE_TOPIC_EMAIL = "queue_topic_inform_email";public static final String QUEUE_TOPIC_SMS = "queue_topic_inform_sms";public static final String ROUTINGKEY_EMAIL="inform.#.email.#";public static final String ROUTINGKEY_SMS="inform.#.sms.#";public void BindTopicEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);channel.queueDeclare(QUEUE_TOPIC_EMAIL, true, false, false, null);channel.queueBind(QUEUE_TOPIC_EMAIL, EXCHANGE_TOPIC, ROUTINGKEY_EMAIL);} catch (Exception e) {log.error("声明Topic->email队列时失败", e);}}public void BindTopicSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);channel.queueDeclare(QUEUE_TOPIC_SMS, true, false, false, null);channel.queueBind(QUEUE_TOPIC_SMS, EXCHANGE_TOPIC,"");} catch (Exception e) {log.error("声明Topic->sms失败", e);}}// 声明队列@Autowired@Qualifier(value = "zspConnectionFactory")private ConnectionFactory connectionFactory;@PostConstructpublic void shengmingQueue() {try {Connection connection = connectionFactory.createConnection();Channel channel = connection.createChannel(false);BindDirectEmail(channel);BindDirectSms(channel);BindFanoutEmail(channel);BindFanoutSms(channel);BindTopicEmail(channel);BindTopicSms(channel);} catch (Exception e) {log.error("业务实例声明绑定队列报错:",e);}}
}

RabbitFactory:自定义容器工厂

package com.zsp.consumer.queue;import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@EnableRabbit
public class RabbitFactory {@Bean("zspConnectionFactory")public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();// 设置RabbitMQ的连接信息,如主机名、端口号、用户名和密码等connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("root");connectionFactory.setPassword("root");return connectionFactory;}@Bean("rabbitListenerContainerFactory")public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("zspConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setConcurrentConsumers(5);factory.setMaxConcurrentConsumers(10);return factory;}
}

ReceiveHandler:队列监听

package com.zsp.consumer.queue;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ReceiveHandler {//监听自定义的Direct队列@RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_SMS, containerFactory = "rabbitListenerContainerFactory")public void directSMS(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Direct队列->sms队列" + jsonObject);}@RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_EMAIL, containerFactory = "rabbitListenerContainerFactory")public void directEmail(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Direct队列->email队列" + jsonObject);}//监听自定义的Fanout队列@RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_SMS, containerFactory = "rabbitListenerContainerFactory")public void FanoutSMS(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Fanout队列->sms队列" + jsonObject);}@RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_EMAIL, containerFactory = "rabbitListenerContainerFactory")public void FanoutEmail(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Fanout队列->email队列" + jsonObject);}//监听自定义的Topic队列@RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_SMS, containerFactory = "rabbitListenerContainerFactory")public void TopicSMS(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Topic队列->sms队列" + jsonObject);}@RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_EMAIL, containerFactory = "rabbitListenerContainerFactory")public void TopicEmail(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Topic队列->email队列" + jsonObject);}
}

5.验证

先启动消费者端,然后执行TestProducer

验证Direct

1.向routingkey为空的队列发消息

我们在消费者端配置了routingkey为空的队列,叫做 QUEUE_DIRECT_EMAIL

因此会打印出下面这条记录

2.向routingkey为123的队列发消息

我们在消费者端配置了routingkey为123的队列,叫做 QUEUE_DIRECT_SMS

因此会打出下面这条记录

验证Fanout

谁跟我绑定了,我都发

验证Topic

模糊匹配routingkey

匹配sms队列

会把下面这个打印出来

需要注意的是如果我们没有自定义容器工厂的话,这个containerFactory可以不写

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/212395.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

B树你需要了解一下

介绍B树的度数主要特点应用场景时间复杂度代码示例拓展 介绍 B树&#xff08;B-tree&#xff09;是一种自平衡的树&#xff0c;能够保持数据有序&#xff0c;常被用于数据库和文件系统的实现。 B树可以看作是一般化的二叉查找树&#xff0c;它允许拥有多于2个子节点。与自平衡…

Spring boot 使用Redis 消息发布订阅

Spring boot 使用Redis 消息发布订阅 文章目录 Spring boot 使用Redis 消息发布订阅Redis 消息发布订阅Redis 发布订阅 命令 Spring boot 实现消息发布订阅发布消息消息监听主题订阅 Spring boot 监听 Key 过期事件消息监听主题订阅 最近在做请求风控的时候&#xff0c;在网上搜…

ESP32-Web-Server编程- 在 Web 上开发动态纪念册

ESP32-Web-Server编程- 在 Web 上开发动态纪念册 概述 Web 有很多有趣的玩法&#xff0c;在打开网页的同时送她一个惊喜。 需求及功能解析 本节演示在 ESP32 上部署一个 Web&#xff0c;当打开对应的网页时&#xff0c;将运行动态的网页内容&#xff0c;显示炫酷的纪念贺词…

.NET 8 中 Android 资源生成的改进和变化

作者&#xff1a;Dean Ellis 排版&#xff1a;Alan Wang 随着 .NET 8 的发布&#xff0c;我们引入了一个新系统&#xff0c;用于生成访问 Android 资源的 C# 代码。 在 Xamarin.Android、.NET 6 和 .NET 7 中生成 Resource.designer.cs 文件的系统已经被弃用。 新系统生成一个名…

苍穹外卖+git开源

搁置了很久重新开始学 为了学习方便&#xff0c;苍穹外卖的前后端代码已放至git开源。前端源代码请看给i他-->sky-take-out: 苍穹外卖 git学习-->Git基础使用-CSDN博客 后端接口员工管理和分类管理模块 添加员工&#xff0c;添加的表单账号、手机号、身份证都…

Spring Boot的日志

打印日志 打印日志的步骤: • 在程序中得到日志对象. • 使用日志对象输出要打印的内容 在程序中得到日志对象 在程序中获取日志对象需要使用日志工厂LoggerFactory,代码如下: package com.example.demo;import org.slf4j.Logger; import org.slf4j.LoggerFactory;public c…

安装you-get(mac)

1、首先要有python环境 2、更新pip python -m pip install --upgrade pip 3、安装you-get pip install you-get;

T天池SQL训练营(五)-窗口函数等

–天池龙珠计划SQL训练营 5.1窗口函数 5.1.1窗口函数概念及基本的使用方法 窗口函数也称为OLAP函数。OLAP 是OnLine AnalyticalProcessing 的简称&#xff0c;意思是对数据库数据进行实时分析处理。 为了便于理解&#xff0c;称之为窗口函数。常规的SELECT语句都是对整张表进…

创建vue项目:node.js下载安装、配置环境变量,下载安装cnpm,配置npm的目录、镜像,安装vue、搭建vue项目开发环境(保姆级教程一)

今天讲解 Windows 如何创建 vue 项目&#xff0c;搭建 vue 开发环境&#xff0c;这是这个系列的第一章&#xff0c;有什么问题请留言&#xff0c;请点赞收藏&#xff01;&#xff01;&#xff01; 文章目录 一、Vue简单介绍二、开始搭建1、安装node.js环境2、配置npm下载时的默…

一文3000字从0到1用Python进行gRPC接口测试!

gRPC 是一个高性能、通用的开源RPC框架&#xff0c;其由 Google 主要面向移动应用开发并基于HTTP/2 协议标准而设计&#xff0c;基于 ProtoBuf(Protocol Buffers) 序列化协议开发&#xff0c;且支持众多开发语言。 自gRPC推出以来&#xff0c;已经广泛应用于各种服务之中。在测…

数据可视化免费化的双面影响探析

近年来数据可视化的免费化也越来越明显&#xff0c;今天就以我作为可视化设计师的经验来和大家分析一下&#xff0c;数据可视化工具免费化所带来的利与弊。 先从好处入手&#xff0c;最明显的就是免费化可以让数据可视化工具得到更广泛的使用。 免费数据可视化工具使得更多人可…

docker搭建nginx实现负载均衡

docker搭建nginx实现负载均衡 安装nginx 查询安装 [rootlocalhost ~]# docker search nginx [rootlocalhost ~]# docker pull nginx准备 创建一个空的nginx文件夹里面在创建一个nginx.conf文件和conf.d文件夹 运行映射之前创建的文件夹 端口&#xff1a;8075映射80 docker…

电脑版便签软件怎么设置在桌面上显示?

对于不少上班族来说&#xff0c;如果想要在使用电脑办公的时候&#xff0c;随手记录一些常用的工作资料、工作注意事项等内容&#xff0c;直接在电脑上使用便签软件记录是比较方便的。电脑桌面便签工具不仅方便我们随时记录各类工作事项&#xff0c;而且支持我们快速便捷使用这…

长城之上的无人机:文化遗产的守护者

长城之上的无人机&#xff1a;文化遗产的守护者 在八达岭长城景区&#xff0c;两架无人机分别部署在了长城的南、北楼两点。根据当前的保护焦点和需求&#xff0c;制定了5条无人机综合巡查航线&#xff0c;以确保长城景区的所有开放区域都能得到有效监管。每天&#xff0c;无人…

Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码

一、相关API的handler1、接收HTTP请求的hander2、每一个数据节点(node)执行分片刷新的action是TransportShardFlushAction 二、对indexShard执行刷新请求1、首先获取读锁&#xff0c;再获取刷新锁&#xff0c;如果获取不到根据参数决定是否直接返回还是等待2、在刷新之后transl…

Java的三种代理模式实现

代理模式的定义&#xff1a; Provide a surrogate or placeholder for another object to control access to it.&#xff08;为其他对象提供一种代理以控制对这个对象的访问。&#xff09; 简单说&#xff0c;就是设置一个中间代理来控制访问原目标对象&#xff0c;达到增强原…

ProEasy机器人案例:电池边包胶

如下图所示&#xff0c;对一个电池三边包边&#xff0c;因客户现场有很多规格电池的大小&#xff0c;所以就需要建立动态的工具坐标来实现适配所有种类的电池 程序如下&#xff1a;Ddome程序 function Speed(num) --速度设置 MaxSpdL(2000) --movl最大速度…

茄子科技张韶全:跨多云大数据平台DataCake在OceanBase的实践

11 月 16 日&#xff0c;OceanBase 在北京顺利举办 2023 年度发布会&#xff0c;正式宣布&#xff1a;将持续践行“一体化”产品战略&#xff0c;为关键业务负载打造一体化数据库。其中&#xff0c;在“数字化转型升级实践专场”&#xff0c;我们有幸邀请到了茄子科技大数据技术…

数据库:JDBC编程

专栏目录 MySQL基本操作-CSDN博客 MySQL基本操作-CSDN博客 数据库的增删查改&#xff08;CRUD&#xff09;基础版-CSDN博客 数据库增删改查&#xff08;CRUD&#xff09;进阶版-CSDN博客 数据库的索引-CSDN博客 基本概念 JDBC编程就是通过Java代码来操作数据库 api 数据库是…

Apache+mod_jk模块代理Tomcat容器

一、背景介绍 最近在看Tomcat运行架构原理, 正好遇到了AJP协议(Apache JServ Protocol). 顺道来研究下这个AJP协议和具体使用方法. 百度百科是这么描述AJP协议的: AJP&#xff08;Apache JServ Protocol&#xff09;是定向包协议。因为性能原因&#xff0c;使用二进制格式来传输…