怎么使用nsq消息中间件

发布时间:2021-11-16 14:03:31 作者:iii
来源:亿速云 阅读:130

本篇内容主要讲解“怎么使用nsq消息中间件”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么使用nsq消息中间件”吧!

组成

nsq是一款轻量级的消息中间件,查看nsq官网给出的解释,可知nsq的组成和分工:


nsqd is the daemon that receives, queues, and delivers messages to clients.
It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).
It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.

从上面可以看出:


nsqlookupd is the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.
There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.

从上面可以看出:

nsqadmin is a Web UI to view aggregated cluster stats in realtime and perform various administrative tasks.

从上面可以看出:

安装

这里为了快速搭建,使用docker compose方式安装(docker-compose.yaml见附件)

拷贝docker-compose.yaml到虚拟机,相关命令如下:

怎么使用nsq消息中间件

分别启动 nsqlookupd/nsqadmin/nsqd,对应三个容器和端口映射

浏览器中可打开 http://192.168.1.91:32770 访问 nsqadmin(虚拟机IP为192.168.1.91)

测试

package main

import (
	"bufio"
	"fmt"
	"github.com/bitly/go-nsq"
	"nsq-demo/src/config"
	"os"
)

var producer *nsq.Producer

func InitProducer(addr string) {
    var err error
	producer, err = nsq.NewProducer(addr, nsq.NewConfig())
	if err != nil {
		panic(err)
	}
	fmt.Println("connect to ", producer.String())
}

func Publish(topic, msg string) error {
	if producer == nil {// check producer
		return fmt.Errorf("producer is nil")
	}
	if msg == "" {// void empty msg
		return nil
	}
	return producer.Publish(topic, []byte(msg))// publish msg
}

func main() {
	InitProducer(config.Nsqd01)
	running := true

	reader := bufio.NewReader(os.Stdin)
	for running {
		data, _, _ := reader.ReadLine()
		command := string(data)
		if command == "stop" {
			running = false
		}

		for err := Publish(config.Topic, command); err != nil; err = Publish(config.Topic, command) {
			config.ExchangeNsqdIPs()
			InitProducer(config.Nsqd01)
		}
	}
	producer.Stop()
}

// producer直连nsqd后,接收来自控制台的输入,然后将消息发送给nsqd

package main

import (
	"fmt"
	"github.com/bitly/go-nsq"
	"nsq-demo/src/config"
	"time"
)

type MyConsumer struct{}

func (*MyConsumer) HandleMessage(msg *nsq.Message) error {// implementation Handler interface
	fmt.Println("receive from ", msg.NSQDAddress, "msg:", string(msg.Body))
	return nil
}

func InitConsumer(topic, channel, addr string) {
	conf := nsq.NewConfig()
	conf.LookupdPollInterval = time.Second
	c, err := nsq.NewConsumer(topic, channel, conf)
	if err != nil {
		panic(err)
	}
	c.SetLogger(nil, 0)// set system log
	c.AddHandler(&MyConsumer{})// set Hander to handle msg

	//if err := c.ConnectToNSQLookupd(addr); err != nil {
	//	panic(err)
	//}

	//if err := c.ConnectToNSQDs(config.GetNsqdIPs()); err != nil {
	//	panic(err)
	//}

	if err := c.ConnectToNSQD(config.Nsqd01); err != nil {
		panic(err)
	}
}

func main() {
	InitConsumer(config.Topic, config.Channel, config.Lookupd)
	select {}
}

// consumer直连nsqd后,通过自定义的Handler来处理消息

附录

version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160" # for the nsqd
      - "4161" # for the nsqadmin
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 # connect to nsqlookupd
    depends_on:
      - nsqlookupd
    ports:
      - "4150" # for clients
      - "4151" # for the HTTP API
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 # connect to nsqlookupd
    depends_on:
      - nsqlookupd  
    ports:
      - "4171"

# docker-compose.yaml of simple nsq

到此,相信大家对“怎么使用nsq消息中间件”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

推荐阅读:
  1. 消息中间件Rabbitmq的使用
  2. 消息中间件概述

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

nsq

上一篇:如何提高InnoDB表BLOB列的存储效率

下一篇:MySQL handler相关状态参数有哪些呢

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》