在Kafka中,偏移量(offset)是消费者用来跟踪它们已经读取的消息的位置的标识符
Kafka提供了一个名为kafka-consumer-groups.sh
的命令行工具,可以用来查询消费者组的偏移量。要查询特定消费者组的偏移量,请运行以下命令:
./kafka-consumer-groups.sh --bootstrap-server <kafka-broker-address> --describe --group <consumer-group-id>
这将显示消费者组的详细信息,包括每个分区的当前偏移量、日志开始(oldest offset)和日志结束(newest offset)等。
如果你正在使用Kafka Consumer API编写自定义应用程序,你可以通过查询ConsumerGroupMetadata
来获取消费者组的偏移量。以下是一个Java示例:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.ConsumerGroupMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import java.util.Collections;
import java.util.Properties;
public class OffsetQuery {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "<kafka-broker-address>");
props.put("group.id", "<consumer-group-id>");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("<topic-name>"));
ConsumerGroupMetadata consumerGroupMetadata = consumer.groupMetadata();
KafkaFuture<Node> leaderForTopic = consumerGroupMetadata.leaderFor(consumerGroupMetadata.topicPartitions().get("<topic-name>"));
Node leader = leaderForTopic.get();
System.out.println("Leader for topic <topic-name>: " + leader.host() + ":" + leader.port());
// 查询特定分区的偏移量
int partition = 0; // 替换为实际的分区号
long offset = consumer.position(consumerGroupMetadata.topicPartitions().get(partition));
System.out.println("Offset for partition " + partition + ": " + offset);
}
}
这个示例将连接到Kafka集群,查询指定消费者组和主题的偏移量,并将结果打印到控制台。请注意,你需要根据实际情况替换<kafka-broker-address>
、<consumer-group-id>
和<topic-name>
等占位符。