【消息队列】RabbitMQ五种消息模式

RabbitMQ

  • RabbitMQ
    • RabbitMQ安装
  • 常见的消息模型
    • 基本消息队列
    • SpringAMQP
    • WorkQueue
    • 消息预取
    • 发布订阅模式
      • Fanout Exchange
      • DirectExchange
      • TopicExchange
    • 消息转换器

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件
官网地址:https://www.rabbitmq.com/

RabbitMQ安装

我们在Centos虚拟机中使用Docker来安装

  1. 下载镜像,在线拉取
    docker pull rabbitmq
  2. 安装MQ
docker run\
--env RABBITMQ_DEFAULT_USER=itcast \  # 设置环境变量用户名
--env RABBITMQ_DEFAULT_PASS= \  # 设置环境变量密码
--name mq \   # 队列名称
--hostname mq1 \  #配置主机名
-p 15672:15672 \  # MQ管理端口
-p 5672:5672 \   #MQ消息传输端口
-d \   # 后台运行
rabbitmq

在这里插入图片描述
在这里插入图片描述

交换机的创建与消息的发送由虚拟主机来完成,每个用户的虚拟主机是相互隔离的

在RabbitMQ中:
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue,exchange等资源的逻辑分组

常见的消息模型

  1. 基本消息队列
  2. 工作消息队列

这两种并没有用到交换机,而是直接到达队列

  1. 发布订阅(Publish,Subscribe),根据交换机类型不同分为三种:
    Fanout Exchange:广播
    Direct Exchange:路由
    Topic Exchange:主题

基本消息队列

publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接收并缓存消息
consumer:订阅队列,处理队列中的消息

java模型(消息发布者)

@Test
public void test() throws IOException,TimeoutException{//1.建立连接,与消息队列进行连接ConnetionFactory factory =new ConnetionFactory();//设置连接参数,主机名,端口号,vhost,用户名,密码factory.setHost(192.168.75.136);factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("");//建立连接Connection connection =factory.newConnection();//创建通道Channel,就可以向队列发送消息了Channel channel =connection.createChannel();//创建队列String queuename="hlh";channel.queueDeclare(queuename,false,false,false,null);//发送消息String message="hello";channel.basicPublish("",queuename,null,message.getBytes());//关闭通道和连接channel.close();connection.close();
}

java模型(消息消费者)

    //1.建立连接,与消息队列进行连接ConnetionFactory factory =new ConnetionFactory();//设置连接参数,主机名,端口号,vhost,用户名,密码factory.setHost(192.168.75.136);factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("");//建立连接Connection connection =factory.newConnection();//创建通道Channel,就可以向队列发送消息了Channel channel =connection.createChannel();//创建队列String queuename="hlh";channel.queueDeclare(queuename,false,false,false,null);//订阅消息channel.basicConsume(queuename,true,new DefaultConsumer(channel){@Override//处理消息的代码,绑定函数,有了消息才执行public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{//处理消息String message=new String(body);             }})

注意:上边生产者消费者都创建了队列:

这是为了防止消息队列中的队列不存在,在进行消息队列初始化的时候不知道是先建立消费者,还是先建立生产者,所以都执行创建函数,但是创建的队列只有一个不会重复

SpringAMQP

  • AMQP

是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中的独立性的要求

  • Spring AMQP

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中Spring-amqp是基础抽象,spring-rabbit是底层的默认实现

  • 特征:
  1. 监听器容器,用于异步处理入站消息
  2. 用于发送和接收消息的RabbitTemplate
  3. Rabbitadmin用于自动声明队列,交换和绑定
  • 使用:
  1. 引入spring-amqp的依赖
    在这里插入图片描述
    在yml中配置mq连接信息:
spring: rabbitmq:host: 192.168.75.136 #主机名port: 5672 #端口virtual-host: / #虚拟主机username: itcast #用户名password:   #密码
  1. 在生产者服务中利用RabbitTemplate发送消息到hlh.queue这个队列
public class springamqptest{@Autowiredprivate RabbitTemplate rabbittemplate;@Testpublic void test(){String queuename="hlh.queue";String message="hello";rabbittemplate.convertAndSend(queuename,message);}
}
  1. 在消费者服务端编写消费逻辑,绑定到hlh.queue这个队列中
@Component
public class SpringrabbitListener {@RabbitListener(queues="hlh.queue")public void listenSimple(String msg) throws InterruptedException{//消费逻辑代码}
}

注意:消息一旦消费就会从队列中删除,rabbitmq没有消息回溯功能

WorkQueue

Work queue,工作队列。可以提高消息处理速度,避免队列消息堆积

一个消息队列绑定多个消费者

假设现在生产者每秒循环发送50条消息,此时的消费者怎么处理:

@Component
public class SpringrabbitListener {@RabbitListener(queues="hlh.queue")public void listenSimple(String msg) throws InterruptedException{//消费逻辑代码}@RabbitListener(queues="hlh.queue")public void listenSimple2(String msg) throws InterruptedException{//消费逻辑代码}
}

通过定义多个消费者进行消费,追上生产者生产的速度,同一个消息只能被一个消费者消费,一旦消费完就会在队列中删除

消息预取

指的每个消费者每次取多少条消息:
可以通过配置进行配置:

spring:rabbitmq:host: 192.168.75.136port: 5672virtual-host: /username: itcastpassword: listener:simple:prefecth: 1 #每次只能获取一条消息,处理完才能获得下一个消息

发布订阅模式

发布订阅可以使得同一个消息发送给多个消费者,实现方式是加入了exchange(交换机)
在这里插入图片描述

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

交换机的作用:

  1. 接收生产者的消息,将消息按照规则路由到与之绑定的队列
  2. 不能缓存消息,路由失败,消息丢失
  3. FanoutExchange的会将消息路由到每个绑定的队列

SpringAMQP提过了声明交换机,队列,绑定关系的API:
在这里插入图片描述

Fanout Exchange

Fanout Exchange 会将所有的消息路由到每一个跟其绑定的queue
在创建配置类,在配置类中进行消息队列绑定交换机

@Configuration
public class FanoutConfig{// 声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//声明一个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}// 绑定队列跟交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}
}

此时的生产者如何发送消息:

public void test(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);
}

监听者如何收到消息

@RabbitListener(queues="fanout.queue1")
public void listener(String msg){//处理得到的消息
}

DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此成为路由模式(routes)

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

一个队列可以指定多个Key

我们可以通过 @RabbitListener声明Exchange,Queue,RoutingKey
在消费者方法上注解

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key={"red","blue"}))
public void Listener(String msg){//进行消息的处理
}

在生产者生产时:

public void test(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}

TopicExchange

TopicExchange与路由模式类似,区别在于routingKey必须是多个单词的列表,并且以.分隔
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
同样也是使用 @RabbitListener进行声明

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key="hi.#"))
public void Listener(String msg){//进行消息的处理
}

生产者生产消息:

public void test(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息rabbitTemplate.convertAndSend(exchangeName,"hi.now",message);
}

消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是我们可以发送任意对象类型的消息,SpringAMQP会帮助我们序列化为字节后发送

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化

如果要修改只需定义一个MessageConverter 类型的Bean即可,推荐使用JSON方式完成序列化

  1. 引入jackson的依赖
    在这里插入图片描述
  2. 声明MessageConverter:
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}

这样发送的消息就会使用自定义的转换类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/317811.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言趣味代码(四)

这一篇主要编写几个打字练习的小程序,然后通过这些小程序的实现来回顾复习我们之前学过的知识,然后通过这写打字练习的小程序来提升我们的打字技术和编程技术。 1. 打字练习 1.1 基本打字练习 1.1.1 基本实现 首先我们来制作一个用于计算并显示输入一…

嵌入式学习59-ARM7(自动设备号和混杂设备)

知识零碎: 头文件查找: /arm/路径下的头文件 linux驱动程序的编写,编译,运行过程 -------------------------------------------------------------------------------------------------------------------------------- 1.…

【C语言】深入了解文件:简明指南

🌈个人主页:是店小二呀 🌈C语言笔记专栏:C语言笔记 🌈C笔记专栏: C笔记 🌈喜欢的诗句:无人扶我青云志 我自踏雪至山巅 文章目录 一、文件的概念1.1 文件名:1.2 程序文件和数据文件 二、数据文…

手拉手springboot整合kafka

前期准备安装kafka 启动Kafka本地环境需Java 8以上 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 Kafka启动方式有Zookeeper和Kraft,两种方式只能选择其中一种启动,不能同时使用。 Kafka下载…

头歌:Spark的安装与使用

第1关:Scala语言开发环境的部署 相关知识 Scala是一种函数式面向对象语言,它融汇了许多前所未有的特性,而同时又运行于JVM之上。随着开发者对Scala的兴趣日增,以及越来越多的工具支持,无疑Scala语言将成为你手上一件…

第5篇:创建Nios II工程之Hello_World<四>

Q:最后我们在DE2-115开发板上演示运行Hello_World程序。 A:先烧录编译Quartus硬件工程时生成的.sof文件,在FPGA上成功配置Nios II系统;然后在Nios II Eclipse窗口右键点击工程名hello_world,选择Run As-->Nios II …

如何使用Go语言进行并发安全的数据访问?

文章目录 并发安全问题的原因解决方案1. 使用互斥锁(Mutex)示例代码: 2. 使用原子操作(Atomic Operations)示例代码: 3. 使用通道(Channels) 在Go语言中,进行并发编程是常…

SpringMVC整体工作流程

. 用户发起一个请求,请求首先到达前端控制器前端控制器接收到请求后会调用处理器映射器,由此得知,这个请求该由哪一个Controller来进行处理(并未调用Controller);前端控制器调用处理器适配器,告诉处理器适配器应该要…

搭建vue3组件库(三): CSS架构之BEM

文章目录 1. 通过 JS 生成 BEM 规范名称1.1 初始化 hooks 目录1.2 创建 BEM 命名空间函数1.3 通过 SCSS 生成 BEM 规范样式 2. 测试 BEM 规范 BEM 是由 Yandex 团队提出的一种 CSS 命名方法论,即 Block(块)、Element(元素&#xf…

qt-C++笔记之滑动条QSlider和QProgressBar进度条

qt-C笔记之滑动条QSlider和QProgressBar进度条 —— 2024-04-28 杭州 本例来自《Qt6 C开发指南》 文章目录 qt-C笔记之滑动条QSlider和QProgressBar进度条1.运行2.阅读笔记3.文件结构4.samp4_06.pro5.main.cpp6.widget.h7.widget.cpp8.widget.ui 1.运行 2.阅读笔记 3.文件结构…

安装VMware Tools报错处理(SP1)

一、添加共享文件 因为没有VMware Tools,所以补丁只能通过共享文件夹进行传输了。直接在虚拟机的浏览器下载的话,自带的IE浏览器太老了,网站打不开,共享文件夹会方便一点,大家也可以用自己的方法,能顺利上…

关于我转生从零开始学C++这件事:升级Lv.10

❀❀❀ 文章由不准备秃的大伟原创 ❀❀❀ ♪♪♪ 若有转载,请联系博主哦~ ♪♪♪ ❤❤❤ 致力学好编程的宝藏博主,代码兴国!❤❤❤ 盘古开天辟地,大伟五一更新。大家好哇,大伟今天继续给大家来更新我们的C:…

【Linux】进程终止

思维导图 学习内容 进程终止是进程控制里面的一个重要的知识,通过这一篇博客,我们可以学习到进程终止的概念,进程终止的三种情况,进程终止的退出码和退出信号,最后在来学习进程是如何进行终止的。 学习目标 进程终止…

CTFHub-Web-文件上传

CTFHub-Web-文件上传-WP 一、无验证 1.编写一段PHP木马脚本 2.将编写好的木马进行上传 3.显示上传成功了 4.使用文件上传工具进行尝试 5.连接成功进入文件管理 6.上翻目录找到flag文件 7.打开文件查看flag 二、前端验证 1.制作payload进行上传发现不允许这种类型的文件上传 …

3.8设计模式——State 状态模式(行为型)

意图 允许一个对象在其内部状态改变时改变它的行为。对象看起来似乎修改了它的类。 结构 Context(上下文)定义客户感兴趣的接口;维护一个ConcreteState子类的实例,这个实例定义当前状态。State(状态)定义…

【LangChain系列 12】Prompt模版——序列化

本文速读: PromptTemplate FewShotPromptTemplate 通常prompt以文件形式存储比python代码更好,一方面可以更容易共享、存储。本文将介绍在LangChain中如何对prompt以不同的方式序列化。 一般来说,对于序列化有以下两个设计原则&#xff1a…

特斯拉全自动驾驶系统Tesla‘s Full-Self Driving (FSD)

版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl Overview Tesla’s FSD is a suite of features that includes Autopilot, Navigate on Autopilot, Auto Lane Change, Autopark, Summon, and Traffic Light and Stop Sig…

基于Python的在线学习与推荐系统设计与实现(论文+源码)-kaic

题目:在线学习与推荐系统设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本在线学习与推荐系统就是在这样的大环境下诞生&#xff0…

芯启智行丨基于G32A1445的汽车音乐律动氛围灯解决方案

随着智能汽车技术的深度渗入,汽车照明作为汽车设计的重要组成部分,正在重塑驾驶员与汽车的互动方式,从简单的照明工具优化升级为承载更多丰富功能和不同应用场景的智能化安全装置。现代智能车型广泛配备了前照灯、车内环境氛围灯、尾灯等汽车…

【Flutter】极光推送配置流程(小米厂商通道) 章二

前言 继【Flutter】极光推送配置流程(极光通道/华为厂商/IOS) 章一 并且,我大概率不会去修改第一篇文章的内容。 随着我自己在配置公司的项目的同时,我希望一直更新这个推送系列文章。 在章一配置完后,也是出现了一些问题,所以本…