029-从零搭建微服务-消息队列(一)

写在最前

如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。

源码地址(后端):mingyue: 🎉 基于 Spring Boot、Spring Cloud & Alibaba 的分布式微服务架构基础服务中心

源码地址(前端):mingyue-ui: 🎉 基于 Vue3 + TS + Vite + Element plus 等技术,适配 MingYue 后台微服务

文档地址:Wiki - Gitee.com

消息队列

消息队列(Message Queue)是一种用于在分布式系统中进行异步通信的通信模式和技术。它允许不同的组件或服务之间通过发送和接收消息来进行通信,而无需直接耦合它们的实现细节。消息队列通常用于解耦系统的不同部分,提高系统的可伸缩性、可靠性和灵活性。

以下是消息队列的一些关键特点和概念:

  1. 消息生产者(Producer): 这是向消息队列发送消息的组件或应用程序。生产者将消息发送到队列中,通常包括一些有关消息内容的元数据。

  2. 消息队列(Queue): 这是用于存储消息的中间件组件,消息在这里排队等待被处理。消息队列通常支持不同的消息传递模式,例如先进先出(FIFO)或发布/订阅模式。

  3. 消息消费者(Consumer): 这是从消息队列接收消息并进行处理的组件或应用程序。消费者订阅特定队列,并在有新消息可用时接收并处理它们。

  4. 消息代理(Message Broker): 这是协调消息的发送和接收的中间件服务。消息代理通常负责消息的路由、传递和确保消息的可靠性。

  5. 消息确认(Acknowledgment): 消费者在成功处理消息后,通常会向消息队列发送确认,以告知队列消息已被处理。这确保了消息不会被重复处理。

  6. 消息持久性(Message Durability): 消息队列通常支持消息的持久性,这意味着即使在消息被传递给消费者之后,消息仍然会在系统中存储,以确保不会丢失。

  7. 消息超时(Message Timeout): 有时候,消息队列会设置消息的超时时间,以确保消息在一定时间内被处理,否则可能会被认为是过期消息。

  8. 发布/订阅模式(Publish/Subscribe): 这是一种消息传递模式,其中生产者将消息发布到一个主题(topic),而不是特定的队列,然后多个消费者订阅该主题以接收消息。这种模式支持广播消息。

使用场景

  • 异步通信:允许不同的系统组件异步通信,提高系统的响应性能。

  • 解耦组件:降低系统中不同组件之间的耦合,使得系统更容易维护和扩展。

  • 负载均衡:通过分发消息给多个消费者来平衡工作负载。

  • 消息传递可靠性:确保消息的可靠传递,即使在系统中的故障情况下也能保证不丢失消息。

  • 日志和审计:用于记录和审计系统活动,以便后续分析和故障排除。

技术选型

一些常见的消息队列实现包括 RabbitMQ、RocketMQ、Kafka等,选择适合特定应用场景的消息队列是关键,因为它会影响系统的性能、可靠性和可扩展性。不同的场景可能更适合不同的消息队列系统。

基础对比
RabbitMQRocketMQKafka
推出时间2007年2012年2012年
所属Pivotal开源,Mozilla阿里开源,ApacheLinkin开源,Apache
社区活跃度
开发语言ErlangJavaScala、Java
支持的协议AMQP自己定义一套自行定义一套(基于TCP)
吞吐量万级(5.95w/s)十万级(11.6w/s)十万级(17.3w/s)
topic数量对吞吐量的影响topic达到几百,几千个时,吞吐量会有较小幅度的下降topic达到几十,几百个时,吞吐量会大幅度下降
时效性微秒级毫秒级毫秒级
可用性高(主从架构)非常高(分布式架构)非常高(分布式架构)
使用场景适用于各种规模的应用程序,尤其适合需要多语言支持的场景。适用于大规模的企业应用和互联网场景,尤其在阿里巴巴等大型公司中得到广泛应用。适用于大数据处理、实时数据流分析、事件溯源等高吞吐量场景。
功能对比
RabbitMQRocketMQKafka
延迟队列
死信队列
优先级队列
消息回溯
消焦持久化
消魚确认机制单条OffsetOffset
消息TTL
消息重复支持at least once、at most once支持at least once支持at least once、at most once
消息顺序性消费者加锁分区有序
消息事务
消息过滤
消息查询
消息重新消费
消费模式队列模式广播模式+集群模式流模式
消费推拉模式Pull、PushPull、PushPull
批量发送

选型总结

通过对RabbitMQ、RocketMQ、Kafka 基础与功能两个维度对比,本项目将采用 RocketMQ、Kafka 两个消息队列。

RocketMQ 适用场景

  • 高性能、高可用性的消息传递场景,例如实时数据分析、电商秒杀等。

  • 需要强大的消息过滤和消息追踪功能的场景,例如广告投放、用户推送等。

  • 需要分布式事务支持的场景,RocketMQ提供了分布式事务消息特性。

Kafka 适用场景

  • 需要高吞吐量和低延迟的实时数据处理场景,例如用户行为日志分析、实时监控等。

  • 需要保留大量历史数据并支持数据回溯的场景,例如大数据分析、数据仓库等。

  • 需要构建事件驱动架构的场景,Kafka可以作为事件源和消息总线。

Docker 安装 RocketMQ

创建目录结构

具体内容可以参考:mingyue/docker/rocketmq

rocketmq/broker1/confbroker.conf/logsREADME.md/storeREADME.md/namesrv/logsREADME.md
docker-compose.yml

编写 docker-compose rocketmq 服务

version: '3.8'
services:mingyue-mqnamesrv:image: apache/rocketmq:4.9.4container_name: mingyue-mqnamesrvports:- "9876:9876"environment:JAVA_OPT: -server -Xms512m -Xmx512mcommand: sh mqnamesrvvolumes:- ./rocketmq/namesrv/logs:/home/rocketmq/logs/rocketmqlogs
​mingyue-mqbroker1:image: apache/rocketmq:4.9.4container_name: mingyue-mqbroker1ports:- "10911:10911"- "10909:10909"- "10912:10912"environment:JAVA_OPT_EXT: -server -Xms512M -Xmx512M -Xmn256mcommand: sh mqbroker -c /home/rocketmq/rocketmq-4.9.4/conf/broker.confdepends_on:- mingyue-mqnamesrvvolumes:- ./rocketmq/broker1/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf- ./rocketmq/broker1/logs:/home/rocketmq/logs/rocketmqlogs- ./rocketmq/broker1/store:/home/rocketmq/store
​mingyue-mqconsole:image: styletang/rocketmq-console-ngcontainer_name: mingyue-mqconsoleports:- "19876:19876"links:- mingyue-mqnamesrv:mqnamesrv #可以用mqnamesrv这个域名访问rocketmq服务environment:JAVA_OPTS: -Dserver.port=19876 -Drocketmq.namesrv.addr=mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=falsedepends_on:- mingyue-mqnamesrv

启动测试

启动前先执行部分目录赋予读写权限,例:chmod 777 /docker/rocketmq/broker1/logs

访问 mingyue-mqconsole 可以打开 Dashboard 页面即可:http://ip:19876/#/

Docker 安装 Kafka

创建目录结构

具体内容可以参考:mingyue/docker/kafka

kafka/dataREADME.md
docker-compose.yml

编写 docker-compose kafka 服务

version: '3.8'
services:mingyue-zookeeper:image: 'bitnami/zookeeper:3.8.0'container_name: mingyue-zookeeperports:- "2181:2181"environment:TZ: Asia/ShanghaiALLOW_ANONYMOUS_LOGIN: "yes"ZOO_SERVER_ID: 1ZOO_PORT_NUMBER: 2181# 自带的控制台 一般用不上可自行开启ZOO_ENABLE_ADMIN_SERVER: "no"# 自带控制台的端口ZOO_ADMIN_SERVER_PORT_NUMBER: 8080
​mingyue-kafka:image: 'bitnami/kafka:3.2.0'container_name: mingyue-kafkaports:- "9092:9092"environment:TZ: Asia/Shanghai# 更多变量 查看文档 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.mdKAFKA_BROKER_ID: 1# 监听端口KAFKA_CFG_LISTENERS: PLAINTEXT://:9092# 实际访问ip 本地用 127 内网用 192 外网用 外网ipKAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://宿主机IP:9092KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181ALLOW_PLAINTEXT_LISTENER: "yes"volumes:- /docker/kafka/data:/bitnami/kafka/datadepends_on:- mingyue-zookeeperlinks:- mingyue-zookeeper:zookeeper #可以用zookeeper这个域名访问zookeeper服务
​mingyue-kafka-manager:image: sheepkiller/kafka-manager:latestcontainer_name: mingyue-kafka-managerports:- "19092:19092"environment:ZK_HOSTS: mingyue-zookeeper:2181APPLICATION_SECRET: letmeinKAFKA_MANAGER_USERNAME: mingyueKAFKA_MANAGER_PASSWORD: mingyue123KM_ARGS: -Dhttp.port=19092depends_on:- mingyue-kafkalinks:- mingyue-zookeeper:zookeeper #可以用zookeeper这个域名访问zookeeper服务

启动测试

启动前先执行部分目录赋予读写权限,例:chmod 777 /docker/kafka/data`

访问 mingyue-kafka-manager 可以打开 Clusters 页面即可:http://mingyue-mq:19092/

Spring Cloud Stream

Spring Cloud Stream 是一个用于构建与共享消息系统连接的高度可扩展的事件驱动微服务的框架。该框架提供了一个基于已经建立和熟悉的 Spring 成语和最佳实践的灵活编程模型,包括支持持久的 pub/sub 语义、消费者组和有状态分区。

说人话:Spring Cloud Stream 是 Spring 用来整合各种 MQ 中间件的框架。

Spring Cloud Stream的核心构建块

  • Destination Binders(目标绑定器):目标指的是 Kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。

  • Destination Bindings(目标绑定):MQ 中间件与最终用户提供的应用程序代码(生产者/消费者)之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)

  • Message(消息):一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。

Spring Cloud Stream 架构图

Spring Cloud Stream 应用程序由中间件中立的核心组成。该应用程序通过在外部代理暴露的目的地和代码中的输入/输出参数之间建立绑定,与外部世界进行通信。建立绑定所需的经纪人特定细节由特定于中间件的 Binder 实现处理。

  • Middleware:消息中间件,如RabbitMQ、Kafka、RocketMQ等。

  • Binder:可以认为是适配器,用来将Stream与中间件连接起来,不同的Binder对应不同的中间件,需要我们配置。

  • Application:由Stream封装的消息机制,很少自定义开发。

  • Inputs:输入,可以自定义开发。

  • Outputs:输出,可以自定义开发。

小结

本节介绍了什么是消息队列、以及选择什么样的消息队列,如何对比,最终选择了 Kafka 与 RocketMQ。然后给出了 Docker 一件部署 Kafka 与 RocketMQ 的 docker-compose 脚本。阐述了什么是 Spring Cloud Stream,未来将会使用 Spring Cloud Stream 作为 MQ 中间价的框架。

下面我们就使用 Spring Cloud Stream 来搭建代码与 MQ 之间的桥梁~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/146307.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为云云耀云服务器L实例评测|华为云云耀云服务器docker部署srs,可使用HLS协议

华为云云耀云服务器L实例评测|华为云云耀云服务器docker部署srs,可使用HLS协议 什么是华为云云耀云L实例 云耀云服务器L实例,面向初创企业和开发者打造的全新轻量应用云服务器。提供丰富严选的应用镜像,实现应用一键部署&#x…

云安全之HTTP协议介绍

HTTP的基本概念 什么是网络协议 网络协议是计算机之间为了实现网络通信而达成的一种“约定”或者”规则“,有了这种”约定不同厂商生产的设备,以及不同操作系统组成的计算机之间,就可以实现通信。 网络协议由三个要素构成:1、语…

Tomcat启动后的日志输出为乱码

天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…

定时任务管理平台青龙 QingLong

一、关于 QingLong 1.1 QingLong 介绍 青龙面板是支持 Python3、JavaScript、Shell、Typescript 多语言的定时任务管理平台,支持在线管理脚本和日志等。其功能丰富,能够满足大部分需求场景,值得一试。 主要功能 支持多种脚本语言&#xf…

Mybatis学习

为什么会有mybatis?: 图片来自b站的黑马网课 截图 懒得自己打字了hh 帅气的人都要注明出处 相信在学习框架之前 都学习了JDBC 因为Mybatis可以解决旧的JDBC存在的一些问题 什么是mybatis?: ORM框架原理: Mybatis是一个ORM框架,即obje…

c++---I/o操作

5、文件操作 程序运行时产生的数据都属于临时数据&#xff0c;程序一旦运行结束都会被释放。 我们可以通过文件将数据持久化 C中对文件操作需要包含头文件 <fstream> 文件类型分为两种&#xff1a; 文本文件 - 文件以文本的ASCII码形式存储在计算机中二进制文件 - 文…

PADS9.5使用记录

目录 一、概述 二、PADS Logic IN4148二极管封装 SOD-123封装 SOD-323封装 SOD-523封装 2N3904 1AM 三极管封装 78L05 7533-1 一、概述 PADS Logic 原理图绘制PADS Layout PCB 封装设计PADS Router 布线 二、PADS Logic …

1.2.C++项目:仿muduo库实现并发服务器之时间轮的设计

文章目录 一、为什么要设计时间轮&#xff1f;&#xff08;一&#xff09;简单的秒级定时任务实现&#xff1a;&#xff08;二&#xff09;Linux提供给我们的定时器&#xff1a;1.原型2.例子 二、时间轮&#xff08;一&#xff09;思想&#xff08;一&#xff09;代码 一、为什…

重试机制-spring-retry、guava-retry

重试机制是什么&#xff1f; 网络重试机制是用于在网络通信中处理失败的请求。接口重试可以在一定的时间间隔内多次尝试发送相同的请求&#xff0c;直到请求成功或达到最大重试次数为止。 为什么要重试&#xff1f; 1. 提高请求的成功率&#xff1a;网络通信中可能会出现各种…

【Linux指令集】---git命令的基本使用

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【Linux专栏】&#x1f388; 本专栏旨在分享学习Linux的一点学习心得&#xff0c;欢迎大家在评论区讨论&#x1f48c; 演示环境&#xff1…

git使用,一点点

查看自己有没有安装git git --version 如果没有安装请执行sudo yum install -y git来安装 git 指令 git log 查看日志 git pull 同步远端和本地仓库 这就是冲突的报错&#xff1a; 所以这个时候你要同步一下git pull

网络-fetch

文章目录 前言一、fetch简介优点&#xff1a;缺点&#xff1a; 二、使用getpost进度实现取消请求超时实现 总结 前言 本文主要记录浏览器与服务端网络通讯 fetch 的介绍与使用&#xff0c;将完成get、post、进度、取消请求、和超时请求的功能实现。 一、fetch简介 fetch作为继…

国庆day2---select实现服务器并发

select.c&#xff1a; #include <myhead.h>#define ERR_MSG(msg) do{\fprintf(stderr,"__%d__:",__LINE__);\perror(msg);\ }while(0)#define IP "192.168.1.3" #define PORT 8888int main(int argc, const char *argv[]) {//创建报式套接字socketi…

3. 文档操作

1. 创建文档 1.1 创建一个文档 在相应的索引下面使用_doc创建文档&#xff0c;地址为&#xff1a;http://127.0.0.1:9200/students/_doc&#xff0c;创建一个姓名张三的学生信息&#xff1a; {"姓名":"张三","年级":5,"班级":2,&qu…

渐变色毛玻璃形态卡悬停效果

效果展示 页面结构组成 从上述的效果展示可以看出&#xff0c;页面的组成部分主要包含这几个部分&#xff1a; 渐变色的底层方块毛玻璃的内容层内容层上的两个小方块 CSS 知识点 transformlinear-gradient 实现页面结构布局 <div class"box"><span>…

竞赛 大数据疫情分析及可视化系统

文章目录 0 前言2 开发简介3 数据集4 实现技术4.1 系统架构4.2 开发环境4.3 疫情地图4.3.1 填充图(Choropleth maps)4.3.2 气泡图 4.4 全国疫情实时追踪4.6 其他页面 5 关键代码最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 大数据疫…

笔记本电脑查询连接wifi密码

笔记本电脑查询连接wifi密码 1、背景2、环境3、实操3.1、已连接wifi查看密码3.2、之前连接过的wifi密码查看 1、背景 在日常使用过程中遇到两个使用场景。网络管理员跳过一下步骤&#xff0c;针对wifi使用人员。 1、刚到一个新环境中需要连接wifi的场景 2、在一个场所连接过一…

.Net Core后端架构实战【介入IOC控制反转】

引言 Inversion of Control,简称IOC,即控制反转。记得当初刚实习的时候公司的带我的人和我提到过IOC这个概念,当初完全不知道是 啥东西。后来有幸写了半年Java,SpringBoot里面业务开发随处可见IOC。再后来我写.Net Core用到的第一个框架Blog.Core项目,它里 面IRepository与R…

MATLAB中d2d函数用法

目录 语法 说明 示例 重新采样离散时间模型 重新采样已识别的离散时间模型 d2d函数的功能是重新采样离散时间模型。 语法 sys1 d2d(sys, Ts) sys1 d2d(sys, Ts, method) sys1 d2d(sys, Ts, opts) 说明 sys1 d2d(sys, Ts)将离散时间动态系统模型 sys 重新采样&#…

集合-List集合

系列文章目录 1.集合-Collection-CSDN博客​​​​​​ 2.集合-List集合-CSDN博客 文章目录 目录 系列文章目录 文章目录 前言 一 . 什么是List? 二 . List集合的特点 三 . 常用方法 1.void add(int index, E element): 将指定的元素插入到列表的指定位置。 2.E remove(int in…