微服务技术 RabbitMQ SpringAMQP P61-P76

B站学习视频icon-default.png?t=N7T8https://www.bilibili.com/video/BV1LQ4y127n4?p=61&vd_source=8665d6da33d4e2277ca40f03210fe53a

文档资料:

链接:https://pan.baidu.com/s/1P_Ag1BYiPaF52EI19A0YRw?pwd=d03r 
提取码:d03r

一 初始MQ

1. 同步通讯

2. 异步通讯

3. MQ常见架构

二 RabbitMQ 快速入门

1. RabbitMQ概述和安装 --单机部署

我们在Centos7虚拟机中使用Docker来安装

1.1.下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

在课前资料已经提供了镜像包:

上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar

1.2.安装MQ

执行下面的命令来运行MQ容器:

docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

1.2. RabbitMQ概述和安装 --集群部署

2.常见消息模型

3.快速入门 --Basic Queue 简单队列模型

三 SpringAMQP

1. Basic Queue 简单队列模型

  • 结构

1.1 消息发送 

  • 16572是UI端口  5672消息端口

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

1.2 消息接收

  • 16572是UI端口  5672消息端口

import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

2. SpringAMQP入门案例

2.1 介绍

2.2 案例 -- 发送消息

spring:rabbitmq:host: 127.0.0.1 # rabbitMQ的ip地址port: 5672 # 端口username: adminpassword: 123virtual-host: /
@RunWith(SpringRunner.class) // @Autowired 可以注入
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName, message);}
}

2.3 案例 --接收消息

spring:rabbitmq:host: 127.0.0.1 # rabbitMQ的ip地址port: 5672 # 端口username: adminpassword: 123virtual-host: /listener:simple:prefetch: 1
@Component  //声明成 bean
public class SpringRabbitListener {// @RabbitListener(queues = "simple.queue")// public void listenSimpleQueue(String msg) {//     System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");// }@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}
}

3. Work Queue 工作队列模型

3.1 介绍

  • 接收信息
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;
import java.util.Map;@Component  //声明成 bean
public class SpringRabbitListener {/*  @RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");}
*/@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);}
}
  • 发送信息

@RunWith(SpringRunner.class) // @Autowired 可以注入
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName, message);}@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}
}
  • application.yml

    logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
    spring:rabbitmq:host: 127.0.0.1 # rabbitMQ的ip地址port: 5672 # 端口username: adminpassword: 123virtual-host: / #虚拟主机listener:simple:prefetch: 1 # 预取 每次只取一条消息,处理完才能获取下一条消息

4. 发布模型介绍

5. 发布、订阅模型-Fanout

5.1 利用SpringAMQP演示FanoutExchange的使用 ( 发布、订阅模型-Direct)

5.2 步骤一 在consumer服务生命Exchange、QUEUE、Binding

6. 发布、订阅模型-Direct

6.1 案例

7. 发布、订阅模型-Topic

8. 消息转换器

推荐使用json

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/220346.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jenkins 添加节点报错

报错日志 Error: A JNI error has occurred, please check your installation and try again Exception in thread "main" java.lang.UnsupportedClassVersionError: hudson/remoting/Launcher has been compiled by a more recent version of the Java Runtime (cl…

如何搭建企业管理系统Odoo并远程访问管理界面【内网穿透】

文章目录 前言1. 下载安装Odoo&#xff1a;2. 实现公网访问Odoo本地系统&#xff1a;3. 固定域名访问Odoo本地系统 前言 Odoo是全球流行的开源企业管理套件&#xff0c;是一个一站式全功能ERP及电商平台。 开源性质&#xff1a;Odoo是一个开源的ERP软件&#xff0c;这意味着企…

短视频账号矩阵系统3年技术独立源头正规开发搭建

短视频账号矩阵3年技术独立开发打造是一个非常有挑战性和前景的项目。以下是一些建议&#xff0c;帮助你成功打造一个成功的短视频账号矩阵&#xff1a; 1. 确定目标受众&#xff1a;首先需要明确你的目标受众是谁&#xff0c;了解他们的兴趣爱好、年龄、性别等&#xff0c;以便…

ModuleNotFoundError: No module named ‘openai.error‘

ModuleNotFoundError: No module named ‘openai.error’ result self.fn(*self.args, **self.kwargs) File “H:\chatGPTWeb\chatgpt-on-wechat\channel\chat_channel.py”, line 168, in _handle reply self._generate_reply(context) File “H:\chatGPTWeb\chatgpt-on-wec…

【Unity动画】综合案例完结-控制角色动作播放+声音配套

这个案例实现的动作并不复杂&#xff0c;主要包含一个 跳跃动作、攻击动作、还有一个包含三个动画状态的动画混合树。然后设置三个参数来控制切换。 状态机结构如下&#xff1a; 完整代码 using System.Collections; using System.Collections.Generic; using UnityEngine;pu…

ShenYu网关注册中心之Zookeeper注册原理

文章目录 1、客户端注册流程1.1、读取配置1.1.1、用于注册的 ZookeeperClientRegisterRepository1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener 1.2、扫描注解&#xff0c;注册元数据和URI1.2.1、构建URI并写入Disruptor1.2.2、构建元数据并写入Disrupto…

win环境下启动kafka Port already in use: 6688; nested exception is

背景 zk启动成功后&#xff0c;接下来启动kafka&#xff0c;再启动kafka后一直说端口被占用。 端口占用解决办法: netstat -aon|findstr 9092 taskkill -f -pid 7780 杀掉后&#xff0c;再次启动kafka时&#xff0c;问题并未解决 后来修改了批处理文件kafka-run-class.bat中…

STM32_串口下载程序

目录标题 前言1、理论知识2、串口下载具体操作2.1、硬件准备2.2、软件准备2.3、设置单片机的启动模式为系统存储器启动2.4、软件配置2.5、下载程序 附:生成hex文件 前言 使用调试器下载程序又快有稳定还能使用调试功能&#xff0c;当然是下载调试的首选。但是拓展下串口下载程…

vscode使用remote ssh到server上 - Node进程吃满CPU

起因&#xff1a;Node进程吃满CPU 分析 我发现每次使用vscode的remote插件登陆到server后&#xff0c;就会出现node进程&#xff0c;不太清楚干什么用的&#xff0c;但是绝对和它有关。 查找原因 首先找到了这篇文章&#xff0c;解决了rg进程的问题&#xff1a; https://blo…

DBA面试题

Oracle体系结构 &#xff08;1&#xff09;、Oracle实例内存中包含哪些部分? 答: sga与pga sga:是一组共享的内存区域&#xff0c;包含数据字典缓存、库缓存、重做日志缓冲区 Pga:为每个服务器进程分配的非共享内存&#xff0c;存储会话状态和私有SOL工作区 在Oracle数据库中&…

JVM-9-Class类文件的结构

Java技术能够一直保持着非常良好的向后兼容性&#xff0c;Class文件结构的稳定功不可没。 Class文件是一组以8个字节为基础单位的二进制流&#xff0c;各个数据项目严格按照顺序紧凑地排列在文件之中。 Class文件格式采用一种类似于C语言结构体的伪结构来存储数据&#xff0c…

JDK bug:ciObjectFactory::create_new_metadata

文章目录 1、问题2.详细日志3.JDK&#xff1a;bug最终bug链接&#xff1a; 京东遇到过类似bug各位大佬如果有更详细的解答可以留言。 1、问题 Problematic frame: V [libjvm.so0x438067] ciObjectFactory::create_new_metadata(Metadata*)0x327 关键字还是ciObjectFactory::cr…

【HCIP学习记录】OSPF之DD报文

1.OSPF报文格式 24字节 字段长度含义Version1字节版本&#xff0c;OSPF的版本号。对于OSPFv2来说&#xff0c;其值为2。Type1字节类型&#xff0c;OSPF报文的类型&#xff0c;有下面几种类型&#xff1a; 1&#xff1a;Hello报文&#xff1b;● 2&#xff1a;DD报文&#xff1…

三、Spring IoC 容器和核心概念

本章概要 组件和组件管理概念 什么是组件&#xff1f;我们的期待Spring充当组件管理角色&#xff08;IoC&#xff09;组件交给Spring管理优势 Spring IoC 容器和容器实现 普通和复杂容器SpringIoC 容器介绍SpringIoC 容器具体接口和实现类SpringIoC 容器管理配置方式 Spring I…

13、Kafka副本机制详解

Kafka 副本机制详解 1、副本定义2、副本角色3、In-sync Replicas&#xff08;ISR&#xff09;4、Unclean 领导者选举&#xff08;Unclean Leader Election&#xff09; 所谓的副本机制&#xff08;Replication&#xff09;&#xff0c;也可以称之为备份机制&#xff0c;通常是指…

ASP.NET MVC+EntityFramework图片头像上传

1&#xff0c;先展示一下整体的效果 2&#xff0c;接下来展示用户添加以及上传头像代码、添加用户界面 前端代码如下&#xff1a; <div class"form-group">Html.LabelFor(model > model.img, "头像&#xff1a;", htmlAttributes: new { class &…

jmeter如何循环运行到csv文件最后一行后停止

1、首先在线程组中设置’循环次数‘–勾选永远 2、csv数据文件设置中设置&#xff1a; 遇到文件结束符再次循环?——改为&#xff1a;False 遇到文件结束符停止线程?——改为&#xff1a;True 3、再次运行就会根据文档的行数运行数据 &#xff08;如果需要在循环控制器中&…

多项目同时跑多个node版本-比nvm好用的volta

开发环境中多个项目需要node版本不同&#xff0c;且同时不止是一个项目在开发中&#xff0c;用了nvm进行node版本管理和切换&#xff0c;但是太麻烦了。新的解决方案volta可以比较好的处理这种情况 Volta 官网先挂出来&#xff1a;https://volta.sh/ 1、volta是什么&#xff…

亚信安慧AntDB数据库——助力5G计费核心替换,全面自主可控

数字经济时代&#xff0c;5G以更快、更丰富、更智能的连接方式服务于各行各业。AntDB数据库&#xff0c;源于亚信科技&#xff0c;自2008年起成功落地全国24个省份的中国移动、中国电信、中国联通和中国广电等运营商项目&#xff0c;为数字化服务和信息化基础建设提供支持。 在…

VSCode 常用的快捷键和技巧系列(2)

一、如何让VSCode工程树显示图标 第一步&#xff1a;安装 快捷键 CtrlP &#xff0c;输入 ext install vscode-icons &#xff0c;然后点击安装插件 第二步&#xff1a;配置 安装成功后&#xff0c;点击Reload重新加载。 然后配置&#xff0c;当前图标使用VsCode-Icons Go…