消息队列原理之如何掌握rabbitmq

发布时间:2021-10-21 13:52:42 作者:iii
来源:亿速云 阅读:123

这篇文章主要讲解了“消息队列原理之如何掌握rabbitmq”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“消息队列原理之如何掌握rabbitmq”吧!

介绍

RabbitMQ 是一个由 Erlang 开发的 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的开源实现,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。支持多种客户端语言。

架构

整体架构对照下面的图说明

消息队列原理之如何掌握rabbitmq

先看看图片上各个名次的解释:

这里面比较难理解的概念是 RoutingKey,Exchange,Binding ,消费发送时不会直接发送给 Queue ,而是先发送给 Exchange,由 Exchange 按照一定的规则投递到与它绑定的 Queue 中,那这个规则是什么呢? 规则就与 Exchange 的 Type、BindingRoutingKey 相关,Exchange 支持的类型有 4 种,direct,fanout,topic,headers,含义如下:

对照上面图和名次解释应该比较清晰明了了,下面我们通过几个例子说明如何使用。

用法(golang)

direct

消息队列原理之如何掌握rabbitmq

先看看 Rabbitmq 默认的 exchange ,其中第一个(AMQP default) 是默认的,默认绑定了所有的 Queue ,会把消息投递到 Routekey 对应的队列中,即: Routekey==QueueName

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func handlerError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

var url = "amqp://username:password@ip:port"

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	queueNameCar := "car"
	if _, err := channel.QueueDeclare(queueNameCar, false, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decare Queue")
	}

	if err := channel.Publish("", queueNameCar, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}

在创建队列有几个参数可以关注一下 消息队列原理之如何掌握rabbitmq

我们自己创建一个 direct 类型的 exchange 并绑定一些队列看看是什么效果。

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	directExchangeNameCar := "direct.car"
	if err := channel.ExchangeDeclare(directExchangeNameCar, "direct", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "big.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameMiddleCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameMiddleCar, "middler.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameSmallCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameSmallCar, "small.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.Publish(directExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}

Queue 的创建上面已经讲过了,这里有 Exchange 的创建,那么再看看创建 Exchange 有哪些参数 消息队列原理之如何掌握rabbitmq

fanout

fanout 工作方式类似于广播,看看下面的代码

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	fanoutExchangeNameCar := "fanout.car"
	if err := channel.ExchangeDeclare(fanoutExchangeNameCar, "fanout", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "big.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameMiddleCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameMiddleCar, "middler.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameSmallCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameSmallCar, "small.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.Publish(fanoutExchangeNameCar, "middle.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}

topic

topic 比较有意思了,和之前的简单粗暴的用法不一样了,先看看下面的代码,声明了一个 topic 类型的 exchange, 4个 queue

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	topicExchangeNameCar := "topic.car"
	if err := channel.ExchangeDeclare(topicExchangeNameCar, "topic", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameBigCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameBigCar, "big.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }

    if err := channel.QueueBind(queueNameMiddleCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameMiddleCar, "middler.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }

    if err := channel.QueueBind(queueNameSmallCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "*.small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "#.small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
}

现在思考每个 producer 产生消息之后,会有哪些 queue 会收到消息。

	if err := channel.Publish(topicExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
	if err := channel.Publish(topicExchangeNameCar, "small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
	if err := channel.Publish(topicExchangeNameCar, "benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
	if err := channel.Publish(topicExchangeNameCar, "auto.blue.benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
	if err := channel.Publish(topicExchangeNameCar, "bike", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}

headers

这种类型很少有实际的应用场景。

感谢各位的阅读,以上就是“消息队列原理之如何掌握rabbitmq”的内容了,经过本文的学习后,相信大家对消息队列原理之如何掌握rabbitmq这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

推荐阅读:
  1. Python38 RabbitMQ 消息队列
  2. Centos7 RabbitMQ消息队列集群

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

rabbitmq

上一篇:SetRegistryKey函数怎么用

下一篇:如何理解Yarn的日志聚合

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》