kafka

golang操作kafka怎样实现

小樊
81
2024-12-17 11:05:41
栏目: 编程语言

在Golang中操作Kafka,你可以使用第三方库,如saramaconfluent-kafka-go。这里我将为你提供一个使用sarama库的示例。首先,你需要安装sarama库:

go get github.com/Shopify/sarama

接下来,我将为你提供一个简单的示例,展示如何使用sarama库创建一个Kafka生产者和一个消费者。

  1. 创建一个Kafka生产者:
package main

import (
	"fmt"
	"log"

	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 5
	config.Producer.Return.Successes = true

	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalf("Error creating producer: %v", err)
	}
	defer func() {
		if err := producer.Close(); err != nil {
			log.Fatalf("Error closing producer: %v", err)
		}
	}()

	topic := "test_topic"
	message := "Hello, Kafka!"

	partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.StringEncoder(message),
	})
	if err != nil {
		log.Printf("Error sending message: %v", err)
		return
	}

	fmt.Printf("Message sent to topic: %s, partition: %d, offset: %d\n", topic, partition, offset)
}
  1. 创建一个Kafka消费者:
package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"

	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Version = sarama.V2_6_0

	brokers := []string{"localhost:9092"}
	groupID := "test_group"

	consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)
	if err != nil {
		log.Fatalf("Error creating consumer group: %v", err)
	}
	defer func() {
		if err := consumer.Close(); err != nil {
			log.Fatalf("Error closing consumer: %v", err)
		}
	}()

	topic := "test_topic"
	handler := exampleConsumerGroupHandler{}

	go func() {
		for {
			err := consumer.Consume(context.Background(), []string{topic}, handler)
			if err != nil {
				log.Printf("Error consuming messages: %v", err)
			}
		}
	}()

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
	<-signals

	consumer.GracefulStop()
}

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("Message: %s, Partition: %d, Offset: %d\n", string(msg.Value), msg.Partition, msg.Offset)
		sess.MarkMessage(msg, "")
	}

	return nil
}

这个示例中,我们创建了一个Kafka生产者,将一条消息发送到名为test_topic的主题。然后,我们创建了一个Kafka消费者,订阅了相同的主题,并在接收到消息时打印其内容。

注意:在运行这些示例之前,请确保你已经启动了一个Kafka实例。你可以使用Docker运行一个简单的Kafka实例,如下所示:

docker run -d --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=localhost:9092 -e KAFKA_BROKER_ID=1 confluentinc/cp-kafka:2.6.0

这将启动一个名为kafka的Docker容器,监听本地的9092端口。

0
看了该问题的人还看了