根据源码,模拟实现 RabbitMQ - 虚拟主机 + Consume设计 (7)

目录

一、虚拟主机 + Consume设计

1.1、承接问题

1.2、具体实现

1.2.1、消费者订阅消息实现思路

1.2.2、消费者描述自己执行任务方式实现思路

1.2.3、消息推送给消费者实现思路

1.2.4、消息确认


一、虚拟主机 + Consume设计


1.1、承接问题

前面已经实现了虚拟主机大部分功能以及转发规则的判定,也就是说,现在消息已经可以通过 转换机 根据对应的转发规则发送给对应的 队列 了.

那么接下来要解决的问题就是,消费者该如何订阅消息(队列),如何把消息推送给消费者,以及消费者如何描述自己怎么执行任务~

1.2、具体实现

1.2.1、消费者订阅消息实现思路

消费者是以队列为维度订阅消息的,并且一个队列可以被多个消费者订阅,那么一旦队列中有消息,这个消息到底因该给谁呢?此处就约定,消费者之间按照 “轮询” 的方式来进行消费.

这里我们就需要定义一个类(ConsumerEnv),用来描述一个消费者,如下

public class ConsumerEnv {private String consumerTag;private String queueName;private boolean autoAck;//通过这个回调来处理收到的消息private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}public String getConsumerTag() {return consumerTag;}public void setConsumerTag(String consumerTag) {this.consumerTag = consumerTag;}public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public boolean isAutoAck() {return autoAck;}public void setAutoAck(boolean autoAck) {this.autoAck = autoAck;}public Consumer getConsumer() {return consumer;}public void setConsumer(Consumer consumer) {this.consumer = consumer;}
}

 

再给每个队列对象(MSGQueue 对象)添加一个属性 List,用来包含若干个上述消费者(有哪些消费者订阅了当前队列),如下图:

    //当前队列都有哪些消费者订阅了private List<ConsumerEnv> consumerEnvList = new ArrayList<>();//记录当取到了第几个消费者(AtomicInteger 是线程安全的)private AtomicInteger consumerSeq = new AtomicInteger(0);/*** 添加一个新的订阅者* @param consumerEnv*/public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}/*** 删除订阅者暂时先不考虑*//*** 挑选一个订阅者,来处理当前的消息(按照轮询的方式)* @return*/public ConsumerEnv chooseConsumer() {if(consumerEnvList.size() == 0) {//该队列暂时没有人订阅return null;}//计算当前要取的下标int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();// 自增return consumerEnvList.get(index);}

VirtualHost 中订阅消息实现

    /*** 订阅消息* 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者* @param consumerTag 消费者的身份标识* @param queueName* @param autoAck 消息被消费之后,应答的方式,true 标识自动应答,false 标识手动应答* @param consumer 是一个回调函数,此处设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 样子了* @return*/public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {//构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把 Consumer 对象添加到队列中queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {e.printStackTrace();System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);return false;}}

1.2.2、消费者描述自己执行任务方式实现思路

当执行订阅消息的时候,我们就让消费者自己去实现处理消息的操作(消息的内容通过参数传递,具体要干啥,取决于消费者自己的业务路基),最后再让线程池来执行回调函数.

这里我们使用函数式接口(回调函数)的方式(lambda 表达式),让消费者在订阅消息的时候,就可以实现未来收到消息后如何去处理消息的操作.

@FunctionalInterface
public interface Consumer {/*** Delivery 的意思是 ”投递“,这个方法预期是在服务器收到消息之后来调用* 通过这个方法,把消息推送给对应的消费者* (注意,这里的方法名和参数,也都是参考 RabbitMQ 来展开的)* @param consumerTag* @param basicProperties* @param body*/void handlerDelivery(String consumerTag, BasicProperties basicProperties, byte[] body);}

为什么要这样实现?

一方面,这种思路也是参考 RabbitMQ。

另一方面,这是由于Java 的函数是不能脱离类存在的,为了实现这种 lambda,java 曲线救国,引入 函数式接口.

对于函数式接口来说:

  1. 首先是 interface 类型
  2. 只能有一个方法
  3. 添加 @FunctionalInterface 注解.

实际上,这也是 lambda 的底层实现(本质)

1.2.3、消息推送给消费者实现思路

这里我们可以添加一个扫描线程,让他来去队列中拿任务.

为什么用了扫描线程还需要用线程池?

如果就一个扫描线程,既要获取消息,又要执行回调,这一个线程可能会忙不过来,因为消费者给出的回调,具体干什么的,咱们是不知道的.

扫描线程怎么知道哪个队列来了新的消息?

  1. 一个简单粗暴的办法,就是直接让扫描线程不停的循环遍历所有队列,发现有元素就立即处理。
  2. 另一个更优雅的办法(我采取的办法),就是用一个阻塞队列,队列中的元素就是接收消息的队列的名字,扫描线程只需要盯住这一个阻塞对垒即可,此时阻塞队列中传递的队列名,就相当于 “令牌”

每次拿到令牌,才能调动一次军队,也就是从对应的队列中取一个消息.

具体的,实现一个 ConsumerManager 类,用来管理消费者的上述行为.

public class ConsumerManager {// 持有上层的 VirtualHost 对象的引用,用来操作数据private VirtualHost parent;// 指定一个线程池,负责取执行具体的回调任务private ExecutorService workerPool = Executors.newFixedThreadPool(4);//存放令牌的队列private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();//扫描线程private Thread scannerThread = null;/*** 初始化* @param parent*/public ConsumerManager(VirtualHost parent) {this.parent = parent;//创建扫描线程,取队列中消费消息scannerThread = new Thread(() -> {while(true) {try {//1.拿到令牌String queueName = tokenQueue.take();//2.根据令牌,找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue == null) {throw new MqException("[ConsumerManager] 取到令牌后发现,该队列名不存在!queueName=" + queueName);}//3.从这个队列中消费一个消息synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {throw new RuntimeException(e);}}});//设置为后台线程scannerThread.setDaemon(true);scannerThread.start();}public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}/*** 添加消费者* 找到对应队列的 List 列表, 把消费者添加进去,最后判断,如果有消息,就立刻消费* @param consumerTag 消费者身份标识* @param queueName* @param autoAck 消息被消费之后,应答的方式,true 标识自动应答,false 标识手动应答* @param consumer 是一个回调函数,此处设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 样子了* @throws MqException*/public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {//找到对应的队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);//如果当前队列中已经有一些消息了,需要立即消费掉int n = parent.getMemoryDataCenter().getMessageCount(queueName);for(int i = 0; i < n; i++) {//这个方法调用一次就消费一条消息consumeMessage(queue);}}}/*** 扫描线程:找到对应的队列后,消费者从队列中拿出消息并消费* @param queue*/private void consumeMessage(MSGQueue queue) {//1.按照轮询的方式,找个消费者出来ConsumerEnv luckDog = queue.chooseConsumer();if(luckDog == null) {//当前队列中没有消费者,暂时不用消费,等后面有消费者了再说return;}//2.从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if(message == null) {//当前队列中还没有消息,也不需要消费return;}//3.把消息带入到消费者的回调方法中,丢给线程池执行workerPool.submit(() -> {try {//1.把消息放到待确认的集合当中,这个操作一定要在执行回调之前(防止执行回调过程中出现异常,导致消息丢失)parent.getMemoryDataCenter().addMessageWaitAck(luckDog.getQueueName(), message);//2.真正执行回调操作luckDog.getConsumer().handlerDelivery(luckDog.getConsumerTag(), message.getBasicProperties(),message.getBody());//3.如果当前是 ”自动应答“ ,就可以直接把消息删除了//  如果当前是 ”手动应答“ ,则先不处理,交给后续消费者调用 basicAck 方法来处理if(luckDog.isAutoAck()) {//1) 删除硬盘上的消息if(message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}//2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());//3) 删除内存上的消息中心的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费!queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});}
}

1.2.4、消息确认

消息确认,就是保证消息被正确消费~~ 

正确消费就是指消费者的回调方法顺利执行完了(没有抛异常之类的),这条消息的使命就完成了,此时就可以删除了。

为了达成消息不丢失这样的效果,具体步骤如下:

  1. 在真正执行回调之前,把消息放到 “待确认的集合” 中,避免应为回调失败,导致消息丢失.
  2. 执行回调
  3. 当去消费者采取的是 autoAck=true ,就认为回调执行完毕不抛异常,就算消费成功,然后就可以删除消息了
    1. 硬盘
    2. 内存中的消息中心
    3. 待确认的消息集合
  4. 当前消费者若采取的是 autoAck=false,手动应答,需要消费者这边,在自己的回调方法内部,显式调用 basicAck 这个核心 API 表示应答.

 basicAck 完成主动应答

    /*** 确认消息* 各个维度删除消息即可* @param queueName* @param messageId* @return*/public boolean basicAck(String queueName, String messageId) {queueName = virtualHostName + queueName;try {//1.获取消息和队列MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在!queueName=" + queueName);}Message message = memoryDataCenter.getMessage(messageId);if(message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在!messageId=" + messageId);}//2.各个维度删除消息if(message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}memoryDataCenter.removeMessage(messageId);memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功,消息确认成功!queueName=" + queueName +", messageId=" + messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败,消息确认失败!queueName=" + queueName +", messageId=" + messageId);e.printStackTrace();return false;}}

扫描线程完成自动应答

    /*** 扫描线程:找到对应的队列后,消费者从队列中拿出消息并消费* @param queue*/private void consumeMessage(MSGQueue queue) {//1.按照轮询的方式,找个消费者出来ConsumerEnv luckDog = queue.chooseConsumer();if(luckDog == null) {//当前队列中没有消费者,暂时不用消费,等后面有消费者了再说return;}//2.从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if(message == null) {//当前队列中还没有消息,也不需要消费return;}//3.把消息带入到消费者的回调方法中,丢给线程池执行workerPool.submit(() -> {try {//1.把消息放到待确认的集合当中,这个操作一定要在执行回调之前(防止执行回调过程中出现异常,导致消息丢失)parent.getMemoryDataCenter().addMessageWaitAck(luckDog.getQueueName(), message);//2.真正执行回调操作luckDog.getConsumer().handlerDelivery(luckDog.getConsumerTag(), message.getBasicProperties(),message.getBody());//3.如果当前是 ”自动应答“ ,就可以直接把消息删除了//  如果当前是 ”手动应答“ ,则先不处理,交给后续消费者调用 basicAck 方法来处理if(luckDog.isAutoAck()) {//1) 删除硬盘上的消息if(message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}//2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());//3) 删除内存上的消息中心的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费!queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});}

如果在回调方法中抛异常了?

回调方法中抛异常了,后续逻辑执行不到,这个消息就会始终呆在待确认的集合中, RabbitMQ 的做法是另外搞一个扫描线程(其实 RabbitMQ 中不叫线程,人家是叫进程,但是注意,这个进程不是操作系统中的进程,而是 erlang 中的概念),负责关注这个 待确认集合中,每个消息待了多久了,如果超出了一定的时间范围,就会把这个消息放到一个特定的队列 —— “死信队列”(这里就不展示了,需要的可以私聊我)

如果在执行回调过程中,broker server 崩了,内存数据全没了?

此时硬盘的数据还在,broker server 重启之后,这个消息就又被加载回内存了,就像从来没有被消费过一样,消费者就又机会重新拿到这个消息,重新消费(重复消费的问题,是由消费者的业务代码负责保证的,broker server 管不了).

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/101603.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【linux】2 Linux编译器-gcc/g++和Linux调试器-gdb

文章目录 一、Linux编译器-gcc/g使用1.1 背景知识1.2 gcc如何完成1.3 函数库1.4 gcc选项 二、linux调试器-gdb使用2.1 背景2.2 开始使用 总结 ヾ(๑╹◡╹)&#xff89;" 人总要为过去的懒惰而付出代价ヾ(๑╹◡╹)&#xff89;" 一、Linux编译器-gcc/g使用 1.1 背景…

JS加密的域名锁定功能,JShaman支持泛域名

JShaman的域名锁定功能&#xff0c;支持泛域名 JShaman的JS代码混淆加密中&#xff0c;有一项“域名锁定”功能。使用此功能后&#xff0c;代码运行时会检测浏览器地址中的域名信息&#xff0c;如是非指定域名&#xff0c;则不运行&#xff0c;以此防止自己网站的JS代码被复制…

python的文件操作

前言 打印内容到屏幕 最简单的输出方式是调用print函数&#xff0c;此函数会将你传递的表达式转化成字符串表达式&#xff0c;并将结果写道标准输出中。 读取键盘输入 python提供了两个raw_input和input内置函数从标准输入中读取一行文本&#xff0c;默认的标准输入是键盘。 …

Android NDK JNI与Java的相互调用

一、Jni调用Java代码 jni可以调用java中的方法和java中的成员变量,因此JNIEnv定义了一系列的方法来帮助我们调用java的方法和成员变量。 以上就是jni调用java类的大部分方法,如果是静态的成员变量和静态方法,可以使用***GetStaticMethodID、CallStaticObjectMethod等***。就…

docker安装fastDFS

一、docker安装 1、搜索镜像 2、拉取镜像 最新版本&#xff1a; docker pull delron/fastdfs3、使用镜像构建容器 3.1 创建tracker容器 docker run -dti --networkhost --name my-tracker -v /opt/zdxf/soft/fastdfs/tracker:/var/fdfs -v /etc/localtime:/etc/localtime d…

Nvidia Jetson 编解码开发(3)解决H265解码报错“PPS id out of range”

1.问题描述 基于之前的开发程序 Nvidia Jetson 编解码开发(2)Jetpack 4.x版本Multimedia API 硬件编码开发--集成encode模块_free-xx的博客-CSDN博客 通过Jetson Xavier NX 硬编码的H265发出后, 上位机断点播放发出来的H265码流, 会报“PPS id out of range” 错误 …

【C语言】喝汽水问题

大家好&#xff01;今天我们来学习C语言中的喝汽水问题&#xff01; 目录 1. 题目内容&#xff1a; 2. 思路分析 2.1 方法一 2.2 方法二 2.3 方法三 3. 代码实现 3.1 方法一 3.2 方法二 3.3 方法三 1. 题目内容 喝汽水&#xff0c;1瓶汽水1元&#xff0c;2个空瓶可以…

算法题面试实战收集

回文数字 2023-08-18 美团 一面 在不使用额外的内存空间的条件下判断一个整数是否是回文。 回文指逆序和正序完全相同。 数据范围&#xff1a; 进阶&#xff1a; 空间复杂度O(1) &#xff0c;时间复杂度 O(n) 提示&#xff1a; 负整数可以是回文吗&#xff1f;&#xff08;比如…

vue项目配置git提交规范

vue项目配置git提交规范 一、背景介绍二、husky、lint-staged、commitlint/cli1.husky2.lint-staged3.commitlint/cli 三、具体使用1.安装依赖2.运行初始化脚本3.在package.json中配置lint-staged4.根目录新增 commitlint.config.js 4.提交测试1.提示信息格式错误时2.eslint校验…

深度学习3:激活函数

一、激活函数的简介与由来 激活函数&#xff1a;是用来加入非线性因素的&#xff0c;解决线性模型所不能解决的问题。 线性函数的组合解决的问题太有限了&#xff0c;碰到非线性问题就束手无策了。如下图。 通过激活函数映射之后&#xff0c;可以输出非线性函数。 最后再通过…

【2023深圳杯数学建模A题思路模型与代码分享】

2023深圳杯数学建模A题 A题 影响城市居民身体健康的因素分析解题思路第一问第二问第三问第四问 技术文档第一问完整代码写在最后 A题 影响城市居民身体健康的因素分析 以心脑血管疾病、糖尿病、恶性肿瘤以及慢性阻塞性肺病为代表的慢性非传染性疾病&#xff08;以下简称慢性病…

【Terraform学习】使用 Terraform 托管 S3 静态网站(Terraform-AWS最佳实战学习)

使用 Terraform 托管 S3 静态网站 实验步骤 前提条件 安装 Terraform&#xff1a; 地址 下载仓库代码模版 本实验代码位于 task_s3 文件夹中。 变量文件 variables.tf 在上面的代码中&#xff0c;您将声明&#xff0c;aws_access_key&#xff0c;aws_secret_key和区域变量…

ubuntu 22.04 LTS 在 llvm release/17.x 分支上编译 cookbook llvm example Chapter 02

不错的资料&#xff1a; LLVMClang编译器链接器--保值【进阶之路二】 - 掘金 —————————————————————————————————————— 下载 llvm-cookbook example: $ git clone https://github.com/elongbug/llvm-cookbook.git 也可以参照llvm-pr…

【BASH】回顾与知识点梳理(三十九)

【BASH】回顾与知识点梳理 三十九 三十九. make、tarball、函数库及软件校验39.1 用 make 进行宏编译为什么要用 makemakefile 的基本语法与变量 39.2 Tarball 的管理与建议使用原始码管理软件所需要的基础软件Tarball 安装的基本步骤一般 Tarball 软件安装的建议事项 (如何移除…

【vue3+ts项目】配置eslint校验代码工具,eslint+prettier+stylelint

1、运行好后自动打开浏览器 package.json中 vite后面加上 --open 2、安装eslint npm i eslint -D3、运行 eslint --init 之后&#xff0c;回答一些问题&#xff0c; 自动创建 .eslintrc 配置文件。 npx eslint --init回答问题如下&#xff1a; 使用eslint仅检查语法&…

6-模板初步使用

官网: 中文版: 介绍-Jinja2中文文档 英文版: Template Designer Documentation — Jinja Documentation (2.11.x) 模板语法 1. 模板渲染 (1) app.py 准备数据 import jsonfrom flask import Flask,render_templateimport settingsapp Flask(__name__) app.config.from_obj…

VB.NET通过VB6 ActiveX DLL调用PowerBasic及FreeBasic动态库

前面说的Delphi通过Activex DLL同时调用PowerBasic和FreeBasic写的DLL&#xff0c;是在WINDOWS基础平台上完成的。 而 .NET平台是架在WINDOWS基础平台之上的&#xff0c;它的上面VB.NET或C#等开发的APP程序&#xff0c;下面写一下用VB.NET&#xff0c;通过VB6注册的Activex DLL…

SOA通信中间件常用的通信协议

摘要&#xff1a; SOA&#xff08;面向服务的架构&#xff09;的软件设计原则之一是模块化。 前言 SOA&#xff08;面向服务的架构&#xff09;的软件设计原则之一是模块化。模块化可以提高软件系统的可维护性和代码重用性&#xff0c;并且能够隔离故障。举例来说&#xff0c;…

(一)idea连接GitHub的全部流程(注册GitHub、idea集成GitHub、增加合作伙伴、跨团队合作、分支操作)

&#xff08;二&#xff09;Git在公司中团队内合作和跨团队合作和分支操作的全部流程&#xff08;一篇就够&#xff09;https://blog.csdn.net/m0_65992672/article/details/132336481 4.1、简介 Git是一个免费的、开源的*分布式**版本控制**系统*&#xff0c;可以快速高效地…

滑动窗口介绍

1.基本概念 利用单调性&#xff0c;使用同向双指针&#xff0c;两个指针之间形成一个窗口 子串与子数组都是连续的一段子序列时不连续的 2.为什么可以用滑动窗口&#xff1f; 暴力解决时发现两个指针不需要回退&#xff08;没必要回退&#xff0c;一定不会符合结果&#xf…