Kafka客户端处理死信队列(Dead Letter Queue,DLQ)主要涉及到两个方面:消费者端和生产者端。下面分别介绍这两个方面的处理方式。
在消费者端,当处理消息时遇到错误,可以将消息发送到死信队列。以下是一个简单的示例,展示了如何在Java中使用Kafka消费者处理死信队列:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerExceptionHandler;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class DeadLetterQueueConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "dead-letter-queue-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "180000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
consumer.setExceptionHandler(new ConsumerExceptionHandler() {
@Override
public void handle(Exception thrown) {
System.out.println("Error occurred while processing message: " + thrown.getMessage());
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息
System.out.printf("Processing record: key = %s, value = %s, partition = %d, offset = %d%n",
record.key(), record.value(), record.partition(), record.offset());
} catch (Exception e) {
// 将消息发送到死信队列
consumer.assign(Collections.singletonList(new TopicPartition("my-topic-dlq", record.partition())));
consumer.seek(new TopicPartition("my-topic-dlq", record.partition()), record.offset());
System.out.println("Sending record to dead letter queue: " + record.value());
}
}
}
}
}
在生产者端,当消息发送失败时,可以将消息发送到死信队列。以下是一个简单的示例,展示了如何在Java中使用Kafka生产者处理死信队列:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerException;
import java.util.Properties;
public class DeadLetterQueueProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i));
try {
// 发送消息
producer.send(record);
System.out.printf("Sent record: key = %s, value = %s%n", record.key(), record.value());
} catch (ProducerException e) {
// 将消息发送到死信队列
producer.send(new ProducerRecord<>("my-topic-dlq", record.key(), record.value()));
System.out.println("Sent record to dead letter queue: " + record.value());
}
}
producer.close();
}
}
在这个示例中,生产者将消息发送到名为my-topic
的主题。如果发送失败(例如,由于网络问题或分区不可用),生产者将消息发送到名为my-topic-dlq
的死信队列。消费者可以从死信队列中获取这些消息并进行相应的处理。