如何使用分布式任务+消息队列框架go-queue

发布时间:2021-10-15 13:58:48 作者:iii
来源:亿速云 阅读:130

这篇文章主要介绍“如何使用分布式任务+消息队列框架go-queue”,在日常操作中,相信很多人在如何使用分布式任务+消息队列框架go-queue问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何使用分布式任务+消息队列框架go-queue”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

为什么要写这个库?

在开始自研 go-queue 之前,针对以下我们调研目前的开源队列方案:

beanstalkd

beanstalkd 有一些特殊好用功能:支持任务priority、延时(delay)、超时重发(time-to-run)和预留(buried),能够很好的支持分布式的后台任务和定时任务处理。如下是 beanstalkd 基本部分:

很幸运的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。

但是这对不熟悉 beanstalkd 操作的 go 开发者而言,需要学习成本。

kafka

类似基于 kafka 消息队列作为存储的方案,存储单元是消息,如果要实现延时执行,可以想到的方案是以延时执行的时间作为 topic,这样在大型的消息系统中,充斥大量一次性的 topicdq_1616324404788, dq_1616324417622),当时间分散,会容易造成磁盘随机写的情况。

而且在 go 生态中,

同时考虑以下因素:

所以我们自己基于以上两个基础组件开发了 go-queue

  1. 基于 beanstalkd 开发了 dq,支持定时和延时操作。同时加入 redis 保证消费唯一性。

  2. 基于 kafka 开发了 kq,简化生产者和消费者的开发API,同时在写入kafka使用批量写,节省IO。

整体设计如下:

如何使用分布式任务+消息队列框架go-queue

应用场景

首先在消费场景来说,一个是针对任务队列,一个是消息队列。而两者最大的区别:

所以在背后的基础设施选型上,也是基于这种消费场景。

而从其中 dq 的 API 中也可以看出:

// 延迟任务执行
- dq.Delay(msg, delayTime);

// 定时任务执行
- dq.At(msg, atTime);

而在我们内部:

如何使用

分别介绍 dqkq 的使用方式:

dq

// [Producer]
producer := dq.NewProducer([]dq.Beanstalk{
	{
		Endpoint: "localhost:11300",
		Tube:     "tube",
	},
	{
		Endpoint: "localhost:11301",
		Tube:     "tube",
	},
})	

for i := 1000; i < 1005; i++ {
	_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
	if err != nil {
		fmt.Println(err)
	}
}
// [Consumer]
consumer := dq.NewConsumer(dq.DqConf{
  Beanstalks: []dq.Beanstalk{
    {
      Endpoint: "localhost:11300",
      Tube:     "tube",
    },
    {
      Endpoint: "localhost:11301",
      Tube:     "tube",
    },
  },
  Redis: redis.RedisConf{
    Host: "localhost:6379",
    Type: redis.NodeType,
  },
})
consumer.Consume(func(body []byte) {
  // your consume logic
  fmt.Println(string(body))
})

和普通的 生产者-消费者 模型类似,开发者也只需要关注以下:

  1. 开发者只需要关注自己的任务类型「延时/定时」

  2. 消费端的消费逻辑

kq

producer.go

// message structure
type message struct {
	Key     string `json:"key"`
	Value   string `json:"value"`
	Payload string `json:"message"`
}

pusher := kq.NewPusher([]string{
	"127.0.0.1:19092",
	"127.0.0.1:19093",
	"127.0.0.1:19094",
}, "kq")

ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
	select {
	case <-ticker.C:
		count := rand.Intn(100)
		// 准备消息
		m := message{
			Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
			Value:   fmt.Sprintf("%d,%d", round, count),
			Payload: fmt.Sprintf("%d,%d", round, count),
		}
		body, err := json.Marshal(m)
		if err != nil {
			log.Fatal(err)
		}

		fmt.Println(string(body))
		// push to kafka broker
		if err := pusher.Push(string(body)); err != nil {
			log.Fatal(err)
		}
	}
}

config.yaml

Name: kq
Brokers:
  - 127.0.0.1:19092
  - 127.0.0.1:19092
  - 127.0.0.1:19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1

consumer.go

var c kq.KqConf
conf.MustLoad("config.yaml", &c)

// WithHandle: 具体的处理msg的logic
// 这也是开发者需要根据自己的业务定制化
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
  fmt.Printf("=> %s\n", v)
  return nil
}))
defer q.Stop()
q.Start()

dq 不同的是:开发者不需要关注任务类型(在这里也没有任务的概念,传递的都是 message data)。

其他操作和 dq 类似,只是将 业务处理函数 当成配置直接传入消费者中。

到此,关于“如何使用分布式任务+消息队列框架go-queue”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

推荐阅读:
  1. Java分布式任务调度框架XXL-Job的介绍
  2. web分布式定时任务调度框架怎么使用

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

kafka

上一篇:CSS实现阴影技巧与注意细节有哪些

下一篇:Web前端和后端的区别有哪些

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》