Kafka ProducerRecord 本身并不能直接确认消息是否已经被成功发送。Kafka ProducerRecord 只是将消息封装成一个对象,包含需要发送的主题、键、值等信息。实际的消息发送过程是由 Kafka Producer 完成的。
Kafka Producer 在发送消息时会返回一个 Future 对象,你可以使用这个 Future 对象来检查消息是否已经成功发送。具体方法如下:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerResult;
// ...
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
// 发送消息并获取 Future 对象
Future<RecordMetadata> future = producer.send(record);
// 检查消息是否成功发送
if (future.isDone()) {
try {
RecordMetadata metadata = future.get();
System.out.println("Message sent to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.out.println("Message is still being sent.");
}
在这个示例中,我们首先创建了一个 KafkaProducer 对象,然后创建了一个 ProducerRecord 对象。接着,我们调用 producer.send()
方法发送消息,并获取一个 Future 对象。通过调用 future.isDone()
方法,我们可以检查消息是否已经成功发送。如果消息已经成功发送,我们可以使用 future.get()
方法获取一个 RecordMetadata 对象,其中包含了消息发送的详细信息。