让消息队列达到最大吞吐量的方法教程

发布时间:2021-10-11 10:31:24 作者:iii
来源:亿速云 阅读:97

这篇文章主要讲解了“让消息队列达到最大吞吐量的方法教程”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“让消息队列达到最大吞吐量的方法教程”吧!

关于吞吐量的一些思考

解决方案和实现

让消息队列达到最大吞吐量的方法教程

借用一下 Rob Pike 的一张图,这个跟队列消费异曲同工。左边4个 gopher 从队列里取,右边4个 gopher 接过去处理。比较理想的结果是左边和右边速率基本一致,没有谁浪费,没有谁等待,中间交换处也没有堆积。

我们来看看 go-zero 是怎么实现的:

	for {
		select {
		case <-q.quit:
			logx.Info("Quitting producer")
			return
		default:
			if v, ok := q.produceOne(producer); ok {
				q.channel <- v
			}
		}
	}

没有退出事件就会通过 produceOne 去读取一个消息,成功后写入 channel。利用 chan 就可以很好的解决读取和消费的衔接问题。

	for {
		select {
		case message, ok := <-q.channel:
			if ok {
				q.consumeOne(consumer, message)
			} else {
				logx.Info("Task channel was closed, quitting consumer...")
				return
			}
		case event := <-eventChan:
			consumer.OnEvent(event)
		}
	}

这里如果拿到消息就去处理,当 okfalse 的时候表示 channel 已被关闭,可以退出整个处理循环了。同时我们还在 redis queue 上支持了 pause/resume,我们原来在社交场景里大量使用这样的队列,可以通知 consumer 暂停和继续。

func (q *Queue) Start() {
	q.startProducers(q.producerCount)
	q.startConsumers(q.consumerCount)

	q.producerRoutineGroup.Wait()
	close(q.channel)
	q.consumerRoutineGroup.Wait()
}

这里需要注意的是,先要停掉 producer,再去等 consumer 处理完。

到这里核心控制代码基本就讲完了,其实看起来还是挺简单的,也可以到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看完整实现。

使用

基本的使用流程:

  1. 创建 producerconsumer

  2. 启动 queue

  3. 生产消息 / 消费消息

对应到 queue 中,大致如下:

创建 queue

// 生产者创建工厂
producer := newMockedProducer()
// 消费者创建工厂
consumer := newMockedConsumer()
// 将生产者以及消费者的创建工厂函数传递给 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
  return producer, nil
}, func() (Consumer, error) {
  return consumer, nil
})

我们看看 NewQueue 需要什么参数:

  1. producer 工厂方法

  2. consumer 工厂方法

producer & consumer 的工厂函数传递 queue ,由它去负责创建。框架提供了 ProducerConsumer 的接口以及工厂方法定义,然后整个流程的控制 queue 实现会自动完成。

生产 message

我们通过自定义一个 mockedProducer 来模拟:

type mockedProducer struct {
	total int32
	count int32
  // 使用waitgroup来模拟任务的完成
	wait  sync.WaitGroup
}
// 实现 Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
	if atomic.AddInt32(&p.count, 1) <= p.total {
		p.wait.Done()
		return "item", true
	}
	time.Sleep(time.Second)
	return "", false
}

queue 中的生产者编写都必须实现:

消费 message

我们通过自定义一个 mockedConsumer 来模拟:

type mockedConsumer struct {
	count  int32
}

func (c *mockedConsumer) Consume(string) error {
	atomic.AddInt32(&c.count, 1)
	return nil
}

启动 queue

启动,然后验证我们上述的生产者和消费者之间的数据是否传输成功:

func main() {
	// 创建 queue
	q := NewQueue(func() (Producer, error) {
		return newMockedProducer(), nil
	}, func() (Consumer, error) {
		return newMockedConsumer(), nil
	})
  // 启动panic了也可以确保stop被执行以清理资源
  defer q.Stop()
	// 启动
	q.Start()
}

以上就是 queue 最简易的实现示例。我们通过这个 core/queue 框架实现了基于 rediskafka 等的消息队列服务,在不同业务场景中经过了充分的实践检验。你也可以根据自己的业务实际情况,实现自己的消息队列服务。

整体设计

让消息队列达到最大吞吐量的方法教程

整体流程如上图:

  1. 全体的通信都由 channel 进行

  2. ProducerConsumer 的数量可以设定以匹配不同业务需求

  3. ProduceConsume 具体实现由开发者定义,queue 负责整体流程

感谢各位的阅读,以上就是“让消息队列达到最大吞吐量的方法教程”的内容了,经过本文的学习后,相信大家对让消息队列达到最大吞吐量的方法教程这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

推荐阅读:
  1. redis实现消息队列的方法
  2. MySQL自增ID达到上限的解决方法

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

go

上一篇:如何使用VBS从XML文件中获取信息

下一篇:如何实现SQL获取表结构的show_table.vbs

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》