一、升级前准备工作
server.properties、zookeeper.properties)、数据目录(log.dirs指定路径)、日志文件及自定义Topic配置,防止升级过程中数据丢失。kafka_2.13-3.6.0.tgz),建议选择稳定 release 版本。inter.broker.protocol.version、log.message.format.version的默认值调整);kafka-topics.sh的参数调整)。二、具体升级步骤
停止现有服务:
先停止Kafka Broker,再停止Zookeeper(Kafka依赖Zookeeper管理元数据),避免数据损坏:
# 停止Kafka(若使用systemd)
sudo systemctl stop kafka
# 或通过脚本停止
/opt/kafka_old/bin/kafka-server-stop.sh /opt/kafka_old/config/server.properties
# 停止Zookeeper
/opt/zk_old/bin/zkServer.sh stop
解压新版本包:
将下载的新版本包解压到指定目录(如/opt/kafka_new),保持目录结构清晰:
tar -xzf kafka_2.13-3.6.0.tgz -C /opt/
mv /opt/kafka_2.13-3.6.0 /opt/kafka_new
迁移配置文件:
将旧版本的config目录复制到新版本中,修改关键配置以适配新版本要求:
cp -r /opt/kafka_old/config/* /opt/kafka_new/config/
需重点检查的配置项:
broker.id:确保集群内唯一;listeners/advertised.listeners:更新Broker的监听地址(如从PLAINTEXT://:9092改为PLAINTEXT://your.domain:9092);zookeeper.connect:确认Zookeeper集群地址正确;log.dirs:确保新目录存在且有写入权限;message.format.version若升级到3.5+需设置为对应版本)。启动新版本服务:
先启动Zookeeper,再启动Kafka Broker,确保元数据服务正常:
# 启动Zookeeper
/opt/kafka_new/config/zookeeper-server-start.sh /opt/kafka_new/config/zookeeper.properties &
# 启动Kafka Broker
/opt/kafka_new/bin/kafka-server-start.sh /opt/kafka_new/config/server.properties &
验证升级结果:
使用Kafka命令行工具检查集群状态,确认新版本运行正常:
# 检查Broker版本(应显示新版本号)
/opt/kafka_new/bin/kafka-topics.sh --version
# 列出所有Topic,确认元数据正常
/opt/kafka_new/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 生产测试消息(向test Topic发送一条消息)
echo "Upgrade test message" | /opt/kafka_new/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 消费测试消息(从test Topic读取)
/opt/kafka_new/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
三、后续操作建议
kafka-consumer-groups.sh查看);zkServer.sh status)。/opt/kafka_old),释放磁盘空间。# 停止新版本Kafka
sudo systemctl stop kafka
# 启动旧版本Kafka
/opt/kafka_old/bin/kafka-server-start.sh /opt/kafka_old/config/server.properties
注意事项
LogSegment结构调整),需使用kafka-reassign-partitions.sh工具迁移数据,确保兼容性。