RabbitMQ学习整理————基于RabbitMQ实现RPC

基于RabbitMQ实现RPC

  • 前言
  • 什么是RPC
  • RabbitMQ如何实现RPC
  • RPC简单示例
  • 通过Spring AMQP实现RPC

前言

这边参考了RabbitMQ的官网,想整理一篇关于RabbitMQ实现RPC调用的博客,打算把两种实现RPC调用的都整理一下,一个是使用官方提供的一个Java client,还有一个是Spring AMQP的整合使用。
代码路径:https://github.com/yzh19961031/blogDemo/tree/master/rabbitmq

什么是RPC

RPC是远程过程调用(Remote Procedure Call)的缩写形式,简单说就是一个节点去请求另一个节点上面的服务并获得响应结果。
我们之前总结的工作模式都是发送消息到指定的队列,再由相关的消费者进行消费,如果存在这样的场景,比如消费者消费完消息需给生产者一个具体的响应,然后生产者再根据这个响应进行其他的业务逻辑,这样就需要使用到RabbitMQ提供的RPC能力。

RabbitMQ如何实现RPC

官方有很详细的介绍文档,这边贴一下地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
RabbitMQ实现RPC很简单,正常的流程就是请求以及响应,我们只需要在请求的消息的属性里面添加一个响应队列的地址,这边需要使用到一个BasicProperties这个类。具体配置如下:

// 指定一个回调队列
callbackQueueName = channel.queueDeclare().getQueue();
// 设置replyTo的属性为指定的回调队列
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();channel.basicPublish("", "rpc_queue", props, message.getBytes());

BasicProperties这个类中提供了很多的属性,有14个,很多基本上很少用到,常用的就是几个,我这边也贴一下,其实在我上一篇文章中基于RabbitMQ实现的一个RPC工具里面都有用到这些属性。

  1. contentType 这个属性用来表明消息的类型,默认是"application/octet-stream"这种流的类型,还有常用的比如"application/json","text/plain"等,这些在我的RPC工具里面都有用到。
  2. replyTo 这个就是上面指定的回调队列。
  3. correlationId 这个id可以用来进行消息的确认,将相应与请求相关联。主要是可以确认服务端收到的消息是不是指定客户端发过来的,用于确认。

首先先贴一张官方提供的图,这个是RabbitMQ实现RPC的主要工作流程:
在这里插入图片描述
实现RPC的具体工作流程:

  1. 首先客户端发送一个请求消息,这个请求消息里面有两个属性,一个是replyTo回调队列的地址,一个是correlationId用于标识当前消息唯一的id信息。
  2. 这个消息是发送到指定的rpc_queue这个队列上面。
  3. 对应我们的服务端Server就会等待rpc_queue上面的请求消息,当请求消息来得时候,服务端会进行处理,处理完成会将相应的消息再发送到请求消息属性中的replyTo回调的队列上面。
  4. 客户端发送消息之后,会等待replyTo队列中的消息。当有消息来得时候,会检查响应消息中correlationId属性和请求消息中correlationId是否一致,完成一次PRC调用。

RPC简单示例

我这边根据官网上面提供的例子简单修改整理了一下,这边提供一个大小写转换的功能,就是客户端发送一段小写的字符串,服务端将字符串转为大写再响应过来。详细逻辑可以看下代码中注释,具体代码如下:
首先服务端:

/*** RPC服务端** @author yuanzhihao* @since 2020/11/21*/
public class RPCServer {public static void main(String[] args) throws IOException, TimeoutException {// 首先还是正常获得connection以及channel对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.1.108");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 定义一个rpc的队列String queueName = "test_rpc";channel.queueDeclare(queueName, false, false, false, null);Object monitor = new Object();// 具体的消费代码里面实现DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消费者将请求消息中的correlationId信息再作为响应传回replyTo队列AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response = "";try {// 提供一个大小写转换的方法String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("toUpperCase(" + message + ")");response = toUpperCase(message);} catch (RuntimeException e) {System.out.println(e.toString());} finally {// 将响应传回replyTo队列channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));// 设置了手动应答 需要手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 执行完成会释放主线程的锁// RabbitMq consumer worker thread notifies the RPC server owner threadsynchronized (monitor) {monitor.notify();}}};// 监听"test_rpc"队列channel.basicConsume(queueName, false, deliverCallback, (consumerTag -> { }));// 这个锁对象是确保我们server的调用逻辑执行完成 首先挂起主线程// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (monitor) {try {monitor.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 提供一个大小写转换的方法private static String toUpperCase(String msg) {return msg.toUpperCase();}
}

客户端:

/*** RPC客户端** @author yuanzhihao* @since 2020/11/21*/public class RPCClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 创建connection以及channel对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.1.108");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");try ( Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel()) {// 声明一个队列String queueName = "test_rpc";// 请求消息中需要带一个唯一标识ID String corrId = UUID.randomUUID().toString();// 声明一个回调队列String replayQueueName = channel.queueDeclare().getQueue();// 将correlationId以及回调队列设置在消息的属性中AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replayQueueName).build();// 具体消息内容String msg = "hello rpc";// 发送请求消息channel.basicPublish("",queueName,properties,msg.getBytes());// 设置一个阻塞队列  等待服务端的响应final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, message) -> {// 注意 这边根据correlationId进行下判断if (message.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(message.getBody(), StandardCharsets.UTF_8));}}, consumerTag -> {});// 获取响应结果String take = response.take();System.out.println("rpc result is "+ take);channel.basicCancel(ctag);}}
}

执行代码,具体的客户端与服务端运行结果在这里插入图片描述 在这里插入图片描述

通过Spring AMQP实现RPC

通过Spring来实现RPC也很简单,主要通过spring提供的一个RabbitTemplate对象中sendAndReceive方法来实现,这个方法是发送消息然后一直等待响应。监听器里面实现的和之前的逻辑大致相同,都需要将response响应消息发送到对应的replyTo回调队列上。下面直接贴一下代码。
首先是服务端,我这边直接是使用配置类的形式,具体一些的配置项可以参考下我之前的那篇博客或者上网搜一下~

/*** 主配置类** @author yuanzhihao* @since 2021/1/9*/
@Configuration
public class RabbitMQConfig {private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);// 注入connectionFactory对象@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses("192.168.1.108:5672");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");return connectionFactory;}// 声明队列@Beanpublic Queue rpcQueue() {return new Queue("test_rpc",false);}@Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}// 创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}// 消息监听器@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());// 监听的队列container.setQueues(rpcQueue());MessageListener messageListener = message -> {String receiveMsg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("Receive a message message is {}", receiveMsg);// 执行对应逻辑String responseMsg = toUpperCase(receiveMsg);MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(message.getMessageProperties().getCorrelationId()).build();// 响应消息 这边就是如果没有绑定交换机和队列的话 消息应该直接传到对应的队列上面rabbitTemplate.send("", message.getMessageProperties().getReplyTo(), new Message(responseMsg.getBytes(StandardCharsets.UTF_8), messageProperties));};// 设置监听器container.setMessageListener(messageListener);return container;}// 提供一个大小写转换的方法private String toUpperCase(String msg) {return msg.toUpperCase();}
}

客户端我采用test单元测试的形式

/*** spring amqp rpc 测试类** @author yuanzhihao* @since 2021/1/9*/
@ContextConfiguration(classes = {RabbitMQConfig.class})
@RunWith(SpringRunner.class)
public class RabbitMQRpcTest {private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);@Autowiredprivate RabbitTemplate rabbitTemplate;// 测试RPC客户端@Testpublic void testRpcClient() {// 设置correlationIdString corrId = UUID.randomUUID().toString();String msg = "hello rpc";MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();// 注意 这边如果使用sendAndReceive不指定replyTo回调队列 spring会默认帮我们添加一个回调队列// 格式默认 "amq.rabbitmq.reply-to" 前缀Message message = rabbitTemplate.sendAndReceive("", "test_rpc", new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties));log.info("The response is {}", new String(message.getBody(), StandardCharsets.UTF_8));}
}

具体实现可以看下代码的注释
代码执行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/262329.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java SpringBoot测试OceanBase

对上篇mysql导入到OceanBase中的数据库进行代码测试&#xff0c;写了个demo包含测试方法&#xff0c;在原mysql库中成功执行&#xff0c;迁移到OceanBase时看是否能不修改业务代码而成功执行测试方法&#xff1a; 代码基于SpringBoot MyBastis测试增删改查、批量新增、多表联…

Redis(十六)缓存预热+缓存雪崩+缓存击穿+缓存穿透

文章目录 面试题缓存预热缓存雪崩解决方案 缓存穿透解决方案 缓存击穿解决方案案例&#xff1a;高并发聚划算业务 总结表格 面试题 缓存预热、雪崩、穿透、击穿分别是什么?你遇到过那几个情况?缓存预热你是怎么做的?如何避免或者减少缓存雪崩?穿透和击穿有什么区别?他两是…

基于docker安装HDFS

1.docker一键安装见 docker一键安装 2.拉取镜像 sudo docker pull kiwenlau/hadoop:1.03.下载启动脚本 git clone https://github.com/kiwenlau/hadoop-cluster-docker4.创建网桥 由于 Hadoop 的 master 节点需要与 slave 节点通信&#xff0c;需要在各个主机节点配置节点…

【ctfshow—web】——信息搜集篇1(web1~20详解)

ctfshow—web题解 web1web2web3web4web5web6web7web8web9web10web11web12web13web14web15web16web17web18web19web20 web1 题目提示 开发注释未及时删除 那就找开发注释咯&#xff0c;可以用F12来查看&#xff0c;也可以CtrlU直接查看源代码呢 就拿到flag了 web2 题目提示 j…

常见消息中间件

ActiveMQ 我们先看ActiveMQ。其实一般早些的项目需要引入消息中间件&#xff0c;都是使用的这个MQ&#xff0c;但是现在用的确实不多了&#xff0c;说白了就是有些过时了。我们去它的官网看一看&#xff0c;你会发现官网已经不活跃了&#xff0c;好久才会更新一次。 它的单机吞…

mysql优化指南之优化篇

二、优化 现在的理解数据库优化有四个维度&#xff0c;分别是&#xff1a; 硬件升级、系统配置、表结构设计、SQL语句及索引。 那优化的成本和效果分别如下&#xff1a; 优化成本&#xff1a;硬件升级>系统配置>表结构设计>SQL语句及索引。 优化效果&#xff1a;…

Java项目:27 基于SSM+JSP实现的大学校园兼职平台

作者主页&#xff1a;舒克日记 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 系统介绍 基于SSMJSP实现的大学校园兼职平台分为前台与管理员两块 管理端分为8大模块&#xff0c;分别是用户管理、兼职管理、帖子管理、聊天管理、…

「递归算法」:求根节点到叶节点数字之和

一、题目 给你一个二叉树的根节点 root &#xff0c;树中每个节点都存放有一个 0 到 9 之间的数字。 每条从根节点到叶节点的路径都代表一个数字&#xff1a; 例如&#xff0c;从根节点到叶节点的路径 1 -> 2 -> 3 表示数字 123 。 计算从根节点到叶节点生成的 所有数…

如何构建企业专属GPT

大语言模型&#xff08;LLM&#xff09;具有令人印象深刻的自然语言理解和生成能力&#xff0c; 2022年11月底OpenAI发布了ChatGPT&#xff0c;一跃成为人工智能AI领域的现象级应用。但由于LLM的训练数据集主要来源于互联网数据&#xff0c;企业私域信息并未被LLM所训练&#x…

家用电器全球扩张:如何借助海外网红营销实现助力品牌成长?

随着全球化的深入发展&#xff0c;家用电器市场已不再局限于国内市场&#xff0c;众多品牌纷纷将目光投向海外&#xff0c;寻求更广阔的发展空间。在这个过程中&#xff0c;如何有效地进行海外推广成为品牌面临的一大挑战。近年来&#xff0c;海外网红营销逐渐崭露头角&#xf…

【Spring】IoC容器 控制反转 与 DI依赖注入 配置类实现版本 第四期

文章目录 基于 配置类 方式管理 Bean一、 配置类和扫描注解二、Bean定义组件三、高级特性&#xff1a;Bean注解细节四、高级特性&#xff1a;Import扩展五、基于注解配置类方式整合三层架构组件总结 基于 配置类 方式管理 Bean Spring 完全注解配置&#xff08;Fully Annotatio…

AndroidStudio 2024-2-21 Win10/11最新安装配置(Kotlin快速构建配置,gradle镜像源)

AndroidStudio 2024 Win10/11最新安装配置 教程目的&#xff1a; (从安装到卸载) &#xff0c;针对Kotlin开发配置&#xff0c;gradle-8.2-src/bin下载慢&#xff0c;以及Kotlin构建慢的解决 好久没玩AS了,下载发现装个AS很麻烦,就觉得有必要出个教程了(就是记录一下:嘻嘻) 因…

MariaDB落幕和思考

听过MySQL的基本也都知道 MariaDB。MariaDB由MySQL的创始人主导开发&#xff0c;他早前曾以10亿美元的价格&#xff0c;将自己创建的公司MySQL AB卖给了SUN&#xff0c;此后&#xff0c;随着SUN被甲骨文收购&#xff0c;MySQL的所有权也落入Oracle的手中。传闻MySQL的创始人担心…

vue封装el-table表格组件

先上效果图&#xff1a; 本文包含了具名插槽、作用域插槽、jsx语法三种&#xff1a; Render.vue&#xff08;很重要&#xff0c;必须有&#xff09;: <script> export default {name: "FreeRender",functional: true,props: {scope:Object,render: Functio…

我把springboot项目从Java 8 升级 到了Java 17 的过程总结,愿为君提前踩坑!

项目从jdk8升级到jdk17&#xff0c;我不是为了追求java 17的新特性&#xff08;准确来说也还没有去了解有什么新特性&#xff09;&#xff0c;也不是为了准确与时俱进&#xff0c;永远走在java行列的最前端&#xff0c;纯粹因为项目需要&#xff0c;因为我们都知道&#xff0c;…

【医学大模型 补全主诉】BioGPT + LSTM 自动补全医院紧急部门主诉

BioGPT LSTM 自动补全医院紧急部门主诉 问题&#xff1a;针对在紧急部门中自动补全主诉的问题子问题1: 提高主诉记录的准确性子问题2: 加快主诉记录的速度子问题3: 统一医疗术语的使用子问题4: 减少打字错误和误解子问题5: 提高非特定主诉的处理能力 解法数据预处理神经网络方…

微服务篇之限流

一、为什么要限流 1. 并发的确大&#xff08;突发流量&#xff09;。 2. 防止用户恶意刷接口。 二、限流的实现方式 1. Tomcat限流 可以设置最大连接数&#xff0c;但是每一个微服务都有一个tomcat&#xff0c;实现起来非常麻烦。 2. Nginx限流 &#xff08;1&#xff09;控…

十大基础排序算法

排序算法分类 排序&#xff1a;将一组对象按照某种逻辑顺序重新排列的过程。 按照待排序数据的规模分为&#xff1a; 内部排序&#xff1a;数据量不大&#xff0c;全部存在内存中&#xff1b;外部排序&#xff1a;数据量很大&#xff0c;无法一次性全部存在内存中&#xff0c;…

利用Ubuntu22.04启动U盘对电脑磁盘进行格式化

概要&#xff1a; 本篇演示利用Ubuntu22.04启动U盘的Try Ubuntu模式对电脑磁盘进行格式化 一、说明 1、电脑 笔者的电脑品牌是acer(宏碁/宏基) 开机按F2进入BIOS 开机按F12进入Boot Manager 2、Ubuntu22.04启动U盘 制作方法参考笔者的文章&#xff1a; Ubuntu制作Ubun…

每日学习总结20240222

每日总结 一旦停下来太久&#xff0c;就很难继续了 ——《一个人的朝圣》 20240222 1. 自定义逻辑 请设计一个函数single_track_logic,传入三个参数&#xff0c;第一个参数是int数组&#xff0c;第二个参数是一个int变量&#xff0c;第三个参数是一个以int为返回值&#xff0c…