Kafka 消费者通过 Group ID 来将来自一个主题的消息分发给多个消费者
使用命令行工具创建消费者组:
你可以使用 kafka-consumer-groups.sh
工具来查看已存在的消费者组,或者使用 --new-consumer-group
参数创建一个新的消费者组。例如:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --create --group my-new-group
这将创建一个名为 my-new-group
的新消费者组。
使用 Kafka 管理工具(如 Confluent Control Center、Kafka Manager 等)创建消费者组:
这些工具通常提供了一个 Web 界面,你可以在其中管理 Kafka 集群、主题和消费者组。在这些工具中,你可以找到创建消费者组的功能,并按照提示操作即可。
使用编程语言(如 Java、Python、Go 等)创建消费者组:
在你的应用程序中,你需要使用 Kafka 客户端库来创建一个消费者,并指定消费者组 ID。以下是一个使用 Python 的 confluent_kafka
库创建消费者组的示例:
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-new-group'
}
consumer = Consumer(conf)
consumer.subscribe(['my-topic'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
else:
raise KafkaException(msg.error())
else:
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
这段代码创建了一个名为 my-new-group
的消费者组,并订阅了名为 my-topic
的主题。当运行此代码时,消费者将从该主题中读取消息。