springboot使用rabbitmq

使用springboot创建rabbitMQ的链接。

整个项目结构如下:

img

1.maven依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version>
</dependency>

application.yaml的配置如下

spring:application:name: rabbitMQrabbitmq:host: 192.168.142.128  #rabbitmq的主机名port: 5672			   #端口,默认5672username: itheima      #rabbitmq的账号password: 123321	   #密码
server:port: 8081               #项目启动端口

2.创建rabbitMQ配置类 – RabbitConfig。

@Configuration
@Slf4j
public class RabbitConfig {@Bean("directExchange")public DirectExchange directExchange() {return new DirectExchange(MQConstant.DIRECT_EXCHANGE);}@Bean("directQueue")public Queue directQueue() {return new Queue(MQConstant.DIRECT_QUEUE);}@Bean("bindingDirectExchange")public Binding bindingDirectExchange(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue") Queue directQueue) {return BindingBuilder.bind(directQueue).to(directExchange).with(MQConstant.ROUTING_KEY);}}

3.创建RabbitMQ客户端类,主要是用来发送消息用的。

@Component
@Slf4j
public class RabbitMqClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(MessageBody messageBody){try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.DIRECT_EXCHANGE, MQConstant.ROUTING_KEY, JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}
}

4.创建接收消息类 —RabbitMqServer

@Component
@Slf4j
public class RabbitMqServer {@RabbitListener(queues = MQConstant.DIRECT_QUEUE)public void receive(String message) {try {log.info("receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);switch (messageBody.getTopic()) {case QueueTopic.USER_LOGIN:User user = JSON.parseObject(messageBody.getData(), User.class);log.info("receive user:{}", user);break;default:log.info("no need hanndle message:{},topic:{}", message, messageBody.getTopic());break;}}catch (Exception e){log.error("rabbitmq receive message error:{}", e);}}
}

5.有了以上准备后就可以开始向mq里面发送消息了,在单元测试编写测试代码。

@SpringBootTest(classes = RabbitMqApplication.class)
class RabbitMqApplicationTests {@Autowiredprivate RabbitMqClient rabbitMqClient;@Testvoid testDirectSend() {//数据User user = new User();user.setId(123);user.setName("Lewin-jie2");user.setPassword("123");MessageBody messageBody = new MessageBody();messageBody.setData(JSON.toJSONString(user));long time = new Date().getTime();messageBody.setSendTime(time);//添加主题messageBody.setTopic(QueueTopic.USER_LOGIN);rabbitMqClient.send(messageBody);}}

6.运行后,可以看到后台的日志,证明我们消息发送已经成功了。

image-20241109151751035

我们打开rabbitmq的控制台(http://你的主机名:15672),可以开到队列里面也收到了消息,但是还没有被消费。

image-20241109152401671

以上出现结果就证明rabbit已经是配置好了。那么我们来了解一下啊rabbitmq

简介:rabbitmq是基于amqp协议,用elang语言开发的一个高级的消息队列,以高性能,高可靠,高吞吐量而被大量应用到应用系统作为第三方消息中间件使用,为应用系统实现应用解耦削峰减流,异步消息

rabbitmq主要构造有,producter,consumer,exchange,queue组成

1.直连交换机(direct_exchange)。

刚刚配置的时候就是演示的producter发消息到直连交换机,然后再发送到queue中的过程。

2.广播交换机(fanout_exchange).

顾名思义,就是绑定该交换机的所有队列都可以收到这个交换机的消息

	@Bean("fanoutExchange")public FanoutExchange fanoutExchange() {return new FanoutExchange(MQConstant.FANOUT_EXCHANGE);}@Bean("aQueue")public Queue aQueue(){return new Queue(MQConstant.FANOUT_QUEUE_A);}@Bean("bQueue")public Queue bQueue(){return new Queue(MQConstant.FANOUT_QUEUE_B);}@Bean("cQueue")public Queue cQueue(){return new Queue(MQConstant.FANOUT_QUEUE_C);}/*** 绑定队列aQueue bQueue cQueue*/@Bean("bindingFanoutExchange1")public Binding bindingFanoutExchange1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("aQueue") Queue aQueue){return BindingBuilder.bind(aQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange2")public Binding bindingFanoutExchange2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("bQueue") Queue bQueue){return BindingBuilder.bind(bQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange3")public Binding bindingFanoutExchange3(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("cQueue") Queue cQueue){return BindingBuilder.bind(cQueue).to(fanoutExchange);}

编写controller类,再postman上面测试[http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag](http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag)

@Controller
@RequestMapping("/mq")
@Slf4j
public class SendMessageController {@Autowiredprivate RabbitMqClient rabbitMqClient;@PostMapping("/sendFanoutMsg")public String sendFanoutMsg(@RequestParam("msg") String msg){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send1(messageBody);}catch (Exception e){log.error("sendFanoutMsg error{}", e);}return "send fanout msg success";}
}

结果:控制台收到消息了!!!

image-20241109213739205

3.主题交换机(topic_exchange)

topic_exchange和direct_exchange很像,topic有通配符。direct没有。

image-20241109214721827
  1. china.news 代表只关心中国新闻

  2. china.weather 代表只关心中国天气

  3. japan.news 代表只关心日本的新闻

  4. japan.weather 代表只关心日本的天气

    controller接口

    @PostMapping("/sendTopicMsg")public String sendTopicMsg(@RequestParam("msg") String msg,@RequestParam("type") String type){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send3(messageBody,type);}catch (Exception e){log.error("sendTopicMsg error{}", e);}return "send topic msg success";}
    

    利用postman测试。

    1.msg: “祖国75岁生日快乐”,type:“china.news”

    image-20241110102452558

    预测:queue1,queue4会接收到消息。

    image-20241110102659758

    2.msg: “日本大量排核废水,导致哥斯拉出现”,type:“japan.news”

    image-20241110102738974

    预测:queue2,queue4会接收到消息。

    image-20241110103638277

    3.msg: “今日日本出现大暴雨,怀疑是哥斯拉来了”,type:“Japan.weather”

    image-20241110103727452

    预测:queue2,queue3会接收到消息

    image-20241110103839382

topic-exchange在代码中如何使用。首先创建交换机,和队列,绑定交换机。

/*============================topic===========================*/@Bean("topicExchange")public TopicExchange topicExchange() {return new TopicExchange(MQConstant.TOPIC_EXCHANGE);}@Bean("queue1")public Queue queue1(){return new Queue(MQConstant.QUEUE1);}@Bean("queue2")public Queue queue2(){return new Queue(MQConstant.QUEUE2);}@Bean("queue3")public Queue queue3(){return new Queue(MQConstant.QUEUE3);}@Bean("queue4")public Queue queue4(){return new Queue(MQConstant.QUEUE4);}@Bean("bingTopicExchange1")public Binding bingTopicExchange1(@Qualifier("queue1") Queue queue1,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue1).to(topicExchange).with(MQConstant.CHINA_);}@Bean("bingTopicExchange2")public Binding bingTopicExchange2(@Qualifier("queue2") Queue queue2,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue2).to(topicExchange).with(MQConstant.JAPAN_);}@Bean("bingTopicExchange3")public Binding bingTopicExchange3(@Qualifier("queue3") Queue queue3,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue3).to(topicExchange).with(MQConstant._WEATHER);}@Bean("bingTopicExchange4")public Binding bingTopicExchange4(@Qualifier("queue4") Queue queue4,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue4).to(topicExchange).with(MQConstant._NEWS);}

消息发送

 public void send3(MessageBody messageBody,String routingKey) {try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.TOPIC_EXCHANGE, routingKey , JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}

消息接收

@RabbitListener(queues = MQConstant.QUEUE1)public void receive4(String message) {log.info("topic exchange");try {log.info("queue1 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE2)public void receive5(String message) {log.info("topic exchange");try {log.info("queue2 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE3)public void receive6(String message) {log.info("topic exchange");try {log.info("queue3 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE4)public void receive7(String message) {log.info("topic exchange");try {log.info("queue4 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/10035.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WordPress使用(1)

1. 概述 WordPress是一个开源博客框架&#xff0c;配合不同主题&#xff0c;可以有多种展现方式&#xff0c;博客、企业官网、CMS系统等&#xff0c;都可以很好的实现。 官网&#xff1a;博客工具、发布平台和内容管理系统 – WordPress.org China 简体中文&#xff0c;这里可…

hdfs:介绍三个脚本

1、jps-cluster.sh 如果我们想在Bigdata01 这台电脑上&#xff0c;查看整个集群的服务启动情况&#xff0c;可以使用这个脚本文件。 #!/bin/bash USAGE"使⽤⽅法&#xff1a;sh jps-cluster.sh" NODES("bigdata01" "bigdata02" "bigdata03…

智慧园区管理平台实现智能整合提升企业运营模式与管理效率

内容概要 在当今数字化的背景下&#xff0c;智慧园区管理平台正逐渐成为企业提升运营效率和管理模式的重要工具。这个平台汇聚了多种先进技术&#xff0c;旨在通过智能整合各类资源与信息&#xff0c;帮助企业实现全面的管理创新。 智慧园区管理平台不仅仅是一个数据处理工具…

大模型知识蒸馏技术(2)——蒸馏技术发展简史

版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl2006年模型压缩研究 知识蒸馏的早期思想可以追溯到2006年,当时Geoffrey Hinton等人在模型压缩领域进行了开创性研究。尽管当时深度学习尚未像今天这样广泛普及,但Hinton的研究已经为知识迁移和模…

python编程环境安装保姆级教程--python-3.7.2pycharm2021.2.3社区版

第1步安装解释器python-3.7.2&#xff0c;第2步安装pycharm编程软件 1、安装解释器 1.1 什么是解释器 就是将Python高级程序语言翻译成为计算机可以识别的0、1代码 1.2 安装解释器python-3.7.2&#xff08;根据自己的操作系统安装适配的解释器&#xff0c;以Windows为例&…

【仓颉】仓颉编程语言Windows安装指南 配置环境变量 最简单解决中文乱码问题和其他解决方案大全

适用于版本&#xff1a; 0.53.13 &#xff5c; 发布日期&#xff1a; 2024-10-24 &#xff08;以后的可能也适用&#xff09; 本机windows版本&#xff1a;24H2 内部版本号windows 10.0.26100 因为仓颉的官方文档一直没更新&#xff0c;所以在这里写一下如何在windows上完成这些…

VS2008 - debug版 - 由于应用程序配置不正确,应用程序未能启动。重新安装应用程序可能会纠正这个问题。

文章目录 VS2008 - debug版 - 由于应用程序配置不正确&#xff0c;应用程序未能启动。重新安装应用程序可能会纠正这个问题。概述笔记VS2008安装环境VS2008测试程序设置默认报错的情况措施1措施2备注 - exe清单文件的问题是否使用静态库?_BIND_TO_CURRENT_VCLIBS_VERSION的出处…

如何将DeepSeek部署到本地电脑

DeepSeek爆火&#xff0c;如何免费部署到你的电脑上&#xff1f;教程来了&#xff0c;先在你的本地电脑上安装Ollama&#xff0c;然后在Ollama搜索选择DeepSeek模型&#xff0c;即可成功在你的本地电脑上部署DeepSeek 一、安装Ollama 打开Ollama官网&#xff1a;https://ollam…

[Java]泛型(一)泛型类

1. 什么是泛型类&#xff1f; 泛型类是指类中使用了占位符类型&#xff08;类型参数&#xff09;的类。通过使用泛型类&#xff0c;你可以编写可以处理多种数据类型的代码&#xff0c;而无需为每种类型编写单独的类。泛型类使得代码更具通用性和可重用性&#xff0c;同时可以保…

模型I/O功能之模型包装器

文章目录 模型包装器分类LLM模型包装器、聊天模型包装器 截至2023年7月&#xff0c;LangChain支持的大语言模型已经超过了50种&#xff0c;这其中包括了来自OpenAI、Meta、Google等顶尖科技公司的大语言模型&#xff0c;以及各类优秀的开源大语言模型。对于这些大语言模型&…

【漫话机器学习系列】067.希腊字母(greek letters)-写法、名称、读法和常见用途

希腊字母&#xff08;Greek Letters&#xff09; 希腊字母在数学、科学、工程学和编程中广泛使用&#xff0c;常用于表示变量、常量、参数、角度等。以下是希腊字母的完整列表及其常见用途。 大写与小写希腊字母表 大写小写名称&#xff08;英文&#xff09;名称&#xff08;…

JxBrowser 7.41.7 版本发布啦!

JxBrowser 7.41.7 版本发布啦&#xff01; • 已更新 #Chromium 至更新版本 • 实施了多项质量改进 &#x1f517; 点击此处了解更多详情。 &#x1f193; 获取 30 天免费试用。

AI在自动化测试中的伦理挑战

在软件测试领域&#xff0c;人工智能&#xff08;AI&#xff09;已经不再是遥不可及的未来技术&#xff0c;而是正在深刻影响着测试过程的现实力量。尤其是在自动化测试领域&#xff0c;AI通过加速测试脚本生成、自动化缺陷检测、测试数据生成等功能&#xff0c;极大提升了测试…

单片机基础模块学习——超声波传感器

一、超声波原理 左边发射超声波信号&#xff0c;右边接收超声波信号 左边的芯片用来处理超声波发射信号&#xff0c;中间的芯片用来处理接收的超声波信号 二、超声波原理图 T——transmit 发送R——Recieve 接收 U18芯片对输入的N_A1信号进行放大&#xff0c;然后输入给超声…

蓝桥杯之c++入门(一)【C++入门】

目录 前言5. 算术操作符5.1 算术操作符5.2 浮点数的除法5.3 负数取模5.4 数值溢出5.5 练习练习1&#xff1a;计算 ( a b ) ⋆ c (ab)^{\star}c (ab)⋆c练习2&#xff1a;带余除法练习3&#xff1a;整数个位练习4&#xff1a;整数十位练习5&#xff1a;时间转换练习6&#xff…

Redis --- 分布式锁的使用

我们在上篇博客高并发处理 --- 超卖问题一人一单解决方案讲述了两种锁解决业务的使用方法&#xff0c;但是这样不能让锁跨JVM也就是跨进程去使用&#xff0c;只能适用在单体项目中如下图&#xff1a; 为了解决这种场景&#xff0c;我们就需要用一个锁监视器对全部集群进行监视…

房屋租赁系统在数字化时代中如何重塑租赁服务与提升市场竞争力

内容概要 在当今快速发展的数字化时代&#xff0c;房屋租赁系统的作用愈发重要。随着市场需求的变化&#xff0c;租赁服务正面临着新的挑战与机遇。房屋租赁系统不仅仅是一个简单的管理工具&#xff0c;更是一个能够提升用户体验和市场竞争力的重要平台。其核心功能包括合同管…

INCOSE需求编写指南-附录 D: 交叉引用矩阵

附录 Appendix D: 交叉引用矩阵 Cross Reference Matrices Rules to Characteristics Cross Reference Matrix NRM Concepts and Activities to Characteristics Cross Reference Matrix Part 1 NRM Concepts and Activities to Characteristics Cross Reference Matrix Part…

Java---入门基础篇(上)

前言 本片文章主要讲了刚学Java的一些基础内容,例如注释,标识符,数据类型和变量,运算符,还有逻辑控制等,记录的很详细,带你从简单的知识点再到练习题.如果学习了c语言的小伙伴会发现,这篇文章的内容和c语言大致相同. 而在下一篇文章里,我会讲解方法和数组的使用,也是Java中基础…

新版231普通阿里滑块 自动化和逆向实现 分析

声明: 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;抓包内容、敏感网址、数据接口等均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff01; 逆向过程 补环境逆向 部分补环境 …