您好,登录后才能下订单哦!
RabbitMQ是一种消息队列,与线程queue和进程QUEUE作用是一样的。
RabbitMQ是一个中间程序,可以实现不同进程之间的通信(比如python和Java之间,QQ和Word之间等);
普通情况下A进程与B进程之间通信,两者之间需要建立很多连接和单独写一些代码,但是使用RabbitMQ的话就可以实现帮助不同进程之间的数据通信。
A进程交给RabbitMQ,RabbitMQ在交给B,同样B交给RabbitMQ,RabbitMQ在交给A,RabbitMQ可以实现A与B进程之间的连接和信息转换。
使用RabbitMQ可以实现很多个独立进程之间的交互,所有其他独立进程都可以用RabbitMQ作为中间程序。
py 消息队列:
线程 queue(同一进程下线程之间进行交互)
进程 Queue(父子进程进行交互 或者 同属于同一进程下的多个子进程进行交互)
如果是两个完全独立的python程序,也是不能用上面两个queue进行交互的,或者和其他语言交互有哪些实现方式呢。
【Disk、Socket、其他中间件】这里中间件不仅可以支持两个程序之间交互,可以支持多个程序,可以维护好多个程序的队列。
虽然可以通过硬盘的方式实现多个独立进程交互,但是硬盘速度比较慢,而RabbitMQ则能够很好的、快速的帮助两个独立进程实现交互。
像这种公共的中间件有好多成熟的产品:
RabbitMQ
ZeroMQ
ActiveMQ
……
RabbitMQ:erlang语言 开发的。
Python中连接RabbitMQ的模块:pika 、Celery(分布式任务队列) 、haigha
可以维护很多的队列
其中pika是RabbitMQ常用的模块
RabbitMQ 教程官网:http://www.rabbitmq.com/getstarted.html
几个概念说明:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

RabbitMQ不像之前学的python Queue都在一个队列里实现交互,RabbitMQ有多个队列(图中红色部分代表队列),每个队列都可以将消息发给多个接收端(C是接收端,P是生产消息端)
1、Rabbitmq 安装
Windos系统
pip install pika
ubuntu系统
install  rabbitmq-server  # 直接搞定
以下centos系统
1)Install Erlang
# For EL5:
rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
# For EL6:
rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
# For EL7:
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm
yum install erlang
2)Install RabbitMQ Server
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpm
3)use RabbitMQ Server
chkconfig rabbitmq-server on
service rabbitmq-server stop/start
或者
rabbitmq-server start

rabbitmq已经开启,等待传输
2、基本示例
发送端 producer
import pika
# 建立一个实例;相当于建立一个socket。
#通过ctrl+ConnectionParameters可以看到能传很多参数,如果远程还可以传用户名密码。
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  # 默认端口5672,可不写
    )
# 声明一个管道,在管道里发消息
channel = connection.channel()
# 在管道里声明一个叫hello的queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                      routing_key='hello',  # queue名字,将消息发给hello这个queue
                      body='Hello World!')  # 消息内容
print(" [x] Sent 'Hello World!'")
connection.close()  # 发完消息后关闭队列 
执行结果:
[x] Sent 'Hello World!'
注意一定要开启rabbitmq,否则会报错
接收端 consumer
import pika
import time
# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 声明管道
channel = connection.channel()
# 为什么又声明了一个‘hello’队列?
# 如果这个queue确定已经声明了,可以不声明。但是你不知道是生产者还是消费者先运行,所以要声明两次。如果消费者没声明,且消费者先运行的话,就会报错。
# 生产者先声明queue,消费者不声明,但是消费者后运行就不会报错。
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):  # 四个参数为标准格式
    print(ch, method, properties)  # 打印看一下是什么
    # ch是管道内存对象地址;method是内容相关信息  properties后面讲  body消息内容
    print(" [x] Received %r" % body)
    #time.sleep(15)
    #ch.basic_ack(delivery_tag = method.delivery_tag)  
channel.basic_consume(  # 消费消息
        'hello',  # 你要从哪个队列里收消息 
        callback,  # 如果收到消息,就调用callback函数来处理消息  # 注意调用的函数(callback)以前在basic_consume模块是放在形参第一个位置的,后面修改到第二个位置了,如果放错位置会报错
        # auto_ack=True  # 写的话,如果接收消息,机器宕机消息就丢了
        # 一般不写。宕机则生产者检测到发给其他消费者
        )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 开始消费消息(开始接收消息,一直收,如果没消息就卡主在这里了)
执行结果:
 [*] Waiting for messages. To exit press CTRL+C
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f715d76f128> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.b728277178e746118699d5b4302a0314', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])> <BasicProperties>
 [x] Received b'Hello World!'
收到了bytes格式的 Hello World!

消费者(接收端)这边看到已经卡主了

如果此时单独在运行一下生产者(发送端),直接可以从消费者看到新收到的消息

重新开启rabbitmq

运行三个接收者(消费者)

运行发送者,可以看到被第一个接收者给收到信息了

第二次运行发送者,第二个接收者收到信息了

第三次运行发送者,第三个接收者收到信息了
上面几次运行说明了,依次的将信息发送每一个接收者
接收端 consumer
import pika
import time
# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 声明管道
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):  
    print(ch, method, properties) 
    print(" [x] Received %r" % body)
    # 正常回调函数(callback)执行完成就表示信息接收完成,如果在还没执行完成时就出现异常就表示信息没有正常接收,比如断网、断电等,会导致信息不能正常接收。
    # 下面sleep 60秒,在60秒之前就将该模块终止执行来模拟异常情况。
    time.sleep(60)  
    #ch.basic_ack(delivery_tag = method.delivery_tag)  
channel.basic_consume(  
        'hello',  
        callback, 
        # auto_ack=True 表示不管消息是否接收(处理)完成,都不会回复确认消息
        # 如果producer不关心 comsumer是否处理完,可以使用该参数
        # 但是一般都不会使用它
        )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 

在centos中重新执行rabbitmq-server start来清空队列里的消息
然后在pycharm开启三个comsumer,在去运行等待接收消息
再去执行producer来发送消息,执行producer后,立即关闭第一个comsumer,这样消息就会因为第一个comsumer没接收成功跑到第二个comsumer去,以此类推。

关闭第二个comsumer,第三个comsumer收到信息

这张图是将三个comsumer同时都关闭了,这样三个comsumer都收不到消息,说明producer的消息没有被接收,此时再去开启第一个comsumer,这时第一个comsumer会将消息给接收过来。
我们将sleep注释掉,也是这种现象,这是因为comsumer并没有发送确认消息给producer
import pika
import time
# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 声明管道
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):  
    print(ch, method, properties) 
    print(" [x] Received %r" % body)
    time.sleep(30)  
    ch.basic_ack(delivery_tag = method.delivery_tag)   # 告诉生成者,消息处理完成
channel.basic_consume(  
        'hello',  
        callback, 
        # auto_ack=True  
        )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 
此时的代码:当其中一个comsumer执行完成,并发送确认消息后再去中断,下一个comsumer就不会收到消息;反之,如果还没发送确认消息就中断了,那么消息就会被下一个comsumer接收到。
如果producer端宕机,那么队列的数据也会消失;这样就需要让队列消息持久化
# durable=True 该代码只是将生成的队列持久化(不是消息),如果producer宕机,队列会存在,单消息会丢
# 要注意需要在producer端和 comsumer端都要 写durable=True
channel.queue_declare(queue='hello',durable=True) 
在centos重新开启 rabbitmq-server start
在producer端
将producer代码执行三次,将三个消息放入队列
import pika
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  
    )
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
channel.basic_publish(exchange='',
                      routing_key='hello',  
                      body='Hello World!',
                      # 下面的代码是让消息持久化
                      properties = pika.BasicProperties(delivery_mode=2)
                      )  
print(" [x] Sent 'Hello World!'")
connection.close()  
将producer代码执行三次,将三个消息放入队列
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
def callback(ch, method, properties, body): 
    print(ch, method, properties) 
    print(" [x] Received %r" % body)
    # time.sleep(30) #注释掉
    ch.basic_ack(delivery_tag = method.delivery_tag)  
channel.basic_consume( 
        'hello', 
        callback
        )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 

可以看到因为producer执行了三次,所以运行comsumer端收到了三条消息
producer端没有改变
import pika
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  
    )
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
channel.basic_publish(exchange='',
                      routing_key='hello',  
                      body='Hello World!',
                      properties = pika.BasicProperties(delivery_mode=2)
                      )  
print(" [x] Sent 'Hello World!'")
connection.close()  
comsumer 1(消费者:1)
import pika 
import time 
connection = pika.BlockingConnection(pika.ConnectionParameters( 
               'localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='hello',durable=True) 
def callback(ch, method, properties, body):  
    print(ch, method, properties)  
    print(" [x] Received %r" % body) 
    # time.sleep(30) #注释掉 
    ch.basic_ack(delivery_tag = method.delivery_tag)   
# channel.basic_qos可以使其消费者最多同时多少个消息;如果其中一个消费者处理慢(如:CPU处理性能低下),达到了最多处理的限制的话 生产者就不会再发送给该消费者。
channel.basic_qos(prefetch_count=1)  #这里限制最多同时只处理1个消息
channel.basic_consume(  
        'hello',  
        callback 
        ) 
print(' [*] Waiting for messages. To exit press CTRL+C') 
channel.start_consuming()  #  

此时有两个comsumer模块,comsumer2比comsumer1多用了sleep 30秒来模拟性能处理慢的情况
comsumer 2(消费者:2)
复制一个comsumer模块为comsumer2
import pika 
import time 
connection = pika.BlockingConnection(pika.ConnectionParameters( 
               'localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='hello',durable=True) 
def callback(ch, method, properties, body):  
    print(ch, method, properties)  
    print(" [x] Received %r" % body) 
    time.sleep(30) #comsumer2这里sleep30秒
    ch.basic_ack(delivery_tag = method.delivery_tag)   
channel.basic_qos(prefetch_count=1)  
channel.basic_consume(  
        'hello',  
        callback 
        ) 
print(' [*] Waiting for messages. To exit press CTRL+C') 
channel.start_consuming()  #  


我们运行两个comsumer后,一直去运行producer。 可以看到comsumer 1接收到了3条信息,而comsumer 2只接收到了1条信息,这是因为comsumer 2 sleep了30秒来模拟信息处理慢的情况;
comsumer 1 和 comsumer 2都指定了同时只能处理1条信息,producer会与comsumer 2协商,因为comsumer2一直没有处理完限制的1条信息,所以信息都被comsumer 1处理了。

新建fanout_publiser模块,用于发送广播的producer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()
# 定义一个转发器叫logs,属于一个中间人的角色,用于将producer的消息转发给消费者(comsumer)
# 定义广播类型使用fanout
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# message = ''.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)   # routing_key为空即可,因为是广播没有定义队列,所以也不需要指定队列,但这里必须要定义为空
print(" [x] Send %r " % message)
connection.close()
新建fanout_consumer模块,用于接收广播的消费者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费断开后,自动将queue删除
# 就是这里会随机生成一个随机的唯一queue,用完之后会将生成的queue删除
# 这里要写queue='',如果不指定队列名字,但也要写一个空的字符串,不然会报错缺少参数
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue  # 拿到随机生成的queue名字
# producer绑定了logs转发器
# 消费者将随机生成的队列也绑定了logs转发器
# 这样producer将消息交给logs转发器,logs转发器将消息交给对应绑定的随机队列,消费者从队列里在拿消息
channel.queue_bind(exchange='logs',queue=queue_name)
print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print("[x] %r" % body)
channel.basic_consume(
    queue=queue_name, on_message_callback=callback
    # auto_ack=True  # 写的话,如果接收消息,机器宕机消息就丢了
                        )
channel.start_consuming()

因为是广播,所以两个consumer都收到了发送者发送的消息。
不过有一点要注意!!!!!!!!!
要先运行consumer(接收者),在运行发送者。就好比收音机一样,只有你先打开收音机,发送者才能将信息发给你。 如果发送者先发送,你却没有接收,之前发送的信息,你就不会再接收到了。

direct 可以区分广播,将指定的消息发送给指定的接收者;
图中显示了将error级别消息发送给C1,将info、error、warning级别消息发送给C2。
producer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 定义消息级别
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ''.join(sys.argv[2:])  or "direct info: Hello World!"
# message = "direct info: Hello World!"
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)   # routing_key为空即可,因为是广播没有定义队列,所以也不需要指定队列,但这里必须要定义为空
print(" [x] Send %r " % message)
connection.close()
consumer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
# 获取参数列表
log_levels = sys.argv[1:]
if not log_levels: # 如果没有参数,就报错,提示要指定消息级别
    sys.stderr.write("Usage: %s [info] [warning] [error] \n" % sys.argv[0])
    sys.exit(1) # 没有参数就退出程序
# print(log_levels)
for severity in log_levels:  # 循环参数列表并绑定
    channel.queue_bind(
        exchange='direct_logs',
        queue=queue_name,
        routing_key=severity
    ) #所有发送到severity的参数,都接收
print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print('[x] %r:%r' % (method.routing_key, body))
channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)
channel.start_consuming()
下面在centos中运行代码

运行C1,只接收error的消息

运行C2,接收 info、warning、error的消息


producer运行,指定发送消息给error,可以看到两个consumer都接收到了error的消息

只有C2接收到了warning的消息
producer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 定义消息级别
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'  # 发送*.info的信息
message = ''.join(sys.argv[2:])  or "topic info: Hello World!"
# message = "direct info: Hello World!"
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)   
print(" [x] Send %r:%r " % (routing_key,message))
connection.close()
consumer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0])
    sys.exit(1)
for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )
print('[*] Waiting for logs. To exit press CTRL+c')
def callback(ch, method, properties, body):
    print('[x %r:%r' % (method.routing_key, body))
channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)
channel.start_consuming()

图中显示过滤中间有".orange."的数据,过滤以rabbit为结尾的数据,过滤以lazy开头的数据。


运行了两个consumer。C1接收.info为结尾的数据,C2接收.error为结尾和mysql为开头的数据。

在运行publisher(已经定义了发送anonymous.info,相当于以.info为结尾的信息)

C1接收到了信息

执行publisher代码时 后面加上 test.error,然后此时在去看C2

C2 看到test.error相关信息

执行publisher代码 加上 mysql.info,这样C1和C2都可以收到消息了



运行C3,代码后面加一个 '#' 符号,表示C3可以接收所有信息(注意#号要被引号括起来)

在publisher随意发送信息,C3都能收到
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。