200、使用默认 Exchange 实现 P2P 消息 之 消息生产者(发送消息) 和 消息消费者(消费消息)

RabbitMQ 工作机制图:

Connection: 代表客户端(包括消息生产者和消费者)与RabbitMQ之间的连接。
Channel: 连接内部的Channel。channel:通道
Exchange: 充当消息交换机的组件。
Queue: 消息队列。
在这里插入图片描述

★ 消费消息

使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:

(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。

(2)通过Connection获取Channel。

(3)根据需要,调用Channel的queueDeclare()方法声明队列,如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。

(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,
该参数相当于JMS中的消息监听器。

这个 basicConsume()方法 相当于是异步消费。
而同步消费会出现阻塞情况,这就失去消息中间件存在的意义,所以先讲异步消费。

★ 发送消息

使用RabbitMQ Java Client依赖库开发消息生产者的大致步骤如下:

(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。

(2)通过Connection获取Channel。

(3)根据需要调用exchangeDeclare()、queueDeclare()方法声明Exchange和队列、并完成队列与Exchange的绑定。
如果声明的Exchange还不存在,则创建该Exchange;否则直接使用已有的Exchange。
Declare:声明、宣布

(4)调用Channel的basicPublish()方法发送消息,调用该方法的第一个参数是exchange,
第二个参数为路由key,最后两个参数依次是消息属性和消息数据体。

【注意】:虽然消息生产者与队列是完全隔离的, 但如果消息生产者不声明消息队列,那系统中就可能暂时还没有任何消息队列。

在这种情况下,消息生产者向Exchange发送的消息将不会分发给任何队列,这些消息直接就被丢弃了。

【备注】:为了保证消息生产者能将消息发送到指定队列,消息生产者需要声明消息队列,保证消息队列的存在。

**问题:**消息生产者 和 消息队列 是完全隔离的,但是生产者为什么还要声明消息队列?
**原因:**因为程序如果先运行消息生产者,后运行消费者,而声明消息队列的方法又只存在消费者那边,那么在先运行消息生产者时,就会因为还没有生成消息队列,所以生产者发送到exchange的消息,会因为没有对应的消息队列而被丢弃。

代码演示:

先创建一个普通的 maven 项目。
在这里插入图片描述
添加一些属性 和 RabbitMQ的依赖
在这里插入图片描述

在这里插入图片描述

创建消息消费者

把创建连接的代码封装到一个方法里去。
在这里插入图片描述

消费者的代码
在这里插入图片描述
在这里插入图片描述

注意:channel.basicConsume 的第二个参数 autoAck:true,就是表示自动确认消息已经被消费完成了。就是当消费者接收到消息之后,就立马返回一个已经确认消费的消息回去给消息队列。
这样容易出现问题,就是消费者这边因为一收到消息就会自动确认消息被消费了并返回已经消费消息的结果回去给消息队列,但是可能消费者其实还没有把消息消费掉,而消息队列那边又以为消费者已经把消息消费了,所以就继续发消息给那个消费者。
而消费者一收到消息又自动确认消费并返回,就会导致这个消息队列的消息越来越多,然后消费者消费不完。
在这里插入图片描述
这个演示已消费未确认的演示放最后



执行消费者
在这里插入图片描述
控制台查看
原本没有这个消息队列,通过调用Channel的queueDeclare()方法声明队列,如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列

一开始消费者声明的这个消息队列,这个是否独占的exclusive 参数我是写true,
所以下图的 myQueue01的 Features 就是 Excl

在这里插入图片描述
在这里插入图片描述

这个就是创建的消费者。
用于消费这个 myQueue01 消息队列的消费者。
在这里插入图片描述

后面把 exclusive 改成了 false,是因为后面的生产者,需要也声明这个 myQueue01 消息队列,而如果这个消息队列是 独占的,就没法声明了,所以改成 false
在这里插入图片描述
在这里插入图片描述

创建消息生产者

生产者发送完消息就会关闭资源
消费者则是一直启动着
在这里插入图片描述

测试

先启动消费者或者启动生产者都一样,因为生产者和消费者都有调用queueDeclare() 方法声明消息队列,所以不存在发送消息后没找到对应的消息队列而导致消息被丢弃的情况。

启动消费者
在这里插入图片描述

然后启动生产者
生产者发送消息
在这里插入图片描述
再看消费者,已经消费了一条消息了。
因为先启动消费者,所以生产者发送的消息马上被消费了,在控制台的队列就看不到了。
在这里插入图片描述

再测试:

先启动生产者
关闭消费者,然后启动生产者发送消息
可以看出消息已经生产发送到消息队列了
在这里插入图片描述
在这里插入图片描述

这一步的流程图
在这里插入图片描述

启动消费者消费消息
在这里插入图片描述

流程图:
在这里插入图片描述

已消费未确认

注意:channel.basicConsume 的第二个参数 autoAck:true,就是表示自动确认消息已经被消费完成了。就是当消费者接收到消息之后,就立马返回一个已经确认消费的消息回去给消息队列。
这样容易出现问题,就是消费者这边因为一收到消息就会自动确认消息被消费了并返回已经消费消息的结果回去给消息队列,但是可能消费者其实还没有把消息消费掉,而消息队列那边又以为消费者已经把消息消费了,所以就继续发消息给那个消费者。
而消费者一收到消息又自动确认消费并返回,就会导致这个消息队列的消息越来越多,然后消费者消费不完。
在这里插入图片描述
在这里插入图片描述

如图:因为 autoAck 为false , 所以消费者消费消息后没有进行确认。这里的 unacked 条数就为1.

如果改成 autoAck 为false ,那么消费者消费消息的代码,要加上确认消息的方法。
在这里插入图片描述
这个就是手动确认消息。
在这里插入图片描述

完整代码:

ConnectionUtil 连接工具类

在这里插入图片描述

package cn.ljh.app.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory =  new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}

P2PProducer 生产者

package cn.ljh.app.rabbitmq.producer;import cn.ljh.app.rabbitmq.consumer.P2PConsumer;
import cn.ljh.app.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;//消息生产者--使用默认的exchange
public class P2PProducer
{//(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。//(2)通过Connection获取Channel。//(3)根据需要调用exchangeDeclare()、queueDeclare()方法声明Exchange和队列、并完成队列与Exchange的绑定。//    如果声明的Exchange还不存在,则创建该Exchange;否则直接使用已有的Exchange。//(4)调用Channel的basicPublish()方法发送消息,调用该方法的第一个参数是exchange,//    第二个参数为路由key,最后两个参数依次是消息属性和消息数据体。public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定//此处打算直接使用默认的Exchange来分发消息,因此无需声明 Exchange,只需声明队列channel.queueDeclare(P2PConsumer.QUEUE_NAME, true, false, false, null);String message = "生产者发送的消息的内容";//4、调用Channel的basicPublish()方法发送消息channel.basicPublish(""/*默认的 Exchange 没有名字,所以用空的字符串*/,P2PConsumer.QUEUE_NAME/*使用队列名作为路由key,表明该消息将会被路由到该队列*/,null /*指定额外的消息的属性*/,message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/);//5、关闭资源//关闭通道channel.close();//关闭连接conn.close();}
}

P2PConsumer 消费者

package cn.ljh.app.rabbitmq.consumer;import cn.ljh.app.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者
public class P2PConsumer
{// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下://(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。//(2)通过Connection获取Channel。//(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布//    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//常量public final static String QUEUE_NAME = "myQueue01";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列//如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列//参数1:声明的队列名; 参数2:消息队列是否持久化//参数3:是否只允许该消息消费者消费该队列的消息,为true,则其他消费者在这个myQueue01队列消息积堆过多的情况下,也无法帮忙消费。//参数4:是否自动删除(如果为true,在该队列没消息的情况下,会自动删除该队列) 参数5:填写额外的参数channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(QUEUE_NAME/*消费这个名字的消费队列里面的消息*/,true/*消息的确认模式:是否自动确认*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数   %s:输出字符串  %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>rabbitmqtest</artifactId><version>1.0.0</version><name>rabbitmqtest</name><!--  属性  --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!--  依赖  --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency></dependencies></project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/157518.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

服务运营 |摘要:学术+业界-近期前沿运筹医疗合作精选

推文作者&#xff1a;李舒湉 编者按 本文归纳整理了近期INFORMS Journal on Applied Analytics中的相关业界合作研究。 这些研究成果体现了运筹学在医疗健康领域实践的效果。文中的学术业界合作使用了不同的研究工具。第一篇文章使用仿真模型帮助诊所进行不同拥挤程度下诊所使用…

【C++】继承 ③ ( 继承的一些重要特性 | 子类拥有父类的所有成员 | 多态性 | 子类可以拥有父类没有的成员 | 代码示例 )

文章目录 一、继承的一些重要特性1、子类拥有父类的所有成员2、子类可以拥有父类没有的成员3、多态性 二、代码示例 一、继承的一些重要特性 1、子类拥有父类的所有成员 子类 继承 父类 , 则 子类 拥有 父类的 所有 成员变量 和 成员函数 ; 这里要注意 : 子类 拥有 父类的 私有…

Python中使用IDLE调试程序

在IDLE中&#xff0c;使用菜单栏中的“Debug”对IDLE打开的python程序进行调试。 1 打开调试开关 选择IDLE菜单栏的“Debug->Debugger”&#xff0c;如图1①所示&#xff1b;此时在IDLE中会显示“[DEBUG ON]”&#xff0c;即“调试模式已打开”&#xff0c;如图1②所示&am…

【使用 TensorFlow 2】03/3 创建自定义损失函数

一、说明 TensorFlow 2发布已经接近5年时间&#xff0c;不仅继承了Keras快速上手和易于使用的特性&#xff0c;同时还扩展了原有Keras所不支持的分布式训练的特性。3大设计原则&#xff1a;简化概念&#xff0c;海纳百川&#xff0c;构建生态.这是本系列的第三部分&#xff0c;…

区块链加密虚拟货币交易平台安全解决方案

区块链机密货币交易锁遭入侵&#xff0c;安全存在隐患。使用泰雷兹Protect server HSM加密机&#xff0c;多方位保护您的数据&#xff0c;并通过集中化管理&#xff0c;安全的存储密钥。 引文部分&#xff1a; 损失7000万美元!黑客入侵香港区块链加密货币交易所 2023年9月&…

如何在Ubuntu 20.04.6 LTS系统上运行Playwright自动化测试

写在前面 这里以 Ubuntu 20.04.6 LTS为例。示例代码&#xff1a;自动化测试代码。 如果过程中遇到其他非文本中提到的错误&#xff0c;可以使用搜索引擎搜索错误&#xff0c;找出解决方案&#xff0c;再逐步往下进行。 一、 环境准备 1.1 安装python3 1.1.1 使用APT安装Py…

【Hello Algorithm】暴力递归到动态规划(二)

暴力递归到动态规划&#xff08;二&#xff09; 背包问题递归版本动态规划 数字字符串改字母字符串递归版本动态规划 字符串贴纸递归版本动态规划 **特别需要注意的是 我们使用数组之前一定要进行初始化 不然很有可能会遇到一些意想不到的错误 比如说在Linux平台上 new出来的in…

记一次生产大对象及GC时长优化经验

最近在做一次系统整体优化,发现系统存在GC时长过长及JVM内存溢出的问题,记录一下优化的过程 面试的时候我们都被问过如何处理生产问题&#xff0c;尤其是线上oom或者GC调优的问题更是必问&#xff0c;所以到底应该如何发现解决这些问题呢&#xff0c;用真实的场景实操&#xff…

2015架构案例(五十一)

第5题 【说明】某信息技术公司计划开发一套在线投票系统&#xff0c;用于为市场调研、信息调查和销售反馈等业务提供服务。该系统计划通过大量宣传和奖品鼓励的方式快速积累用户&#xff0c;当用户规模扩大到一定程度时&#xff0c;开始联系相关企业提供信息服务&#xff0c;并…

批量执行insert into 的脚本报2006 - MySQL server has gone away

数据库执行批量数据导入是报“2006 - MySQL server has gone away”错误&#xff0c;脚本并没有问题&#xff0c;只是insert into 的批量操作语句过长导致。 解决办法&#xff1a; Navicat ->工具 ->服务器监控->mysql ——》变量 修改max_allowed_packet大小为512…

TCP/IP(七)TCP的连接管理(四)全连接

一 全连接队列 nginx listen 参数backlog的意义 nginx配置文件中listen后面的backlog配置 ① TCP全连接队列概念 全连接队列: 也称 accept 队列 ② 查看应用程序的 TCP 全连接队列大小 实验1&#xff1a; ss 命令查看 LISTEN状态下 Recv-Q/Send-Q 含义附加&#xff1a;…

【Java学习之道】日期与时间处理类

引言 在前面的章节中&#xff0c;我们介绍了Java语言的基础知识和核心技能&#xff0c;现在我们将进一步探讨Java中的常用类库和工具。这些工具和类库将帮助我们更高效地进行Java程序开发。在本节中&#xff0c;我们将一起学习日期与时间处理类的使用。 一、为什么需要日期和…

vsCode 忽略 文件上传

1 无 .gitignore 文件时&#xff0c;在项目文件右键&#xff0c;Git Bash 进入命令行 输入 touch .gitignore 生成gitignore文件 2 、在文件.gitignore里输入 node_modules/ dist/ 来自于&#xff1a;vscode git提交代码忽略node_modules_老妖zZ的博客-CSDN博客

k8s - Flannel

1.Flannel概念剖析 Flannel是 CoreOS 团队针对 Kubernetes 设计的一个覆盖网络&#xff08;Overlay Network&#xff09;工具&#xff0c;其目的在于帮助每一个使用 Kuberentes 的 CoreOS 主机拥有一个完整的子网。这次的分享内容将从Flannel的介绍、工作原理及安装和配置三方…

④. GPT错误:导入import pandas as pd库,存储输入路径图片信息存储错误

꧂ 问题最初꧁ 用 import pandas as pd 可是你没有打印各种信息input输入图片路径 print图片尺寸 大小 长宽高 有颜色占比>0.001的按照大小排序将打印信息存储excel表格文件名 表格路径 图片大小 尺寸 颜色类型 占比信息input输入的是文件就处理文件 是文件夹&#x1f4c…

44.ES

一、ES。 &#xff08;1&#xff09;es概念。 &#xff08;1.1&#xff09;什么是es。 &#xff08;1.2&#xff09;es的发展。 es是基于lucene写的。 &#xff08;1.3&#xff09;总结。 es是基于lucene写的。 &#xff08;2&#xff09;倒排索引。 &#xff08;3&#xf…

flutter 开发中的问题与技巧

一、概述 刚开始上手 flutter 开发的时候&#xff0c;总会遇到这样那样的小问题&#xff0c;而官方文档又没有明确说明不能这样使用&#xff0c;本文总结了一些开发中经常会遇到的一些问题和一些开发小技巧。 二、常见问题 1、Expanded 组件只能在 Row、Column、Flex 中使用 C…

GEE:基于GLDAS数据集分析土壤湿度的时间序列变化

作者:CSDN @ _养乐多_ 本篇博客将介绍如何使用Google Earth Engine(GEE)进行土壤湿度数据的分析。我们将使用NASA GLDAS(Global Land Data Assimilation System)数据集,其中包括了关于土壤湿度的信息。通过该数据集,我们将了解土壤湿度在特定区域和时间段内的变化,并生…

springboot vue 部署至Rocky(Centos)并自启,本文部署是若依应用

概述 1、安装nohup&#xff08;后台进程运行java&#xff09; 2、安装中文字体&#xff08;防止中文乱码&#xff09; 3、安装chrony&#xff08;保证分布式部署时间的一致性&#xff09; 5、安装mysql数据&#xff0c;迁移目录&#xff0c;并授权自启动&#xff1b; 6、安…

SpringBoot注解篇之@Validated

目录 前言Validated作用NotNull与NotBlank区别总结 前言 大家好&#xff0c;我是AK&#xff0c;在做新项目顺便整理SpringBoot相关内容&#xff0c;这里主要介绍下Validated注解的应用&#xff0c;减少核心业务逻辑中一些参数判断的代码。 Validated作用 Validated 是 Spring…