Kafka命令行工具提供了多种方式来消费消息,以下是几种常见的选择:
使用kafka-console-consumer.sh
脚本:
这是Kafka自带的一个简单的命令行消费者工具,可以用来消费指定主题的消息。使用方式如下:
./kafka-console-consumer.sh --bootstrap-server <kafka-broker-address> --topic <topic-name> --from-beginning
其中,<kafka-broker-address>
是Kafka集群的地址,<topic-name>
是要消费的主题名称。--from-beginning
参数表示从消息队列的开头开始消费。
使用kafka-console-consumer.sh
脚本并指定消费者组:
如果你需要使用消费者组来消费消息,可以使用--group
参数来指定消费者组名称。使用方式如下:
./kafka-console-consumer.sh --bootstrap-server <kafka-broker-address> --topic <topic-name> --from-beginning --group <consumer-group-name>
其中,<consumer-group-name>
是消费者组的名称。
使用自定义的消费者程序:
如果你需要更复杂的消费逻辑,可以使用Kafka客户端库(如Java、Python、Go等)编写自定义的消费者程序。这些程序可以实现更多的功能,如消息过滤、消息转换、分布式处理等。
以Java为例,你可以使用Kafka的Java客户端库编写一个消费者程序,如下所示:
Properties props = new Properties();
props.put("bootstrap.servers", "<kafka-broker-address>");
props.put("group.id", "<consumer-group-name>");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("<topic-name>"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
其中,<kafka-broker-address>
是Kafka集群的地址,<consumer-group-name>
是消费者组的名称,<topic-name>
是要消费的主题名称。