RabbitMQ入门案例

RabbitMQ 是目前比较主流的MQ消息队列中间件,下面简单总结RabbitMQ入门时所做的一些笔记

1.RabbitMQ 入门案例

需求:用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者

1.1 添加依赖

<!--rabbitmq 依赖客户端-->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version>
</dependency>

1.2 消息生产者

创建一个类作为生产者,最终生产消息到 RabbitMQ 的队列里

步骤:

  1. 创建 RabbitMQ 连接工厂
  2. 进行 RabbitMQ 工厂配置信息
  3. 创建 RabbitMQ 连接
  4. 创建 RabbitMQ 信道
  5. 生成一个队列
  6. 发送一个消息到交换机,交换机发送到队列。“” 代表默认交换机
/*** <p>Class: Producer </p>* <p>Description: 生产者:发消息</p>** @author zhouyi* @version 1.0* @date 2023/7/9*/
public class Producer {//对列名称public static final String QUEUE_NAME = "hello";//发消息public static void main(String[] args) throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP 连接RabbitMQ对列factory.setHost("8.219.165.36");//用户名factory.setUsername("admin");//密码factory.setPassword("admin123");//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();/*** 生产一个对列* 1.对列名称* 2.对列里面的消息是否持久化,默认情况下,消息存储在内存中* 3.该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费 false:只能一个消费者消费* 4.是否自动删除,最后一个消费者端开链接以后,该队列是否自动删除,true表示自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//发消息String message = "Hello,world";/*** 发送一个消息* 1.发送到哪个交换机* 2.路由的key值是哪个本次是队列的名称* 3.其他参数信息* 4.发送消息的消息体*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息发送完毕");}
}

运行代码发现如下错误,即读写权限没设置好

Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost '/' refused for user 'admin', class-id=10, method-id=40)

image-20230709180328444

再次运行看到消息队列中已存在消息

image-20230709180630502

image-20230709180852356

方法解释

  • 声明队列:
channel.queueDeclare(队列名/String, 持久化/boolean, 共享消费/boolean, 自动删除/boolean, 配置参数/Map);

配置参数现在是 null,后面死信队列、延迟队列等会用到,如:队列的优先级

队列里的消息如果没有被消费,何去何从?(死信队列)

Map<String, Object> params = new HashMap();
// 设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
params.put("x-max-priority", 10);
// 声明当前队列绑定的死信交换机
params.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 key
params.put("x-dead-letter-routing-key", "YD");
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
  • 发布消息:

    channel.basicPublish(交换机名/String, 队列名/String, 配置参数/Map, 消息/String);
    

    配置参数现在是 null,后面死信队列、延迟队列等会用到,如:发布的消息优先级

    发布的消息标识符 id

// 给消息赋予 优先级 ID 属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).messageId("1")build();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());

1.3 消息消费者

创建一个类作为消费者,消费 RabbitMQ 队列的消息,消息消费是通过Channel来完成的

public class Consumer {//队列的名称public static final String QUEUE_NAME = "hello";//接受消息public static void main(String[] args) throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP 连接RabbitMQ对列factory.setHost("8.219.165.36");//用户名factory.setUsername("admin");//密码factory.setPassword("admin123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明接收消息DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println(new String(message.getBody()));};//取消消息时的回调CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答true:代表自动应答false:代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}

运行结果如下

image-20230709182737671

此时队列里面的消息都被全部消费了

image-20230709182819693

说明消息已被消费掉了

2.Work Queues(工作队列)

Work Queues 是工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

2.1 轮询消费

轮询消费消息指的是轮流消费消息,即每个工作队列都会获取一个消息进行消费,并且获取的次数按照顺序依次往下轮流。

案例中生产者叫做 Task,一个消费者就是一个工作队列,启动两个工作队列消费消息,这个两个工作队列会以轮询的方式消费消息。

轮询案例

  • 首先把 RabbitMQ 的配置参数封装为一个工具类:`RabbitMQUtils,创建信道的工具类
public class RabbitMqUtils {//得到一个连接的 channelpublic static Channel getChannel() throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("182.92.234.71");factory.setUsername("admin");factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}

创建一个工作线程,相当于一个消费者

public class Work01 {//队列的名称public static final String QUEUE_NAME = "hello";//接收消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();//消息的接受DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到的消息:" + new String(message.getBody()));};//消息接受被取消时,执行下面的内容CancelCallback cancelCallback = consumerTag -> {System.out.println(consumerTag + "消息被消费者取消消费接口回调逻辑");};//消息的接受channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}

为了演示多个工作线程,可以在IDEA中设置允许同时运行多次

image-20231215224904321

创建好一个工作队列,只需要以多线程方式启动两次该 main 函数即可,以 first、second 区别消息队列。

要开启多线程功能,首先启动该消息队列,然后如图开启多线程:

先运行第一个工作线程:

image-20230709200835846

在运行第二个工作线程,此时已经开了两个工作线程,如下

image-20230709201135491

  • 创建一个生产者,发送消息进程,为了方便消息直接从控制台输入

    public class Task01 {//队列名称public static final String QUEUE_NAME="hello";//发送大量消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();//队列的声明channel.queueDeclare(QUEUE_NAME,false,false,false,null);//发送消息//从控制台当中接受信息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完成:"+message);}}
    }
    

结果验证

假设生产者生产 AA BB CC DD这四条消息,理论上工作线程C1和工作线程C2轮询接收到消息,期望测试结果如下

  • 生产者(Task01) 生产AA消息,工作线程C1接收到AA消息;
  • 生产者(Task01) 生产BB消息,工作线程C2接收到BB消息;
  • 生产者(Task01) 生产CC消息,工作线程C1接收到CC消息;
  • 生产者(Task01) 生产DD消息,工作线程C2接收到DD消息;

实际运行结果如下,和期望一致

image-20230709201946579

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/220505.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

美颜SDK是什么?视频美颜SDK在直播平台中的集成与接入教程详解

当下&#xff0c;主播们追求更加自然、精致的外观&#xff0c;而观众也期待在屏幕前欣赏到更为清晰、美丽的画面。为了满足这一需求&#xff0c;美颜SDK应运而生&#xff0c;成为直播平台的重要利器之一。 一、什么是美颜SDK&#xff1f; 通过美颜SDK&#xff0c;开发者可以…

三菱PLC FX3U滑动平均值滤波

三菱PLC滑动平均值滤波其它相关写法,请参考下面文章链接: https://rxxw-control.blog.csdn.net/article/details/125044013https://rxxw-control.blog.csdn.net/article/details/125044013滑动平均值滤波程序总共分为三部分,第一步为:滑动采样。 第二步为:队列求和,第三…

最强笔记生成AI —— NotionAI

NotionAI是Notion推出的一款革命性AI工具&#xff0c;它正通过利用其先进的AI技术来扩大用户群。这款强大的生成式AI工具能够帮助用户完成笔记总结、识别会议中的行动项&#xff0c;并创建和修改文本。NotionAI通过自动化枯燥的任务、为用户提供建议和模板&#xff0c;极大地简…

深度学习模型轻量化方法介绍

深度学习模型轻量化是指通过一系列技术手段减少模型的大小和计算需求&#xff0c;使其能够在资源有限的环境中&#xff08;如移动设备、嵌入式系统&#xff09;运行。下面是一些常见的模型轻量化方法&#xff1a; 模型剪枝&#xff08;Pruning&#xff09;: 描述: 模型剪枝涉及…

【Java异常】idea 报错:无效的目标发行版:17 的解决办法

【Java异常】idea 报错&#xff1a;无效的目标发行版&#xff1a;17 的解决办法 一&#xff0c;问题来源 springcloud的第一个demo项目就给我干趴了 二、原因分析 java: 无效的目标发行版: 17 原因就是 JDK 版本不对。从 IDEA 编辑器中可以找到问题的原因所在&#xff0c;…

Axure之交互与情节与一些实例

目录 一.交互与情节简介 二.ERP登录页到主页的跳转 三.ERP的菜单跳转到各个页面的跳转 四.省市联动 五.手机下拉加载 今天就到这里了&#xff0c;希望帮到你哦&#xff01;&#xff01;&#xff01; 一.交互与情节简介 "交互"通常指的是人与人、人与计算机或物体…

卷积层里的填充和步幅(padding和strides)

目录 一、填充和步幅相关概念 1、填充(padding) 2、步幅(strides) 3、总结 二、代码实现 1、填充(padding) 2、步幅(strides) 3、小结 一、填充和步幅相关概念 1、填充(padding) 当输入图片比较小的时候&#xff0c;我们一般会进行填充&#xff0c;填充是指在输入周围…

【TB作品】STM32 PWM之实现呼吸灯,STM32F103RCT6,晨启

文章目录 完整工程参考资料实验过程 实验任务&#xff1a; 1&#xff1a;实现PWM呼吸灯&#xff0c;定时器产生PWM&#xff0c;控制实验板上的LED灯亮灭&#xff1b; 2&#xff1a;通过任意两个按键切换PWM呼吸灯输出到两个不同的LED灯&#xff0c;实现亮灭效果&#xff1b; 3&…

Flink系列之:自定义函数

Flink系列之&#xff1a;自定义函数 一、自定义函数二、概述三、开发指南四、函数类五、求值方法六、类型推导七、自动类型推导八、定制类型推导九、确定性十、内置函数的确定性十一、运行时集成十二、标量函数十三、表值函数十四、聚合函数十五、表值聚合函数 一、自定义函数 …

【EI会议征稿通知】第三届区块链、信息技术与智慧金融国际学术会议 (ICBIS2024)

第三届区块链、信息技术与智慧金融国际学术会议 (ICBIS2024) The 3rd International Academic Conference on Blockchain, Information Technology and Smart Finance 第三届区块链、信息技术与智慧金融国际学术会议 (ICBIS2024) 将于2024年2月23-25日在马来西亚举行。本次会…

回归预测 | MATLAB实现GWO-DHKELM基于灰狼算法优化深度混合核极限学习机的数据回归预测 (多指标,多图)

回归预测 | MATLAB实现GWO-DHKELM基于灰狼算法优化深度混合核极限学习机的数据回归预测 &#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现GWO-DHKELM基于灰狼算法优化深度混合核极限学习机的数据回归预测 &#xff08;多指标&#xff0c;多图&#…

根据电脑硬件条件,确定Pytorch的版本?

根据CUDA确定Pytorch的版本 1 显卡型号&#xff1a;NVIDIA GeForce GTX 970 2 显卡算力&#xff1a;5.2 https://en.wikipedia.org/wiki/CUDA3 确定CUDA Runtime 4 看自己的驱动&#xff1a; CUDA Driver Version —— 12.2 nvidia-smi 5 确定使用的版本 前面3中runtime …

Zookeeper-快速开始

Zookeeper介绍 简介&#xff1a;ZooKeeper 是一个开源的分布式协调框架&#xff0c;是Apache Hadoop 的一个子项目&#xff0c;主要用来解决分布式集群中应用系统的一致性问题。 设计目标&#xff1a;将那些复杂且容易出错的分布式一致性服务封装起来&#xff0c;构成一个高效…

Ubuntu 虚拟机环境,编译AOSP源码

环境 : VMware虚拟机 Ubuntu 20.04.3 LTS 搭建配置开发环境 sudo apt-get install git-core gnupg flex bison build-essential zip curl zlib1g-dev gcc-multilib g-multilib libc6-dev-i386 libncurses5 lib32ncurses5-dev x11proto-core-dev libx11-dev lib32z1-dev libgl…

持续集成交付CICD:K8S 通过模板文件自动化完成前端项目应用发布

目录 一、实验 1.环境 2.GitLab 更新deployment文件 3.GitLab更新共享库前端项目CI与CD流水线 4.K8S查看前端项目版本 5.Jenkins 构建前端项目 6.Jenkins 再次构建前端项目 二、问题 1. Jenkins 构建CI 流水线报错 2. Jenkins 构建CI 流水线弹出脚本报错 3. Jenkins…

fiddler的下载、安装

在官网下载fiddler 点击Download For Windows 下载完成 安装fiddler 点击.exe文件&#xff0c;进行傻瓜式安装&#xff0c;即可安装成功 配置fiddler 点击OK后&#xff0c;重启fiddler, 即可抓包

JS基础之模块化

JS基础之模块化 JS模块化模块化前端发展 什么是模块&#xff1f;怎么定义模块化IIFE匿名函数自调用IIFE模式增强模块化的好处 JS模块化 模块化 JS DOM操作 代码规范管理的标准 不同模块间的管理模块内部自组织 标准bundler (模块构建工具) ESNext TS -> ES5 前端发展 生态 …

打开VScode时不打开上次使用的文件夹

是不是很烦VScode 打开新的文件夹&#xff0c;每次都打开上次使用过的文件夹&#xff0c;只需在设置里面改一个设置就可以避免了。 Ctrl &#xff0c;打开设置&#xff0c;搜索 window.restoreWindows 通过这种设置就可以让VScode 每次打开新的文件夹而不打开上次的文件夹。

“No.”竟然不是Number的缩写!92%的人不知道为什么!柯桥成人英语学习就来泓畅教育

今天给大家介绍一个很有意思的表达 不知道&#xff0c;同学们有没有发现 ↓ 英语中&#xff0c;数字经常和“No.”一起出现 大家有深究过“No.”是什么意思吗 有的同学会说&#xff0c;是不是“Number”的缩写 虽然很像&#xff0c;但是它俩还真不一样 接下来我们就来盘一…

Leetcode—12.整数转罗马数字【中等】

2023每日刷题&#xff08;六十四&#xff09; Leetcode—12.整数转罗马数字 实现代码 const pair<int, string> valueTable[] {{1000, "M"},{900, "CM"},{500, "D"},{400, "CD"},{100, "C"},{90, "XC"},…