使用RabbitMQ实现微服务间的异步消息传递

使用RabbitMQ实现微服务间的异步消息传递

      • RabbitMQ简介
      • 安装RabbitMQ
        • 在Ubuntu上安装RabbitMQ
        • 在CentOS上安装RabbitMQ
      • 配置RabbitMQ
      • 创建微服务
        • 生产者服务
          • 安装依赖
          • 生产者代码
        • 消费者服务
          • 消费者代码
      • 运行微服务
      • 消息模式
        • 直接模式
          • 生产者代码
          • 消费者代码
        • 扇出模式
          • 生产者代码
          • 消费者代码
        • 主题模式
          • 生产者代码
          • 消费者代码
      • 高级特性
        • 持久化
          • 生产者代码
          • 消费者代码
        • 确认机制
          • 消费者代码
      • 监控和日志
        • 监控
        • 日志
      • 故障排除
      • 总结

在现代分布式系统中,微服务架构越来越受到欢迎。微服务之间需要进行高效、可靠的消息传递。RabbitMQ作为一个成熟的开源消息中间件,能够很好地满足这一需求。本文将详细介绍如何使用RabbitMQ实现微服务间的异步消息传递。

RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议。它支持多种消息模式,如发布/订阅、路由、主题等。

安装RabbitMQ

RabbitMQ可以在多种操作系统上安装,包括Linux、macOS和Windows。
在Ubuntu上安装RabbitMQ
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
在CentOS上安装RabbitMQ
sudo yum install epel-release
sudo yum install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

配置RabbitMQ

安装完成后,可以使用以下命令进行基本配置。
sudo rabbitmq-plugins enable rabbitmq_management
sudo systemctl restart rabbitmq-server
访问RabbitMQ管理界面:`http://localhost:15672`,默认用户名和密码都是`guest`。

创建微服务

我们将创建两个简单的微服务:生产者服务和消费者服务。
生产者服务
生产者服务负责发送消息到RabbitMQ。
安装依赖
pip install pika
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')message = 'Hello World!'
channel.basic_publish(exchange='', routing_key='hello', body=message)
print(f'Sent: {message}')
connection.close()
消费者服务
消费者服务负责从RabbitMQ接收消息。
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

运行微服务

先启动消费者服务,然后启动生产者服务。
# 启动消费者服务
python consumer.py# 启动生产者服务
python producer.py

消息模式

RabbitMQ支持多种消息模式,包括直接模式、扇出模式、主题模式和头部模式。
直接模式
直接模式是最简单的模式,消息会被发送到指定的队列。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='direct_queue')message = 'Direct message'
channel.basic_publish(exchange='', routing_key='direct_queue', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='direct_queue')channel.basic_consume(queue='direct_queue', auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
扇出模式
扇出模式将消息广播到所有绑定的队列。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')message = 'Fanout message'
channel.basic_publish(exchange='fanout_exchange', routing_key='', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='fanout_exchange', queue=queue_name)channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
主题模式
主题模式允许更复杂的路由规则。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')routing_key = 'kern.critical'
message = 'Critical kernel message'
channel.basic_publish(exchange='topic_exchange', routing_key=routing_key, body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuebinding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key=binding_key)channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

高级特性

RabbitMQ还支持许多高级特性,如持久化、确认机制、死信队列等。
持久化
可以配置消息和队列的持久化,以确保消息不会因为RabbitMQ服务器重启而丢失。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='durable_queue', durable=True)message = 'Persistent message'
channel.basic_publish(exchange='', routing_key='durable_queue', body=message, properties=pika.BasicProperties(delivery_mode=2))
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')ch.basic_ack(delivery_tag=method.delivery_tag)connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='durable_queue', durable=True)channel.basic_consume(queue='durable_queue', on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
确认机制
可以配置消费者在处理完消息后发送确认,以确保消息不会被重复处理。
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')ch.basic_ack(delivery_tag=method.delivery_tag)connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='ack_queue')channel.basic_consume(queue='ack_queue', on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

监控和日志

RabbitMQ提供了丰富的监控和日志功能,可以用于监控和调试。

监控
可以通过RabbitMQ管理界面监控队列、交换机和连接等。

日志
可以通过配置文件调整日志级别和输出方式。

故障排除

如果RabbitMQ配置出现问题,可以使用以下命令进行故障排除。

sudo rabbitmqctl status
sudo journalctl -u rabbitmq-server

总结

通过本文,你已经学习了如何使用RabbitMQ实现微服务间的异步消息传递。我们介绍了RabbitMQ的基本概念、安装方法、配置RabbitMQ、创建微服务、消息模式(直接模式、扇出模式、主题模式)、高级特性(持久化、确认机制)、监控和日志、故障排除等内容。掌握了这些知识,将有助于你在实际工作中更好地利用RabbitMQ来构建高效、可靠的微服务架构。
RabbitMQ管理界面示例
RabbitMQ消息传递模式示例

使用RabbitMQ可以显著提高微服务间消息传递的可靠性和效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/462361.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MySQL】MySQL安装以及各种报错处理

前言: 本节内容讲述在Ubuntu环境下怎么进行MySQL的安装。 以及一些安装过程中遇到的报错如何处理的问题。 ps:注意, 本篇文章不是图形化界面的MySQL安装教程哦。想要安装图形化界面的MySQL的友友们可以另寻资源了。 目录 更新软件包列表 安装M…

Servlet 3.0 注解开发

文章目录 Servlet3.0注解开发修改idea创建注解的servlet模板内容讲解 关于servlet3.0注解开发的疑问_配置路径省略了属性urlPatterns内容讲解内容小结 Servlet3.0注解开发 【1】问题 说明:之前我们都是使用web.xml进行servlet映射路径的配置。这样配置的弊端&…

FPGA时序分析和约束学习笔记(3、Timequest时序路径详解和优化)

FPGA时序分析和约束学习笔记(3、Timequest时序路径详解和优化) Timequest中Data Path分析 Data Arrival Path clock path:时钟信号到达源寄存器时钟端口的时间 data path:数据从源寄存器Q端口出发到达目标寄存器D端口的时间 D…

windows与windows文件共享

目录 基础设置主机共享文件端设置从机接受文件端设置 基础设置 1、先确保两台电脑直接能够ping通,这是文件共享的前提,如果ping不通就去查找对应的原因,一般都是防火墙的原因。 在ping通的情况下: 2、先找到高级共享设置 3、对专…

前端页面整屏滚动fullpage.js简单使用

官网CSS,JS地址 fullPage.js/dist/fullpage.min.js at master alvarotrigo/fullPage.js GitHub fullPage.js/dist/fullpage.min.css at master alvarotrigo/fullPage.js GitHub <!DOCTYPE html> <html lang"en"><head><meta charset"…

Rust整合Elasticsearch

Elasticsearch是什么 Lucene&#xff1a;Java实现的搜索引擎类库 易扩展高性能仅限Java开发不支持水平扩展 Elasticsearch&#xff1a;基于Lucene开发的分布式搜索和分析引擎 支持分布式、水平扩展提高RestfulAPI&#xff0c;可被任何语言调用 Elastic Stack是什么 ELK&a…

opencv-day2-图像预处理1

图像预处理 在计算机视觉和图像处理领域&#xff0c;图像预处理能够提高后续处理&#xff08;如特征提取、目标检测等&#xff09;的准确性和效率。 常见的图像预处理操作&#xff1a; 图像色彩空间转换 图像大小调整 图像仿射变换 图像翻转 图像裁剪 图像二值化处理 图…

scrapy爬取名人名言

爬取名人名言&#xff1a;http://quotes.toscrape.com/ 1 创建爬虫项目&#xff0c;在终端中输入&#xff1a; scrapy startproject quotes2 创建之后&#xff0c;在spiders文件夹下面创建爬虫文件quotes.py&#xff0c;内容如下&#xff1a; import scrapy from scrapy.spi…

一文了解Linux内核I2C子系统,驱动苹果MFI加密芯片

版本 日期 作者 变更表述 1.0 2024/10/27 于忠军 文档创建 背景&#xff1a;由于苹果有一套MFI IAP2的蓝牙私有协议&#xff0c;这个协议是基于BR/EDR的RFCOMM自定义UUID来实现IAP2协议的通信&#xff0c;中间会牵扯到苹果加密芯片的I2C读取&#xff0c;所以我们借此机…

Spring之依赖注入(DI)和控制反转(IoC)——配置文件、纯注解

依赖注入 依赖注入(Dependency Injection&#xff0c;简称 DI)与控制反转(loC)的含义相同&#xff0c;只不过这两 个称呼是从两个角度描述的同一个概念。对于一个 Spring 初学者来说&#xff0c;这两种称呼很难理解, 下面我们将通过简单的语言来描述这两个概念。 当Java对象&…

Ubuntu 22.04安装部署

一、部署环境 表 1‑1 环境服务版本号系统Ubuntu22.04 server lts运行环境1JDK1.8前端WEBNginx1.8数据库postgresqlpostgresql13postgis3.1pgrouting3.1消息队列rabbitmq3.X(3.0以上)运行环境2erlang23.3.3.1 二、安装系统 2.1安装 1.安装方式&#xff0c;选第一条。 2.选择…

基于ResNet50模型的船型识别与分类系统研究

项目源码获取方式见文章末尾&#xff01; 600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【LSTM模型实现光伏发电功率的预测】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模…

信息学科平台系统开发:基于Spring Boot的最佳实践

3系统分析 3.1可行性分析 通过对本基于保密信息学科平台系统实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本基于保密信息学科平台系统采用Spring Boot框架&a…

Cityscapes数据集:如何将像素级的多边形标注的分割数据标注转为目标检测的bbox标注

Cityscapes数据集官网下载地址&#xff1a; https://www.cityscapes-dataset.com/ 相关介绍&#xff1a;从官网下载这三个压缩包文件leftImg8bit_trainvaltest.zip、gtCoarse.zip、gtFine_trainvaltest.zip 1&#xff09;leftImg8bit_trainvaltest.zip分为train、val以及tes…

【周末推荐】Windows无缝连接iPhone

关注“ONE生产力”&#xff0c;获取更多精彩推荐&#xff01; 又到了周末推荐时间了&#xff0c;今天我们介绍一个Windows内置的功能&#xff0c;能够帮助大家将自己的电脑和iPhone连接在一起。 很多用Windows的小伙伴羡慕macOS可以和iPhone无缝连接&#xff0c;轻松阅读和回…

JDBC/ODBC—数据库连接API概述

JDBC/ODBC概述 在数据库连接领域&#xff0c;有两种广泛使用的技术&#xff1a;ODBC&#xff08;Open Database Connectivity - 开放数据库连接&#xff09;和 JDBC&#xff08;Java Database Connectivity - Java 数据库连接&#xff09;。 一、什么是 ODBC&#xff1f; Ope…

Vagrant使用教程:创建CentOS 8虚拟机

目录 简介准备工作下载配置Vagrant修改环境变量创建VAGRANT_HOME环境变量修改virturalBox新建虚拟机文件的默认生成路径修改Vagrant配置支持VirtualBox7.1.x版本创建Vagrant文件添加镜像 初始化并开机初始化开发环境开机 其他配置项宿主机的交换目录修改虚拟机内存修改 访问方式…

使用Django Channels实现WebSocket实时通信

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 使用Django Channels实现WebSocket实时通信 Django Channels 简介 环境搭建 安装 Django 和 Channels 创建 Django 项目 配置 A…

【JAVA 笔记】11 ch08_opp_intermediate 第8章 面向对象编程(中级部分)

第8章 面向对象编程(中级部分) IDEA 常用快捷键 包 包的三大作用 包基本语法 包的本质分析 包的命名 常用的包 如何引入包 注意事项和使用细节 访问修饰符 基本介绍 访问修饰符的访问范围! 使用的注意事项 面向对象编程三大特征 基本介绍 封装介绍 封装的理解和好处 封装的实现…

面试题:JVM(四)

new对象流程&#xff1f;&#xff08;龙湖地产&#xff09; 对象创建方法&#xff0c;对象的内存分配。&#xff08;360安全&#xff09; 1. 对象实例化 创建对象的方式有几种&#xff1f; 创建对象的步骤 指针碰撞&#xff1a;以指针为分界线&#xff0c;一边是已连续使用的…