RabbitMQ练习(Topics)

 1、RabbitMQ教程

《RabbitMQ Tutorials》icon-default.png?t=N7T8https://www.rabbitmq.com/tutorials

2、环境准备

参考:《RabbitMQ练习(Hello World)》和《RabbitMQ练习(Work Queues)》。

确保RabbitMQ、Sender、Receiver、Receiver2容器正常安装和启动。

root@k0test1:~# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
root@k0test1:~# docker start sender
root@k0test1:~# docker start receiver
root@k0test1:~# docker start receiver2
root@k0test1:~# docker network inspect bridge

网络拓扑:

3、Topics练习

3.1 概述

在前面的Routing练习中,不再使用仅能进行简单广播的fanout exchange,而是使用了direct exchange,从而实现选择性地接收日志。但direct exchange仍然有局限性——它不能基于多个条件进行路由。

在常见的日志系统中,不仅可以根据严重性订阅日志,还可以根据发出日志的来源进行订阅。比如Unix工具syslog,它根据严重性(信息/警告/关键...)和设施(认证/定时任务/内核...)来发出日志(原文:You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).)。

这将提供很大的灵活性——比如只希望接收来自'cron'的关键错误日志,以及来自'kern'的所有日志。

要在日志系统中实现这一点,需要学习更复杂的主题交换机(topic exchange)

3.2 Topic exchange

1、路由键格式要求

发送到主题交换机(topic exchange)的消息所携带的路由键(routing_key)不能是任意形式——它必须是用分隔的单词列表。这些单词可以是任何内容,但通常它们和消息的某些特征相关。一些有效的路由键示例包括:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。路由键中的单词数量可以随意,只要不超过255字节的限制。

2、绑定键格式要求

绑定键(binding key)也必须是相同的形式。主题交换机(topic exchange)背后的逻辑类似于直接交换机(direct exchange)——路由键需要和绑定键进行匹配,使用特定路由键发送的消息将被传递到所有与匹配绑定键绑定的队列。然而,对于绑定键有两个重要的特殊情况:

  • * 可以替代恰好一个单词。
  • # 可以替代0个或多个单词。

3、举例说明:

在这个例子中,我们将发送描述动物的消息。这些消息将使用由三个词组成的路由键发送(由两个点分隔)。路由键的第一个词将描述速度(celerity),第二个词描述颜色(colour),第三个词描述物种(species):"<celerity>.<colour>.<species>"。

我们创建了三个绑定(bindings):

  • Q1 绑定的键是 "*.orange.*",意味着它对所有橙色的动物感兴趣。
  • Q2 绑定的键有两个:"*.*.rabbit" 和 "lazy.#",意味着它对所有关于兔子的消息以及所有懒惰的动物感兴趣。

这些绑定可以总结为:

  • Q1 对所有橙色的动物感兴趣。
  • Q2 想要听到所有关于兔子的消息,以及所有关于懒惰动物的消息。

具有路由键 "quick.orange.rabbit" 的消息将被发送到两个队列。消息 "lazy.orange.elephant" 也会被发送到两个队列。另一方面,"quick.orange.fox" 只会被发送到第一个队列,而 "lazy.brown.fox" 只会被发送到第二个队列。"lazy.pink.rabbit" 将只被发送到第二个队列一次,尽管它匹配了两个绑定。"quick.brown.fox" 不匹配任何绑定,因此将被丢弃。

如果我们违反协议,发送了一个或四个词的消息,比如 "orange" 或 "quick.orange.new.rabbit",那么这些消息将不会匹配任何绑定,并将丢失。

另一方面,尽管 "lazy.orange.new.rabbit" 有四个词,但它将匹配最后一个绑定,并将被发送到第二个队列。

主题交换机(Topic Exchange)

主题交换机(Topic Exchange)是一个非常强大的消息交换机制,它可以像其他类型的交换机一样工作。以下是主题交换机的两种主要行为:

  1. 当一个队列使用"#"(井号)作为绑定键与主题交换机绑定时,它将接收到所有的消息,而不管路由键是什么。这与扇出交换机(fanout exchange)的行为类似,即所有发送到交换机的消息都会被分发到所有绑定的队列。

  2. 如果在绑定中没有使用特殊字符"*"(星号)和"#"(井号),主题交换机的行为就会和直接交换机(direct exchange)一样。这意味着消息只会被分发到那些绑定键与消息的路由键完全匹配的队列。

主题交换机提供了一种灵活的方式来根据消息的路由键进行消息的路由和分发。通过使用星号(*)和井号(#)作为通配符,可以创建复杂的路由规则,从而实现对消息的精确控制。

3.3 代码说明

日志系统中使用主题交换机,日志的路由键将有两个单词组成:"<facility>.<severity>",每个日志消息的路由键将由两个部分组成,第一部分是设施或系统组件的名称,第二部分是日志消息的严重性级别。例如,一个路由键可能是"auth.error",表示与认证相关的错误消息。使用主题交换机,可以基于这些路由键的不同组合来灵活地路由和分发日志消息。 

3.3.1 Sending

进入sender容器,vi编写emit_log_topic.py:

root@sender:/# vi emit_log_topic.py
root@sender:/# cat emit_log_topic.py 
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs', exchange_type='topic')routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent {routing_key}:{message}")
connection.close()
root@sender:/#

这段代码是一个使用 Python 编写的简单 RabbitMQ 发送程序,它使用了 pika 库来与 RabbitMQ 服务器进行交互。下面是代码的详细说明:

  1. 导入所需库:

    • import pika: 导入 pika 库,这是 Python 中用于与 RabbitMQ 交互的库。
    • import sys: 导入 sys 模块,用于访问由命令行参数提供的值。
  2. 建立连接:

    • connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2')): 创建一个到 RabbitMQ 服务器的连接。这里指定了服务器的 IP 地址为 172.17.0.2
  3. 创建通道:

    • channel = connection.channel(): 在建立的连接上创建一个新的通道。
  4. 声明交换机:

    • channel.exchange_declare(exchange='topic_logs', exchange_type='topic'): 声明一个名为 topic_logs 的主题交换机。exchange_type='topic' 指定了交换机的类型为 topic,这意味着它可以根据路由键的模式来路由消息。
  5. 设置路由键和消息:

    • routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info': 从命令行参数中获取路由键。如果有提供第一个参数(sys.argv[1]),则使用它作为路由键;如果没有提供,则默认使用 'anonymous.info'
    • message = ' '.join(sys.argv[2:]) or 'Hello World!': 从命令行参数中获取消息内容。如果有提供第二个及后续参数,将它们连接成一个字符串作为消息;如果没有提供,则默认消息为 'Hello World!'
  6. 发布消息:

    • channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message): 使用指定的交换机和路由键发布消息。消息的内容是之前设置的 message
  7. 打印消息确认:

    • print(f" [x] Sent {routing_key}:{message}"): 打印一条消息到控制台,确认消息已经发送,并显示路由键和消息内容。
  8. 关闭连接:

    • connection.close(): 关闭与 RabbitMQ 服务器的连接。

这个脚本可以作为命令行工具使用,通过指定不同的路由键和消息内容来发送消息到 RabbitMQ。使用示例:

python emit_log_topic.py auth.error "User authentication failed"

这将向 RabbitMQ 发送一个路由键为 auth.error 的消息,内容为 "User authentication failed"。如果没有提供路由键,消息将使用默认的 'anonymous.info' 路由键发送。

3.3.2 Receiving

进入receiver/receiver2容器,vi编写receive_logs_topic.py:

root@receiver:/# vi receive_logs_topic.py 
root@receiver:/# cat receive_logs_topic.py 
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs', exchange_type='topic')result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queuebinding_keys = sys.argv[1:]
if not binding_keys:sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(f" [x] {method.routing_key}:{body}")channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()
root@receiver:/# 

这段代码是一个使用 Python 编写的 RabbitMQ 接收程序,它同样使用了 pika 库来与 RabbitMQ 服务器进行交互。下面是代码的详细说明:

  1. 导入所需库:

    • import pika: 导入 pika 库,用于与 RabbitMQ 交互。
    • import sys: 导入 sys 模块,用于访问命令行参数。
  2. 建立连接:

    • connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2')): 创建一个到 RabbitMQ 服务器的阻塞模式连接,指定服务器的 IP 地址。
  3. 创建通道:

    • channel = connection.channel(): 在连接上创建一个新的通道。
  4. 声明交换机:

    • channel.exchange_declare(exchange='topic_logs', exchange_type='topic'): 声明一个名为 topic_logs 的主题交换机。
  5. 声明队列:

    • result = channel.queue_declare('', exclusive=True): 声明一个非持久的、独占的队列。exclusive=True 意味着这个队列只对连接它的客户端可见,并且当连接关闭时,队列将被删除。
    • queue_name = result.method.queue: 从结果中获取队列名称。
  6. 检查绑定键:

    • binding_keys = sys.argv[1:]: 从命令行参数中获取所有的绑定键。
    • if not binding_keys: ...: 如果没有提供绑定键,打印用法信息并退出程序。
  7. 绑定队列到交换机:

    • 循环遍历所有的 binding_keys,使用 channel.queue_bind() 方法将队列绑定到 topic_logs 交换机,并为每个绑定键设置相应的路由键。
  8. 等待日志消息:

    • 打印提示信息,告知用户程序正在等待日志消息,并说明如何退出程序。
  9. 定义消息回调函数:

    • def callback(ch, method, properties, body): ...: 定义一个回调函数,当接收到消息时会被调用。它打印出消息的路由键和消息体。
  10. 设置消息消费:

    • channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True): 设置消息消费,指定队列名称、回调函数和自动确认消息。
  11. 开始接收消息:

    • channel.start_consuming(): 启动消息接收循环,直到用户中断(例如使用 CTRL+C)。

这个脚本可以作为命令行工具使用,通过指定一个或多个绑定键来接收匹配这些键的消息。例如,如果你想要接收所有与 auth 相关的日志消息,你可以使用以下命令行:

python receive_logs_topic.py auth.*

这将使得脚本接收所有路由键以 auth 开头的日志消息。使用星号(*)作为通配符,可以匹配任意数量的字符。

 3.4、开始测试

3.4.1 接收所有日志

1、在 receiver 容器中打开命令行界面,运行以下命令以接收所有日志:

root@receiver:/# python3 receive_logs_topic.py "#"[*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings:

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic <--创建的topic exchange
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topicdirectroot@30acfada6737:/#  rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0 <--创建的临时队列
root@30acfada6737:/#  rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     argumentsexchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
root@30acfada6737:/# 

根据 rabbitmqctl list_bindings 命令的输出,第一个绑定的路由键实际上与目的地的名称相同。这里是具体的解释:

  1. 源(source): 没有显示名称,这通常指的是默认的交换机(default exchange)。每个队列在创建时都会自动绑定到这个默认交换机上,使用队列的名称作为路由键。

  2. 源类型(source_kind): exchange 表示源是一个交换机。

  3. 目的地(destination): amq.gen-eiFT38g9RD-iuc-RCn0FMw 是一个自动生成的队列名称。

  4. 路由键(routing_key): 与目的地名称相同,即 amq.gen-eiFT38g9RD-iuc-RCn0FMw。这意味着只有当消息的路由键与队列名称完全匹配时,消息才会被路由到这个队列。

  5. 参数(arguments): 空列表 [] 表示这个绑定没有使用任何额外的参数。

  6. 第二个绑定:

    • 源(source): topic_logs,这是一个命名的主题交换机。
    • 目的地(destination): amq.gen-eiFT38g9RD-iuc-RCn0FMw,与第一个绑定中的队列相同。
    • 路由键(routing_key): #,这是一个特殊字符,表示这个队列将接收所有通过 topic_logs 交换机的消息。

这意味着:

  • 第一个绑定是队列与其默认交换机之间的绑定,当消息不指定交换机名字(此时使用默认交换机),同时路由键和队列名称相同时,才会被这个队列接收。
  • 第二个绑定是队列与 topic_logs 交换机之间的绑定,由于使用了 # 作为路由键,这个队列将接收所有发送到 topic_logs 交换机的消息,无论它们的实际路由键是什么。

如果你想要测试这些绑定关系,你可以:

  • 使用 emit_log_topic.py脚本发送消息到 topic_logs 交换机,不指定路由键或使用 任意信息 作为路由键,然后观察这些消息是否被自动生成的队列接收。
  • 如果你有 receive_logs_topic.py 脚本正在监听这个队列,你应该能够看到所有发送的消息被打印出来。

 3、在 sender 容器中打开命令行界面,运行以下命令以发送日志:

root@sender:/# python3 emit_log_topic.py "kern.critical" "A critical kernel error"[x] Sent kern.critical:A critical kernel error
root@sender:/# python3 emit_log_topic.py "test" "This is a test"                  [x] Sent test:This is a test
root@sender:/# 

4、观察 receiver 容器的命令行输出,检查是否收到了发送的日志。

root@receiver:/# python3 receive_logs_topic.py "#"[*] Waiting for logs. To exit press CTRL+C[x] kern.critical:b'A critical kernel error'[x] test:b'This is a test'

5、receiver容器继续准备接受所有日志。 

3.4.2 接收特定设施的日志

1、在 receiver2 容器中打开命令行界面,运行以下命令以接收日志:

root@receiver2:/# python3 receive_logs_topic.py "kern.*"[*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings: 

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic 
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topicdirect
root@30acfada6737:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0
amq.gen-XmrA_fE3vyX-j4pFSbpqww  0  <---receiver2创建的临时队列
root@30acfada6737:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     argumentsexchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   amq.gen-XmrA_fE3vyX-j4pFSbpqww  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []
root@30acfada6737:/# 

根据rabbitmqctl list_bindings 命令的输出:

topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []

我们可以看到以下信息:

  1. 虚拟主机(vhost): 绑定关系是在默认的虚拟主机 / 下列出的。

  2. 源(source_name): topic_logs,这是一个主题交换机(exchange)的名称。

  3. 源类型(source_kind): exchange 表示源是一个交换机。

  4. 目的地(destination_name): amq.gen-XmrA_fE3vyX-j4pFSbpqww,这是一个自动生成的队列名称。

  5. 目的地类型(destination_kind): queue 表示目的地是一个队列。

  6. 路由键(routing_key): kern.*,这表明这个绑定将匹配所有以 kern. 开头的路由键。这是一个主题路由键,使用星号(*)作为通配符,代表任意数量的字符。

  7. 参数(arguments): 空列表 [] 表示这个绑定没有使用任何额外的参数。

这个绑定表明,任何发送到 topic_logs 交换机并且路由键以 kern. 开头的消息都将被路由到名为 amq.gen-XmrA_fE3vyX-j4pFSbpqww 的队列。例如,消息使用路由键 kern.warningkern.critical 都会被这个队列接收。

如果你想测试这个特定的绑定,你可以按照以下步骤操作:

  1. 使用 emit_log_topic.py 脚本发送消息到 topic_logs 交换机,使用一个以 kern. 开头的路由键,例如 kern.critical

  2. 确保 receive_logs_topic.py 脚本正在监听 amq.gen-XmrA_fE3vyX-j4pFSbpqww 队列。

  3. 观察 receive_logs_topic.py 脚本的输出,检查是否收到了使用 kern.critical 路由键发送的消息。

  4. 如果没有收到消息,请检查脚本的 RabbitMQ 连接设置和队列监听设置是否正确。

  5. 你还可以测试发送使用不同路由键的消息,以验证只有匹配 kern.* 模式的消息才会被接收。

 3、在 sender 容器中打开命令行界面,继续运行以下命令以发送日志:

root@sender:/# python3 emit_log_topic.py "kern.critical" "third: A critical kernel error" [x] Sent kern.critical:third: A critical kernel error
root@sender:/# python3 emit_log_topic.py "test" "fourth: This is a test"     [x] Sent test:fourth: This is a test
root@sender:/#   

4、观察 receiver 容器的命令行输出,检查是否收到了发送的日志。

root@receiver:/# python receive_logs_topic.py "#"
bash: python: command not found
root@receiver:/# python3 receive_logs_topic.py "#"[*] Waiting for logs. To exit press CTRL+C[x] kern.critical:b'A critical kernel error'[x] test:b'This is a test'[x] kern.critical:b'third: A critical kernel error'[x] test:b'fourth: This is a test'

5、观察 receiver2容器的命令行输出,检查是否收到了发送的日志。

root@receiver2:/# python3 receive_logs_topic.py "kern.*"[*] Waiting for logs. To exit press CTRL+C[x] kern.critical:b'third: A critical kernel error'

3.4.3 接收特定严重性的日志

1、在 receiver2 容器中新开命令行界面,运行以下命令以接收日志:

root@receiver2:/# python3 receive_logs_topic.py "*.critical"[*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings: 

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topicdirect
root@30acfada6737:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0
amq.gen-dZZcREH6GManw2OusGgU2g  0
amq.gen-XmrA_fE3vyX-j4pFSbpqww  0
root@30acfada6737:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     argumentsexchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []exchange        amq.gen-dZZcREH6GManw2OusGgU2g  queue   amq.gen-dZZcREH6GManw2OusGgU2g  []exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   amq.gen-XmrA_fE3vyX-j4pFSbpqww  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
topic_logs      exchange        amq.gen-dZZcREH6GManw2OusGgU2g  queue   *.critical      []
topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []
root@30acfada6737:/# 

根据rabbitmqctl list_bindings 命令的输出:

topic_logs      exchange        amq.gen-dZZcREH6GManw2OusGgU2g  queue   *.critical      []

可以了解到以下信息:

  1. 虚拟主机(vhost): 绑定关系是在默认的虚拟主机 / 下列出的。

  2. 源(source_name): topic_logs,这是一个主题交换机的名称。

  3. 源类型(source_kind): exchange 表示源是一个交换机。

  4. 目的地(destination_name): amq.gen-dZZcREH6GManw2OusGgU2g,这是一个自动生成的队列名称。

  5. 目的地类型(destination_kind): queue 表示目的地是一个队列。

  6. 路由键(routing_key): *.critical,这表明这个绑定将匹配所有以 *.critical 结尾的路由键。在这个上下文中,星号(*)作为通配符,代表任意数量的字符,但在这个特定的路由键中,它实际上匹配任何以点(.)后跟 critical 结尾的字符串。

  7. 参数(arguments): 空列表 [] 表示这个绑定没有使用任何额外的参数。

这个绑定表明,任何发送到 topic_logs 交换机并且路由键以 *.critical 结尾的消息都将被路由到名为 amq.gen-dZZcREH6GManw2OusGgU2g 的队列。例如,消息使用路由键 auth.criticalcron.critical 都会被这个队列接收。

如果你想测试这个特定的绑定,你可以按照以下步骤操作:

  1. 使用 emit_log_topic.py 脚本发送消息到 topic_logs 交换机,使用一个以 .critical 结尾的路由键,例如 auth.critical

  2. 确保 receive_logs_topic.py 脚本正在监听 amq.gen-dZZcREH6GManw2OusGgU2g 队列。

  3. 观察 receive_logs_topic.py 脚本的输出,检查是否收到了使用 auth.critical 路由键发送的消息。

  4. 如果没有收到消息,请检查脚本的 RabbitMQ 连接设置和队列监听设置是否正确。

  5. 你还可以测试发送使用不同路由键的消息,以验证只有匹配 *.critical 模式的消息才会被接收。例如,发送一个路由键为 info.critical 的消息,它应该被接收;而发送一个路由键为 info.normal 的消息,则不应该被这个特定的队列接收。

3、在 sender 容器中打开命令行界面,继续运行以下命令以发送第5个和第6个日志:

root@sender:/# python3 emit_log_topic.py "auth.critical" "fifth log: This is fifth log"          [x] Sent auth.critical:fifth log: This is fifth log
root@sender:/# python3 emit_log_topic.py "kern.critical" "sixth log: This is sixth log"              [x] Sent kern.critical:sixth log: This is sixth log
root@sender:/# 

4、观察日志接收情况:

receiver(收到5、6)

root@receiver:/# python receive_logs_topic.py "#"
bash: python: command not found
root@receiver:/# python3 receive_logs_topic.py "#"[*] Waiting for logs. To exit press CTRL+C[x] kern.critical:b'A critical kernel error'[x] test:b'This is a test'[x] kern.critical:b'third: A critical kernel error'[x] test:b'fourth: This is a test'[x] auth.critical:b'fifth log: This is fifth log'[x] kern.critical:b'sixth log: This is sixth log'

 receiver2 第一个终端(只收到6):

root@receiver2:/# python3 receive_logs_topic.py "kern.*"[*] Waiting for logs. To exit press CTRL+C[x] kern.critical:b'third: A critical kernel error'[x] kern.critical:b'sixth log: This is sixth log'

receiver2第二个终端(收到5、6):

root@receiver2:/# python3 receive_logs_topic.py "*.critical"[*] Waiting for logs. To exit press CTRL+C[x] auth.critical:b'fifth log: This is fifth log'[x] kern.critical:b'sixth log: This is sixth log'

3.4.4 创建多个绑定的测试

1、在 receiver2 容器中新开第三个终端界面,运行以下命令以接收日志:

root@receiver2:/# python3 receive_logs_topic.py "kern.*" "*.critical"[*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings: 

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topicdirect
root@30acfada6737:/# rabbitmqctl list_queues   
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-zR4Pv3875mquqMTLIkMCUQ  0
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0
amq.gen-HS_61KsFpuerwoA17IMUmw  0
amq.gen-XmrA_fE3vyX-j4pFSbpqww  0
root@30acfada6737:/# rabbitmqctl list_bindings 
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     argumentsexchange        amq.gen-zR4Pv3875mquqMTLIkMCUQ  queue   amq.gen-zR4Pv3875mquqMTLIkMCUQ  []exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   amq.gen-HS_61KsFpuerwoA17IMUmw  []exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   amq.gen-XmrA_fE3vyX-j4pFSbpqww  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   *.critical      []
topic_logs      exchange        amq.gen-zR4Pv3875mquqMTLIkMCUQ  queue   *.critical      []
topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   kern.*  []
topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []
root@30acfada6737:/# 

rabbitmqctl list_bindings 命令输出中:

topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   *.critical      []
topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   kern.*  []

可以看到以下信息:

  1. 虚拟主机(vhost): 绑定关系是在默认的虚拟主机 / 下列出的。

  2. 源(source_name): topic_logs,这是一个主题交换机的名称。

  3. 源类型(source_kind): exchange 表示源是一个交换机。

  4. 目的地(destination_name): amq.gen-HS_61KsFpuerwoA17IMUmw,这是一个自动生成的队列名称。

  5. 目的地类型(destination_kind): queue 表示目的地是一个队列。

  6. 路由键(routing_key): 存在两条绑定,每条绑定有不同的路由键:

    • *.critical:这个绑定将匹配所有以 .critical 结尾的路由键,例如 auth.critical 或 cron.critical
    • kern.*:这个绑定将匹配所有以 kern. 开头的路由键,例如 kern.warning 或 kern.emerg
  7. 参数(arguments): 两个绑定的参数列表都是空的 [],表示这些绑定没有使用任何额外的参数。

这个输出显示了 topic_logs 交换机绑定到同一个队列 amq.gen-HS_61KsFpuerwoA17IMUmw 上的两种不同的路由模式。这意味着这个队列将接收符合这两种模式的任何消息。

测试这些绑定的步骤:

  1. 发送消息:

    • 使用 emit_log_topic.py 脚本发送消息到 topic_logs 交换机,使用不同的路由键来测试绑定。例如:
      • 发送一个路由键为 auth.critical 的消息,应该被队列接收。
      • 发送一个路由键为 kern.warning 的消息,也应该被队列接收。
  2. 监听队列:

    • 确保 receive_logs_topic.py 脚本正在监听 amq.gen-HS_61KsFpuerwoA17IMUmw 队列。
  3. 观察输出:

    • 观察 receive_logs_topic.py 脚本的输出,检查是否收到了使用上述路由键发送的消息。
  4. 验证绑定:

    • 如果消息没有被接收,检查脚本的 RabbitMQ 连接设置和队列监听设置是否正确。
    • 验证发送的消息路由键是否符合任一绑定的模式。
  5. 探索其他路由键:

    • 发送不符合上述两种模式的消息,例如使用路由键 info.normal,以验证消息不会被接收。

通过这些步骤,你可以验证 topic_logs 交换机的绑定是否按预期工作,并且消息是否能够正确地根据路由键模式被路由到指定的队列。

3、在 sender 容器中打开命令行界面,继续运行以下命令以发送第7个和第8个日志:

root@sender:/# python3 emit_log_topic.py "auth.critical" "seventh log: This is 7th log"       [x] Sent auth.critical:seventh log: This is 7th log
root@sender:/# python3 emit_log_topic.py "kern.warning" "eighth log: This is 8th log"                      [x] Sent kern.warning:eighth log: This is 8th log
root@sender:/# python3 emit_log_topic.py "info.normal" "ninth log: This is 9th log"                   [x] Sent info.normal:ninth log: This is 9th log
root@sender:/# 

4、观察日志接收情况:

receiver: (收到7、8、9)

root@receiver:/# python3 receive_logs_topic.py "#"[*] Waiting for logs. To exit press CTRL+C[x] kern.critical:b'A critical kernel error'[x] test:b'This is a test'[x] kern.critical:b'third: A critical kernel error'[x] test:b'fourth: This is a test'[x] auth.critical:b'fifth log: This is fifth log'[x] kern.critical:b'sixth log: This is sixth log'[x] auth.critical:b'seventh log: This is 7th log'[x] kern.warning:b'eighth log: This is 8th log'[x] info.normal:b'ninth log: This is 9th log'

receiver2第一个终端:(收到8,匹配kern.*)

root@receiver2:/# python3 receive_logs_topic.py "kern.*"[*] Waiting for logs. To exit press CTRL+C[x] kern.critical:b'third: A critical kernel error'[x] kern.critical:b'sixth log: This is sixth log'[x] kern.warning:b'eighth log: This is 8th log'

receiver2第二个终端:(收到7,匹配*.critical)

(receiver2误操作中断了之前的连接,重建了和rabbitmq的连接)

root@receiver2:/# python3 receive_logs_topic.py "*.critical"[*] Waiting for logs. To exit press CTRL+C[x] auth.critical:b'fifth log: This is fifth log'[x] kern.critical:b'sixth log: This is sixth log'
^CTraceback (most recent call last):File "//receive_logs_topic.py", line 32, in <module>channel.start_consuming()File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 1883, in start_consumingself._process_data_events(time_limit=None)File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 2044, in _process_data_eventsself.connection.process_data_events(time_limit=time_limit)File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 842, in process_data_eventsself._flush_output(common_terminator)File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 514, in _flush_outputself._impl.ioloop.poll()File "/usr/local/lib/python3.10/dist-packages/pika/adapters/select_connection.py", line 579, in pollself._poller.poll()File "/usr/local/lib/python3.10/dist-packages/pika/adapters/select_connection.py", line 1184, in pollevents = self._poll.poll(self._get_max_wait())
KeyboardInterruptroot@receiver2:/# python3 receive_logs_topic.py "*.critical"[*] Waiting for logs. To exit press CTRL+C[x] auth.critical:b'seventh log: This is 7th log'

receiver2第三个终端:(收到7、8,匹配kern.*或者*.critical)

root@receiver2:/# python3 receive_logs_topic.py "kern.*" "*.critical"[*] Waiting for logs. To exit press CTRL+C[x] auth.critical:b'seventh log: This is 7th log'[x] kern.warning:b'eighth log: This is 8th log'

4、小结

这篇文章是 RabbitMQ 官方教程的第五部分,使用 Python 客户端 Pika 来演示如何使用主题交换机(Topic Exchange)。以下是对文章内容的总结:

1、预备条件

  • RabbitMQ 需要安装并运行在标准端口(5672)上。
  • 如果使用不同的主机、端口或凭据,则需要调整连接设置。
  • 需要使用 Pika RabbitMQ 客户端版本 1.0.0。

2、教程重点

  • 教程介绍了如何使用主题交换机来改善日志系统,允许基于多个标准进行消息路由。
  • 通过使用主题交换机,可以实现类似于 Unix 系统中 syslog 工具的功能,即根据消息的严重性和来源进行路由。

3、主题交换

  • 主题交换机的消息路由键(routing_key)必须是由点分隔的单词列表。
  • 绑定键(binding_key)也采用相同的格式,可以使用特殊字符:
    • *(星号):匹配正好一个单词。
    • #(井号):匹配零个或多个单词。

4、示例

  • 假设发送的消息描述动物,路由键由三个单词组成,分别表示速度、颜色和物种。
  • 创建了三个绑定:
    • Q1 绑定到 *.orange.*:对所有橙色动物感兴趣。
    • Q2 绑定到 *.*.rabbit 和 lazy.#:对所有兔子和所有懒惰动物感兴趣。

5、消息路由示例

  • 消息 quick.orange.rabbit 将被 Q1 和 Q2 接收。
  • 消息 lazy.orange.elephant 也将被两者接收。
  • 消息 quick.orange.fox 只匹配 Q1,而 lazy.brown.fox 只匹配 Q2。
  • 如果消息不符合绑定键的模式,如 orange 或 quick.orange.new.rabbit,它们将不会被任何队列接收。

6、代码示例

  • 提供了两个 Python 脚本示例:
    • emit_log_topic.py:用于发送消息到主题交换。
    • receive_logs_topic.py:用于接收主题交换的消息。

7、使用方法

  • 接收所有日志:python receive_logs_topic.py "#"
  • 接收特定设施的日志(如 kern):python receive_logs_topic.py "kern.*"
  • 只接收特定严重性的日志(如 critical):python receive_logs_topic.py "*.critical"
  • 创建多个绑定:python receive_logs_topic.py "kern.*" "*.critical"
  • 发送特定路由键的日志:python emit_log_topic.py "kern.critical" "A critical kernel error"

8、注意事项

  • 代码没有对路由或绑定键做任何假设,可以根据需要使用多个路由键参数。

9、结语

  • 教程鼓励用户尝试和玩耍这些程序,以更好地理解主题交换机(topic exchange)的工作方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/412735.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

“重启就能解决一切问题”,iPhone重启方法大揭秘

随着iPhone不断更新换代&#xff0c;其设计与操作方式也在不断进化。从最初的实体Home键到如今的全面屏设计&#xff0c;iPhone的操作逻辑也随之发生了改变。 对于那些习惯了传统安卓手机操作的用户来说&#xff0c;iPhone的重启方式可能会显得有些不同寻常。下面我们就来一起…

SQL血缘解析

Druid 作为使用率特别高的的数据库连接池工具,在具备完善的连接池管理功能外,同时Druid 的 SQL解析功能可以用来防止 SQL注入等安全风险。通过对 SQL 语句进行解析和检查,Druid 可以识别并阻止潜在的恶意 SQL 语句执行,黑名单(阻止特定的 SQL 语句执行)、白名单(仅允许特…

★ 算法OJ题 ★ 力扣11 - 盛水最多的容器

Ciallo&#xff5e;(∠・ω< )⌒☆ ~ 今天&#xff0c;我将和大家一起做一道双指针算法题--盛水最多的容器~ 目录 一 题目 二 算法解析 三 编写算法 一 题目 11. 盛最多水的容器 - 力扣&#xff08;LeetCode&#xff09; 二 算法解析 解法1&#xff1a;暴力枚举 …

文本数据分析-(TF-IDF)(1)

文章目录 一、TF-IDF简介1.意义2.TF与IDF1).TF&#xff08;Term Frequency&#xff09;2).IDF&#xff08;Inverse Document Frequency&#xff09;3).TF-IDF 二、应用三、代码实现1.文件读取2.数据预处理3.排序和输出4.全部代码 一、TF-IDF简介 1.意义 TF-IDF&#xff08;Te…

28 TreeView组件

Tkinter ttk.Treeview 组件使用指南 ttk.Treeview 是 Tkinter 的一个高级控件&#xff0c;用于显示和管理层次化数据。它类似于电子表格或列表视图&#xff0c;但提供了更丰富的功能&#xff0c;如可展开的节点、多列显示等。ttk 模块是 Tkinter 的一个扩展&#xff0c;提供了…

Golang | Leetcode Golang题解之第382题链表随机节点

题目&#xff1a; 题解&#xff1a; type Solution struct {head *ListNode }func Constructor(head *ListNode) Solution {return Solution{head} }func (s *Solution) GetRandom() (ans int) {for node, i : s.head, 1; node ! nil; node node.Next {if rand.Intn(i) 0 { …

《机器学习》数据分析之关键词提取、TF-IDF、项目实现 <下>

目录 一、内容回顾 1、核心算法 2、算法公式 3、拆分文本 二、再次操作 1、取出每一卷的地址和内容 得到下列结果&#xff1a;&#xff08;此为DF类型&#xff09; 2、对每一篇文章进行分词 3、计算TF-IDF值 得到以下数据&#xff1a; 三、总结 1、关键词提取 1&a…

数据挖掘之分类算法

分类算法是数据挖掘中常用的一类算法&#xff0c;其主要任务是根据已知的训练数据&#xff08;即带有标签的数据&#xff09;构建模型&#xff0c;然后利用该模型对新的数据进行分类。分类算法广泛应用于金融、医疗、市场营销等领域&#xff0c;用于预测、决策支持等任务。以下…

STM32G474采用“多个单通道ADC转换”读取3个ADC引脚的电压

STM32G474采用“多个单通道ADC转换”读取3个ADC引脚的电压&#xff1a;PC0、PA1和PA2。本测试将ADC1_IN6映射到PC0引脚&#xff0c;ADC12_IN2映射到PA1引脚&#xff0c;ADC1_IN3映射到PA2引脚。 1、ADC输入 ADC输入电压范围&#xff1a;Vref– ≤ VIN ≤ Vref ADC支持“单端输入…

Java 集合Collection(List、Set)Map

集合的理解和优点 1)可以动态保存任意多个对象&#xff0c;使用比较方便!2)提供了一系列方便的操作对象的方法: add、remove、 set、 get等3)使用集合添加,删除新元素的示意代码- Java集合的分类 Java的集合类很多&#xff0c;主要分为两大类&#xff0c;如图&#xff1a; 1…

iPhone备忘录不小心删除了怎么办?

在日常使用iPhone的过程中&#xff0c;备忘录作为我们记录重要信息、灵感闪现和日常琐事的小帮手&#xff0c;其重要性不言而喻。然而&#xff0c;有时候因为操作失误或是不小心点击&#xff0c;我们可能会将珍贵的备忘录内容删除&#xff0c;这无疑会让人感到焦虑与不安。但请…

深入垃圾回收:理解GC的核心算法与实现

垃圾回收&#xff08;Garbage Collection&#xff0c;GC&#xff09;是现代编程语言中一项关键技术。它不仅解决了内存管理中的诸多问题&#xff0c;还为开发者提供了一个更高效、更安全的编程环境。本文将深入探讨GC的起源、主要算法以及这些算法在不同编程语言中的具体实现。…

考试:计算机网络(01)

网络功能和分类 计算机网络是计算机技术与通信技术相结合的产物&#xff0c;它实现了远程通信、远程信息处理和资源共享。 计算机网络的功能&#xff1a;数据通信、资源共享、管理集中化、实现分布式处理、负载均衡。 网络性能指标&#xff1a;速率、带宽(频带宽度或传送线路…

嵌入式数据库

概述 1.作用&#xff1a;存储大量数据&#xff0c;专业存储数据 存储在内存&#xff08;数组&#xff0c;变量&#xff0c;链表&#xff09;上的特点&#xff1a;程序运行结束&#xff0c;或者掉电&#xff0c;数据会丢失。 存储在硬盘&#xff08;文件&#xff09;上的特点…

vue3+ts+vite项目代码检查报错(vue-tsc)

报错原因&#xff1a;vue-tsc与typescrip版本不兼容 排查流程&#xff1a; 1、开始以为vue-tsc或者typescript版本太低&#xff0c;通过npm update更新&#xff0c;更新后还是报错 2、项目中package.json文件中typescript、vue-tsc版本并无兼容问题 3、控制台执行npm list发…

【HarmonyOS】模仿个人中心头像图片,调用系统相机拍照,从系统相册选择图片和圆形裁剪显示 (一)

【HarmonyOS】头像图片&#xff0c;调用系统相机拍照&#xff0c;从系统相册选择图片和圆形裁剪显示 &#xff08;一&#xff09; Demo效果展示&#xff1a; 方案思路&#xff1a; 使用photoAccessHelper实现系统相册选择图片的功能。此API可在无需用户授权的情况下&#xff…

万亿生成式AI市场,商汤迎来“长坡厚雪”

AI掀起了全球科技玩家的军备竞赛&#xff0c;然而声浪越强噪音越多&#xff0c;这个领域的混乱程度也变得远超以往。就连刚刚公布财报的英伟达&#xff0c;市场也没有买账&#xff0c;因为担心AI驱动的增长高峰已过&#xff0c;接下来&#xff0c;下游会更看重实际成果。 “囤…

javaee、ssm(maven)、springboot(maven)项目目录结构以及编译后文件目录存放路径

javaee项目目录结构&#xff1a; src下的文件或者是源码编译后都会放在WebRoot&#xff08;项目根目录&#xff09;文件夹\WebRoot\WEB-INF\classes目录中。 编译后的文件夹目录如下&#xff1a; 以上为普通的javaee项目目录结构&#xff0c;同maven工程目录结构是不一样的。…

07-图5 Saving James Bond - Hard Version(C)

哈哈&#xff0c;我是真的服了&#xff0c;写了好几天结果给我个这&#xff0c;气死我了&#xff0c;果然还有很大的进步空间。如果有c测试点4&#xff0c;就好了。 又写了一天&#xff0c;是真解决不了了&#xff0c;这个问题等我明白一定来解答 哈哈&#xff0c; 测试点提示内…

【SQL】餐馆营业额七日均线数据

目录 题目 分析 代码 题目 表: Customer ------------------------ | Column Name | Type | ------------------------ | customer_id | int | | name | varchar | | visited_on | date | | amount | int | -----------------------…