Kafka Producer Ack 是一种机制,用于确保消息被成功发送到 Kafka 集群。在 Kafka Producer 中,有两种确认方式:同步(synchronous)和异步(asynchronous)。
在同步确认模式下,Producer 在发送消息后,会等待 Kafka Broker 返回确认消息(Ack)。只有收到确认消息后,Producer 才会认为该消息已成功发送。这种方式可以确保消息的可靠传输,但会降低 Producer 的吞吐量。
要使用同步确认,需要在 Kafka Producer 配置中设置 acks
参数为 all
。例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
在异步确认模式下,Producer 发送消息后,不会等待 Kafka Broker 返回确认消息。而是在后台启动一个线程,通过轮询的方式检查消息是否已成功发送。这种方式可以提高 Producer 的吞吐量,但可能导致消息丢失或重复发送。
要使用异步确认,需要在 Kafka Producer 配置中设置 acks
参数为 1
,并实现 org.apache.kafka.clients.producer.Callback
接口。例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "1");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
}
}
});
总之,Kafka Producer Ack 提供了同步和异步两种消息确认方式,可以根据实际需求选择合适的确认方式。