快速上手RabbitMQ

  1. 安装RabbitMQ
    1. 首先将镜像包上传到虚拟机,使用命令加载镜像
docker load -i mq.tar
    1. 运行MQ容器
docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
  1. MQ的基本结构
    1. RabbitMQ的一些角色
      1. publisher:生产者
      2. consumer:消费者
      3. exchange:交换机,负责消息路由
      4. queue:队列,存储消息
      5. virtualHost:虚拟主机,隔离不同租户的exchange,queue,消息的隔离
    1. 快速入门

public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}
public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}
  1. SpringAMQP
    1. 功能
      1. 自动声明队列、交换机及其绑定关系
      2. 基于注解的监听器模式,异步接收消息
      3. 封装了RabbitTemplate工具,用于发送消息
    1. 简化模型 === producer->queue->consumer
      1. BasicQueue
        1. 首先在父工程中引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
        1. 配置MQ地址,在publisher服务的application.yml中添加配置
spring:rabbitmq:host: 192.168.137.138 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码
        1. 编写队列
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg){log.info("接受到的消息:{}",msg);
}
        1. 发送消息
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue(){String queueName = "simple.queue";String message = "hello,world";rabbitTemplate.convertAndSend(queueName,message);}}
      1. WorkQueue === 让多个消费者绑定到一个队列,共同消费队列中的消息
        1. 结构图

        2. 消息发送
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testWorkQueue() throws Exception{String queueName = "simple.queue";String message = "hello,world";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName,"第"+i+"个"+message);Thread.sleep(20);}}
}
        1. prefetch能者多劳机制
          1. 原理:mq在收到consumer的ack之前,可以向consumer推送的消息的条数,默认250
          2. 修改consumer服务的application.yml文件
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        1. 消息接受
@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void onWorkQueue1(String msg) throws Exception {log.info("work1接收到的消息,{}", msg);Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void onWorkQueue2(String msg) throws Exception {log.info("work2接收到的消息,{}", msg);Thread.sleep(200);}}
    1. 发布/订阅模型 === producer->exchange(只负责路由,不负责存储)->queue->consumer
      1. Fanout === 广播给所有的queue
        1. 结构图

        2. 消息发送流程
          1. 可以有多个队列
          2. 每个队列都要绑定到Exchange(交换机)
          3. 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
          4. 交换机把消息发送给绑定过的所有队列
          5. 订阅队列的消费者都能拿到消息
        1. 在消费者模块中创建一个类,声明队列和交换机
@Configuration
public class FanoutConfig {/** 创建一个交换机* */@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanout.exchange");}/** 创建队列1* */@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/** 创建队列2* */@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/** 将队列1绑定到交换机* */@Beanpublic Binding queue1Binding(){return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}/** 将队列2绑定到交换机* */@Beanpublic Binding queue2Binding(){return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}}
        1. 发送消息
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testFanoutExchange() {// 队列名称String exchangeName = "fanout.exchange";// 消息String message = "hello world!";rabbitTemplate.convertAndSend(exchangeName, "", message);}
}
        1. 消息接受
@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")public void fanoutQueue1(String msg){log.info("收到了来自fanout.queue1的消息,{}",msg);}@RabbitListener(queues = "fanout.queue2")public void fanoutQueue2(String msg){log.info("收到了来自fanout.queue2的消息,{}",msg);}
}
      1. Direct === 路由给exchange绑定的queue
        1. 结构图

        2. 消息发送流程
          1. queue与exchange绑定的时候需要设置bindingkey
          2. 可以设置多个bindingkey,key可以重复
          3. produce发送的时候需要设置routingkey
          4. exchange判断消息的routingkey与queue中的bindingkey是否完全一致,一致才会接受到消息
        1. 基于注解声明队列和交换机
@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "direct.exchange"),key = {"red","blue"}))public void directQueue1(String msg){log.info("收到了来自direct.queue1的消息,{}",msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "direct.exchange"),key = {"gary","blue"}))public void directQueue2(String msg){log.info("收到了来自direct.queue2的消息,{}",msg);}}
        1. 消息发送
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testDirectExchange(){String exchange = "direct.exchange";String routingKey = "gary";String message = "hello direct";rabbitTemplate.convertAndSend(exchange,routingKey,message);}}
        1. Direct交换机与Fanout交换机有什么区别?
          1. Fanout交换机将消息路由给每一个与之绑定的队列
          2. Direct交换机根据RoutingKey判断路由给哪个队列
      1. Topic
        1. 结构图

        2. 匹配支持通配符
          1. *:1个单词
          2. #:1个或者多个单词
        1. 基于注解声明队列和交换机
@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "topic.exchange"),key = "china.#"))public void topicQueue1(String msg){log.info("收到了来自topic.queue1的消息,{}",msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "topic.exchange"),key = "#.news"))public void topicQueue2(String msg){log.info("收到了来自topic.queue2的消息,{}",msg);}}
        1. 消息发送
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testTopicExchange(){String exchange = "topic.exchange";String routingKey = "china.123";String message = "so cool";rabbitTemplate.convertAndSend(exchange, routingKey, message);}}
    1. 消息转换器
      1. 默认发送String,byte[],Serializable
      2. 可以自定义序列化
        1. 在publisher和consumer两个服务中都引入依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
        1. 注入MessageConverter的实现类
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
        1. 消息发送
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testObjectQueue(){String queue = "object.queue";User message = new User("蒋浩楠",80);rabbitTemplate.convertAndSend(queue,message);}
}
        1. 接收消息
@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "object.queue")public void objectQueue(UserDTO dto){log.info("收到了来自topic.queue2的消息,{}",dto.toString());}
}
  1. RabbitMQ集群

    1. 普通集群
      1. 结构图

      2. 特征
        1. 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
        2. 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
        3. 队列所在节点宕机,队列中的消息就会丢失
    1. 镜像集群
      1. 结构图

      2. 特征
        1. 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
        2. 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点
        3. 一个队列的主节点可能是另一个队列的镜像节点
        4. 所有操作都是主节点完成,然后同步给镜像节点
        5. 主宕机后,镜像节点会替代成新的主
    1. 仲裁队列
      1. 特征
        1. 与镜像队列一样,都是主从模式,支持主从数据同步
        2. 使用非常简单,没有复杂的配置
        3. 主从同步基于Raft协议,强一致
      1. java代码中创建仲裁队列
        1. 创建队列
@Bean
public Queue quorumQueue() {return QueueBuilder.durable("quorum.queue") // 持久化.quorum() // 仲裁队列.build();
}
        1. SpringAMQP连接MQ集群
spring:rabbitmq:addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073 #address来代替host、port方式username: itcastpassword: 123321virtual-host: /
  1. 部署集群
    1. 计划部署3节点的mq集群

    2. 获取cookie,每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookieUTQKOGHXAJPQFJREBLEL #cookiedocker rm -f mq #停止并删除当前的mq容器,我们重新搭建集群
    1. 准备集群配置
#在/tmp目录新建一个配置文件 rabbitmq.conf
cd /tmp# 创建文件
touch rabbitmq.conf#配置文件内容如下
loopback_users.guest = false
listeners.tcp.default = 5672
default_user = itcast 
default_pass = 123321
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
    1. 再创建一个文件,记录cookie
cd /tmp# 创建cookie文件
touch .erlang.cookie# 写入cookie
echo "UTQKOGHXAJPQFJREBLEL" > .erlang.cookie
# 修改cookie文件的权限
# 修改cookie文件的权限
# 修改cookie文件的权限
chmod 600 .erlang.cookie
    1. 准备三个目录,mq1、mq2、mq3,然后拷贝rabbitmq.conf、cookie文件到mq1、mq2、mq3:
cd /tmp# 创建目录
mkdir mq1 mq2 mq3# 进入/tmp
cd /tmp# 拷贝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3# 或者
echo mq1 mq2 mq3 | xargs -t -n 1 cp rabbitmq.conf
echo mq1 mq2 mq3 | xargs -t -n 1 cp .erlang.cookie
    1. 启动集群
#创建一个网络
docker network create mq-net#运行命令
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/mq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3-managementdocker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/mq2/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3-managementdocker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/mq3/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3-management
    1. 添加镜像模式
docker exec -it mq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

    1. 添加仲裁队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/319418.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用 GPT API 从 PDF 出版物导出研究图表?

原文地址&#xff1a;how-to-use-gpt-api-to-export-a-research-graph-from-pdf-publications 揭示内部结构——提取研究实体和关系 2024 年 2 月 6 日 介绍 研究图是研究对象的结构化表示&#xff0c;它捕获有关实体的信息以及研究人员、组织、出版物、资助和研究数据之间的关…

brpc profiler

cpu profiler cpu profiler | bRPC MacOS的额外配置 在MacOS下&#xff0c;gperftools中的perl pprof脚本无法将函数地址转变成函数名&#xff0c;解决办法是&#xff1a; 安装standalone pprof&#xff0c;并把下载的pprof二进制文件路径写入环境变量GOOGLE_PPROF_BINARY_PA…

Microsoft 365 for Mac(Office 365)v16.84正式激活版

office 365 for mac包括Word、Excel、PowerPoint、Outlook、OneNote、OneDrive和Teams的更新。Office提供了跨应用程序的功能&#xff0c;帮助用户在更短的时间内创建令人惊叹的内容&#xff0c;您可以在这里创作、沟通、协作并完成重要工作。 Microsoft 365 for Mac(Office 36…

1. 深度学习笔记--神经网络中常见的激活函数

1. 介绍 每个激活函数的输入都是一个数字&#xff0c;然后对其进行某种固定的数学操作。激活函数给神经元引入了非线性因素&#xff0c;如果不用激活函数的话&#xff0c;无论神经网络有多少层&#xff0c;输出都是输入的线性组合。激活函数的意义在于它能够引入非线性特性&am…

【webrtc】MessageHandler 7: 基于线程的消息处理:切换main线程向observer发出通知

以当前线程作为main线程 RemoteAudioSource 作为一个handler 仅实现一个退出清理的功能 首先on message的处理会切换到main 线程 :main_thread_其次,这里在main 线程对sink_ 做清理再次,在main 线程做出状态改变,并能通知给所有的observer 做出on changed 行为。对接mediac…

clang:在 Win10 上编译 MIDI 音乐程序(二)

先从 Microsoft C Build Tools - Visual Studio 下载 1.73GB 安装 "Microsoft C Build Tools“ 访问 Swift.org - Download Swift 找到 Windows 10&#xff1a;x86_64 下载 swift-5.10-RELEASE-windows10.exe 大约490MB 建议安装在 D:\Swift\ &#xff0c;安装后大约占…

《金融研究》:普惠金融改革试验区DID工具变量数据(2012-2023年)

数据简介&#xff1a;本数据集包括普惠金融改革试验区和普惠金融服务乡村振兴改革试验区两类。 其中&#xff0c;河南兰考、浙江宁波、福建龙岩和宁德、江西赣州和吉安、陕西铜川五省七地为普惠金融改革试验区。山东临沂、浙江丽水、四川成都三地设立的是普惠金融服务乡村振兴…

手撸Mybatis(二)—— 配置项的获取

本专栏的源码&#xff1a;https://gitee.com/dhi-chen-xiaoyang/yang-mybatis。 配置项解析 在mybatis中&#xff0c;一般我们会定义一个mapper-config.xml文件&#xff0c;来配置数据库连接的相关信息&#xff0c;以及我们的mapperxml文件存放目录。在本章&#xff0c;我们会…

docker-compose启动mysql5.7报错

描述一下问题经过&#xff1a; 使用docker compose 部署mysql5.7 文件如下: 使用命名卷的情况下&#xff0c;匿名卷不存在该问题 services:mysql:restart: alwaysimage: mysql:5.7container_name: mysql-devports:- 3306:3306environment:- MYSQL_DATABASEdev- MYSQL_ROOT_PAS…

团队经理口才训练教案(3篇)

团队经理口才训练教案&#xff08;3篇&#xff09; **篇&#xff1a;基础口才训练 一、教学目标 让团队经理了解口才在团队管理中的重要性。 教授基础口才技巧&#xff0c;如发音、语速、语调等。 二、教学内容 口才的重要性 强调团队经理的口才能力对团队凝聚力、沟通…

详细介绍ARM-ORACLE Database 19c数据库下载

目录 1. 前言 2. 获取方式 2.1 ORACLE专栏 2.2 ORACLE下载站点 1. 前言 现有网络上已有非常多关于ORACLE数据库机下载的介绍&#xff0c;但对于ARM平台的介绍不多&#xff0c;借此机会我将该版的下载步骤做如下说明&#xff0c;希望能够一些不明之人提供帮助和参考 2. 获…

分享一篇关于AGI的短文:苦涩的教训

学习强化学习之父、加拿大计算机科学家理查德萨顿&#xff08; Richard S. Sutton &#xff09;2019年的经典文章《The Bitter Lesson&#xff08;苦涩的教训&#xff09;》。 文章指出&#xff0c;过去70年来AI研究走过的最大弯路&#xff0c;就是过于重视人类既有经验和知识&…

Flutter笔记:美工设计.导出视频到RIVE

Flutter笔记 美工设计.导出视频到RIVE - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https://blog.csdn.net/qq_28…

目标跟踪—卡尔曼滤波

目标跟踪—卡尔曼滤波 卡尔曼滤波引入 滤波是将信号中特定波段频率滤除的操作&#xff0c;是抑制和防止干扰的一项重要措施。是根据观察某一随机过程的结果&#xff0c;对另一与之有关的随机过程进行估计的概率理论与方法。 历史上最早考虑的是维纳滤波&#xff0c;后来R.E.卡…

原生IP和住宅IP有什么区别?

原生IP和住宅IP在多个方面存在显著的区别。 从定义和来源来看&#xff0c;原生IP是指未经NAT&#xff08;网络地址转换&#xff09;处理的真实、公网可路由的IP地址&#xff0c;它直接从互联网服务提供商&#xff08;ISP&#xff09;获得&#xff0c;而不是通过代理服务器或VP…

【MATLAB】解决不同版本MATLAB出现中文乱码的问题

解决不同版本MATLAB出现中文乱码的问题 方法1&#xff1a;更改保存类型为GBK方法2&#xff1a;记事本打开方法3&#xff1a;Notepad参考 低版本matlab打开高版本Matlab的.m文件时&#xff0c;出现中文乱码问题。比如下图&#xff1a; 出现原因为&#xff1a; 编码格式不统一问…

批量抓取某电影网站的下载链接

思路&#xff1a; 进入电影天堂首页&#xff0c;提取到主页面中的每一个电影的背后的那个urL地址 a. 拿到“2024必看热片”那一块的HTML代码 b. 从刚才拿到的HTML代码中提取到href的值访问子页面&#xff0c;提取到电影的名称以及下载地址 a. 拿到子页面的页面源代码 b. 数据提…

Ansys Speos|进行智能手机镜头杂散光分析

本例的目的是研究智能手机Camera系统的杂散光。杂散光是指光向相机传感器不需要的散光光或镜面光&#xff0c;是在光学设计中无意产生的&#xff0c;会降低相机系统的光学性能。 在本例中&#xff0c;光学透镜系统使用Ansys Zemax OpticStudio (ZOS)进行设计&#xff0c;并使用…

A Bug‘s Life (并查集)

//新生训练 #include <iostream> #include <algorithm> using namespace std; const int N 5000; int p[N], sz[N]; int n, m; int find(int x) {if (p[x] ! x)p[x] find(p[x]);return p[x]; } int main() {int T;scanf("%d", &T);for (int k 1; …

RabbitMQ之顺序消费

什么是顺序消费 例如&#xff1a;业务上产生者发送三条消息&#xff0c; 分别是对同一条数据的增加、修改、删除操作&#xff0c; 如果没有保证顺序消费&#xff0c;执行顺序可能变成删除、修改、增加&#xff0c;这就乱了。 如何保证顺序性 一般我们讨论如何保证消息的顺序性&…