要查看Kafka的偏移量,您可以使用Kafka的命令行工具或编程API。以下是两种常用方法:
方法1:使用Kafka命令行工具
Kafka提供了一个名为kafka-consumer-groups.sh
的命令行工具,可以用来查看消费者组的偏移量。请按照以下步骤操作:
kafka-consumer-groups.sh
工具:./kafka-consumer-groups.sh --bootstrap-server <kafka-broker-address> --describe --group <consumer-group-id>
将<kafka-broker-address>
替换为您的Kafka代理地址(例如:localhost:9092
),将<consumer-group-id>
替换为您要查询的消费者组ID。
运行此命令后,您将看到类似以下的输出:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-consumer-group my-topic 0 100 200 100 consumer-1-86753f8a-1234-4567-89ab-cdef12345678 /127.0.0.1 consumer-1
在这个例子中,您可以查看消费者组my-consumer-group
在主题my-topic
的每个分区的当前偏移量(CURRENT-OFFSET
)、日志结束偏移量(LOG-END-OFFSET
)和lag(LAG
)。
方法2:使用编程API
您还可以使用Kafka客户端库(如Java、Python、Go等)编写程序来查询偏移量。以下是使用Java API的示例:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaOffsetViewer {
public static void main(String[] args) {
String kafkaBootstrapServer = "<kafka-broker-address>";
String consumerGroupId = "<consumer-group-id>";
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBootstrapServer);
props.put("group.id", consumerGroupId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("<topic-name>"));
consumer.seekToEnd(consumer.assignment());
while (true) {
consumer.poll(Duration.ofMillis(1000));
consumer.position(consumer.assignment().iterator().next());
System.out.println("Topic: " + consumer.topic() + ", Partition: " + consumer.partition() + ", Offset: " + consumer.position());
}
}
}
将<kafka-broker-address>
替换为您的Kafka代理地址,将<consumer-group-id>
替换为您要查询的消费者组ID,将<topic-name>
替换为您要查询的主题名称。
运行此Java程序后,您将看到类似以下的输出:
Topic: my-topic, Partition: 0, Offset: 200
在这个例子中,您可以查看主题my-topic
在分区0的当前偏移量。