RabbitMQ如何实现Publish和Subscribe

发布时间:2021-12-24 09:16:34 作者:小新
来源:亿速云 阅读:108

RabbitMQ如何实现Publish和Subscribe

目录

  1. 引言
  2. RabbitMQ简介
  3. Publish/Subscribe模式
  4. RabbitMQ中的Exchange
  5. 实现Publish和Subscribe的步骤
  6. 代码示例
  7. 总结

引言

在现代分布式系统中,消息队列(Message Queue)是一种常见的通信机制,用于解耦生产者和消费者之间的直接依赖关系。RabbitMQ是一个广泛使用的开源消息代理,支持多种消息传递模式,包括Publish/Subscribe模式。本文将详细介绍如何使用RabbitMQ实现Publish和Subscribe模式。

RabbitMQ简介

RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)协议的消息代理,它允许应用程序通过消息进行通信。RabbitMQ的主要特点包括:

Publish/Subscribe模式

Publish/Subscribe模式是一种消息传递模式,其中消息的发送者(Publisher)将消息发送到一个特定的主题(Topic),而消息的接收者(Subscriber)可以订阅这些主题并接收消息。这种模式的主要优点是解耦了生产者和消费者之间的关系,使得系统更加灵活和可扩展。

在RabbitMQ中,Publish/Subscribe模式通过Exchange和Queue来实现。Publisher将消息发送到Exchange,Exchange根据其类型将消息路由到一个或多个Queue,Subscriber从Queue中接收消息。

RabbitMQ中的Exchange

Exchange是RabbitMQ中用于接收消息并将其路由到Queue的组件。RabbitMQ支持多种类型的Exchange,每种类型有不同的路由规则:

在Publish/Subscribe模式中,通常使用Fanout Exchange或Topic Exchange。

实现Publish和Subscribe的步骤

5.1 安装RabbitMQ

首先,需要在本地或服务器上安装RabbitMQ。可以通过以下步骤在Linux系统上安装RabbitMQ:

# 安装Erlang(RabbitMQ的依赖)
sudo apt-get install erlang

# 安装RabbitMQ
sudo apt-get install rabbitmq-server

# 启动RabbitMQ服务
sudo systemctl start rabbitmq-server

# 启用RabbitMQ管理插件
sudo rabbitmq-plugins enable rabbitmq_management

5.2 创建生产者

生产者(Publisher)负责将消息发送到Exchange。以下是使用Python和Java创建生产者的示例。

Python示例

import pika

# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个Fanout Exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 发送消息
message = "Hello, RabbitMQ!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")

# 关闭连接
connection.close()

Java示例

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Publisher {
    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = "Hello, RabbitMQ!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

5.3 创建消费者

消费者(Subscriber)负责从Queue中接收消息。以下是使用Python和Java创建消费者的示例。

Python示例

import pika

# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个Fanout Exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 声明一个临时Queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 将Queue绑定到Exchange
channel.queue_bind(exchange='logs', queue=queue_name)

# 定义回调函数
def callback(ch, method, properties, body):
    print(f" [x] Received {body}")

# 开始消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Java示例

import com.rabbitmq.client.*;

public class Subscriber {
    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

5.4 运行示例

  1. 启动RabbitMQ服务。
  2. 运行生产者代码,发送消息到Exchange。
  3. 运行消费者代码,接收消息并打印。

代码示例

6.1 Python示例

生产者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = "Hello, RabbitMQ!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")

connection.close()

消费者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

def callback(ch, method, properties, body):
    print(f" [x] Received {body}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

6.2 Java示例

生产者

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Publisher {
    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = "Hello, RabbitMQ!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者

import com.rabbitmq.client.*;

public class Subscriber {
    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

总结

通过本文的介绍,我们了解了如何使用RabbitMQ实现Publish/Subscribe模式。RabbitMQ提供了灵活的Exchange和Queue机制,使得消息的发布和订阅变得简单而高效。无论是Python还是Java,RabbitMQ都提供了丰富的客户端库,方便开发者快速集成到自己的应用中。希望本文能帮助你更好地理解和使用RabbitMQ。

推荐阅读:
  1. 如何运行和管理RabbitMQ
  2. RabbitMQ实战:可用性分析和实现

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

rabbitmq publish subscribe

上一篇:怎么进行ROS中cmake的简单使用

下一篇:linux中如何删除用户组

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》