cent6.6安装rabbitmq

cent6.6安装rabbitmq

如果对运维课程感兴趣,可以在b站上、A站或csdn上搜索我的账号: 运维实战课程,可以关注我,学习更多免费的运维实战技术视频

1.在服务器192.168.231.128上安装rabbitmq

1)安装编译工具

[root@localhost ~]# yum -y install make gcc gcc-c++

2)安装相关依赖包

[root@localhost ~]# yum -y install kernel-devel m4 ncurses-devel openssl-devel

3)源码安装erlang(rabbimq依赖该包,rabbitmq是erlang语言开发的)

[root@localhost ~]# ls /erlang/

otp_src_R16B02.tar.gz

[root@localhost ~]# cd /erlang/

[root@localhost erlang]# ls

otp_src_R16B02.tar.gz

[root@localhost erlang]# tar -zxf otp_src_R16B02.tar.gz

[root@localhost erlang]# ls

otp_src_R16B02  otp_src_R16B02.tar.gz

[root@localhost erlang]# cd otp_src_R16B02

[root@localhost otp_src_R16B02]# ls

aclocal.m4  bootstrap     EPLICENCE               HOWTO  Makefile.in                         plt             system

AUTHORS     configure     erl-build-tool-vars.sh  lib    otp_build                           prebuilt.files  TAR.include

bin         configure.in  erts                    make   otp_client_build.temp_stderr.10373  README.md       xcomp

[root@localhost otp_src_R16B02]# ./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac

            #不用java编译,故去掉java避免错误

[root@localhost otp_src_R16B02]# make && make install

4)配置环境变量,让rabbitmq能找到安装的erlang软件

[root@localhost otp_src_R16B02]# vim /etc/profile

ERL_HOME=/usr/local/erlang

export PATH=$PATH:$ERL_HOME/bin

wq

[root@localhost otp_src_R16B02]# source  /etc/profile

[root@localhost otp_src_R16B02]# cd

5)源码安装rabbitmq

[root@localhost ~]# yum -y install xmlto    #安装rabbitmq的依赖包,否则编译不通过,报下面错误:

/bin/sh: line 1: xmlto: command not found

[root@localhost ~]# yum -y install zip unzip  #安装zip和unzip命令,否则编译时候会报zip和unzip命令找不到,可能内部需要该命令解压. 

[root@localhost ~]# ls /rabbitmq/

rabbitmq-server-3.1.5.tar.gz

[root@localhost ~]# cd /rabbitmq/

[root@localhost rabbitmq]# ls

rabbitmq-server-3.1.5.tar.gz

[root@localhost rabbitmq]# tar -zxf rabbitmq-server-3.1.5.tar.gz 

[root@localhost rabbitmq]# ls

rabbitmq-server-3.1.5  rabbitmq-server-3.1.5.tar.gz

[root@localhost rabbitmq]# cd rabbitmq-server-3.1.5

[root@localhost rabbitmq-server-3.1.5]# ls

calculate-relative  generate_app   LICENSE-APACHE2-ExplorerCanvas  LICENSE-MIT-EJS10      LICENSE-MIT-Sammy060  scripts

codegen             generate_deps  LICENSE-Apache-Basho            LICENSE-MIT-eldap      LICENSE-MPL-RabbitMQ  src

codegen.py          include        LICENSE-APL2-Stomp-Websocket    LICENSE-MIT-Flot       Makefile              version.mk

docs                INSTALL        LICENSE-BSD-base64js            LICENSE-MIT-jQuery164  plugins-src

ebin                LICENSE        LICENSE-BSD-glMatrix            LICENSE-MIT-Mochi      README

[root@localhost rabbitmq-server-3.1.5]# make          #直接编译

[root@localhost rabbitmq-server-3.1.5]# make install TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man   #安装rabbitmq

6) 安装web插件管理界面

[root@localhost rabbitmq-server-3.1.5]# cd /usr/local/rabbitmq/sbin/

[root@localhost sbin]# ls

rabbitmqctl  rabbitmq-defaults  rabbitmq-env  rabbitmq-plugins  rabbitmq-server

[root@localhost sbin]# mkdir /etc/rabbitmq #创建一个目录,管理界面软件默认会安装在此目录下,不创建则安装不了

[root@localhost sbin]# ./rabbitmq-plugins enable rabbitmq_management

The following plugins have been enabled:

  mochiweb

  webmachine

  rabbitmq_web_dispatch

  amqp_client

  rabbitmq_management_agent

  rabbitmq_management

Plugin configuration has changed. Restart RabbitMQ for changes to take effect.

查看插件列表:

[root@localhost sbin]# ./rabbitmq-plugins list

[e] amqp_client                       3.1.5

[ ] cowboy                            0.5.0-rmq3.1.5-git4b93c2d

[ ] eldap                             3.1.5-gite309de4

[e] mochiweb                          2.7.0-rmq3.1.5-git680dba8

[ ] rabbitmq_amqp1_0                  3.1.5

……

7)启动rabbitmq (后台启动运行)

[root@localhost sbin]# ./rabbitmq-server --detached &

[1] 60071

[root@localhost sbin]#

              RabbitMQ 3.1.5. Copyright (C) 2007-2013 GoPivotal, Inc.

  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/

  ##  ##

  ##########  Logs: /var/log/rabbitmq/rabbit@localhost.log

  ######  ##        /var/log/rabbitmq/rabbit@localhost-sasl.log

  ##########

              Starting broker... completed with 6 plugins.

[root@localhost sbin]# netstat -anput |grep 15672

tcp        0      0 0.0.0.0:15672               0.0.0.0:*                   LISTEN      60071/beam      

注意:如果要用python操作rabbitmq,需要安装Python连接rabbitmq的模块:pika

用python3自带的pip3命令安装即可,安装过程如下:

[root@localhost sbin]# cd /usr/local/python3.5/bin

[root@localhost bin]# ./pip3 install pika

Collecting pika

  Downloading pika-0.11.2-py2.py3-none-any.whl (107kB)

    100% |████████████████████████████████| 110kB 931kB/s

Installing collected packages: pika

Successfully installed pika-0.11.2

You are using pip version 7.1.2, however version 9.0.1 is available.

You should consider upgrading via the 'pip install --upgrade pip' command.

[root@localhost bin]# python3

>>> import pika         能导入即可

>>> quit()

客户端浏览器访问rabbitmq的管理页面:

使用登录的名户名和密码默认都算guest,登录管理页面,如图:

点击login,如图:

8)停止rabbitmq

[root@localhost sbin]# ./rabbitmqctl stop

Stopping and halting node rabbit@localhost ...

...done.

2.在服务器192.168.231.128上用python操作rabbitmq,实现一个简单的队列通信(生产者生产消息,消费者消费消息)

1)安装python3.5.0

[root@localhost ~]# python3 -V

Python 3.5.0

2)安装rabbitmq并运行,如上面

[root@localhost ~]# netstat -anput |grep 15672

tcp        0      0 0.0.0.0:15672               0.0.0.0:*                   LISTEN      60179/beam  

3)安装python连接rabbitmq的模块pika  (用python3自带的pip3命令安装)

[root@localhost ~]# cd /usr/local/python3.5/bin

[root@localhost bin]# ./pip3 install pika

Collecting pika

  Downloading pika-0.11.2-py2.py3-none-any.whl (107kB)

    100% |████████████████████████████████| 110kB 931kB/s

Installing collected packages: pika

Successfully installed pika-0.11.2

You are using pip version 7.1.2, however version 9.0.1 is available.

You should consider upgrading via the 'pip install --upgrade pip' command.

[root@localhost bin]# python3

>>> import pika         能导入即可

>>> quit()

4)python连接rabbitmq并建立生产者生产消息,消费者消费消息的队列通信(队列和消息未持久化)

#当rabbitmq服务重启后,队列和消息会丢失

[root@localhost ~]# cat producer.py       #生产者生产消息

#!/usr/bin/env python3

import pika

connection = pika.BlockingConnection(  #通过创建一个实例建立一个socket

    pika.ConnectionParameters('localhost')    #python连接到本机的rabbitmq

    )

channel = connection.channel()   #声明一个管道,在管道里发消息

channel.queue_declare(queue='hello')  #在管道里声明一个queue队列,队列名叫hello,也可其他的,生产消费两者都声明,避免不知哪个先启动时未声明  在hello后没加参数durable=True,队列不会持久化,重启会队列丢失

channel.basic_publish(exchange='',   #通过管道的basic_publish命令向队列里发消息,exchange先不管

                    routing_key='hello', #要给哪个队列发消息,hello就是要给队列发消息的队列名

                    body='hello world!') #要给队列发送的消息内容,里面没加参数 properties=pika.BasicProperties(delivery_mode=2),消息也不持久化

print ("[x] send 'hello world!'")

connection.close()                       #发完消息后关闭队列,不用关闭管道

[root@localhost ~]# cat consumer.py     #消费者消费消息

#!/usr/bin/env python3

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  #创建一个实例建立一个socekt连接

channel = connection.channel()     #建立一个管道

channel.queue_declare(queue='hello') #从管道里,声明从哪个队列里收消息,hello是队列名,生产消费两者都声明,避免不知哪个先启动时未声明,在hello没加参数durable=True,队列不会持久化

def callback(ch,method,properties,body):  #ch指管道的内存对象,后面几个可以不知道,不常用

    print ('-->',ch,method,properties)

    #ch.basic_ack(delivery_tag=method.delivery_tag)  #默认不加下面的no_ack=True,需释放这个,手动去跟服务器确认消息

    print ('[x] Received %r' % body)

channel.basic_consume(       #通过管道的basic_consume方法消费消息

                    callback, #若果收到消息,就调用callback函数来处理消息

                    queue='hello', #从哪个队列里消费消息,hello是队列名

                    no_ack=True)   #不确认,True时,消费者无论是否消费完消息,都不给服务器发确认.False是确认

#上面是声明开始消费消息,还没消费消息,一般不加no_ack=True,默认要确认,这样消费者只有消费完消息后跟服务器缺认后,保证消息不丢失

print ('[*] waiting for messages, To exit press CTRL+C')

channel.start_consuming()       #这里才是真正消费消息,只要启动就一直运行了,不只消费一条,一直在消费

消费者消费消息先运行等待消费消息,如下:

[root@localhost ~]# python3 consumer.py

[*] waiting for messages, To exit press CTRL+C

……

生产者生产消息,运行,生产2条消息

[root@localhost ~]# python3 producer.py

[x] send 'hello world!'

[root@localhost ~]# python3 producer.py

[x] send 'hello world!'

查看消费者消费的消息

[root@localhost ~]# python3 consumer.py

[*] waiting for messages, To exit press CTRL+C

--> <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 48464, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.26fecdba11ed4c6e9b7089eeee1548ed', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])> <BasicProperties>

[x] Received b'hello world!'

--> <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 48464, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.26fecdba11ed4c6e9b7089eeee1548ed', 'delivery_tag=2', 'exchange=', 'redelivered=False', 'routing_key=hello'])> <BasicProperties>

[x] Received b'hello world!'

查看队列里的消息:  #因为消费者消费完了,所以队列里无消息

root@localhost ~]# cd /usr/local/rabbitmq/sbin/

[root@localhost sbin]# ./rabbitmqctl list_queues

Listing queues ...

hello 0

...done.

消费者ctrl+c停止消费消息,让生产者生产3条消息,测试rabbitmq服务重启后,队列丢失(未持久化)

生产者生产3条消息:

[root@localhost ~]# python3 producer.py

[x] send 'hello world!'

[root@localhost ~]# python3 producer.py

[x] send 'hello world!'

[root@localhost ~]# python3 producer.py

[x] send 'hello world!'

查看队列里的消息,有几条(3条)

[root@localhost ~]# cd /usr/local/rabbitmq/sbin/

[root@localhost sbin]# ./rabbitmqctl list_queues

Listing queues ...

hello  3        #有3条未消费的消息

...done.

重启rabbitmq服务

[root@localhost sbin]# ./rabbitmqctl stop

[root@localhost sbin]# ./rabbitmq-server --detached &

[root@localhost sbin]# ./rabbitmqctl list_queues

Listing queues ...

...done.                             #发现队列名都丢失了,消息更丢失了,未持久化。

注意:上面是生产者和消费者是一对一关系,一个生产者对应一个消费者。

当一个生产者对应多个消费者时: 生产者和消费者代码都不变,消费者用多个终端执行,就表示了启动了多个消费者。

5)python连接rabbitmq并建立生成者生产消息,消费者消费消息和(队列和消息持久化)

两个步骤:                                        #当rabbitmq服务重启后,队列和消息不丢失

1)在每个声明队列里都要加参数durable=True,但只是队列是持久化不能消息持久化。如:channel.queue_declare(queue='hello2',durable=True)

2)在生产者代码里的channel.basic_publish()中加下面参数:

                    properties=pika.BasicProperties(

                        delivery_mode=2)  #消息持久化

例子演示:

[root@localhost ~]# cat producer.py    #生产者生产消息

#!/usr/bin/env python3

import pika

connection = pika.BlockingConnection(  #通过创建一个实例建立一个socket

    pika.ConnectionParameters('localhost')    #python连接到本机的rabbitmq

    )

channel = connection.channel()   #声明一个管道,在管道里发消息

channel.queue_declare(queue='hello2',durable=True)  #在管道里声明一个queue队列,队列名叫hello,也可其他,生产消费两者都声明,避免不知哪个先启动时未声明,在hello后再加参数durable=True是队列持久化,消息不持久化

channel.basic_publish(exchange='',   #通过管道的basic_publish命令向队列里发消息,exchange先不管

                    routing_key='hello2', #要给哪个队列发消息,hello就是要给队列发消息的队列名

                    body='hello world!',  #要给队列发送的消息内容

                    properties=pika.BasicProperties(

                        delivery_mode=2  #消息持久化

                    )

                    )

print ("[x] send 'hello world!'")

connection.close()                       #发完消息后关闭队列,不用关闭管道

[root@localhost ~]# cat consumer.py      #消费者消费消息

#!/usr/bin/env python3

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  #创建一个实例建立一个socekt连接

channel = connection.channel()     #建立一个管道

channel.queue_declare(queue='hello2',durable=True) #从管道里,声明从哪个队列里收消息,hello是队列名,生产消费两者都声明,避免不知哪个先启动时未声明,在hello后再加参数durable=True是持久化,不加不会持久化

def callback(ch,method,properties,body):  #ch指管道的内存对象,后面几个可以不知道,不常用

    print ('-->',ch,method,properties)

    ch.basic_ack(delivery_tag=method.delivery_tag)  #默认不加下面的no_ack=True,需释放这个,手动去跟服务器确认消息

    print ('[x] Received %r' % body)

channel.basic_consume(       #通过管道的basic_consume方法消费消息

                    callback, #若果收到消息,就调用callback函数来处理消息

                    queue='hello2' #从哪个队列里消费消息,hello是队列名

                    )   #一般不加no_ack=True,默认要确认,这样消费者只有消费完消息后跟服务器缺认后,保证消息不丢失

print ('[*] waiting for messages, To exit press CTRL+C')

channel.start_consuming()       #这里才是真正消费消息,只要启动就一直运行了,不只消费一条,一直在消费

消费者先不启动,不消费消息,启动生产者生产2条消息:

[root@localhost ~]# python3 producer.py

[x] send 'hello world!'

[root@localhost ~]# python3 producer.py

[x] send 'hello world!'

[root@localhost sbin]# ./rabbitmqctl list_queues  #查看各队列里有多少条消息

[root@localhost sbin]#  ./rabbitmqctl list_queues

Listing queues ...

hello2 2        #队列里有两条消息

...done.

[root@localhost sbin]# ./rabbitmqctl stop              #停止rabbitmq

[root@localhost sbin]# ./rabbitmq-server --detached &  #启动rabbitmq

[root@localhost sbin]# ./rabbitmqctl list_queues    #查看各队列里数据,队列不会消失,但消息是丢失了

Listing queues ...

hello2 2       #重启后队列和消息仍然存在

...done.

[root@localhost sbin]# cd

启动消费者消费消息:消费了这2条消息

[root@localhost sbin]# cd

[root@localhost ~]# python3 consumer.py

[*] waiting for messages, To exit press CTRL+C

--> <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 48460, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.939e6959a2594d29a1f5365ff2fba76a', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello2'])> <BasicProperties(['delivery_mode=2'])>

[x] Received b'hello world!'

--> <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 48460, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.939e6959a2594d29a1f5365ff2fba76a', 'delivery_tag=2', 'exchange=', 'redelivered=False', 'routing_key=hello2'])> <BasicProperties(['delivery_mode=2'])>

[x] Received b'hello world!'

再启动一个终端查看队列里的消息

[root@localhost ~]# cd /usr/local/rabbitmq/sbin/

[root@localhost sbin]# ./rabbitmqctl list_queues

Listing queues ...

hello2 0

...done.

注意:上面是生产者和消费者是一对一关系,一个生产者对应一个消费者。

当一个生产者对应多个消费者时: 生产者和消费者代码都不变,消费者用多个终端执行,就表示了启动了多个消费者。

3.rabbitmq的生产者自动根据消费者消息快慢(处理能力)自动分配不同量的任务(上面的4)和5)都适用)(持久性)

上面4)或5)的生产者代码都不用变,只需在4)或5)的消费者代码channel.basic_consume()的上边添加一条:

channel.basic_qos(prefetch_count=1)即可,表示:让生产者知道,你等我消费完这一条再给我发下一条,相当于权重设置。

例子:

[root@localhost ~]# cat producer.py    #生产者生产消息

#!/usr/bin/env python3

import pika

connection = pika.BlockingConnection(  #通过创建一个实例建立一个socket

    pika.ConnectionParameters('localhost')    #python连接到本机的rabbitmq

    )

channel = connection.channel()   #声明一个管道,在管道里发消息

channel.queue_declare(queue='hello2',durable=True)  #在管道里声明一个queue队列,队列名叫hello,也可其他,生产消费两者都声明,避免不知哪个先启动时未声明,在hello后再加参数durable=True是队列持久化,消息不持久化

channel.basic_publish(exchange='',   #通过管道的basic_publish命令向队列里发消息,exchange先不管

                    routing_key='hello2', #要给哪个队列发消息,hello就是要给队列发消息的队列名

                    body='hello world!',  #要给队列发送的消息内容

                    properties=pika.BasicProperties(

                        delivery_mode=2  #消息持久化

                    )

                    )

print ("[x] send 'hello world!'")

connection.close()                       #发完消息后关闭队列,不用关闭管道

[root@localhost ~]# cat consumer.py  #消费者消费消息(处理快的,没加sleep)

#!/usr/bin/env python3

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  #创建一个实例建立一个socekt连接

channel = connection.channel()     #建立一个管道

channel.queue_declare(queue='hello2',durable=True) #从管道里,声明从哪个队列里收消息,hello是队列名,生产消费两者都声明,避免不知哪个先启动时未声明,在hello后再加参数durable=True是持久化,不加不会持久化

def callback(ch,method,properties,body):  #ch指管道的内存对象,后面几个可以不知道,不常用

    print ('-->',ch,method,properties)

    ch.basic_ack(delivery_tag=method.delivery_tag)  #默认不加下面的no_ack=True,需释放这个,手动跟服务器确认消息

    print ('[x] Received %r' % body)

channel.basic_qos(prefetch_count=1) #添加此内容是:让生产者知道,你等我消费完这一条再给我发下一条,相当于权重设置

channel.basic_consume(       #通过管道的basic_consume方法消费消息

                    callback, #若果收到消息,就调用callback函数来处理消息

                    queue='hello2' #从哪个队列里消费消息,hello是队列名

                    )   #一般不加no_ack=True,默认确认,消费者只有消费完消息后跟服务器缺认后,保证消息不丢失

print ('[*] waiting for messages, To exit press CTRL+C')

channel.start_consuming()       #这里才是真正消费消息,只要启动就一直运行了,不只消费一条,一直在消费

[root@localhost ~]# cat consumer2.py  #消费者消费消息(处理慢的,加了sleep)

#!/usr/bin/env python3

import pika

import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  #创建一个实例建立一个socekt连接

channel = connection.channel()     #建立一个管道

channel.queue_declare(queue='hello2',durable=True) #从管道里,声明从哪个队列里收消息,hello是队列名,生产消费两者都声明,避免不知哪个先启动时未声明,在hello后再加参数durable=True是持久化,不加不会持久化

def callback(ch,method,properties,body):  #ch指管道的内存对象,后面几个可以不知道,不常用

    print ('-->',ch,method,properties)

    time.sleep(50)   #睡觉50s,模拟处理消息慢,机器老

    ch.basic_ack(delivery_tag=method.delivery_tag)  #默认不加下面的no_ack=True,需释放这个,手动跟服务器确认消息

    print ('[x] Received %r' % body)

channel.basic_qos(prefetch_count=1) #添加此内容是:让生产者知道,你等我消费完这一条再给我发下一条,相当于权重设置

channel.basic_consume(       #通过管道的basic_consume方法消费消息

                    callback, #若果收到消息,就调用callback函数来处理消息

                    queue='hello2' #从哪个队列里消费消息,hello是队列名

                    )   #一般不加no_ack=True,默认确认,消费者只有消费完消息后跟服务器缺认后,保证消息不丢失

print ('[*] waiting for messages, To exit press CTRL+C')

channel.start_consuming()       #这里才是真正消费消息,只要启动就一直运行了,不只消费一条,一直在消费

[root@localhost ~]# python3 consumer.py            #启动消费者1消费消息(消费快的)

[*] waiting for messages, To exit press CTRL+C

……

[root@localhost ~]# python3 consumer2.py           #启动消费者2消费消息(消费慢的)

[*] waiting for messages, To exit press CTRL+C

……

[root@localhost ~]# python3 producer.py            #生产者生产消息

[x] send 'hello world!'

此处说明一下:生产者先生产消息,默认会轮询给两个消费者发消息,让消费者消费消息,但是,消费者的处理能力不同,消费者快的就多处理几条,消费者处理慢的,还没有处理完成时候,生产者就不会给该消费者发。

4.rabbitmq中生产者和消费者的队列两个机制:

1)生产者和消费者的队列轮询机制:

当生产者生产了数据时,发给多个消费者时候,默认是轮询的,发第一条数据会给第一个最先启动的消费者,生成第2条消息时候,会轮询到下一个消费者消费,依次类推。

2)生产者和消费者队列的确认机制:

默认情况下,生产者给消费者发消息,消费者消费消息后,会跟服务器确认是否消费了消息,消费者代码中一般不加参数:no_ack=True(表示不确认),即使加参数也是为False,表示:消费者只有消费完消息后跟服务器缺认后,才让服务器从队列里删除该消息,否则如果没有确认已经消费了消息就删除消息,可能会丢失消息,这样的机制能保证消息不丢失。

5.rabbitmq中消息队列的持久化设置 (在每次声明消息队列时候(每个队列都要加),加参数:durable=True)

两个步骤:

1)在每个声明队列里都要加参数durable=True,但只是队列是持久化不能消息持久化。如:channel.queue_declare(queue='hello2',durable=True)

2)在生产者代码里的channel.basic_publish()括号中除了之前的参数,再添加下面参数:

                    properties=pika.BasicProperties(

                        delivery_mode=2)  #消息持久化

6.rabbitmq的广播方式发消息(fanout模式)   

场景:类似于收音机收听广播,一个广播,多个接收.

[root@localhost ~]# cat producer.py    #生产者生产消息

#!/usr/bin/env python3

import pika

import sys

connection = pika.BlockingConnection(  #通过创建一个实例建立一个socket

    pika.ConnectionParameters('localhost')    #python连接到本机的rabbitmq

    )

channel = connection.channel()   #声明一个管道,在管道里发消息

channel.exchange_declare('logs','fanout')  #logs是随便起的一个名字,后面定义广播类型

message = ''.join(sys.argv[1:]) or 'info: hello world'

channel.basic_publish(exchange='logs',   #通过管道的basic_publish命令向队列里发消息,exchange先不管

                    routing_key='', #不写队列名,前面也没声明队列,因为是广播,是给所有队列发

                    body=message  #要给队列发送的消息内容

                    )

print ("[x] send %r" % message)

connection.close()                       #发完消息后关闭队列,不用关闭管道

[root@localhost ~]# cat consumer.py   #消费者消费消息

#!/usr/bin/env python3

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  #创建一个实例建立一个socekt连接

channel = connection.channel()     #建立一个管道

channel.exchange_declare('logs','fanout') #定义广播类型

result = channel.queue_declare(exclusive=True) #不指定队列名字,rabbit会随机分配一个名字,exclusive=True会在使用此队列的消费者断开后,自动将队列删除

queue_name = result.method.queue  #获取随机分配的队列名

print ('random queuename',queue_name)

channel.queue_bind(exchange='logs',queue=queue_name)  #绑定到转发器上,只收该转发器消息:logs,转发器会转到相应队列里,所以还需绑定队列名

print ('[*] waiting for messages, To exit press CTRL+C')

def callback(ch,method,properties,body):  #ch指管道的内存对象,后面几个可以不知道,收到消息后通过该函数处理

    ch.basic_ack(delivery_tag=method.delivery_tag)  #默认不加下面的no_ack=True,需释放这个,手动去跟服务器确认消息

    print ('[x] %r' % body)

channel.basic_qos(prefetch_count=1)   #添加此内容是:让生产者知道,你等我消费完这一条再给我发下一条,相当于权重设置

channel.basic_consume(       #通过管道的basic_consume方法消费消息

                    callback, #若果收到消息,就调用callback函数来处理消息

                    queue=queue_name #从哪个队列里消费消息,是随机分配的队列名

                    )   #一般不加no_ack=True,默认要确认,这样消费者只有消费完消息后跟服务器缺认后,保证消息不丢失

channel.start_consuming()       #这里才是真正消费消息,只要启动就一直运行了,不只消费一条,一直在消费

开启3个终端运行消费者代码,让消费者消费消息:

[root@localhost ~]# python3 consumer.py

random queuename amq.gen-q2upwaJLwMtxNEqD7iZ_eQ

[*] waiting for messages, To exit press CTRL+C

……

[root@localhost ~]# python3 consumer.py

random queuename amq.gen-519fqCe-Tx9sU--xTss5xw

[*] waiting for messages, To exit press CTRL+C

……

[root@localhost ~]# python3 consumer.py

random queuename amq.gen-BjmlqcjlG5vxEZMzuNxckQ

[*] waiting for messages, To exit press CTRL+C

……

运行生产者代码,生产者生产2条消息:(广播方式)

[root@localhost ~]# python3 producer.py

[x] send 'info: hello world'

[root@localhost ~]# python3 producer.py

[x] send 'info: hello world'

查看3个终端各个消费者消费的消息,3个终端都接收了2条消息消费了

[root@localhost ~]# python3 consumer.py

random queuename amq.gen-q2upwaJLwMtxNEqD7iZ_eQ

[*] waiting for messages, To exit press CTRL+C

[x] b'info: hello world'

[x] b'info: hello world'

[root@localhost ~]# python3 consumer.py

random queuename amq.gen-519fqCe-Tx9sU--xTss5xw

[*] waiting for messages, To exit press CTRL+C

[x] b'info: hello world'

[x] b'info: hello world'

[root@localhost ~]# python3 consumer.py

random queuename amq.gen-BjmlqcjlG5vxEZMzuNxckQ

[*] waiting for messages, To exit press CTRL+C

[x] b'info: hello world'

[x] b'info: hello world'

7.rabbitmq的广播过滤方式发消息(direct模式)   

生产者生产各种类型的消息和内容,消费者根据类型进行收取消息(过滤不细致)

[root@localhost ~]# cat producer.py     #生产者生产消息

#!/usr/bin/env python3

import pika

import sys

connection = pika.BlockingConnection(  #通过创建一个实例建立一个socket

    pika.ConnectionParameters('localhost')    #python连接到本机的rabbitmq

    )

channel = connection.channel()   #声明一个管道,在管道里发消息

channel.exchange_declare('direct_logs','direct')  #direct_logs是随便起的一个名字,定义广播类型,带过滤的广播

severity = sys.argv[1] if len(sys.argv) > 1 else 'info' #位置1自定义要过滤消息的类型,手动传递参数,若没传,就是只接收info类型的消息

message = ' '.join(sys.argv[2:]) or 'hello world'      #位置2定义该消息类型下的消息内容

channel.basic_publish(exchange='direct_logs',   #通过管道的basic_publish命令向队列里发消息,exchange先不管

                    routing_key=severity, #不写队列名,前面也没声明队列,因为是广播的过滤,是给所有队列发

                    body=message  #要给队列发送的消息内容

                    )

print ("[x] send %r:%r" % (severity,message))

connection.close()                       #发完消息后关闭队列,不用关闭管道

[root@localhost ~]# cat consumer.py    #消费者消费消息

#!/usr/bin/env python3

import pika

import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  #创建一个实例建立一个socekt连接

channel = connection.channel()     #建立一个管道

channel.exchange_declare('direct_logs','direct') #定义广播类型,是广播的过滤

result = channel.queue_declare(exclusive=True) #不指定队列名字,rabbit会随机分配一个名字,exclusive=True会在使用此队列的消费者断开后,自动将队列删除

queue_name = result.method.queue  #获取随机分配的队列名

print ('random queuename',queue_name)

severities = sys.argv[1:]   #获取用户位置参数传递的要过滤的条件,收什么类型的消息

if not severities:          #若用户没有在位置参数设置过滤条件,就报错并退出

    sys.stderr.write('Usage: %s [info] [warning] [error]\n' % sys.argv[0])

    sys.exit(1)

for severity in severities:    #循环绑定用户设置的过滤条件参数,所有发到severity参数的消息我都接收,实现过滤

    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity) #绑定到转发器上,只收过滤条件消息

print ('[*] waiting for messages, To exit press CTRL+C')

def callback(ch,method,properties,body):  #ch指管道的内存对象,后面几个可以不知道,不常用

    ch.basic_ack(delivery_tag=method.delivery_tag)  #默认不加下面的no_ack=True,需释放这个,手动去跟服务器确认消息

    print ('[x] %r:%r' % (method.routing_key,body))

channel.basic_qos(prefetch_count=1)   #添加此内容是:让生产者知道,你等我消费完这一条再给我发下一条,相当于权重设置

channel.basic_consume(       #通过管道的basic_consume方法消费消息

                    callback, #若果收到消息,就调用callback函数来处理消息

                    queue=queue_name #从哪个队列里消费消息,随机获取的队列名

                    )   #一般不加no_ack=True,默认要确认,这样消费者只有消费完消息后跟服务器缺认后,保证消息不丢失

channel.start_consuming()       #这里才是真正消费消息,只要启动就一直运行了,不只消费一条,一直在消费

各类消费者根据消息类型,收取不同的消息内容:

[root@localhost ~]# python3 consumer.py info              #消费者1仅收取info内容的消息

random queuename amq.gen-raLh9PEIVYaLGKYWLj8Tig

[*] waiting for messages, To exit press CTRL+C

[root@localhost ~]# python3 consumer.py error             #消费者2仅收取error内容的消息

random queuename amq.gen-AnCJ50AX--AlmJnsqWzpvw

[*] waiting for messages, To exit press CTRL+C

[root@localhost ~]# python3 consumer.py info error        #消费者3收取info和error内容的消息

random queuename amq.gen-kCyvuVapsjDIebC3nmd9eg

[*] waiting for messages, To exit press CTRL+C

[root@localhost ~]# python3 consumer.py warning           #消费者4仅收取warning内容的消息

random queuename amq.gen-wTkdfpdv2VF1qBhXYOtaTA

[*] waiting for messages, To exit press CTRL+C

生产者发送各种类型的消息类型和消息内容:

[root@localhost ~]# python3 producer.py info content1     #发送info类型的消息,内容是:content1

[x] send 'info':'content1'

[root@localhost ~]# python3 producer.py error content_2   #发送error类型的消息,内容是:content_2

[x] send 'error':'content_2'

[root@localhost ~]# python3 producer.py warning content_3   #发送warning类型的消息,内容是:content_3

[x] send 'warning':'content_3'

各不同类型消费者的收取消费者情况:

[root@localhost ~]# python3 consumer.py info        #消费者1仅收取info内容的消息

random queuename amq.gen-raLh9PEIVYaLGKYWLj8Tig

[*] waiting for messages, To exit press CTRL+C

[x] 'info':b'content1'

[root@localhost ~]# python3 consumer.py error      #消费者2仅收取error内容的消息

random queuename amq.gen-AnCJ50AX--AlmJnsqWzpvw

[*] waiting for messages, To exit press CTRL+C

[x] 'error':b'content_2'

[root@localhost ~]# python3 consumer.py info error  #消费者3收取info和error内容的消息

random queuename amq.gen-kCyvuVapsjDIebC3nmd9eg

[*] waiting for messages, To exit press CTRL+C

[x] 'info':b'content1'

[x] 'error':b'content_2'

[root@localhost ~]# python3 consumer.py warning   #消费者4仅收取warning内容的消息

random queuename amq.gen-wTkdfpdv2VF1qBhXYOtaTA

[*] waiting for messages, To exit press CTRL+C

[x] 'warning':b'content_3'

7.rabbitmq的广播细致过滤方式发消息(topic模式)  

生产者生产各种类型的消息和内容,消费者根据类型进行收取消息(过滤细致)

[root@localhost ~]# cat producer.py   #生产者生产消息

#!/usr/bin/env python3

import pika

import sys

connection = pika.BlockingConnection(  #通过创建一个实例建立一个socket

    pika.ConnectionParameters('localhost')    #python连接到本机的rabbitmq

    )

channel = connection.channel()   #声明一个管道,在管道里发消息

channel.exchange_declare('topic_logs','topic')  #topic_logs是随便起的一个名字,定义广播类型,细致过滤

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' #位置1自定义要过滤消息的类型,手动传递参数,若没传,就是只接收anonymous.info类型的消息

message = ' '.join(sys.argv[2:]) or 'hello world'      #位置2定义该消息类型下的消息内容

channel.basic_publish(exchange='topic_logs',   #通过管道的basic_publish命令向队列里发消息,exchange先不管

                    routing_key=routing_key, #不写队列名,前面也没声明队列,因为是广播的过滤,是给所有队列发

                    body=message  #要给队列发送的消息内容

                    )

print ("[x] send %r:%r" % (routing_key,message))

connection.close()                       #发完消息后关闭队列,不用关闭管道

[root@localhost ~]# cat consumer.py   #消费者消费消息

#!/usr/bin/env python3

import pika

import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  #创建一个实例建立一个socekt连接

channel = connection.channel()     #建立一个管道

channel.exchange_declare('topic_logs','topic') #定义广播类型,是广播的细致过滤

result = channel.queue_declare(exclusive=True) #不指定队列名字,rabbit会随机分配一个名字,exclusive=True会在使用此队列的消费者断开后,自动将队列删除

queue_name = result.method.queue  #获取随机分配的队列名

print ('random queuename',queue_name)

binding_keys = sys.argv[1:]   #获取用户位置参数传递的要过滤的条件,过滤的消息类型

if not binding_keys:          #若用户没有在位置参数设置过滤条件,就报错并退出

    sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0])

    sys.exit(1)

for binding_key in binding_keys:  #循环绑定用户设置过滤条件参数,所有发到severity参数的消息我都接收,实现过滤

    channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key) #绑定到转发器上,只收过滤条件消息

print ('[*] waiting for messages, To exit press CTRL+C')

def callback(ch,method,properties,body):  #ch指管道的内存对象,后面几个可以不知道,不常用

    ch.basic_ack(delivery_tag=method.delivery_tag)  #默认不加下面的no_ack=True,需释放这个,手动去跟服务器确认消息

    print ('[x] %r:%r' % (method.routing_key,body))

channel.basic_qos(prefetch_count=1)   #添加此内容是:让生产者知道,你等我消费完这一条再给我发下一条,相当于权重设置

channel.basic_consume(       #通过管道的basic_consume方法消费消息

                    callback, #若果收到消息,就调用callback函数来处理消息

                    queue=queue_name #从哪个队列里消费消息,hello是队列名

                    )   #一般不加no_ack=True,默认要确认,这样消费者只有消费完消息后跟服务器缺认后,保证消息不丢失

channel.start_consuming()       #这里才是真正消费消息,只要启动就一直运行了,不只消费一条,一直在消费

各类消费者根据消息类型细致过滤,收取不同的消息内容:

[root@localhost ~]# python3 consumer.py *.info          #消费者1仅收取以info结尾类型的消息

random queuename amq.gen-fbvaYf-zB0sQaU2_hflTfQ

[*] waiting for messages, To exit press CTRL+C

[root@localhost ~]# python3 consumer.py *.error mysql.*  #消费者2收取以error结尾和以mysql开头类型的消息

random queuename amq.gen-6ivXWiP1DToTkh0KqxRGWg

[*] waiting for messages, To exit press CTRL+C

[root@localhost ~]# python3 producer.py

[x] send 'anonymous.info':'hello world'

生产者生产各类型的消息和内容:

[root@localhost ~]# python3 producer.py           #生产的消息类型:info,内容是:hello world,不传参数时默认

[x] send 'anonymous.info':'hello world'

[root@localhost ~]# python3 producer.py *.error  error_message  #生产的消息类型:*.info,内容是: error_message

[x] send '*.error':'error_message'

[root@localhost ~]# python3 producer.py mysql.* mysql__message  #生产的消息类型:mysql.*,内容是: mysql_message

[x] send 'mysql.*':'mysql__message'

查看各类消费者收到的消息:

[root@localhost ~]# python3 consumer.py *.info     #消费者1仅收取以info结尾类型的消息

random queuename amq.gen-3KE-xS917CV-x1RhCVolMg

[*] waiting for messages, To exit press CTRL+C

[x] 'anonymous.info':b'hello world'

[root@localhost ~]# python3 consumer.py *.error mysql.*   #消费者2收取以error结尾和以mysql开头类型的消息

random queuename amq.gen-bcW0w9u3ymGD8kQH-xyDDg

[*] waiting for messages, To exit press CTRL+C

[x] '*.error':b'error_message'

[x] 'mysql.*':b'mysql__message'

如果对运维课程感兴趣,可以在b站上、A站或csdn上搜索我的账号: 运维实战课程,可以关注我,学习更多免费的运维实战技术视频

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/9171.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu 更新24LTS中断导致“系统出错且无法恢复,请联系系统管理员”

22LTS to 24LTS 更新过程中手jian把更新程序controlC导致的。 解决 目前企图完成更新来恢复&#xff0c;重启后有软件包冲突&#xff0c;sudo apt upgrade报冲突。无法进行。 将原来source.list重新 sudo dpkg --configure -a sudo apt install -f 这些都不管用。还是显示gno…

Databend x 沉浸式翻译 | 基于 Databend Cloud 构建高效低成本的业务数据分析体系

「沉浸式翻译」是一个非常流行的双语对照网页翻译扩展工具&#xff0c;用户可以用它来即时翻译外文网页、PDF 文档、ePub 电子书、字幕等。它不仅可以实现原文加译文实时双语对照显示&#xff0c;还支持 Google、OpenAI、DeepL、微软、Gemini、Claude 等数十家翻译平台服务的自…

LabVIEW纤维集合体微电流测试仪

LabVIEW开发纤维集合体微电流测试仪。该设备精确测量纤维材料在特定电压下的电流变化&#xff0c;以分析纤维的结构、老化及回潮率等属性&#xff0c;对于纤维材料的科学研究及质量控制具有重要意义。 ​ 项目背景 在纤维材料的研究与应用中&#xff0c;电学性能是评估其性能…

哈工大:LLM高质量嵌入模型KaLM-Embedding

&#x1f4d6;标题&#xff1a;KaLM-Embedding: Superior Training Data Brings A Stronger Embedding Mode &#x1f310;来源&#xff1a;arXiv, 2501.01028 &#x1f31f;摘要 &#x1f538;随着检索增强生成在大型语言模型中的盛行&#xff0c;嵌入模型变得越来越重要。尽…

Baklib打造高效内容管理平台提升协作与创作体验

内容概要 随着信息化时代的迅猛发展&#xff0c;高效的内容管理已成为各类团队日常工作中不可或缺的一部分。Baklib的内容管理平台正是应运而生&#xff0c;旨在为团队提供一个集协作、创作与管理于一体的解决方案。该平台通过其直观的界面和一系列创新功能&#xff0c;帮助用…

通过高效的侦察发现关键漏洞接管整个IT基础设施

视频教程在我主页简介或专栏里 在这篇文章中&#xff0c; 我将深入探讨我是如何通过详细分析和利用暴露的端点、硬编码的凭据以及配置错误的访问控制&#xff0c;成功获取目标组织关键IT基础设施和云服务访问权限的全过程。 我们先提到目标网站的名称 https://*sub.domain*.co…

【Linux笔记】Day4

关机重启 登录注销 注意用户名只能是小写 用户管理 pwd显示当前在哪个目录下&#xff1a; 删除用户 连带着用户文件夹都删掉 高权限用户切换到低权限用户无需输入密码 clear能把当前屏幕显示清掉 如果之前加的用户没有指定组&#xff0c;它会创建一个和用户同名的组&a…

A7. Jenkins Pipeline自动化构建过程,可灵活配置多项目、多模块服务实战

服务容器化构建的环境配置构建前需要解决什么下面我们带着问题分析构建的过程:1. 如何解决jenkins执行环境与shell脚本执行环境不一致问题?2. 构建之前动态修改项目的环境变量3. 在通过容器打包时避免不了会产生比较多的不可用的镜像资源,这些资源要是不及时删除掉时会导致服…

一个简单的自适应html5导航模板

一个简单的 HTML 导航模板示例&#xff0c;它包含基本的导航栏结构&#xff0c;同时使用了 CSS 进行样式美化&#xff0c;让导航栏看起来更美观。另外&#xff0c;还添加了一些 JavaScript 代码&#xff0c;用于在移动端实现导航菜单的展开和收起功能。 PHP <!DOCTYPE htm…

Winform如何取消叉号,减号和放大(两种)

方法一: 找到窗体属性 MaximizeBoxFalse; MinimizeBoxFalse; ControlBoxFALSE; 方法二: 点击Form 在From里面找到Form-Closing 这个事件 写入 if(e.CloseReasonCloseReason.UserClosing) { MessageBox.Show("对不起,你不能关闭") e.Cancel true; }

OpenCV:开运算

目录 1. 简述 2. 用腐蚀和膨胀实现开运算 2.1 代码示例 2.2 运行结果 3. 开运算接口 3.1 参数详解 3.2 代码示例 3.3 运行结果 4. 开运算应用场景 5. 注意事项 6. 总结 相关阅读 OpenCV&#xff1a;图像的腐蚀与膨胀-CSDN博客 OpenCV&#xff1a;闭运算-CSDN博客 …

MIMIC-IV数据部署(博主较忙,缓慢更新)

1. 用到的数据准备 在下面的网站&#xff0c;注册、申请、推荐人从邮箱里帮忙确认。 通过后&#xff0c;拉到页面的最下面。把那个将近10个G的文件给下载下来。 可以在晚上睡觉的时候下载&#xff0c;第二天早上起来“收数据”。 MIMIC-IV v3.1 2. 用到的软件准备 7-zip …

二叉树-堆(补充)

二叉树-堆 1.二叉树的基本特性2.堆2.1.堆的基本概念2.2.堆的实现2.2.1.基本结构2.2.2.堆的初始化2.2.3.堆的销毁2.2.4.堆的插入2.2.5.取出堆顶的数据2.2.6.堆的删除2.2.7.堆的判空2.2.8.堆的数据个数2.2.9.交换2.2.10.打印堆数据2.2.11.堆的创建2.2.12.堆排序2.2.13.完整代码 3…

省市区三级联动

引言 在网页中&#xff0c;经常会遇到需要用户选择地区的场景&#xff0c;如注册表单、地址填写等。为了提供更好的用户体验&#xff0c;我们可以实现一个三级联动的地区选择器&#xff0c;让用户依次选择省份、城市和地区。 效果展示&#xff1a; 只有先选择省份后才可以选择…

【数据结构】动态内存管理函数

动态内存管理 为什么存在动态内存管理动态内存函数的介绍&#x1f38a;malloc补充&#xff1a;perror函数&#x1f38a;free&#x1f38a;calloc&#x1f38a;realloc 常见动态内存错误对空指针的解引用操作对动态开辟空间的越界访问对非动态开辟内存使用free释放使用free释放一…

【xcode 16.2】升级xcode后mac端flutter版的sentry报错

sentry_flutter 7.11.0 报错 3 errors in SentryCrashMonitor_CPPException with the errors No type named terminate_handler in namespace std (line 60) and No member named set_terminate in namespace std 替换sentry_flutter版本为&#xff1a; 8.3.0 从而保证oc的…

Julius AI 人工智能数据分析工具介绍

Julius AI 是一款由 Casera Labs 开发的人工智能数据分析工具&#xff0c;旨在通过自然语言交互和强大的算法能力&#xff0c;帮助用户快速分析和可视化复杂数据。这款工具特别适合没有数据科学背景的用户&#xff0c;使数据分析变得简单高效。 核心功能 自然语言交互&#x…

智慧园区系统集成解决方案构建智能管理新模式与发展蓝图

内容概要 在当今快速发展的科技环境中&#xff0c;智慧园区系统集成解决方案为园区的管理和运营提供了一种全新的思路。这种解决方案通过集合先进的核心技术&#xff0c;帮助各种园区实现智能化管理&#xff0c;从而提高运营效率。对于工业园、产业园、物流园、写字楼乃至公寓…

AI软件外包需要注意什么 外包开发AI软件的关键因素是什么 如何选择AI外包开发语言

1. 定义目标与需求 首先&#xff0c;要明确你希望AI智能体做什么。是自动化任务、数据分析、自然语言处理&#xff0c;还是其他功能&#xff1f;明确目标可以帮助你选择合适的技术和方法。 2. 选择开发平台与工具 开发AI智能体的软件时&#xff0c;你需要选择适合的编程语言、…

设计模式面试题

一、工厂方法模式: 1.简单工厂模式: (1).抽象产品:定义了产品的规范&#xff0c;描述了产品的主要特性和功能 (2).具体产品:实现或继承抽象产品的子类 (3).具体工厂:提供了创建产品的方法&#xff0c;调用者通过该方法来获取产品 所有产品都共有一个工厂&#xff0c;如果新…