Apache Kafka 是一个分布式流处理平台,它能够在 Linux 系统上实现消息的持久化。以下是在 Linux 中实现 Kafka 消息持久化的关键步骤:
安装 Kafka:
KAFKA_HOME 和 PATH。配置 Kafka Broker:
server.properties,通常位于 KAFKA_HOME/config 目录下。log.dirs 属性,指定 Kafka 存储日志的目录。例如:log.dirs=/var/lib/kafka/logs
配置消息持久化:
log.retention.hours 或 log.retention.bytes 属性设置合理,以控制消息的保留时间或大小。log.retention.hours=168
启动 Kafka Broker:
bin/kafka-server-start.sh config/server.properties
创建 Topic:
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3
生产者和消费者:
编写生产者和消费者代码,确保消息被正确发送和接收。
生产者代码示例(使用 Java):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<String, String>("my-topic", "key", "value");
producer.send(record);
producer.close();
消费者代码示例(使用 Java):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
通过以上步骤,你可以在 Linux 系统上实现 Kafka 消息的持久化。确保 Kafka Broker 和 Topic 的配置正确,并且生产者和消费者代码能够正确处理消息。