Kafka在CentOS上的升级流程及注意事项
升级前必须备份Kafka集群的核心数据,包括:
/var/lib/zookeeper
或自定义配置的dataDir
);/tmp/kafka-logs
或自定义配置的log.dirs
)。rsync
、tar
或云存储快照,确保数据可恢复。通过命令行工具确认当前集群版本(任选其一):
# 查看主题元数据中的版本信息(需指定已有主题)
bin/kafka-topics.sh --describe --topic your_topic_name | grep "Version"
# 直接查看Kafka命令行工具版本
bin/kafka-topics.sh --version
记录当前版本号(如3.5.2
),以便后续选择兼容的新版本。
访问Apache Kafka官方下载页面(https://kafka.apache.org/downloads),选择与当前版本兼容的新版本(如从3.5.2
升级到3.9.0
),下载对应的二进制分发包(.tgz
格式)。
查阅新版本的发布说明(Release Notes)和升级指南(Upgrade Guide),重点确认:
使用systemctl
命令停止运行中的Kafka服务:
sudo systemctl stop kafka
验证服务状态:
sudo systemctl status kafka # 应显示"inactive (dead)"
tar -xzf kafka_2.12-3.9.0.tgz # 替换为实际下载的文件名
mv kafka_2.12-3.9.0 /opt/kafka # 移动到目标目录(建议固定路径)
/opt/kafka_old
),便于后续回退。将旧版本的config
目录下的配置文件复制到新版本目录,并根据新版本要求修改关键配置:
cp -r /opt/kafka_old/config/* /opt/kafka/config/
重点检查的配置项:
server.properties
:
broker.id
:确保集群内唯一;listeners
:更新为当前节点的监听地址(如PLAINTEXT://your_server_ip:9092
);advertised.listeners
:更新为外部客户端访问的地址(如PLAINTEXT://your_public_ip:9092
);zookeeper.connect
:若未迁移到KRaft模式,需保持与旧版本一致的Zookeeper连接字符串(如localhost:2181
);log.dirs
:确认日志目录路径正确(与备份路径一致);inter.broker.protocol.version
:若跨大版本升级,需设置为旧版本的协议版本(如从3.5.2
升级到3.9.0
,可保留3.5
,待集群全部升级完成后再修改为3.9
);log.message.format.version
:同理,保留旧版本格式(如3.5
)。进入新版本目录,启动Kafka服务:
cd /opt/kafka/bin
./kafka-server-start.sh -daemon ../config/server.properties # 使用-daemon后台运行
验证服务是否启动成功:
ps -ef | grep kafka # 应存在Kafka进程
tail -n 50 ../logs/server.log # 检查日志是否有错误
/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
输出应显示新版本号(如3.9.0
)。# 创建主题
/opt/kafka/bin/kafka-topics.sh --create --topic test_upgrade --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
# 发送消息
/opt/kafka/bin/kafka-console-producer.sh --topic test_upgrade --bootstrap-server localhost:9092
# 接收消息
/opt/kafka/bin/kafka-console-consumer.sh --topic test_upgrade --from-beginning --bootstrap-server localhost:9092
确认消息收发正常,无报错。升级后需持续监控以下指标:
kafka-consumer-groups.sh --describe
);ERROR
或WARN
级别日志)。确认集群运行稳定后,可删除旧版本的Kafka目录(如/opt/kafka_old
),释放磁盘空间。
若升级后出现严重问题(如数据丢失、服务无法启动),可快速回退到旧版本:
# 停止新版本Kafka
sudo systemctl stop kafka
# 启动旧版本Kafka(需提前备份旧版本目录)
/opt/kafka_old/bin/kafka-server-start.sh -daemon /opt/kafka_old/config/server.properties
验证旧版本服务恢复正常后,重复升级前的备份步骤,再尝试重新升级。