linux

Linux中Golang如何实现消息队列

小樊
49
2025-10-01 14:38:53
栏目: 编程语言

在Linux中,使用Golang实现消息队列有多种方法。以下是一些常见的实现方式:

1. 使用RabbitMQ

RabbitMQ是一个广泛使用的消息代理和队列服务器,支持多种消息协议。

安装RabbitMQ

首先,你需要在Linux上安装RabbitMQ。可以使用以下命令:

sudo apt-get update
sudo apt-get install rabbitmq-server

使用Golang客户端库

你可以使用streadway/amqp库来与RabbitMQ交互。

go get github.com/streadway/amqp

示例代码

以下是一个简单的生产者示例:

package main

import (
	"fmt"
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello", // name
		true,    // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	body := "Hello World!"
	err = ch.Publish(
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	fmt.Println(" [x] Sent %s", body)
}

消费者示例:

package main

import (
	"fmt"
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello", // name
		true,    // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			fmt.Printf("Received a message: %s\n", d.Body)
		}
	}()

	fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

2. 使用Kafka

Kafka是一个分布式流处理平台,也可以用作消息队列。

安装Kafka

你可以使用以下命令安装Kafka:

wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0

启动Zookeeper和Kafka服务器:

bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &

使用Golang客户端库

你可以使用confluent-kafka-go库来与Kafka交互。

go get github.com/confluentinc/confluent-kafka-go/kafka

示例代码

生产者示例:

package main

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
	if err != nil {
		panic(err)
	}
	defer p.Close()

	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
				} else {
					fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
				}
			}
		}
	}()

	topic := "test-topic"
	p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
		Value:          []byte("Hello Kafka"),
	}, nil)

	p.Flush(15 * 1000)
}

消费者示例:

package main

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092",
		"group.id":          "test-consumer-group",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}
	defer c.Close()

	c.SubscribeTopics([]string{"test-topic"}, nil)

	for {
		msg, err := c.ReadMessage(-1)
		if err == nil {
			fmt.Printf("Received message: %s\n", string(msg.Value))
		} else {
			fmt.Printf("Consumer error: %v\n", err)
		}
	}
}

3. 使用Redis

Redis也可以用作消息队列,特别是使用其发布/订阅功能。

安装Redis

你可以使用以下命令安装Redis:

sudo apt-get update
sudo apt-get install redis-server

使用Golang客户端库

你可以使用go-redis库来与Redis交互。

go get github.com/go-redis/redis/v8

示例代码

生产者示例:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // no password set
		DB:       0,  // use default DB
	})

	err := rdb.Publish(ctx, "channel", "Hello Redis").Err()
	if err != nil {
		panic(err)
	}
	fmt.Println("Message published")
}

消费者示例:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // no password set
		DB:       0,  // use default DB
	})

	pubsub := rdb.Subscribe(ctx, "channel")
	defer pubsub.Close()

	ch := pubsub.Channel()

	for msg := range ch {
		fmt.Printf("Received message: %s\n", msg.Payload)
	}
}

这些示例展示了如何在Linux中使用Golang实现消息队列。你可以根据自己的需求选择合适的消息队列系统,并使用相应的Golang客户端库进行开发。

0
看了该问题的人还看了