RabbitMQ实践体验

发布时间:2020-04-09 12:25:04 作者:yerikyu
来源:网络 阅读:3877

最近由于业务需要进行性能升级,将原来需要经过http进行数据交互的方式修改为消息队列的形式。于是原来的同步处理的方式变成了异步处理,在一定程度上提升我们系统的性能,不过debug的时候,不免哭了出来。因为每个环节都需要进行详细检查。
对于RabbitMQ,我们知道,其是AMQP的一种代理服服务器,具有一套严格的通信方式,即在核心产品进行通信的各个方面几乎都采用了RPC(Remote Procedure Call, 远程过程调用)模式。

AMQ与RabbitMQ进行交互

RabbitMQ通信时用到的类和方法与AMQP协议层面的类和方法一一对应。因此AMQP本质上是RPC的一种传输机制

高级消息队列模型

AMQ(Advanced Message Queuing)模型,这个模型是针对代理服务器软件例如(RabbitMQ)设计的,该模型在逻辑上定义了三种抽象组件用于指定消息的路由行为,分别是:

python使用AMQP

在将消息发布到队列之前,我们需要经历过以下若干个步骤。至少,必须要设置交换器和队列,然后将他们绑定再一起。接下来我们将通过python来实现AMQP机制。
我用到了pika这个库,需要的话,需要通过以下指令安装。该库实现了绝大部分rabbitmq的api以及提供了相关的调优参数,后续有机会不妨可以详谈。

pip install pika

1. 声明交换器

交换器在AMQ模型中是非常重要的角色存在。因此,在AMQP规范中都有自己的类。声明一个交换器,我们可以直接在控制台界面进行创建。
RabbitMQ实践体验
不过这样仅仅是在极少数的情况下才适合,动手调戏鼠标对开发工程师的来说实在是太蠢啦,能玩键盘就别玩鼠标啊,我们不妨通过以下代码来声明(创建)一个交换器。pika内置函数会事先通过get的方式来检查我们待声明的交换器是否存在,如果存在则不创建,否则创建一个新的交换器。

 self.channel.exchange_declare(
            exchange=exchange,
            exchange_type="direct",
            passive=False,
            durable=True,
            auto_delete=False)

2. 声明队列

一旦交换器创建成功,就可以通过发送类似queue.declare命令让rabbitmq创建一个队列。同样的,我们仍然可以在图形化界面里面创建队列。
RabbitMQ实践体验
还是那句话,动手调戏鼠标对开发工程师的来说实在是太蠢啦,能玩键盘就别玩鼠标啊,我们不妨通过以下代码来声明(创建)若干个队列。pika内置函数会事先通过get的方式来检查我们待声明的队列是否存在,如果存在则不创建,否则创建一个新的队列。

self.channel.queue_declare(queue=queue, durable=True)

当队列同名时,即如果我们多次发送同一个queue.declare命令并不会有任何副作用,因为RabbitMQ并不会处理后续的队列声明,究其原因,每次创建都会先通过get的方式调用消息队列引擎查询队列是否存在。如果需要返回队列相关的有用信息,则将会返回队列中待处理消息的数量以及该队列的消费者数量。当然了如果队列同名,而且新队列的属性与原有的队列不一样,那么RabbitMQ将关闭发出的RPC请求的信道,返回403错误

3. 绑定队列到交换器

一旦创建了交换器和队列,之后就可以将它们绑定在一起了,如同queue.declare命令,将队列绑定到交换器Queue.Bind每次只能指定一个队列。我们既可以通过图形化界面进行绑定,也可以通过代码实现这个效果
RabbitMQ实践体验

 self.channel.queue_bind(
            queue=queue, exchange=exchange, routing_key=rk)

4. 发布消息

发布消息到RabbitMQ时,多个帧封装了发送到服务器的消息数据。在实际的消息内容到达rabbitMQ之前,客户端应用程序会发送一个basic.publish方法帧、一个内容头帧和至少一个消息体帧。
默认情况下,只要没有消费者正在监听队列,消息就会被存储在队列中。当添加更多消息时,队列大小也会随之增加。RabbitMQ可以将这些消息保存在内存或者写入磁盘。

def produce(self, body):
        self.channel.basic_publish(exchange=self.exchange, routing_key=self.route_key, body=body,
                                   properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1)
                                   )

5. 消费消息

一旦发布消息被路由并且保存在一个或者多个队列中,剩下的就是如何对其进行消费。注意到,发送和消费是异步的。 消费时,可以让RabbitMQ知道如何消费他们
Basic.Consume命令中
no_ack为true时,RabbitMQ将连续发送消息直到消费者发送一个Basic.Cancel命令或者断开连接为止
如果为false,则需要发送一个Basic.Ack来确认收到每条消息的请求

def on_message(chan, method_frame, _header_frame, body, userdata=None):
            """Called when a message is received. Log message and ack it."""
            # LOGGER.info('Userdata: %s Message body: %s', userdata, body)
            # print(" [x] Received %r" % body.decode())
            data = body.decode()
            result = alarmFun(data)
            publish = Publish(exchange='spider', queue='alarm', rk='rk-alarm')
            publish.produce(result)
            # chan.basic_ack(delivery_tag=method_frame.delivery_tag)

on_message_callback = functools.partial(on_message)
self.channel.basic_consume(on_message_callback=on_message_callback,
                                   queue=self.queue,
                                   auto_ack=True
                                   )

基于python开发

经过前面的描述,我们需要理论联系实践,让我们通过python开发消费者角色和发布者角色。

发布者

按照配置流程,我们需要初始化连接、配置交换器、队列、绑定,然后才能通过连接件信息推送(publish)到队列中。

import logging
from random import randint

import pika

BROKER_USER = os.environ.get('BROKER_USER', 'guest')
BROKER_PASSWD = os.environ.get('BROKER_PASSWD', 'guest')
BROKER_IP = os.environ.get('BROKER_IP', '127.0.0.1')
BROKER_PORT = os.environ.get('BROKER_PORT', '5672')
BROKER_VHOST = os.environ.get('BROKER_VHOST', 'my_vhost')
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
BROKER_URL = 'amqp://{}:{}@{}:{}/{}'.format(BROKER_USER, BROKER_PASSWD, BROKER_IP, BROKER_PORT, BROKER_VHOST)

# logging.basicConfig(level=logging.DEBUG)
# LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
#               '-35s %(lineno) -5d: %(message)s')
# LOGGER = logging.getLogger(__name__)

class Publish(object):
    def __init__(self, exchange, queue, rk):
        # LOGGER.info('Connecting to %s', BROKER_URL)
        # logging.basicConfig(level=logging.DEBUG)
        self.credentials = pika.PlainCredentials(BROKER_USER, BROKER_PASSWD)
        # 通过这个方式设置备用链路,保证connection稳定性
        self.parameters = (
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials),
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials, connection_attempts=5,
                                      retry_delay=1))
        self.connection = pika.BlockingConnection(self.parameters)
        self.channel = self.connection.channel()
        self.exchange = exchange
        self.channel.exchange_declare(
            exchange=exchange,
            exchange_type="direct",
            passive=False,
            durable=True,
            auto_delete=False)
        self.channel.queue_declare(queue=queue, durable=True)
        self.route_key = rk

    def produce(self, body):
        self.channel.basic_publish(exchange=self.exchange, routing_key=self.route_key, body=body,
                                   properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1)
                                   )

    def close(self):
        self.connection.close()

def test():
    publish = Publish(exchange='test_yerik', queue='test_test', rk='rk-test_test')
    for i in range(1, 10000):
        publish.produce(randint(1, 100).__str__())
    publish.close()

if __name__ == '__main__':
    test()

消费者

消费者的设计和生产者在初始化的时候设计大致相同,都是通过建立连接、开启channel、exange、queue、bind等过程,主要的区别在于commsum

import functools
import logging
import pika

BROKER_USER = os.environ.get('BROKER_USER', 'guest')
BROKER_PASSWD = os.environ.get('BROKER_PASSWD', 'guest')
BROKER_IP = os.environ.get('BROKER_IP', '127.0.0.1')
BROKER_PORT = os.environ.get('BROKER_PORT', '5672')
BROKER_VHOST = os.environ.get('BROKER_VHOST', 'my_vhost')
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
BROKER_URL = 'amqp://{}:{}@{}:{}/{}'.format(BROKER_USER, BROKER_PASSWD, BROKER_IP, BROKER_PORT, BROKER_VHOST)

# print('pika version: %s' % pika.__version__)

# logging.basicConfig(level=logging.DEBUG)
# LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
#               '-35s %(lineno) -5d: %(message)s')
# LOGGER = logging.getLogger(__name__)
from apps.alarm.alarmfun import alarmFun
from apps.utils.rabbitmq.publish import Publish

class Consummer(object):
    def __init__(self, exchange, queue, rk):
        # LOGGER.info('Connecting to %s', BROKER_URL)
        self.credentials = pika.PlainCredentials(BROKER_USER, BROKER_PASSWD)
        self.parameters = (
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials),
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials, connection_attempts=5,
                                      retry_delay=1))
        self.connection = pika.BlockingConnection(self.parameters)

        self.channel = self.connection.channel()
        self.exchange = exchange
        self.channel.basic_qos(prefetch_count=1)
        self.exchange = exchange
        self.queue = queue
        self.channel.exchange_declare(
            exchange=exchange,
            exchange_type="direct",
            passive=False,
            durable=True,
            auto_delete=False)
        self.channel.queue_declare(queue=queue, durable=True)
        self.channel.queue_bind(
            queue=queue, exchange=exchange, routing_key=rk)
        self.channel.basic_qos(prefetch_count=1)

    def consum_message(self):
        # LOGGER.info('Comsummer by {}'.format(name))
        def on_message(chan, method_frame, _header_frame, body, userdata=None):
            """Called when a message is received. Log message and ack it."""
            # LOGGER.info('Userdata: %s Message body: %s', userdata, body)
            # print(" [x] Received %r" % body.decode())
            data = body.decode()
            result = alarmFun(data)
            publish = Publish(exchange='spider', queue='alarm', rk='rk-alarm')
            publish.produce(result)
            # chan.basic_ack(delivery_tag=method_frame.delivery_tag)

        on_message_callback = functools.partial(on_message)

        self.channel.basic_consume(on_message_callback=on_message_callback,
                                   queue=self.queue,
                                   auto_ack=True
                                   )
        try:
            self.channel.start_consuming()

        except KeyboardInterrupt:
            self.channel.stop_consuming()

    def cancel(self):
        self.connection.close()

def test():
    consummer = Consummer('test_yerik', 'test_test', 'rk-test_test')
    consummer.consum_message()
    print(consummer.receive)

if __name__ == '__main__':
    test()

参考文档:

  1. 深入RabbitMQ, Gavin M.Roy 著 汪佳南 郑天民 译
推荐阅读:
  1. 边缘计算实践体验
  2. OpenStack实践(七):RabbitMQ监控

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

rabbitmq 消费者 发布者

上一篇:Sqoop从Oracle导出数据出错:The Network

下一篇:平台记录部件获取输入控件(Field)

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》