【MQ05】异常消息处理

异常消息处理

上节课我们已经学习到了消息的持久化和确认相关的内容。但是,光有这些还不行,如果我们的消费者出现问题了,无法确认,或者直接报错产生异常了,这些消息要怎么处理呢?直接丢弃?这就是丢消息了呀。再次处理?一直继续报错怎么办?这条消息就永远都在不停报错的死循环中了。

通常,消息队列系统都会提供一套对于异常消息的处理机制,比如 RabbitMQ 的死信队列。

RabbitMQ死信队列

死信队列,其实就是在满足一定规则的前提下,将消息发送到指定的一个交换机队列中。这些规则包括:

  • 使用者使用basic.reject或basic.nack(重新排队参数设置为false)对消息进行否定确认。

  • 消息过期,根据队列的消息 TTL 过期时间而定。

  • 消息被丢弃,因为队列超过了长度限制。

前面两个好测试,最后一个要修改 RabbitMQ 的配置稍微麻烦点,那么咱们就用前面两个来测。

首先,要定义一个用于接收死信消息的交换机和队列,我们顺便也直接做一个客户端消费者,专门读取死信队列里的消息。这个就相当于是正规队列消费者处理出现问题之后,再由这个消费者来做善后。

// 5.rq.c.deadletter.php
// ……………………
// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道// 死信队列及交换机
$channel->exchange_declare('dead_letter', 'direct', false,true,false); // 定义交换机
$channel->queue_declare('dead_letter_queue', false, true); // 定义队列
$channel->queue_bind('dead_letter_queue', 'dead_letter'); // 队列绑定交换机echo "等待死信队列消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;// 定义接收数据的回调函数
$callback = function ($msg) {echo '死信队列接收到数据: ', $msg->body, PHP_EOL;
};// 消费队列,获取到数据将调用 callback 回调函数
$channel->basic_consume('dead_letter_queue', '', false, true, false, false, $callback);while ($channel->is_open()) {$channel->wait();
}

交换机名字和队列名字自己起就好了。前面已经说过了,这个消费者获取到的死信队列数据都是正常消费有问题的,那么善后工作咱们就可以将这些数据记录日志或者记录到数据库,顺便发邮件、发短信提醒,或者做任何你想做的通知及记录工作。出于测试的目的,咱们就是简单打印了一下。

> php 5.rq.c.deadletter.php
等待死信队列消息,或者使用 Ctrl+C 退出程序。

启动之后就等着死信数据的到来吧。接下来,我们要修改一下正常的队列,增加一些参数。

// 5.rq.c.php
// ……………………
$channel->queue_declare('hello', false, true, false, false, false, new AMQPTable(['x-message-ttl'=>10000, // 10秒过期'x-dead-letter-exchange'=>'dead_letter', // 死信到某个交换机'x-dead-letter-routing-key'=>'', // 死信路由
]));// ……………………$callback = function ($msg) {echo '接收到数据: ', $msg->body, PHP_EOL;$msg->nack();
};// ……………………

x-message-ttl 是消息的过期时间,我们后面要用它来测过期时间进入死信队列的情况。x-dead-letter-exchange 用于定义出现问题后,将这个队列的数据放到哪个死信队列交换机中。x-dead-letter-routing-key 这个是指定进入死信队列的哪个路由。关于 RabbitMQ 交换机和路由的内容,如果有不清楚的小伙伴,可以在深入地学习一下 RabbitMQ 的官方文档和示例哦。

然后,在回调函数中,我们直接调用 $msg->nack() 。这个表示的就是手动取消确认,还记得上节课我们学过的是 $msg->ack() 吧,这个表示的是确认处理完成,没问题了,而 nack() 就是反过来的,说明当前的队列处理有问题,没有正常 ACK 。这对应的就是死信队列的第一条规则。

现在,向普通的 hello 消息队列中发送消息,结果死信队列中会接收到数据。

> php 5.rq.c.deadletter.php
等待死信队列消息,或者使用 Ctrl+C 退出程序。
死信队列接收到数据: Hello World!

过期时间

好了,上面测试的结果就是死信队列的第一条规则。接下来我们测试第二条规则。

在 hello 队列的配置中,我们加上的 x-message-ttl 是 10 秒,也就是说,这条消息 10 秒不处理就会进入到死信队列。这个非常好测,直接关掉消费者,然后生产者发送数据,等到过期时间后,死信队列就会显示接收到数据了。

这个规则可以帮我们实现一个非常重要的功能,就是:延时队列。RabbitMQ 中没有直接的延时队列相关的功能,但可以通过死信的这个规则来实现,具体内容我们下节课再说。

Redis 队列在 Laravel 框架中处理异常消息

好了,看完 RabbitMQ 的相关异常处理功能之后,我们马上会联想到,Redis 有这样的功能吗?

抱歉,真的没有,但是,Laravel 和 TP 框架的队列功能都通过业务代码的形式实现了类似的功能。我们还是以 Laravel 为例进行学习。

在 Laravel 中,异常的消息队列数据最后会保存到 MySQL 数据库中,我们需要执行数据迁移来创建表,使用下面这两个命令。

php artisan queue:failed-table
php artisan migrate

操作成功之后,会在数据库中创建一个名为 failed_jbs 的表。接下来,还是继续拿上次课创建的那个最后会报异常的 Job 来进行测试,直接调用生产者的命令插入队列。

> php artisan q:p4

然后,我们不使用 --tries ,这样就不会进行重试了,一次失败就会进入到异常处理流程中,也就是插入到数据库中。

> php artisan queue:work
[2023-01-03 01:58:12][gAfPH03MvGWQXjjjk5nAFWtU7b3gkhqD] Processing: App\Jobs\Queue4
接收到了消息:测试 1672711092
[2023-01-03 01:58:22][gAfPH03MvGWQXjjjk5nAFWtU7b3gkhqD] Failed:     App\Jobs\Queue4

好了,去数据库看一眼吧,数据是不是插入进来了。

36aa3a7d22ca85b13e1dd118f614099f.png

从截图上可以看到,不仅有原始的队列信息,还有异常信息、队列使用的连接以及队列名、uuid 和失败时间这些字段。在结构上,还为 uuid 创建了一个唯一索引,这个 uuid 的作用我们后面马上就会看到。

接下来,使用命令行,我们还可以看到所有失败队列的信息。

> php artisan queue:failed+--------------------------------------+------------+---------+-----------------+---------------------+
| ID                                   | Connection | Queue   | Class           | Failed At           |
+--------------------------------------+------------+---------+-----------------+---------------------+
| c8774e27-1284-4657-a13a-7e5665789d74 | redis      | default | App\Jobs\Queue4 | 2023-01-03 01:58:22 |
| 4a38d37b-86e7-4755-a29a-f6843b7289cc | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:57:23 |
| f4fa4cfd-cee2-4199-b048-c21dbbc188ca | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:57:03 |
| 210c9716-89f1-438b-b252-32cc6552a68f | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:52:57 |
| c971ddef-9098-4399-a404-bbce08bd97af | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:52:41 |
+--------------------------------------+------------+---------+-----------------+---------------------+

然后,可以利用 uuid ,也就是上面命令中返回的 ID ,来让数据进行消息重试,使用 queue:retry 命令。

> php artisan queue:retry c8774e27-1284-4657-a13a-7e5665789d74 
The failed job [c8774e27-1284-4657-a13a-7e5665789d74] has been pushed back onto the queue!

执行之后,这条失败的数据又塞回之前的队列里了,消费者又会开始对它进行消费。这就是 uuid 的作用。另外,我们还可以批量执行重试,直接在命令后面写多个 uuid 就行。也可以一次性全部执行重试,只需要使用 all 参数即可,这个大家可以去官方文档再详细看一下。

我们还可以删除或者整个清除所有的失败任务数据,其实也就是删除 failed_jobs 中的数据。queue:forget 用于根据指定的 uuid 进行删除,而 queue:flush 则会清除所有数据。除了这两个命令之外,还有一个根据时间来清除失败任务的命令 queue:prune-failed 。它默认是默认 24 小时,可以用 --hours=xxx 来设置具体的时间。

> php artisan queue:prune-failed
4 entries deleted!

最后,我们还可以禁用失败存储。直接通过 .env 配置文件进行配置就行了,设置对应的属性值为 null 即可。

QUEUE_FAILED_DRIVER=null

任务错误处理

除了上面的失败处理之外,在 Laravel 中,还可以在出现错误的时候马上去执行一个方法,就像是失败事件后的回调函数一样。通过这个方法,我们可以在任务失败的时候马上就进行邮件、短信通知,或者也可以记录错误日志,甚至也可以不使用上面默认的异常处理功能以及相关的表,直接在这里用我们自己自定义的表来存储失败任务的信息。总之就是,任务失败后你想怎么处理都行。

只需要在任务类中实现 failed() 方法。

// /app/Jobs/Queue4.php
// ……………………
// ……………………
public function failed($exception = null)
{echo '如果发生错误就进入到这里了,错误信息是:'.$exception->getMessage(), PHP_EOL;
}

然后看一下效果。

> php artisan queue:work
[2023-01-03 02:19:00][d5MlKJkjnez3MBnqzrYCUkG1HSg1LJtC] Processing: App\Jobs\Queue4
接收到了消息:测试 1672712340
如果发生错误就进入到这里了,错误信息是:
[2023-01-03 02:19:10][d5MlKJkjnez3MBnqzrYCUkG1HSg1LJtC] Failed:     App\Jobs\Queue4

这个方法的执行是同步的,不是异步的,就像我们前面说的,任务失败了马上就会调用这个方法。因此,就可以安全有效地在这个方法中进行我们任何想要善后处理的操作了。

总结

好了,到现在为止,我们的队列系统其实已经相当安全可靠了。上一篇文章通过持久化和 ACK 机制解决了消息丢失的问题,这次即使是消费者出现了异常,我们也可以保证消息能够通过死信队列或者框架机制保存下来。从这里我们也可以看出,即使原生的系统不支持,也可以通过框架代码的形式来实现类似的功能,这也是队列这种系统灵活性的表现。补充一点,BLMOVE 这类 Redis 命令其实也可以实现消息备份,但和上面死信那种触发条件还是有区别,这是主动备份。再有,Redis 的 Stream 类型其实也已经是很完备的一套消息队列功能机制了,未应答 ACK 的数据是可以重复执行的,这也可以当成是一种异常处理的形式,只不过也一样需要我们自己编码干预进行转移,可以参考我们之前 Redis 系列中相关的内容:【Redis09】Redis基础:Stream操作 https://mp.weixin.qq.com/s/DsSnyU9xuJEijm9t-DVs2A 。

接下来,我们再看两种常见的队列形式,分别是延时队列和优先级队列,它们在 RabbitMQ 和 Laravel+Redis 中的实现又是怎样的呢?

测试代码:

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/5.rq.c.deadletter.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/5.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/5.rq.p.php

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/265428.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浅谈 Linux 网络编程 - 网络字节序

文章目录 前言核心知识关于 小端法关于 大端法网络字节序的转换 函数 前言 在进行 socket 网络编程时,会用到字节流的转换函数、例如 inet_pton、htons 等,那么为什么要用到这些函数呢,本篇主要就是对这部分进行介绍。 核心知识 重点需要记…

韩国突发:将批准比特币ETF

作者:秦晋 韩国两党宣布将批准比特币ETF。比特币也再次成为竞选的宠儿。 4月10日,韩国将迎来每隔4年而进行的一次立法大选。在大选之前,现执政党与反对党都承诺将批准比特币ETF。 我们知道,比特币的主要受众群体以年轻人居多。此前…

idea打包报错,clean、package报错

一、idea在打包时,点击clean或package报错如下: Error running ie [clean]: No valid Maven installation found. Either set the home directory in the configuration dialog or set the M2_HOME environment variable on your system. 示例图&#xf…

揭示预处理中的秘密!(二)

目录 ​编辑 1. #运算符 2. ##运算符 3. 命名约定 4. #undef 5. 命令行定义 6. 条件编译 7. 头文件的被包含的方式 8.嵌套文件包含 9. 其他预处理指令 10. 完结散花 悟已往之不谏,知来者犹可追 …

androidapp开发语言,已获千赞

初级 初级研发工程师的定义是掌握基础的Android知识,能够独立完成一个功能,工作年限大概在1-2年,这个层级大部分人通过看一些资料书籍再经过项目练习很快可以达到。这个级别的人往往需要掌握如下一些技能: 掌握Android 四大组件…

Nginx网络服务六-----IP透传、调度算法和负载均衡

1.实现反向代理客户端 IP 透传 就是在日志里面加上一个变量 Module ngx_http_proxy_module [rootcentos8 ~]# cat /apps/nginx/conf/conf.d/pc.conf server { listen 80; server_name www.kgc.org; location / { index index.html index.php; root /data/nginx/html/p…

等保2.0高风险项全解析:判定标准与应对方法

引言 所谓高风险项,就是等保测评时可以一票否决的整改项,如果不改,无论你多少分都会被定为不合格。全文共58页,写得比较细了,但是想到大家基本不会有耐心去仔细看的(凭直觉)。这几天挑里边相对…

【大数据】Flink 内存管理(一):设置 Flink 进程内存

《Flink 内存管理》系列(已完结),共包含以下 4 篇文章: Flink 内存管理(一):设置 Flink 进程内存Flink 内存管理(二):JobManager 内存分配(含实际…

智慧校园|智慧校园管理小程序|基于微信小程序的智慧校园管理系统设计与实现(源码+数据库+文档)

智慧校园管理小程序目录 目录 基于微信小程序的智慧校园管理系统设计与实现 一、前言 二、系统功能设计 三、系统实现 1、微信小程序前台 2、管理员后台 (1)学生信息管理 (2) 作业信息管理 (3)公告…

shader学习记录——融合、融球效果

融合、融球效果shader,重点在等势面公式上 Shader "Custom/MetaballsShader" {Properties{_MainTex ("Texture", 2D) "white" {}_Color("Color",Color) (1,1,1,1)}SubShader{Tags { "RenderType""Opaque…

什么是去中心化云计算?

去中心化云计算是一种新型的云计算方式,它与传统的中心化云计算不同,将数据和计算任务分布到多个节点上,而不是将数据集中存储在中心服务器上。这种云计算方式具有许多优势,包括提高数据安全性、降低运营成本、增强可扩展性和灵活…

【C语言】学生宿舍信息管理系统

目录 项目说明 1. 数据结构设计 2. 功能实现 3. 主菜单设计 4. 文件操作 5. 系统使用 项目展示 1.主菜单功能界面 ​编辑 2.添加信息 3.查询信息 4.修改信息 5.删除信息 6.退出程序 项目完整代码 结语 在这篇博客中,我们将探讨如何使用C语言来开发…

Java 反射机制

​ 更多内容,前往IT-BLOG ​ 反射Reflection被视为动态语言的关键,反射机制允许程序在执行期间借助于Reflection API取得任何类的内部信息,并能直接操作任意对象的内部属性及方法。反射是一种功能强大且复杂的机制。使用它的主要人员是工具构…

通过QScrollArea寻找最后一个弹簧并且设置弹簧大小

项目原因,最近需要通过QScrollArea寻找其中最后一个弹簧并且设置大小和策略,因为无法直接调用UI指针,所以只能用代码寻找。 直接上代码: if (m_scrollArea){int iScrollWidth m_labelSelectedTitle->width();m_scrollArea-&g…

C语言--- 指针(3)

一.字符指针变量 在指针的类型中&#xff0c;我们知道有一种指针类型为字符指针char * 一般使用&#xff1a; #include<stdio.h> int main() {char ch a;char* p &ch;*p b;printf("%c\n",ch);return 0; } 其实还有一种使用方式 &#xff1a; #inc…

【前端素材】推荐优质后台管理系统Salreo平台模板(附源码)

一、需求分析 当我们从多个层次来详细分析后台管理系统时&#xff0c;可以将其功能和定义进一步细分&#xff0c;以便更好地理解其在不同方面的作用和实际运作。 1. 结构层次 在结构层次上&#xff0c;后台管理系统可以分为以下几个部分&#xff1a; a. 辅助功能模块&#…

2024年2月国内如何快速注册OnlyFans最新小白教学

前言 onlyface软件是一个创立于2016年的订阅式社交媒体平台&#xff0c;创作者可以在自己的账号发布原创的照片或视频&#xff0c;并将其设置成付费模式&#xff0c;若用户想查看则需要每月交费订阅。 需要注意的是&#xff0c;网络上可能存在非法或不道德的应用程序&#xff…

【C++】树形关联式容器set、multiset、map和multimap的介绍与使用

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C》《Linux》《算法》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 目录 前言 1.关联式容器 2.键…

鸿蒙应用成企业布局新方向 鸿蒙人才成开年之后“香饽饽”

随着春节假期的结束&#xff0c;职场人也开始返工返岗。与此同时2024年春招季也已拉开帷幕。2月23日&#xff0c;据智联招聘发布的《2024年春招市场行情周报》&#xff08;第一期&#xff09;显示&#xff0c;2024年春节后第一周&#xff0c;依托消费需求释放与制造业返工复产&…

【JavaEE】_前端POST请求借助form表单向后端传参

目录 1. 前端POST请求借助form表单向后端传参 2. 关于parameter方法获取参数的优先性问题 前端向后端传参通常有三种方法&#xff1a; 第一种&#xff1a;使用GET请求的query string部分向后端传参&#xff1a; 本专栏中已经详述了前端使用GET请求的query string向后端传参…