主题模式
官方文档参考:https://www.rabbitmq.com/tutorials/tutorial-five-python.html
使用topic类型的交换器,队列绑定到交换器、bingingKey时使用通配符,交换器将消息路由转发到具体队列时,会根据消息routingKey模糊匹配,比较灵活。
在Direct类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。
这里再加入一个需求,不仅想根据日志级别进行划分,还想根据日志的来源分日志,如何来做呢?
使用topic类型的交换器, routingKey就不能随便写了,它必须是点分单词,单词可以随便写,一般按消息的特征,该点分单词字符串最长255字节。
bindingKey也必须是这种形式。top类型的交换器背后原理跟direct类型类似只要队列的bingingkey的值与消息的routingKey的匹配,队列就可以收到该消息。有两个不同
- * (star)匹配一个单词。
- # 匹配0到多个单词。
上报的数据的RoutingKey,格式如下
地区.业务.日志级别 如shanghai.busi.INFO 、 hangzhou.line.ERROR
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;public class Product {private static final String[] ADDRESS_ARRAYS = {"shanghai", "suzhou", "hangzhou"};private static final String[] BUSI_NAMES = {"product", "user", "schedule"};private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);for (int i = 0; i < 50; i++) {String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];String busiName = BUSI_NAMES[ThreadLocalRandom.current().nextInt(0, BUSI_NAMES.length)];String address =ADDRESS_ARRAYS[ThreadLocalRandom.current().nextInt(0, ADDRESS_ARRAYS.length)];String routingKey = address + "." + busiName + "." + level;String pushMsg = "地址[" + address + "],业务[" + busiName + "],级别[" + level + "],消息:" + i;channel.basicPublish("ex.busi.topic", routingKey, null, pushMsg.getBytes(StandardCharsets.UTF_8));}}
}
上海的消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;/*** 上海地区的消费都,获取所有的上海信息*/
public class ShangHaiConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);// 定义队列channel.queueDeclare("shanghai.all.log",// 持久化存储true,// 排他false,// 自动删除true,// 属性null);// 将队列与交换机进行绑定channel.queueBind("shanghai.all.log", "ex.busi.topic", "shanghai.#", null);channel.basicConsume("shanghai.all.log",(consumerTag, message) -> {String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("shanghai consumer 收到数据:" + dataMsg);},consumerTag -> {});}
}
所有错误日志的消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class ErrorLogConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);// 定义队列channel.queueDeclare("log.all.error",// 持久化存储true,// 排他false,// 自动删除true,// 属性null);// 将队列与交换机进行绑定channel.queueBind("log.all.error", "ex.busi.topic", "#.ERROR", null);channel.basicConsume("log.all.error",(consumerTag, message) -> {String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("错误日志 consumer 收到数据:" + dataMsg);},consumerTag -> {});}
}
苏州用户的消费者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class SuZhouUserConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);// 定义队列channel.queueDeclare("suzhou.user.consumer",// 持久化存储true,// 排他false,// 自动删除true,// 属性null);// 将队列与交换机进行绑定channel.queueBind("suzhou.user.consumer", "ex.busi.topic", "suzhou.user.*", null);channel.basicConsume("suzhou.user.consumer",(consumerTag, message) -> {String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("suzhou consumer 收到数据:" + dataMsg);},consumerTag -> {});}
}
首先启动三个消费者,查看队列和交换器的信息
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ ex.busi.topic │ topic │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ ex.routing │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌───────────────┬─────────────┬──────────────────────┬──────────────────┬──────────────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ │ exchange │ suzhou.user.consumer │ queue │ suzhou.user.consumer │ │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ │ exchange │ shanghai.all.log │ queue │ shanghai.all.log │ │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ │ exchange │ log.all.error │ queue │ log.all.error │ │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange │ log.all.error │ queue │ #.ERROR │ │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange │ shanghai.all.log │ queue │ shanghai.# │ │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange │ suzhou.user.consumer │ queue │ suzhou.user.* │ │
└───────────────┴─────────────┴──────────────────────┴──────────────────┴──────────────────────┴───────────┘
[root@nullnull-os ~]#
观察可以发现,此队列与消息的绑定已经成功。接下使用生产者发送消息。观察控制台输出:
错误日志消费者
错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:6
错误日志 consumer 收到数据:地址[suzhou],业务[product],级别[ERROR],消息:8
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:10
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:12
错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:15
错误日志 consumer 收到数据:地址[hangzhou],业务[user],级别[ERROR],消息:16
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:17
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:18
错误日志 consumer 收到数据:地址[hangzhou],业务[user],级别[ERROR],消息:21
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:22
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:24
错误日志 consumer 收到数据:地址[hangzhou],业务[product],级别[ERROR],消息:28
错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:33
错误日志 consumer 收到数据:地址[hangzhou],业务[schedule],级别[ERROR],消息:39
错误日志 consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:40
错误日志 consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:43
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:46
上海地区的消费者
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:0
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:1
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[INFO],消息:2
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:5
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:10
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:12
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:17
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:18
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:22
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:24
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:32
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[INFO],消息:35
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[INFO],消息:38
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:41
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:46
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:48
苏州用户的消费者
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[WARN],消息:37
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:40
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:43
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[WARN],消息:45
至此topic模式操作成功。