PHP + Laravel + RabbitMQ + Redis 实现消息队列 (三) 消费队列在RabbitMQ和redis中的发布和订阅

发布订阅(Pub/Sub)

对于消息队列传统的模式来说,一个消费者消费一条消息,这条消息被消费之后就不会再次被其它的消费者消费。但是在发布订阅模式中,一条消息是可以被多个消费者消费的,这些消费者其实相当于是订阅了这条队列的消息。当有新的消息出现在队列中,就会像广播一样让所有订阅者都获得这条消息。
来源于RabbitMQ官网

为什么要使用发布订阅模式

  • 解耦和异步通信: 发布订阅模式允许发布者(发布消息的一方)和订阅者(接收消息的一方)之间解耦。发布者不需要知道哪些订阅者会接收消息,订阅者也不需要了解消息的来源。这种解耦使系统更加灵活和可扩展。
  • 实时数据处理和通知: 当需要实时传输数据并且多个接收者需要收到同一数据时,发布订阅模式特别有用。例如,即时聊天应用程序中的消息传输,或者实时数据分析系统中的数据处理和通知。
  • 事件驱动架构: 在事件驱动架构中,发布订阅模式是核心机制之一。系统中的各个组件可以通过发布和订阅事件来响应特定的业务事件,从而使系统更加响应式和可维护。
  • 分布式系统协调: 在分布式系统中,不同节点之间可能需要进行协调和通信。通过发布订阅模式,可以实现跨节点的消息传递和事件处理,促进系统间的解耦和灵活性。
  • 解决竞态条件: 在多线程或多进程环境中,使用发布订阅模式可以避免竞态条件的发生。订阅者能够按照自己的速度和时间处理接收到的消息,不会因为速度不同而导致数据不一致或丢失。

举个例子

比如现在有一个订单系统,在用户下单以后,我们需要同步给用户发送下单成功的通知,同时也需要给商家发送用户已经下单的通知;
如果使用传统的模式,我们大概需要一个事务隔离的环境执行如下逻辑

  1. 用户成功下单
  2. 给用户发送短信、站内消息等
  3. 给商家发送有用户下单短信、站内消息等

如果使用发布/订阅模式的话则可以拆成两个部分;

  • 发布者
  1. 用户成功下单
  2. 发布者发布消息 publish
  • 订阅者
  1. 订阅者一,发送消息
  2. 订阅者二,发送消息

RabbitMQ实现

在 RabbitMQ 中,交换机(Exchange)是消息的分发中心,它决定了消息应该被发送到哪些队列。RabbitMQ 提供了几种不同的交换机类型,每种类型都有不同的消息分发规则,其中包括了发布订阅模式的实现方式。
其中 Fanout Exchange (扇出交换机)
它会把所有发送到它的消息广播到所有与它绑定的队列中。这种模式实现了典型的发布订阅(Publish-Subscribe)模式,其中:

  • 发布者将消息发送到 Fanout Exchange。
  • Fanout Exchange 接收到消息后,会将消息复制并发送到所有与之绑定的队列。
  • 订阅者分别从各自的队列中接收消息。

发布者代码(创建订单)

    // 发布订单创建public function orderCreate(){$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');$channel = $connection->channel();// 定义交换机$channel->exchange_declare('orders', 'fanout', false, false, false);$data = '订单号:' . time();$msg = new AMQPMessage($data);// 注意,这里是指定的交换机,第三个参数还是队列名,之前普通队列我们指定的是第三个参数$channel->basic_publish($msg, 'orders');echo '[x] 发送消息 ', $data;$channel->close();$connection->close();}

订阅者(发送短信和邮件)

		// 发送短信$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');$channel = $connection->channel();$channel->exchange_declare('orders', 'fanout', false,false,false);// 使用空队列名,由 RabbitMQ 生成随机队列名// 这里使用了 解构赋值   PHP 版本在 7.1  // 以上可以这样使用 queue_declare 方法返回的数组中提取第一个和第二个元素,分别赋给 $queue_name 和一个没有具体变量名的占位符[$queue_name, ,] = $channel->queue_declare('',false, false, true,false);// 队列绑定到 orders 交换机$channel->queue_bind($queue_name, 'orders');echo "[x] 等待数据,退出请按 CTRL+C\n";$callback = function($msg) {echo '[x] 接收到 ', $msg->body, ",开始向相关方发送短信....\n";};$channel->basic_consume($queue_name, '', false, true, false, false, $callback);while($channel->is_open()){$channel->wait();}$channel->close();$connection->close();// 发送邮件$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');$channel = $connection->channel();$channel->exchange_declare('orders', 'fanout', false,false,false);// 使用空队列名,由 RabbitMQ 生成随机队列名// 这里使用了 解构赋值   PHP 版本在 7.1  // 以上可以这样使用 queue_declare 方法返回的数组中提取第一个和第二个元素,分别赋给 $queue_name 和一个没有具体变量名的占位符[$queue_name, ,] = $channel->queue_declare('',false, false, true,false);// 队列绑定到 orders 交换机$channel->queue_bind($queue_name, 'orders');echo "[x] 等待数据,退出请按 CTRL+C\n";$callback = function($msg) {echo '[x] 接收到 ', $msg->body, ",开始向相关方发送邮件....\n";};$channel->basic_consume($queue_name, '', false, true, false, false, $callback);while($channel->is_open()){$channel->wait();}$channel->close();$connection->close();

我们可以发现其实在这两个代码中并没有太大差距,唯一的差距就是在模拟发送消息的时候echo的数据不一致,现在开始测试

测试

  1. 订阅者订阅
  • 命令行1(SMS)
    [x] 等待数据,退出请按 CTRL+C
  • 命令行2 (Email)
    [x] 等待数据,退出请按 CTRL+C
  1. 创建订单
  • 发送消息 订单号:1723028165
  1. 订阅者接收
  • 命令行1(SMS)
    [x] 接收到 订单号:1723028165,开始向相关方发送短信…
  • 命令行2 (Email)
    [x] 接收到 订单号:1723028165,开始向相关方发送邮件…

Redis实现

	// 订单创建public function redisOrderCreate(){$data = '订单号:' . time();Redis::publish('orders', $data);echo '[x] 发送消息 ', $data;}// 发布者订阅// 发送短信echo "[x] 等待数据,退出请按 CTRL+C\n";// 订阅频道 'orders'Redis::subscribe(['orders'], function ($message, $channel) {echo "[x] 接收到 {$message},开始向相关方发送短信....\n";});// 发送邮件echo "[x] 等待数据,退出请按 CTRL+C\n";// 订阅频道 'orders'Redis::subscribe(['orders'], function ($message, $channel) {echo "[x] 接收到 {$message},开始向相关方发送邮件....\n";});

注意:

subscribe() 方法,而且这个方法是直接就会挂起当前应用程序的,不需要我们再使用 while 来做死循环挂起。一个 subscribe() 方法可以监听多个发布频道,所以它的第一个参数是数组。第二个参数就是一个回调函数,这个函数有三个参数,分别是 redis实例、频道名称、消息内容 。
在使用 subscribe() 挂起程序的时候,要设置一下连接超时时间,要不过一会超过默认的连接超时时间后就会断开连接了。
在 Laravel 中,对于 Redis 的操作,特别是在使用 illuminate/redis 组件时,并没有直接支持设置 OPT_READ_TIMEOUT 这样的常量选项。
如果你需要在 Laravel 中控制 Redis 的读取超时,你可以考虑通过 Redis 客户端的其他方式来实现,例如使用 Predis 库。

composer require predis/predis
use Predis\Client;// 创建 Predis 客户端实例
$redis = new Client();// 设置读取超时时间
$redis->getConnection()->setReadTimeout(-1);// 订阅频道 'orders'
$redis->pubSubLoop()->subscribe('orders', function ($message) {echo "[x] 接收到 {$message->payload},开始向相关方发送xx....\n";
});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/394463.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端构建工具|vite快速入门

认识vite vite组成部分 Vite是一种新型前端构建工具,能够显著提升前端开发体验。它主要由两部分组成: 一个开发服务器,它基于 原生 ES 模块 提供了 丰富的内建功能,如速度快到惊人的 模块热更新(HMR)。一…

C++——类模板经典案例——自定义通用数组类

案例:自定义数组类 需求: 1,对内置数据及自定义数据类型的数据存储 2,将数组中的数据存储到堆区 3,构造函数中可以存入数组的容量 4,提供对应的拷贝构造函数和运算符重载防止浅拷贝问题的发生 5&#xff0c…

基于Springboot + Vue的宿舍管理系统

前言 文末获取源码数据库 感兴趣的可以先收藏起来,需要学编程的可以给我留言咨询,希望帮助更多的人 精彩专栏推荐订阅 不然下次找不到哟 Java精品毕设原创实战项目 作者的B站地址:程序员云翼的个人空间-程序员云翼个人主页-哔哩哔哩视频 csd…

vue3+axios请求导出excel文件

在Vue 3中使用axios请求导出Excel文件,可以发送一个GET或POST请求,并设置响应类型为blob或arraybuffer,然后使用new Blob()构造函数创建一个二进制文件,最后使用URL.createObjectURL()生成一个可以下载的链接。 先看代码 import…

Stable Diffusion绘画 | 必备插件安装推荐

新手必备安装的插件推荐如下: 汉化语言包:汉化插件GitHub地址;双语对照插件GitHub地址无边图库:无边图库插件GitHub地址ControlNet:已默认安装 插件安装 最推荐的安装方式:通过「可下载」、「从网址安装…

Qt Modbus 寄存器读写实例

一.线圈状态寄存器读写 项目效果如下 1. 写单个寄存器 MODBUS_API int modbus_write_bit(modbus_t *ctx, int coil_addr, int status); int addrui->spinBoxwirte_addr->value();int dataui->spinBoxwirte_data->value();int ret modbus_write_bit(mb,addr,d…

学习c#-4语句 ,条件,循环

代码: string name "小赵"; //条件判断 if (name "小赵") { Console.WriteLine("我是小赵"); } else { Console.WriteLine("我不是小赵"); } // switch条件判断 switch (name) { case "小…

5.3 匿名函数:Python编程中的隐士大师

欢迎来到我的博客,很高兴能够在这里和您见面!欢迎订阅相关专栏: 工💗重💗hao💗:野老杂谈 ⭐️ 全网最全IT互联网公司面试宝典:收集整理全网各大IT互联网公司技术、项目、HR面试真题.…

嵌入式初学-C语言-十七

#接嵌入式初学-C语言-十六# 函数的递归调用 含义: 在一个函数中直接或者间接调用了函数本身,称之为函数的递归调用 // 直接调用a()->a(); // 间接调用a()->b()->a();a()->b()->..->a();递归调用的本质: 本是是一种循环…

【QT】Qt 音视频

Qt 音视频 Qt 音视频1. Qt 音频2. Qt 视频 Qt 音视频 在 Qt 中,音频主要是通过 QSound 类来实现。但是需要注意的是 QSound 类只支持播放 wav 格式的音频文件。也就是说如果想要添加音频效果,那么首先需要将非 wav 格式的音频文件转换为 wav 格式。 通…

JavaWeb之servlet关于Ajax实现前后端分离

一、什么是Ajax: AJAX Asynchronous JavaScript and XML(异步的 JavaScript 和 XML)。 AJAX 不是新的编程语言,而是一种使用现有标准的新方法。 AJAX 最大的优点是在不重新加载整个页面的情况下,可以与服务器交换数据并更新部…

Nuxt2:强制删除window.__NUXT__中的数据

一、问题描述 在以前的一篇文章《Nuxt3: 强制删除__NUXT_DATA__的一种方式》中曾介绍了在Nuxt3中如何删除存在于页面id为__NUXT_DATA__的script节点中的数据。 此次,Nuxt2与Nuxt3不同在于它的数据是存在于window.__NUXT__,那么该如何处理呢?…

2025深圳国际户外庭院营地用品博览会

2025深圳国际户外庭院营地用品博览会 2025 Shenzhen International Outdoor Courtyard Camping Supplies Expo 时间:2025年02月27-3月01日 地点:深圳会展中心(福田馆) 详询主办方陆先生 I38(前三位) …

如何用OceanBase与DataWorks,打造一站式的数据集成、开发和数据服务

导语:在OceanBase 2024年开发者大会的技术生态论坛上,阿里云DataWorks团队的高级技术专家罗海伟,详细阐述了一站式大数据开发治理平台DataWorks的能力,并对于如何基于OceanBase和Dataworks构建一站式数据集成、开发以及数据服务进…

Linux驱动开发—Linux内核定时器概念和使用详解,实现基于定时器的字符驱动

文章目录 内核定时器概念在Linux驱动模块中使用定时器软定时器(Soft Timers)jiffies 含义高精度定时器(High Resolution Timers) 实现倒计时字符设备驱动 内核定时器概念 在 Linux 内核中,定时器是用来管理和调度延迟…

8.7-主从数据库的配置+mysql的增删改查

一、mysql环境的配置 1.环境准备 (1)主数据库 #关闭防火墙 [rootmaster ~]# systemctl stop firewalld#关闭selinux [rootmaster ~]# setenforce 0#下载lrzsz工具 [rootmaster ~]# yum -y install lrzsz#安装rsync [rootmaster ~]# yum -y install rs…

低代码平台:效率利器还是质量妥协?

目录 低代码平台:效率利器还是质量妥协? 一、引言 二、低代码平台的定义和背景 1、什么是低代码平台? 2、低代码平台的兴起 三、低代码开发的机遇 1、提高开发效率 2、降低开发成本 3、赋能业务人员 四、低代码开发的挑战 1、质量…

pgbackrest备份方案(差异和增量备份的区别)

pgbackrest备份方案(差异和增量备份的区别) 一 备份 全量备份: 将数据库集群的全部内容复制到备份中。数据库集群的第一个备份始终是全量备份。始终能够直接还原全量备份。全量备份不依赖于完整备份之外的任何文件来保持一致性。 差异备份: 仅复制自…

3D展示的前景如何?

随着人类科技的不断进步,对未来的趋势也肯定是向高纬度发展。3D取代2D只是一个所需时间长短而已,题主既然这么问,说明肯定是意识到了3D是未来的趋势,那么就应该多接触和了解未来的3D平台及应用工具、应用领域等。 之前2G\3G时代&…

1.MongoDB入门指南之开篇

1. 写在前面 MongoDB大家可能听说过,但是要怎么学习?先学习哪个,很多人是不知道的,毕竟面对一个未知的事物,迷茫是很多人都会遇到的,从今天起我们就开始系统的介绍MongoDB的学习。 2. 课程介绍 课程主要分…