Kafka 消息发送的消息确认机制是通过生产者客户端来实现的。生产者客户端在发送消息到 Kafka 集群时,可以配置不同的确认策略来确保消息的可靠传输。以下是 Kafka 生产者提供的两种主要确认方式:
同步确认(Synchronous Acknowledgment):
异步确认(Asynchronous Acknowledgment):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 设置为 all 表示需要所有副本都确认
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent to topic: " + metadata.topic() + " partition: " + metadata.partition() + " offset: " + metadata.offset());
}
}
});
producer.close();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "1"); // 设置为 1 表示只需要 leader 确认
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent to topic: " + metadata.topic() + " partition: " + metadata.partition() + " offset: " + metadata.offset());
}
}
});
producer.close();
根据具体的应用场景和需求,可以选择合适的确认策略来确保消息的可靠传输。