您好,登录后才能下订单哦!
目录:
1. Python集成ActiveMQ
2. 封装服务mq_service.py
3. 接收处理消息mq_listener.py
4. 启动消息监听服务mq.py
5. 单元测试test_mq_serivce.py
6. 发送消息功能调用
7. 常见问题和解决方法
ActiveMQ是一个非常流行的消息队列服务中间件,实现JMS规范,基于STOMP协议(端口为61613)支持Python访问。
JMS:Java Message Service
STOMP:Simple(or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议
JMS规范定义了2类消息发送接收模型:点对点queue,发布订阅topic,区别是能够重复消费和是否保存。
1,点对点queue:不可重复消费,消息被消费前一直保存。
生产者发送消息到queue,一个消费者取出并消费消息。
消息被消费后,queue中不再保存,所有只有一个消费者能够取到消息。
queue支持多个消费者存在,但是一个消息只有一个消费者可以消费。
当前没有消费者时,消息一直保存,直到被消费者消费。
2,发布订阅topic:可重复消费,发布给所有订阅者。
生产者发布消息到topic中,多个订阅者收到并消费消息。
和queue不同,发布到topic中的消息会被所有订阅者消费。
当生产者发布消息时,不管是否有订阅者,都不保存消息。
JMS规范定义的2类消息传输模型queue和topic比较:
Queue | Topic | |
模型 | 点对点Point-to-Point | 发布订阅publish/subscribe |
有无状态 | queue消息在消费前被一直保存在mq服务器上的文件或者配置DB | topic数据默认不保存,是无状态的。 |
完整性保障 | queue保证每条消息都被消费者接收到 | topic不保证生产者发布的每条消息都被订阅者接收到 |
消息是否会丢失 | 生产者发送消息到queue,消费者接收到消息。如果没有消费者,将一直保存,不会丢失。 | 生产者发布消息到topic时,当前的订阅者都能够接收到消息。如果当前没有订阅者,该消息就丢失。 |
消息发布接收策略 | 一对一的消息发布接收策略,一个生产者发送的消息只被一个消费者接收。mq服务器收到回复后,将这个消息删除。 | 一对多的消息发布接收策略,同一个topic的多个订阅者都能收到生产者发布的消息。 |
Python集成ActiveMQ使用stomp.py,只需简单配置,本文在Django框架下进一步封装服务mq_service.py。典型系统架构示意图和消息队列:
时序图如下:
示例代码:https://github.com/rickding/HelloPython/tree/master/hello_activemq
├── settings.py
├── mq
│ └── mq_service.py
│ └── mq_listener.py
├── tests
│ └── test_mq_service.py
├── management
│ └── commands
│ └── mq.py
一,Python集成ActiveMQ
代码文件 | 功能要点 | |
Python集成ActiveMQ | requirements.txt | 安装stomp.py: stomp.py >= 5.0.1 |
封装服务 | mq_serivce.py | 封装ActiveMQ的消息发送和处理功能。在Django框架下,将地址等配置在settings.py中集中管理,注意端口为61613 |
接收处理消息 | mq_listener.py | 增加消息接收处理类,继承stomp.ConnectionListener |
启动消息监听服务 | mq.py | 在Django框架下,将启动服务代码封装成command,方便调用和维护。 |
单元测试 | test_mq_serivce.py | 测试封装的功能函数 |
功能调用 | views.py | 增加REST接口/chk/mq,调用mq_service发送消息 |
1. 新建Django项目,运行:django-admin startproject hello_activemq
2. 进到目录hello_activemq,增加应用:python manage.py startapp app
项目的目录文件结构如下:
3. 安装stomp.py,pip install stomp.py >= 5.0.1
二,封装服务mq_service.py,调用ActiveMQ发送消息
1. 增加mq_service.py:
import json
import logging
import stomp
from django.conf import settings
log = logging.getLogger(__name__)
def send_msg(msg_dict, queue_or_topic=settings.MQ_QUEUE):
conn = stomp.Connection10([(settings.MQ_URL, settings.MQ_PORT)])
conn.connect(settings.MQ_USER, settings.MQ_PASSWORD)
msg_str = json.dumps(msg_dict)
log.info('Send msg: %s, %s, %s' % (type(msg_dict), type(msg_str), msg_str))
conn.send(queue_or_topic, msg_str)
conn.disconnect()
2. 打开settings.py,配置ActiveMQ信息:
MQ_URL = '127.0.0.1'
MQ_PORT = 61613
MQ_USER = 'admin'
MQ_PASSWORD = 'admin'
MQ_QUEUE = '/queue/SampleQueue'
MQ_TOPIC = '/topic/SampleTopic'
3. 为了增加代码的兼容和容错能力,封装get_conn(), close_conn()等辅助函数,详见代码文件mq_service.py。
三,接收处理消息mq_listener.py
1. 增加mq_listener.py,声明消息处理类,继承stomp.ConnectionListener:
import json
import logging
import stomp
log = logging.getLogger(__name__)
class MqListener(stomp.ConnectionListener):
def on_message(self, headers, msg_str):
log.info('Receive msg: %s, %s, %s' % (type(msg_str), msg_str, headers))
msg_dict = None
try:
msg_dict = json.loads(msg_str)
except Exception as e:
log.warning('Exception when parse msg: %s' % str(e))
log.info('Parsed msg: {}, {}'.format(type(msg_dict), msg_dict))
def on_error(self, headers, msg_str):
log.info('Error msg: %s, %s, %s' % (type(msg_str), msg_str, headers))
2. 在on_message()函数中,将消息字符串解析为json,方便业务处理。
3. 声明on_error()函数处理错误信息。
四,启动消息监听服务mq.py
1. 将循环接收消息代码封装成函数consume_msg(),增加在服务中mq_serivce.py:
import logging
import time
import stomp
from django.conf import settings
log = logging.getLogger(__name__)
def consume_msg(listener, queue=settings.MQ_QUEUE, topic=settings.MQ_TOPIC):
conn = stomp.Connection10([(settings.MQ_URL, settings.MQ_PORT)])
conn.connect(settings.MQ_USER, settings.MQ_PASSWORD)
conn.set_listener('', listener)
conn.subscribe(queue)
conn.subscribe(topic)
while 1:
time.sleep(1000) # secs
conn.disconnect()
2. 调用set_listener()设置消息接收类实例,使用之前创建的MqListener
3. 调用subscribe()订阅消息,启动循环监听。
4. 我们将启动服务代码封装成command,在目录management/commands中增加mq.py
import logging
from django.core.management.base import BaseCommand
from hello_activemq.mq import mq_service as mq
from hello_activemq.mq.mq_listener import MqListener
log = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'mq starts listener'
def handle(self, *args, **options):
log.info("mq starts")
return mq.consume_msg(MqListener())
5. 运行命令python manage.py mq,看到消息提示,启动监听服务成功。
五,单元测试test_mq_service.py
增加测试函数,发送消息:
import logging
from django.test import TestCase
from hello_activemq.mq import mq_service as mq
log = logging.getLogger(__name__)
class MQServiceTest(TestCase):
def test_send_msg(self):
msg_dict = {'content': 'test msg dict', 'msg': 'msg from python'}
mq.send_msg_to_queue(msg_dict)
mq.send_msg_to_topic({'msg': "test msg from python"})
运行python manage.py test,同时看到监听服务收到并处理消息:
六,发送消息功能调用
1. 在views.py中发送消息,调用mq_servcie.py
import json
from django.http import HttpResponse
from hello_activemq.mq import mq_service as mq
def chk_mq(req):
msg_dict = {
'url': req.get_raw_uri(),
'path': req.get_full_path(),
'host': req.get_host(),
}
mq.send_msg_to_queue(msg_dict)
mq.send_msg_to_topic(msg_dict)
return HttpResponse(json.dumps(msg_dict))
2. 在urls.py中配置路由
from django.urls import path
from app.views import chk_mq
urlpatterns = [
path('', chk_mq, name='chk'),
]
3. 运行命令启动服务:python manage.py runserver 0.0.0.0:8001
4. REST接口发送消息
七,常见问题和解决方法
1. 启动服务错误:[transport.py: 787, attempt_connection] Could not connect to host 127.0.0.1, port 61613
解决:检查ActiveMQ是否正常启动,特别注意是否开启STOMP协议端口61613
原因:Python连接ActiveMQ使用STOMP协议,端口默认61613
2. 发送消息时错误:TypeError: message should be a string or bytes, found <class 'dict'>
解决:将消息内容序列化为JSON,发送时调用json.dumps(),接收时调用json.loads()
原因:Python连接ActiveMQ使用的是STOMP协议,消息格式为简单文本。
注:JMS规范定义的5类消息:
字符串TextMessage,
键值对MapMessage,
序列化对象ObjectMessage
字节流BytesMessage
数据流StreamMessage
ActiveMQ支持5类JMS消息,增加了二进制大文件消息BlobMessage:
3. 跨系统对接时接收到的消息类型不是TextMessage
Python开发的业务处理服务 -> Java开发的API服务,接收到的消息类型为BytesMessage,Python发送时设置conn.send('xx', msg_str, content_type="text/plain")仍然接收不到期望的类型TextMessage
解决:stomp建立连接时配置参数conn = stomp.Connection10([("localhost", 61613)], auto_content_length=False)
原因:Python连接ActiveMQ使用STOMP协议,消息格式为简单文本,不携带类型信息,只通过header中的content-length来判断TextMessage和BytesMessage,所以发送消息时不在header中添加content-length就可以了。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。