Kafka Send 避免重复发送消息可以通过以下方法实现:
enable.idempotence
为 true
。同时,还需要为 producer 设置一个唯一的 ID(transactional.id
),以便 Kafka 可以跟踪和识别它。Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true");
props.put("transactional.id", "my-unique-producer-id");
Producer<String, String> producer = new KafkaProducer<>(props);
transactional.id
,并调用 initTransactions()
、beginTransaction()
、commitTransaction()
和 abortTransaction()
方法来管理事务。Properties props = new Properties();
// ... 其他配置 ...
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
// 发送消息到分区 0 和主题 A
producer.send(new ProducerRecord<>("topicA", key, value));
producer.send(new ProducerRecord<>("topicB", key, value));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
} finally {
producer.close();
}
ListOffsets
API 来实现。在发送消息之前,你可以先检查消息的偏移量,如果已经存在,则跳过发送。这种方法需要额外的逻辑来实现,但可以提供更高的灵活性。请注意,这些方法并不能完全保证消息不会重复发送,但它们可以大大降低重复发送的可能性。在实际应用中,你可能需要根据具体需求选择合适的方法。