在开始配置前,需确保Linux系统已安装Java运行环境(JRE 8+)(Kafka依赖Java运行),并下载Kafka二进制包(如kafka_2.13-3.2.0.tgz)。可通过以下命令快速安装Java:
sudo apt update && sudo apt install -y openjdk-11-jdk
/opt/kafka):tar -xzf kafka_2.13-3.2.0.tgz -C /opt
cd /opt/kafka_2.13-3.2.0
config/server.properties文件,设置以下关键参数以实现持久化:# 指定Kafka日志存储目录(需提前创建并授权)
log.dirs=/opt/kafka/logs
# 消息保留时间(168小时=7天,可根据需求调整)
log.retention.hours=168
# 单个日志段最大大小(1GB,避免单个文件过大)
log.segment.bytes=1073741824
# 日志清理检查间隔(5分钟,及时清理过期数据)
log.retention.check.interval.ms=300000
# 副本因子(设置为3,确保数据高可用)
default.replication.factor=3
# 最小同步副本数(设置为2,保证数据写入成功)
min.insync.replicas=2
log.dirs指定的目录存在且有正确权限:mkdir -p /opt/kafka/logs
chmod -R 755 /opt/kafka/logs
Kafka依赖ZooKeeper管理集群元数据,需先启动ZooKeeper再启动Kafka:
# 启动ZooKeeper(默认端口2181)
bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka(后台模式)
bin/kafka-server-start.sh config/server.properties &
创建Topic时,需指定副本因子(replication-factor)(≥2)和分区数(partitions)(≥1),确保数据冗余:
# 创建名为"my_topic"的Topic,3个分区,3个副本
bin/kafka-topics.sh --create \
--topic my_topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 3
通过生产者-消费者流程验证消息是否持久化:
bin/kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092
输入任意消息(如Hello Kafka Persistence)并按回车。--from-beginning表示从最早消息开始):bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092
若能读取到之前发送的消息,则说明持久化成功。df -h /opt/kafka/logs
log.dirs目录(如使用rsync):rsync -av /opt/kafka/logs /backup/kafka_logs_backup
log.retention.hours(保留时间)、log.retention.bytes(最大大小)等参数,平衡存储成本与数据保留需求。通过以上步骤,可在Linux系统上实现Kafka消息的持久化存储,确保数据可靠性和高可用性。