RabbitMQ基础篇

文章目录

  • 1 RabbitMQ概述
    • 1.1 消息队列
    • 1.2 RabbitMQ体系结构
  • 2 RabbitMQ工作模式
    • 2.1 简单模式(Simple Queue)
    • 2.2 工作队列模式(Work Queues)
    • 2.3 发布/订阅模式(Publish/Subscribe)
    • 2.4 路由模式(Routing)
    • 2.5 通配符模式(Topics)
  • 3 RabbitMQ整合SpringBoot
    • 3.1 @RabbitListener注解属性
    • 3.2 消费者工程
    • 3.3 生产者工程
  • 4 消息可靠性投递
    • 4.1 什么是消息可靠投递?
    • 4.2 消息的可靠发送
      • 4.2.1 消息确认机制
        • ①模块准备
        • ②配置类说明
        • ③配置类示例
        • ④测试代码
      • 4.2.2 备用交换机
        • ①备用交换机配置
        • ②备用交换机测试
    • 4.3 消息的可靠存储
      • 4.3.1 非持久化交换机和队列
      • 4.3.2 持久化交换机和消息队列
        • ①@Queue注解分析
        • ②@Exchange注解分析
    • 4.4 消息的可靠消费
      • 4.4.1 模块准备
      • 4.4.2 手动确认思路
        • ①basicAck()方法
        • ②basicNack()方法
        • ③basicReject()方法
      • 4.4.3 可靠消费代码
    • 4.5 消息可靠性投递架构
  • 5 消费端限流
    • 5.1 未设置prefetch
    • 5.2 设置prefetch
  • 6 消息超时
    • 6.1 队列层面设置过期时间
    • 6.2 消息层面设置过期时间
  • 7 死信和死信队列
    • 7.1 准备工作
      • 7.1.1 正常交换机和正常消息队列
      • 7.1.2 死信交换机和死信队列
      • 7.1.3 常量声明
    • 7.2 死信--拒绝
    • 7.3 死信--超时和溢出
  • 8 延时队列
    • 8.2 延迟插件的使用
      • 8.2.1 生产者端
      • 8.2.2 消费者端
        • ① ui界面创建延迟交换机和队列
        • ②代码创建延迟交换机和队列
      • 8.2.3 效果展示
  • 9 事务消息
    • 9.1 什么是事务消息?
    • 9.2 Springboot发送事务消息
      • 9.2.1 准备工作
      • 9.2.2 没有事务消息
      • 9.2.3 使用事务消息
    • 9.3 Channel发送事务消息
  • 10 惰性队列
    • 10.1 队列策略设定
    • 10.3 `queue.declare`参数设定
  • 11 优先级队列
    • 11.1 准备工作
    • 11.2 使用优先级队列

1 RabbitMQ概述

RabbitMQ简易安装教程

# 拉取镜像
docker pull rabbitmq:3.13-management# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-managementdocker exec -it 5129c41ad3d8 /bin/bash # 数字是rabbitmq的id,可以通过docker ps查看rabbitmq-plugins enable rabbitmq_management #启用 RabbitMQ Management 插件,使得你可以轻松地监控和管理 RabbitMQ 服务器

访问登录:http://192.168.145.130:15672,账号密码就是上面指定的

1.1 消息队列

消息队列是实现应用程序之间通信的中间件

消息队列的好处

  • 消息的发送者和接收者进行异步通信
  • 流量高峰保证服务稳定,消息队列可以暂存大量消息,达到流量削峰
  • 扩展性高,可以水平扩展以支持更多的发送者和接收者,相应地增加或减少资源处理(功能处理)
  • 解耦:消息的发送者和接收者只专注于消息,无需关系彼此细节

主流MQ对比

image-20240526195043655

1.2 RabbitMQ体系结构

image-20240527160657626
  • Channel(信道):信道是生产者消费者和RabbitMQ服务器之间通信的桥梁。所有的消息发布和消费都由信道来完成的

    • 建立在TCP连接上的虚拟连接,允许在单个TCP连接上建立多个信道,从而实现多线程处理
    • 每个线程对应一个信道,信道在RabbitMQ中具有唯一的ID,保证了信道的私有性
    • 引入信道的概念是为了减少建立和销毁TCP连接的开销,提高系统性能
  • Exchange(交换机):负责接收消息并根据路由键将消息转发到绑定的队列

  • Queue(队列):队列是RabbitMQ中用于存储消息的容器,消息按照先进先出的顺序进行处理

  • Virtual Host(虚拟主机):是RabbitMQ中的命名空间(理解为分组),用于隔离不同的环境或应用程序。每个虚拟主机都有自己的队列、交换机和绑定关系

  • Broker(代理服务器):指RabbitMQ服务器本身,多个Broker组合成一个RabbitMQ集群

2 RabbitMQ工作模式

image-20240527195355740

  • 简单模式:生产者向默认交换机发送消息,默认交换机将消息放到队列中,消费者监听并消费消息
  • 工作队列模式:生产者向默认交换机发送消息,默认交换机将消息放到消息队列,多个消费者监听消息队列竞争消息(其实就是简单模式的升级版)
  • 发布/订阅模式:扇出交换机接收消息并将消息发送给所有订阅了该交换机的队列
  • 消息生产者将消息发送给Direct交换机,Direct交换机根据routing key路由键将消息发送到指定队列(用的最多)
  • 消息生产者将消息发送给Topic交换机,Topic交换机根据通配符形式的路由键将消息发送到指定队列

项目导入依赖:采用原生的方式,开发中都是集成框架的

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
</dependencies>

封装连接工具类:

import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  public class ConnectionUtil {  public static final String HOST_ADDRESS = "192.168.145.160";public static Connection getConnection() throws Exception {  // 定义连接工厂  ConnectionFactory factory = new ConnectionFactory();// 设置服务地址  factory.setHost(HOST_ADDRESS);// 端口  factory.setPort(5672);//设置账号信息,用户名、密码、vhost  factory.setVirtualHost("/");  factory.setUsername("guest");  factory.setPassword("123456");// 通过工程获取连接  Connection connection = factory.newConnection();return connection;  }
}

2.1 简单模式(Simple Queue)

生产者向默认交换机发送消息,默认交换机将消息放到队列中,消费者监听并消费消息

image-20240527195355740

生产者:发送消息

public class Producer {  public static void main(String[] args) throws Exception {// 获取连接Connection connection = ConnectionUtil.getConnection();// 创建频道  Channel channel = connection.createChannel();  // 声明(创建)队列  // queue      参数1:队列名称  // durable    参数2:是否定义持久化队列,当 MQ 重启之后还在  // exclusive  参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列  // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除  // arguments  参数5:队列其它参数  channel.queueDeclare("simple_queue", true, false, false, null);  // 要发送的信息  String message = "你好;小兔子!";  // 参数1:交换机名称,如果没有指定则使用默认Default Exchange  // 参数2:路由key,简单模式可以传递队列名称  // 参数3:配置信息  // 参数4:消息内容  channel.basicPublish("", "simple_queue", null, message.getBytes());  System.out.println("已发送消息:" + message);  // 关闭资源  channel.close();  connection.close();}
}

运行效果:新增队列:simple_queue

image-20240527200148062

image-20240527200606545

消费者

public class Consumer {  public static void main(String[] args) throws Exception {// 获取连接Connection connection = ConnectionUtil.getConnection();// 创建ChannelChannel channel = connection.createChannel();  // 创建队列// 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建  // 参数1. queue:队列名称  // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在  // 参数3. exclusive:是否独占。  // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉  // 参数5. arguments:其它参数。  channel.queueDeclare("simple_queue",true,false,false,null);  // 接收消息  DefaultConsumer consumer = new DefaultConsumer(channel){// 回调方法,当收到消息后,会自动执行该方法  // 参数1. consumerTag:标识  // 参数2. envelope:获取一些信息,交换机,路由key...  // 参数3. properties:配置信息  // 参数4. body:数据  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);  System.out.println("Exchange:"+envelope.getExchange());  System.out.println("RoutingKey:"+envelope.getRoutingKey());  System.out.println("properties:"+properties);  System.out.println("body:"+new String(body));}};// 参数1. queue:队列名称  // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息  // 参数3. callback:回调对象  // 消费者类似一个监听程序,主要是用来监听消息  channel.basicConsume("simple_queue",true,consumer);}
}

控制台打印:

image-20240527202647372

消息被消费掉了,所以RabbitMQ服务器上没有了

image-20240527201201641

2.2 工作队列模式(Work Queues)

生产者向默认交换机发送消息,默认交换机将消息放到消息队列,多个消费者监听消息队列竞争消息(其实就是简单模式的升级版)

生产者

public class Producer {  public static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  for (int i = 1; i <= 10; i++) {String body = i+"hello rabbitmq~~~";channel.basicPublish("",QUEUE_NAME,null,body.getBytes());}  channel.close();connection.close();}
}

发送消息:

image-20240527204259210

消费者1:

public class Consumer1 {  static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}

消费者2:

public class Consumer2 {static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer2 body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}

注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序

运行结果:两个消费者竞争消息队列中消息

image-20240527204854307

2.3 发布/订阅模式(Publish/Subscribe)

rabbitmq消息通讯过程:消息生产者将消息发送给交换机,由交换机处理消息。Exchange(交换机)只负责转发消息,不存储消息,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

常见的交换机类型

  • Fanout Exchange(扇出交换机),将消息发送给所有绑定到交换机的队列
  • Direct Exchange(直连交换机),把消息交给符合指定routing key的队列
  • Topic Exchange(主题交换机),把消息交给符合routing pattern(路由模式)的队列
  • Default Exchange(默认交换机),把消息发送给指定队列

发布/订阅模式:扇出交换机接收消息并将消息发送给所有订阅了该交换机的队列

生产者:

public class Producer {public static void main(String[] args) throws Exception {// 1、获取连接  Connection connection = ConnectionUtil.getConnection();// 2、创建频道  Channel channel = connection.createChannel();  // 参数1. exchange:交换机名称  // 参数2. type:交换机类型  //     DIRECT("direct"):定向  //     FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。  //     TOPIC("topic"):通配符的方式  //     HEADERS("headers"):参数匹配  // 参数3. durable:是否持久化  // 参数4. autoDelete:自动删除  // 参数5. internal:内部使用。一般false  // 参数6. arguments:其它参数  String exchangeName = "test_fanout";  // 3、创建交换机  channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);  // 4、创建队列  String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name,true,false,false,null);  channel.queueDeclare(queue2Name,true,false,false,null);  // 5、绑定队列和交换机  // 参数1. queue:队列名称  // 参数2. exchange:交换机名称  // 参数3. routingKey:路由键,绑定规则  //     如果交换机的类型为fanout,routingKey设置为""  channel.queueBind(queue1Name,exchangeName,"");  channel.queueBind(queue2Name,exchangeName,"");  String body = "日志信息:张三调用了findAll方法...日志级别:info...";  // 6、发送消息  channel.basicPublish(exchangeName,"",null,body.getBytes());  // 7、释放资源  channel.close();  connection.close();}
}

消费者1:

public class Consumer1 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name = "test_fanout_queue1";  channel.queueDeclare(queue1Name,true,false,false,null);Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));  System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");}  };channel.basicConsume(queue1Name,true,consumer);}
}

消费者2

public class Consumer2 {  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue2Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));  System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");}};channel.basicConsume(queue2Name,true,consumer);}
}

先启动两个消费者,再启动生产者发送消息

image-20240527211334592

交换机和队列的绑定关系如下图所示:

image-20240527211442292

发布订阅模式与工作队列模式的区别:

  • 工作队列模式消息由默认交换机处理,发布订阅模式消息由指定交换机处理
  • 监听同一个队列的消费端程序彼此之间是竞争关系
  • 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息

2.4 路由模式(Routing)

消息生产者将消息发送给Direct交换机,Direct交换机根据routing key路由键将消息发送到指定队列(用的最多)

当Direct交换机用相同的路由键routing key绑定多个队列,就会有广播效果(类似发布订阅)

生产者:

public class Producer {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName = "test_direct";// 创建交换机  channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);// 创建队列  String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";// 声明(创建)队列  channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);// 队列绑定交换机  // 队列1绑定error  channel.queueBind(queue1Name, exchangeName, "error");// 队列2绑定info error warning  channel.queueBind(queue2Name, exchangeName, "info");channel.queueBind(queue2Name, exchangeName, "error");channel.queueBind(queue2Name, exchangeName, "warning");String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";// 发送消息  channel.basicPublish(exchangeName, "warning", null, message.getBytes());System.out.println(message);// 释放资源  channel.close();connection.close();}
}

消费者1:

public class Consumer1 {  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name = "test_direct_queue1";channel.queueDeclare(queue1Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));  System.out.println("Consumer1 将日志信息打印到控制台.....");}};channel.basicConsume(queue1Name,true,consumer);}
}

消费者2:

public class Consumer2 {  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name = "test_direct_queue2";channel.queueDeclare(queue2Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("Consumer2 将日志信息存储到数据库.....");}};channel.basicConsume(queue2Name,true,consumer);}
}

先启动两个消费者,再启动生产者

绑定关系:

image-20240527214607379

消费者2接受到消息,消费者1没有消息

image-20240527214651865

2.5 通配符模式(Topics)

消息生产者将消息发送给Topic交换机,Topic交换机根据通配符形式的路由键将消息发送到指定队列

(通配符规则:#:匹配零个或多个词,*:匹配一个词)

生产者:

public class Producer {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName = "test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);// 绑定队列和交换机  // 参数1. queue:队列名称  // 参数2. exchange:交换机名称  // 参数3. routingKey:路由键,绑定规则  //      如果交换机的类型为fanout ,routingKey设置为""  // routing key 常用格式:系统的名称.日志的级别。  // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库  channel.queueBind(queue1Name, exchangeName, "#.error");channel.queueBind(queue1Name, exchangeName, "order.*");channel.queueBind(queue2Name, exchangeName, "*.*");// 分别发送消息到队列:order.info、goods.info、goods.error  String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";channel.basicPublish(exchangeName, "order.info", null, body.getBytes());body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";channel.basicPublish(exchangeName, "goods.info", null, body.getBytes());body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";channel.basicPublish(exchangeName, "goods.error", null, body.getBytes());channel.close();connection.close();}
}

消费者1监听消息队列1

public class Consumer1 {  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String QUEUE_NAME = "test_topic_queue1";channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}

消费者2监听消息队列2

public class Consumer2 {  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String QUEUE_NAME = "test_topic_queue2";channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}

先启动两个消费者,接着启动生产者发送消息

image-20240527215241465

3 RabbitMQ整合SpringBoot

项目基本四步骤基本步骤:建module,改POM,写YAML,主启动

3.1 @RabbitListener注解属性

  • bindings属性:指定交换机和队列之间的绑定关系,指定当前方法要监听的队列

    隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们

  • queues属性

    @RabbitListener(queues = {QUEUE_LINZHUOWEI})
    
    • 作用:指定当前方法要监听的队列
    • 此时框架不会创建相关交换机和队列,必须提前创建好

3.2 消费者工程

  1. 建module:module06-boot-consumer

  2. 改POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--引入web模块为了保证项目一直运行,持久监听消息队列消息--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
    </dependencies>
    
  3. 写YAML

    spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /logging:level:com.linzhuowei.mq.listener.MyMessageListener: info
    
  4. 主启动:正常添加@SpringBootApplication

  5. 监听器:

    import lombok.extern.slf4j.Slf4j;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;@Component
    @Slf4j
    public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order";  public static final String ROUTING_KEY = "order";  public static final String QUEUE_NAME  = "queue.order";  @RabbitListener(bindings = @QueueBinding(//队列信息value = @Queue(value = QUEUE_NAME, durable = "true"),//交换机信息exchange = @Exchange(value = EXCHANGE_DIRECT),//路由键信息,赋值为字符串数组key = {ROUTING_KEY}))public void processMessage(//对应消息数据本身,形参类型需要和发送消息的数据类型对应String data,//对应发送的消息本身,可以通过message获取消息数据,包括dataMessage message,//频道对象Channel channel) {log.info(data);}}
    

运行查看后台管理:

image-20241021163148733

如图:交换机exchange.direct.order通过order路由键绑定消息队列

题外话:

  • 使用@RabbitListenerbindings属性能绑定交换机和队列的关系并监听队列消息,如果RabbitMQ服务中没有交换机和队列,则会自动创建该队列
  • 使用@RabbitListenerqueues属性,监听指定消息队列

所以如果只是单纯监听消息队列,不考虑交换机和队列的创建以及绑定(因为这些创建操作可以在后台页面点击完成嘿嘿),消费者代码也可以这样写:

@RabbitListener(queues = {QUEUE_LINZHUOWEI})
public void processMessage(//对应消息数据本身,形参类型需要和发送消息的数据类型对应String data,//对应发送的消息本身,可以通过message获取消息数据,包括dataMessage message,//频道对象Channel channel) {log.info(data);
}

但建议还是写第一种

3.3 生产者工程

  1. 新建模块:module05-boot-producer

  2. 改POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
    </dependencies>
    
  3. 写YAML

    spring: rabbitmq: host: 192.168.145.130port: 5672 username: guest password: 123456 virtual-host: /
    
  4. 主启动

    import org.springframework.boot.SpringApplication;  
    import org.springframework.boot.autoconfigure.SpringBootApplication;  @SpringBootApplication
    public class RabbitMQProducerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args);  }
    }
    
  5. 测试程序

    @SpringBootTest  
    public class RabbitMQTest {  public static final String EXCHANGE_DIRECT = "exchange.direct.order";  public static final String ROUTING_KEY = "order";@Autowired  private RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() {  rabbitTemplate.convertAndSend(  EXCHANGE_DIRECT,   ROUTING_KEY,   "Hello linzhuowei");  }  
    }
    

4 消息可靠性投递

4.1 什么是消息可靠投递?

消息可靠投递是确保消息从生产者发送到消息队列,再从消息队列消费到消费者的过程中,不丢失消息或重复处理消息

消息可靠投递主要三个方面:

  1. 消息的可靠发送(生产者 -> 消息队列)
  2. 消息的可靠存储(消息队列内部存储)
  3. 消息的可靠消费(消息队列 -> 消费者)

下面分别说这三个部分

4.2 消息的可靠发送

通过消息发送回调接口或备用交换机保证消息从生产者成功发送到消息队列中

4.2.1 消息确认机制

应答确认+ 失败重试

生产者发送消息后等待消息队列的响应,确保消息成功送达,如果发送失败可以尝试重新发送

①模块准备
  1. 新建模块:module07-confirm-producer

  2. 改POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
    </dependencies>
    
  3. 主启动

  4. 写YAML:启用消息确认机制

    spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /publisher-confirm-type: CORRELATED # 交换机的确认publisher-returns: true # 队列的确认
    logging:level:com.linzhuowei.mq.config.MQProducerAckConfig: info
    
②配置类说明

通过配置类设置RabbitTemplate的回调接口,通过回调方法获取RabbitMQ服务器返回的确认信息,实现消息确认机制

代码实现过程:配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中

两个接口两个回调方法,是否发送到交换机和是否发送到消息队列

方法名方法功能所属接口接口所属类
confirm()确认消息是否发送到交换机ConfirmCallbackRabbitTemplate
returnedMessage()确认消息是否发送到队列ReturnsCallbackRabbitTemplate
  1. ConfirmCallback接口(RabbitTemplate内部的接口)

    /*** A callback for publisher confirmations.**/
    @FunctionalInterface
    public interface ConfirmCallback {/*** Confirmation callback.* @param correlationData correlation data for the callback.* @param ack true for ack, false for nack* @param cause An optional cause, for nack, when available, otherwise null.*/void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);}
    

    生产者端发送消息之后,回调confirm()方法

    • ack参数值为true:表示消息成功发送到了交换机
    • ack参数值为false:表示消息没有发送到交换机
  2. ReturnCallback接口(RabbitTemplate内部的接口)

    /*** A callback for returned messages.** @since 2.3*/
    @FunctionalInterface
    public interface ReturnsCallback {/*** Returned message callback.* @param returned the returned message and metadata.*/void returnedMessage(ReturnedMessage returned);}
    

    接口中的returnedMessage()方法仅在消息没有发送到队列时调用

ReturnedMessage类中主要属性含义如下:

属性名类型含义
messageorg.springframework.amqp.core.Message消息以及消息相关数据
replyCodeint应答码,类似于HTTP响应状态码
replyTextString应答码说明
exchangeString交换机名称
routingKeyString路由键名称
③配置类示例

配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中

import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送到交换机成功!数据:" + correlationData);} else {log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);}}@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("消息发送到消息队列失败...");log.info("消息主体: " + new String(returned.getMessage().getBody()));log.info("应答码: " + returned.getReplyCode());log.info("描述:" + returned.getReplyText());log.info("消息使用的交换器 exchange : " + returned.getExchange());log.info("消息使用的路由键 routing : " + returned.getRoutingKey());}
}
④测试代码
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest  
public class RabbitMQTest {  public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";@Autowired  private RabbitTemplate rabbitTemplate;@Test  public void testSendMessage() {  rabbitTemplate.convertAndSend(  EXCHANGE_DIRECT,   ROUTING_KEY,   "Hello atguigu");  }  }

通过调整代码,测试如下三种情况:

  • 交换机正确、路由键正确
  • 交换机正确、路由键不正确,无法发送到队列
  • 交换机不正确,无法发送到交换机

回顾交换机作用:接收消息并路由到消息队列

4.2.2 备用交换机

①备用交换机配置

当消息在队列中未被处理时(如消息过期、消息被拒绝或达到最大重试次数,无匹配队列等),这些消息就会转发到备用交换机

本次案例模拟交换机没有匹配的消息队列,消息转至备用交换机

  1. 首先创建备用交换机(扇出类型):

    exchange.direct.order.backup
    

    image-20241022174034510

    image-20241022174127199

  2. 创建备用消息队列

    queue.order.backup
    

    image-20241022174305118

    image-20241022174317509

  3. 将备用消息队列绑定备用交换机

    image-20241022174431672\

  4. 重新创建原交换机(置顶备用交换机)

    exchange.direct.order
    

    需要删除原来的直连交换机,重新创建直连交换机,并设置备用交换机

    exchange.direct.order.backup
    

    image-20241022175031949

    image-20241022175047422

  5. 原交换机绑定原队列

    image-20241022175155189

    image-20241022175207880

    image-20241022175218262

②备用交换机测试

消息发送端:

@SpringBootTest  
public class RabbitMQTest {  public static final String EXCHANGE_DIRECT = "exchange.direct.order";//exchange.direct.order交换机绑定的路由键为order,这里order路由键错误,会转到备用交换机public static final String ROUTING_KEY = "order1";@Autowired  private RabbitTemplate rabbitTemplate;@Test  public void testSendMessage() {  rabbitTemplate.convertAndSend(  EXCHANGE_DIRECT,   ROUTING_KEY+"11","Hello 备用交换机");}  }

结果:消息发送成功,消息先发往直连交换机exchange.direct.order,由于路由键无效,没有匹配的消息队列,所以消息发往备用的扇出交换机exchange.direct.order.backup,最终发送到消息队列queue.test.backup

4.3 消息的可靠存储

通过将消息持久化到硬盘上防止消息队列宕机导致内存中消息丢失(交换机默认持久化,消息队列有指定也是默认持久化)

4.3.1 非持久化交换机和队列

即消息在内存存储,重启消息丢失

  1. 创建非持久化交换机

    image-20241022180407671

  2. 创建非持久化消息队列

    image-20241022180520671

  3. 绑定交换机和消息队列的关系

    image-20241022180609220

测试:发送消息后,队列成功收到消息。

docker restart rabbitmq

重启rabbitmq,内存的消息丢失,内存掉电设备

4.3.2 持久化交换机和消息队列

先来看卡监听消息队列的写法

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}
))
public void processMessage(String dateString,Message message,Channel channel) {log.info(dateString);
}

关注@RabbitListener中,@QueueBinding中的value和exchange两个注解,分别是QueueExchange类型

①@Queue注解分析

@Queue注解抽出关注的部分

@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {/*** Specifies if this queue should be durable.* By default if queue name is provided it is durable.* @return true if the queue is to be declared as durable.* @see org.springframework.amqp.core.Queue#isDurable()*/String durable() default "";/*** Specifies if this queue should be auto deleted when not used.* By default if queue name is provided it is not auto-deleted.* @return true if the queue is to be declared as auto-delete.* @see org.springframework.amqp.core.Queue#isAutoDelete()*/String autoDelete() default "";
}
  • durable属性:By default if queue name is provided it is durable
  • autoDelete属性:By default if queue name is provided it is not auto-deleted

翻译就是:只要消息队列指定,默认持久化且不自动删除

②@Exchange注解分析

@Exchange注解抽出有用的部分

@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {/*** @return false if the exchange is to be declared as non-durable.*/String durable() default TRUE;/*** @return true if the exchange is to be declared as auto-delete.*/String autoDelete() default FALSE;
}
  • durable属性默认true:false if the exchange is to be declared as non-durable
  • autoDelete属性默认false:true if the exchange is to be declared as auto-delete

交换机默认持久化

4.4 消息的可靠消费

消息确认机制

  • 自动确认:消费者接收消息后自动返回ACK确认,RabbitMQ删除消息。自动确认机制,消息处理失败会导致消息丢失(因为消息已删)
  • 手动确认:消费者处理消息成功后,显式发送ACK给消息队列,通知RabbitMQ消息成功消费删除消息,消费者处理消息失败后,显示发送NACK给消息队列,通知RabbitMQ消息消费失败,执行相应的失败策略。手动确认机制保证消息的可靠消费

4.4.1 模块准备

  1. POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
    </dependencies>
    
  2. YAML:开启手动确认机制

    spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual # 把消息确认模式改为手动确认
    
  3. 主启动

  4. 消息监听:其实durableautoDelete可以不设置,默认值就是这样的

    @Component
    public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME  = "queue.order";// 修饰监听方法@RabbitListener(// 设置绑定关系bindings = @QueueBinding(// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),// 配置路由键信息key = {ROUTING_KEY}))public void processMessage(String dataString, Message message, Channel channel) {}}
    

4.4.2 手动确认思路

未命名文件

  • 步骤1:YAML配置文件把消息确认模式改为手动确认
  • 步骤2:调用Channel对象的方法返回信息
    • ACK:Acknowledgement,表示消息处理成功
    • NACK:Negative Acknowledgement,表示消息处理失败
    • Reject:拒绝,同样表示消息处理失败
  • 步骤3:拒绝或者消息处理失败的后续操作
    • requeue为true:重新放回队列,重新投递,再次尝试
    • requeue为false:不放回队列,不重新投递
①basicAck()方法
  • 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean multiple取值为true:为小于、等于deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息
②basicNack()方法
  • 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean multiple取值为true:为小于、等于deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息
boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列
③basicReject()方法
  • 方法功能:根据指定的deliveryTag,对该消息表示拒绝
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列

basicNack()和basicReject()有啥区别?

  • basicNack()有批量操作
  • basicReject()没有批量操作

Fanout交换机,同一个消息广播到不同的队列,deliveryTag会重复吗?不会,deliveryTag在Broker范围内唯一

4.4.3 可靠消费代码

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME  = "queue.order";// 修饰监听方法@RabbitListener(// 设置绑定关系bindings = @QueueBinding(// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),// 配置路由键信息key = {ROUTING_KEY}))public void processMessage(String dataString, Message message, Channel channel) throws IOException {// 1、获取当前消息的 deliveryTag 值备用long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 2、正常业务操作log.info("消费端接收到消息内容:" + dataString);// System.out.println(10 / 0);// 3、给 RabbitMQ 服务器返回 ACK 确认信息channel.basicAck(deliveryTag, false);} catch (Exception e) {// 4、获取信息,看当前消息是否曾经被投递过Boolean redelivered = message.getMessageProperties().getRedelivered();if (!redelivered) {// 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次channel.basicNack(deliveryTag, false, true);} else {// 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列channel.basicReject(deliveryTag, false);}}}
}

4.5 消息可靠性投递架构

MQ是系统解耦利器,能很好解除消息发送者和消息接收者之间的耦合。如何保证消息可靠性投递?通过前面我们知道主要分消息可靠发送,消息可靠存储,消息可靠消费。这一小节我们用另一个角度分析

image-20250117153649131

要保证消息可靠性投递,我们分上下两个半场

  • 上半场123分别对应:1发送方调用主动API发送消息,2MQ服务端收到消息并将消息落库(持久化),3发送方收到回调ACK(确认消息成功投递到MQ服务器)

    timer起作用:步骤 3,如果发送方没有收到回调确认(比如服务端由于网络问题或者其他原因未能正确发送 ACK),则发送方会启动一个定时器,尝试重新发送消息。如果多次发送失败(超时),发送方会向业务方回调发送失败,这通常是在重试机制达到最大次数或超时后触发的

  • 下半场456分别对应:4消费端接收消息处理业务逻辑,5接收方(消费者)发送 ACK 回应消息处理成功,6MQ服务端收到ACK并将库中的消息删除

    timer起作用:步骤 5,消费者没有及时发送 ACK(比如消费者处理超时或发生了异常),MQ 服务端会启动定时器等待 ACK

    如果 MQ 服务端在规定时间内没有收到消费者的 ACK,timer 会触发重试机制,可能重新将消息投递到消费者,只到确认消息被处理并收到 ACK 后,消息才会从 MQ 服务端的持久化存储中删除,以确保消息的可靠性

上下半场均有重发,重发策略有定时重发(如每个10s重发直到超出次数)和指数退避(X秒重发,2X秒重发,4X秒重发)

综合来看关键点在于如何保证消息幂等

  • 上半场消息幂等:发送方没有收到回调ACK,会重新发送消息到MQ服务器。上半场的消息幂等性有MQ服务器完成,MQ会为每条消息生成全局唯一的message ID用作去重和幂等依据(上半场消息幂等由MQ服务器完成无需关注)
  • 下半场消息幂等:MQ服务端超时未收到ACK,导致MQ重复投递消息。业务方会收到重复消息,业务方需要保证消息幂等性。比如消息携带全局唯一id用于保证幂等,再处理消息前判断即可

5 消费端限流

利用消息队列的削峰限流,平滑流量避免大量请求涌入,限制请求数量,避免对后端服务造成过大的压力

常见的削峰限流策略有:

image-20241023191236100

通过prefetch来设置消费者**同时接收未确认的消息的数量**,每次预取的消息数量来实现流量削峰

5.1 未设置prefetch

首先向消息队列中发送100个消息

public void testSendMessage() {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Hello atguigu" + i);}
}

image-20241024161821436

  • Ready100
  • Unack0
  • Total100

消息消费者监听对应的消息队列

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}
))
public void processMessage(String dataString,Message message,Channel channel) throws InterruptedException, IOException {// 正常业务操作log.info("消费端接收到消息内容:" + dataString);//如果不睡1秒,瞬间为0TimeUnit.SECONDS.sleep(1);//手动确认ACKchannel.basicAck(deliveryTag, false);
}

显示结果:Ready直接为0,Unack和Total逐渐减少直到0

image-20241024161947405

5.2 设置prefetch

修改YAML

spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manualprefetch: 1 # 设置每次最多从消息队列服务器取回多少消息(同时接收未确认的消息的数量)

首先发送消息:

public void testSendMessage() {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Hello atguigu" + i);}
}

image-20241024161821436

消息消费者监听对应的消息队列

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}
))
public void processMessage(String dataString,Message message,Channel channel) throws InterruptedException, IOException {// 正常业务操作log.info("消费端接收到消息内容:" + dataString);//如果不睡1秒,瞬间为0TimeUnit.SECONDS.sleep(1);//手动确认ACKchannel.basicAck(deliveryTag, false);
}

效果,监听者每次只取一个消息消费,同时未确认消息只有prefetch

image-20241024164743749

6 消息超时

设置过期时间,消息超过过期时间自动删除(更准确的说超时消息会变成死信)

可通过两个层面设置过期时间

  • 队列层面:设置队列的消息过期时间,队列内的消息超出过期时间自动删除
  • 消息层面:设置具体某个消息的过期时间,消息超出过期时间自动删除

如果两个层面都有设置,以过期时间短的为准

6.1 队列层面设置过期时间

创建交换机

image-20241027013536936

创建消息队列,并设置过期时间10000毫秒

image-20241027013449844

绑定交换机

image-20241027013654535

发送消息,不启动消费端,等待消息过期

image-20241027153738779

6.2 消息层面设置过期时间

MessagePostProcessor 是 Spring Framework 的接口,在消息发送前对消息进行处理和修改。通过接口MessagePostProcessor接口在消息层面设置过期时间

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;@Test  
public void testSendMessageTTL() {  // 1、创建消息后置处理器对象  MessagePostProcessor messagePostProcessor = (Message message) -> {  // 设定 TTL 时间,以毫秒为单位message.getMessageProperties().setExpiration("5000");  return message;};// 2、发送消息  rabbitTemplate.convertAndSend(    EXCHANGE_DIRECT,     ROUTING_KEY,     "Hello linzhuowei", messagePostProcessor);    
}

原来消息队列queue.test.timeout过期时间10000毫秒,消息层面设置过期时间5000毫秒,以短的过期时间为标准,发送消息,等待消息过期

image-20241027154917714

7 死信和死信队列

无法正常被消费的消息就称为死信

死信的原因有三种(就是消息没有被正常消费):

  • 拒绝:消费者拒绝消息,basicNack()/basicReject(),并且不把消息重新放回原目标队列(requeue=false
  • 超时:消息达到超时时间未被消费
  • 溢出:队列中消息数量达到最大限制,根据队列先进先出原理,后来再进入一条消息,队列中最早的消息会变成死信

死信的处理方式大致三种:

  • 丢弃:不处理,死信直接丢弃
  • 入库:死信写入数据库,日后处理
  • 监听:死信进入死信队列,消费端监听死信队列,做后序处理(通常采用)

下面分别演示三种死信成因

7.1 准备工作

7.1.1 正常交换机和正常消息队列

  • 正常交换机:exchange.normal.video
  • 正常队列:queue.normal.video
  • 正常路由键:routing.key.normal.video
  1. 创建正常交换机

    image-20241027163756613

  2. 创建正常队列,写好死信队列和死信交换机

    image-20240318165821774

  3. 绑定正常消息队列和正常交换机

    image-20241027165215532

    完成设施后设置如下

    image-20241027165346613

7.1.2 死信交换机和死信队列

  • 死信交换机:exchange.dead.letter.video
  • 死信队列:queue.dead.letter.video
  • 死信路由键:routing.key.dead.letter.video
  1. 创建死信交换机

    image-20241027164612397

  2. 创建死信队列

    image-20241027164929494

  3. 绑定死信队列和死信交换机

    image-20241027172818810

7.1.3 常量声明

public static final String EXCHANGE_NORMAL = "exchange.normal.video";  
public static final String EXCHANGE_DEAD_LETTER = "exchange.dead.letter.video";  public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";  
public static final String ROUTING_KEY_DEAD_LETTER = "routing.key.dead.letter.video";  public static final String QUEUE_NORMAL = "queue.normal.video";  
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";

7.2 死信–拒绝

  1. 发送端发送消息到正常交换机

    @Test  
    public void testSendMessageButReject() {  rabbitTemplate  .convertAndSend(  EXCHANGE_NORMAL,  ROUTING_KEY_NORMAL,  "★[normal]发送消息--正常交换机--正常消息队列...");  
    }
    
  2. 消费端监听正常消息队列和死信队列

    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;import java.io.IOException;@Component
    @Slf4j
    public class DeadLetterListener {public static final String QUEUE_NORMAL = "queue.normal.video";public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";@RabbitListener(queues = {QUEUE_NORMAL})public void processMessageNormal(Message message, Channel channel) throws IOException {// 消费端监听正常消息队列,接收并拒绝消息log.info("★[normal]接收消息,但拒绝消息且不重新放入队列...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = {QUEUE_DEAD_LETTER})public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {// 消费端监听死信队列,接收并成功消费消息log.info("★[dead letter]监听死信队列,接收到死信消息...");log.info("★[dead letter]dataString = " + dataString);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
    

先启动消费端监听死信队列和正常队列,再向正常消息队列发送消息

过程:发送端将消息发送到正常消息队列,监听正常消息队列的消费者接收消息并拒绝,消息通过死信交换机路由到死信队列,监听死信队列的消费者接收并成功消费。

正常消息队列:queue.normal.video,由于消息刚到达就被消费者接收,所以Queued messages没有变化

image-20241027222519208

同一时间,死信队列也是刚接收消息就被消费端消费,所以Queued messages没有变化

image-20241027223034059

消费端控制台打印:

[normal]接收消息,但拒绝消息且不重新放入队列...[dead letter]监听死信队列,接收到死信消息...[dead letter]dataString =[normal]发送消息--正常交换机--正常消息队列...

7.3 死信–超时和溢出

image-20241027232428910

前面创建正常消息队列时就置顶了正常消息队列最大消息数为10(x-max-length=10)且最大生存时间为10s(x-message-ttl=10000

先关闭消费者,向正常消息队列发送20条消息

@Test
public void testSendMessageButReject() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(EXCHANGE_NORMAL,ROUTING_KEY_NORMAL,"★[normal]发送消息--正常交换机--正常消息队列...");}
}

image-20241027224927913

  1. 发送者发送20条消息(m1,m2,…,m19,m20
  2. 前十条消息(m1,m2,…,m9,m10)正常进入消息队列,到达最大消息数
  3. 后十条消息(m11,m12,…,m19,m20)进入消息队列,根据队列先进先出,前十条消息(m1,m2,…,m9,m10溢出
  4. 这十条消息(m1,m2,…,m9,m10)通过死信交换机进入死信队列(对应死信队列第一个上坡)
  5. 后十条消息(m11,m12,…,m19,m20)超过10s未被消费,超时,后十条消息(m11,m12,…,m19,m20)也进入死信队列(对应死信队列第二个上坡)

消费者端省略,就还是监听然后消费…

8 延时队列

延时队列有两种实现思路

  • 借助超时时间+死信队列来实现延时队列
  • 通过RabbitMQ插件来完成延时队列

插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

需要将插件放入rabbitmq中容器的?/plugins目录,我们来看看该目录映射到宿主机的哪个目录?

docker inspect rabbitmq

运行结果:

"Mounts": [{"Type": "volume","Name": "rabbitmq-plugin","Source": "/var/lib/docker/volumes/rabbitmq-plugin/_data","Destination": "/plugins","Driver": "local","Mode": "z","RW": true,"Propagation": ""},{"Type": "volume","Name": "0f14d68cf8aad70d33dfe59651cf0c05c0598d1e2223d0154cf6689a9dfb96f4","Source": "/var/lib/docker/volumes/0f14d68cf8aad70d33dfe59651cf0c05c0598d1e2223d0154cf6689a9dfb96f4/_data","Destination": "/var/lib/rabbitmq","Driver": "local","Mode": "","RW": true,"Propagation": ""}],

和容器内/plugins目录对应的宿主机目录是:/var/lib/docker/volumes/rabbitmq-plugin/_data

## 8.1 下载延迟插件

RabbitMQ社区插件:https://www.rabbitmq.com/community-plugins.html

延迟插件:

image-20241028000022313

下载插件安装文件,并移动到对应目录

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data

启用插件

# 登录进入容器内部
docker exec -it rabbitmq /bin/bash# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 退出Docker容器
exit# 重启Docker容器
docker restart rabbitmq

延迟插件启动成功:

image-20241028001239695

8.2 延迟插件的使用

8.2.1 生产者端

通过MessageProcessor来设置延迟时间

@Test
public void testSendDelayMessage() {rabbitTemplate.convertAndSend(EXCHANGE_DELAY,ROUTING_KEY_DELAY,"测试基于插件的延迟消息 [" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]",messageProcessor -> {// 设置延迟时间:以毫秒为单位messageProcessor.getMessageProperties().setHeader("x-delay", "10000");return messageProcessor;});
}

8.2.2 消费者端

① ui界面创建延迟交换机和队列

使用插件创建交换机exchange.delay.happy

使用rabbitmq_delayed_message_exchange插件要求交换机type=x-delayed-message,并通过x-delayed-type设置交换机的类型(direct、fanout、topic),创建方式如下:

image-20240319163915574

创建消息队列queue.delay.video并绑定exchange.delay.happy交换机

@Component  
@Slf4j
public class MyDelayMessageListener {public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(queues = {QUEUE_DELAY})public void process(String dataString, Message message, Channel channel) throws IOException {  log.info("[生产者]" + dataString);log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
②代码创建延迟交换机和队列
@Component  
@Slf4j
public class MyDelayMessageListener {  public static final String EXCHANGE_DELAY = "exchange.delay.video";public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(bindings = @QueueBinding(  value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"),  exchange = @Exchange(  value = EXCHANGE_DELAY,   durable = "true",   autoDelete = "false",   type = "x-delayed-message",   arguments = @Argument(name = "x-delayed-type", value = "direct")),  key = {ROUTING_KEY_DELAY}  ))  public void process(String dataString, Message message, Channel channel) throws IOException {  log.info("[生产者]" + dataString);  log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  }  }

8.2.3 效果展示

前面消息可靠投递中说过,消息发送后回调confirm(),而returnMessage()只有在消息发送失败才会回调,但是使用rabbitmq_delayed_message_exchange插件后,即使消息成功发送到队列上,也会导致returnedMessage()方法执行(问题不大嘛)

消费端效果:

[生产者]测试基于插件的延迟消息 [12:41:29]
[消费者]12:41:39

9 事务消息

9.1 什么是事务消息?

生产者发送消息的操作打包成一个原子操作,要么全部成功要么全部失败,通过事务消息保证消息发送的原子性

RabbitMQ 的事务消息有点类似 Spring 的事务,分为开始事务、提交事务、回滚事务。

  • txSelect():开始事务,使用 txSelect() 开启事务。
  • txCommit():提交事务,如果 txCommit() 提交事务成功了,则消息一定会发送到 RabbitMQ
  • txRollback():回滚事务,如果在执行 txCommit() 之前 RabbitMQ 发生了异常,txRollback() 会捕获异常进行回滚。

RabbitMQ 发送事务消息流程:txSelect开启事务,消息发送到 RabbitMQ 缓存,接着 txCommit 提交事务,txCommit成功后则消息一定发送到了 RabbitMQ。

如果在 txCommit 完成前出现任何异常,我们就捕获这个异常然后执行 txRollback 进行回滚操作,整个过程跟 Spring 的事务机制没太大的区别。因此,我们可以通过 RabbitMQ 事务机制保证消息一定可以发送成功

了解了 RabbitMQ 的事务消息机制,接下来我们就分享两种方式来实现 RabbitMQ 事务消息

9.2 Springboot发送事务消息

9.2.1 准备工作

  1. 改pom.xml

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
    </dependencies>
    
  2. 写yaml

    spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
    
  3. 主启动

  4. 事务配置

    @Configuration
    @Data
    public class RabbitConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
    }
    

9.2.2 没有事务消息

没有事务消息,无法保证消息发送原子性

@SpringBootTest
@Slf4j
public class RabbitMQTest {public static final String EXCHANGE_NAME = "exchange.tx.dragon";public static final String ROUTING_KEY = "routing.key.tx.dragon";@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");// 2、抛出异常log.info("do bad:" + 10 / 0);// 3、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");}}

抛出异常前的消息发送了,抛异常后的消息没有发送:

image-20241028232807095

9.2.3 使用事务消息

因为在junit中给测试方法使用@Transactional注解默认就会回滚,所以回滚操作需要使用@RollBack注解操控

@Test
@Transactional
@Rollback(value = false)
public void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)");log.info("do bad:" + 10 / 0);// 2、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)");
}

image-20241028232918897

9.3 Channel发送事务消息

工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RabbitMqUtil {public static Channel getChannel() {// 创建一个连接工厂,并设置MQ的相关信息ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxxxx");factory.setUsername("xxx");factory.setPassword("xxx");factory.setVirtualHost("/xxx");Channel channel = null;try {// 创建连接Connection connection = factory.newConnection();// 获取信道channel = connection.createChannel();} catch (Exception e) {log.error("创建 RabbitMQ Channel 失败", e);e.printStackTrace();}return channel;}
}

Channel发送事务消息

import com.rabbitmq.client.Channel;
import com.user.service.util.RabbitMqUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.nio.charset.StandardCharsets;@Slf4j
@Component
public class RabbitTransactionChannelProducer {@Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")public void sendTransactionaChannelMessage(String message) {//获取 ChannelChannel channel = RabbitMqUtil.getChannel();try {//开启事务channel.txSelect();//发送消息channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "1").getBytes(StandardCharsets.UTF_8));channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "2").getBytes(StandardCharsets.UTF_8));//发送第三条消息之前模拟一个错误 我们看下前两条消息否回滚了//int a = 1 / 0;channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "3").getBytes(StandardCharsets.UTF_8));//提交事务channel.txCommit();} catch (Exception e) {//回滚事务try {channel.txRollback();} catch (IOException ex) {log.error("txRollback error", e);ex.printStackTrace();}e.printStackTrace();} finally {try {channel.close();} catch (Exception e) {log.error("channel close error", e);e.printStackTrace();}}}
}

10 惰性队列

创建队列分两种:

  • default默认消息队列:消息存储在内存,当队列内存限制触发才会将部分消息移到磁盘

  • lazy惰性消息队列:消息尽可能地保存在磁盘,内存中只保持必要的元数据

    惰性队列,将消息尽可能地保存在磁盘,减少内存的使用。有效防止由于队列消息过多导致的内存溢出,是处理需要处理大量消息但内存有限的场景。但是由于消息存于磁盘,生产者发送消息和消费者消费比普通队列慢,尤其在高吞吐场景

队列创建置顶模式方式有:使用队列策略(建议)和设置queue.declare参数

如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。

10.1 队列策略设定

# 登录Docker容器
docker exec -it rabbitmq /bin/bash# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

  • rabbitmqctl命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量

  • set_policy是子命令,表示设置策略

  • Lazy是当前要设置的策略名称,是我们自己自定义的,不是系统定义的

  • "^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置

  • '{“queue-mode”:“lazy”}'是一个JSON格式的参数设置指定了队列的模式为"lazy"

  • –-apply-to参数指定该策略将应用于队列(queues)级别

  • 命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列

如果需要修改队列模式可以执行如下命令(不必删除队列再重建):

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues

10.3 queue.declare参数设定

参数x-queue-mode设定队列创建模式,lazydefault(默认)

Java代码原生API设置方式:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

Java代码注解设置方式:

@Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {@Argument(name = "x-queue-mode", value = "lazy")
})

11 优先级队列

优先级队列允许你根据消息的优先级来处理消息。消息默认先进先出,通过设置不同的优先级值,消费者可以优先处理重要或紧急的消息,而延迟处理优先级较低的消息

11.1 准备工作

  1. 创建交换机:exchange.test.priority

  2. 创建消息队列:queue.test.priority

    image-20241029005751489

    RabbitMQ消息优先级范围 1到255 ,建议使用 1到5(数字越大优先级越高)

    通过设置 x-max-priority来指定消息队列的最大优先级,默认为0。而消息的优先级不能大于x-max-priority,所以使用优先级队列一定要指定x-max-priority,这里指定为x-max-priority=10

  3. 改POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
    </dependencies>
    
  4. YAML

    spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
    
  5. 主启动

11.2 使用优先级队列

不要启动消费者程序,让多条不同优先级的消息滞留在队列中

  • 第一次发送优先级为1的消息
  • 第二次发送优先级为2的消息
  • 第三次发送优先级为3的消息

先发送的消息优先级低,后发送的消息优先级高,将来看看消费端是不是先收到优先级高的消息

消息生产者:

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class RabbitMQTest {public static final String EXCHANGE_PRIORITY = "exchange.test.priority";public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() {//第一次发送优先级为1的消息rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{message.getMessageProperties().setPriority(1);return message;});//第二次发送优先级为2的消息//rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 2.", message->{//    message.getMessageProperties().setPriority(2);//    return message;//});//第三次发送优先级为3的消息//rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 3.", message->{//    message.getMessageProperties().setPriority(3);//    return message;//});}}

image-20241029010516996

消费端:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MyMessageProcessor {public static final String QUEUE_PRIORITY = "queue.test.priority";@RabbitListener(queues = {QUEUE_PRIORITY})public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {log.info(data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}

效果:

I am a message with priority 3.
I am a message with priority 2.
I am a message with priority 1.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/3834.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javaEE初阶————多线程初阶(2)

今天给大家带来第二期啦&#xff0c;保证给大家讲懂嗷&#xff1b; 1&#xff0c;线程状态 NEW安排了工作还未开始行动RUNNABLE可工作的&#xff0c;或者即将工作&#xff0c;正在工作BLOCKED排队等待WAITING排队等待其他事TIMED_WAITING排队等待其他事TERMINATED工作完成了 …

于灵动的变量变幻间:函数与计算逻辑的浪漫交织(下)

大家好啊&#xff0c;我是小象٩(๑ω๑)۶ 我的博客&#xff1a;Xiao Xiangζั͡ޓއއ 很高兴见到大家&#xff0c;希望能够和大家一起交流学习&#xff0c;共同进步。 这一节我们主要来学习单个函数的声明与定义&#xff0c;static和extern… 这里写目录标题 一、单个函数…

【数据分析】02- A/B 测试:玩转假设检验、t 检验与卡方检验

一、背景&#xff1a;当“审判”成为科学 1.1 虚拟场景——法庭审判 想象这样一个场景&#xff1a;有一天&#xff0c;你在王国里担任“首席审判官”。你面前站着一位嫌疑人&#xff0c;有人指控他说“偷了国王珍贵的金冠”。但究竟是他干的&#xff0c;还是他是被冤枉的&…

HTML应用指南:利用GET请求获取全国特斯拉充电桩位置

随着电动汽车的普及&#xff0c;充电基础设施的建设变得至关重要。作为电动汽车领域的先驱&#xff0c;特斯拉不仅在车辆技术创新上持续领先&#xff0c;还积极构建广泛的充电网络&#xff0c;以支持其不断增长的用户群体。为了提升用户体验和服务质量&#xff0c;开发人员和数…

【原创】大数据治理入门(2)《提升数据质量:质量评估与改进策略》入门必看 高赞实用

提升数据质量&#xff1a;质量评估与改进策略 引言&#xff1a;数据质量的概念 在大数据时代&#xff0c;数据的质量直接影响到数据分析的准确性和可靠性。数据质量是指数据在多大程度上能够满足其预定用途&#xff0c;确保数据的准确性、完整性、一致性和及时性是数据质量的…

OpenVela——专为AIoT领域打造的开源操作系统

目录 一、系统背景与开源 1.1. 起源 1.2. 开源 二、系统特点 2.1. 轻量化 2.2. 标准兼容性 2.3. 安全性 2.4. 高度可扩展性 三、技术支持与功能 3.1. 架构支持 3.2. 异构计算支持 3.3. 全面的连接套件 3.4. 开发者工具 四、应用场景与优势 4.1. 应用场景 4.2. …

寒假1.18

web&#xff08;报错注入&#xff09; [第一章 web入门]SQL注入-2 题解&#xff1a; 打开是个403界面&#xff0c;还以为没开成 通过题目提示&#xff0c;访问/login.php /user.php 合在一起访问屁都没有&#xff0c;分开访问&#xff0c;/login.php回显&#xff1a; /user.p…

如何通过 Apache Airflow 将数据导入 Elasticsearch

作者&#xff1a;来自 Elastic Andre Luiz 了解如何通过 Apache Airflow 将数据导入 Elasticsearch。 Apache Airflow Apache Airflow 是一个旨在创建、安排&#xff08;schedule&#xff09;和监控工作流的平台。它用于编排 ETL&#xff08;Extract-Transform-Load&#xff0…

企业分类相似度筛选实战:基于规则与向量方法的对比分析

文章目录 企业表相似类别筛选实战项目背景介绍效果展示基于规则的效果基于向量相似的效果 说明相关文章推荐 企业表相似类别筛选实战 项目背景 在当下RAG&#xff08;检索增强生成&#xff09;技术应用不断发展的背景下&#xff0c;掌握文本相似算法不仅能够助力信息检索&…

Jenkins-Pipeline简述

一. 什么是Jenkins pipeline&#xff1a; pipeline在jenkins中是一套插件&#xff0c;主要功能在于&#xff0c;将原本独立运行于单个或者多个节点的任务连接起来&#xff0c;实现单个任务难以完成的复杂发布流程。Pipeline的实现方式是一套Groovy DSL&#xff0c;任何发布流程…

基于海思soc的智能产品开发(高、中、低soc、以及和fpga的搭配)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 市场上关于图像、音频的soc其实非常多&#xff0c;这里面有高、中、低档&#xff0c;开发方式也不相同。之所以会这样&#xff0c;有价格的因素&am…

C# 修改项目类型 应用程序程序改类库

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的&#xff0c;可以在任何平台上使用。 源码指引&#xff1a;github源…

6、原来可以这样理解C语言_函数(1/8)函数的概念

目录 一、函数的概念 一、函数的概念 数学中我们其实就⻅过函数的概念&#xff0c;⽐如&#xff1a;⼀次函数 ykxb &#xff0c;k和b都是常数&#xff0c;给⼀个任意的x&#xff0c;就 得到⼀个y值。 其实在C语⾔也引⼊函数&#xff08;function&#xff09;的概念&#xff0c…

Excel 技巧10 - 如何检查输入重复数据(★★)

本文讲了如何在Excel中通过COUNTIF来检查输入重复数据。 当输入重复数据时&#xff0c;显示错误提示。 1&#xff0c;通过COUNTIF来检查输入重复数据 比如下面是想检查不要输入重复的学号。 选中C列&#xff0c;点 Menu > 数据 > 数据验证 在数据验证页面&#xff0c…

【Linux系统】Ext系列磁盘文件系统二:引入文件系统(续篇)

inode 和 block 的映射 该博文中有详细解释&#xff1a;【Linux系统】inode 和 block 的映射原理 目录与文件名 这里有几个问题&#xff1a; 问题一&#xff1a; 我们访问文件&#xff0c;都是用的文件名&#xff0c;没用过 inode 号啊&#xff1f; 之前总是说可以通过一个…

[计算机网络]一. 计算机网络概论第一部分

作者申明&#xff1a;作者所有文章借助了各个渠道的图片视频以及资料&#xff0c;在此致谢。作者所有文章不用于盈利&#xff0c;只是用于个人学习。 1.0推荐动画 【网络】半小时看懂<计算机网络>_哔哩哔哩_bilibili 1.1计算机网络在信息时代的作用 在当今信息时代&…

Python操作Excel——openpyxl使用笔记(2)

2. 操作工作表 前面提到一个工作簿中会有一个或者多个工作表&#xff0c;当前使用的工作表被称作活动工作表&#xff0c;这里展开介绍一下对于工作表的一些操作。 2.1 枚举所有工作表 使用for循环可以很方便的遍历每个工作表&#xff1a; import openpyxl wb openpyxl.o…

Social LSTM:Human Trajectory Prediction in Crowded Spaces | 文献翻译

概要 行人遵循不同轨迹以避免障碍物和容纳同行者。任何在这种场景中巡航的自动驾驶车辆都需要能够遇见行人的未来位置并相应地调整其路线以避免碰撞。轨迹预测问题能够被看作一个顺序生成任务&#xff0c;其中我们对基于行人过去的位置预测其未来的轨迹感兴趣。根据最近RNN&am…

从零开始:Gitee 仓库创建与 Git 配置指南

引言 Git 是一款广泛使用的版本控制工具&#xff0c;它能够帮助开发者在开发过程中高效地管理代码的版本。而 Gitee&#xff08;码云&#xff09;是国内知名的 Git 托管平台&#xff0c;它提供了强大的代码托管、团队协作和项目管理功能。如果你是 Git 和 Gitee 的新手&#x…

挖掘机检测数据集,准确识别率91.0%,4327张原始图片,支持YOLO,COCO JSON,PASICAL VOC XML等多种格式标注

挖掘机检测数据集&#xff0c;准确识别率91.0%&#xff0c;4327张图片&#xff0c;支持YOLO&#xff0c;COCO JSON&#xff0c;PASICAL VOC XML等多种格式标注 数据集详情 数据集分割 训练组70&#xff05; 3022图片 有效集20&#xff05; 870图片 测试集10&…