升级前必须备份Kafka的核心数据目录(log.dirs指定的路径,如/tmp/kafka-logs)和配置文件(server.properties、zookeeper.properties),防止升级过程中数据丢失或配置错误导致无法恢复。
示例命令:
cp -r /tmp/kafka-logs /tmp/kafka-logs-backup
cp /usr/local/kafka/config/server.properties /usr/local/kafka/config/server.properties.bak
cp /usr/local/kafka/config/zookeeper.properties /usr/local/kafka/config/zookeeper.properties.bak
java -version
升级前需停止所有运行中的Kafka和Zookeeper进程,避免数据不一致:
# 停止Kafka(若使用自带脚本)
/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties
# 停止Zookeeper(若使用自带脚本)
/usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties
访问Apache Kafka官方下载页面(https://kafka.apache.org/downloads),选择所需版本(如kafka_2.13-3.5.2.tgz),使用wget下载:
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.13-3.5.2.tgz
tar -xzvf kafka_2.13-3.5.2.tgz
cd kafka_2.13-3.5.2
进入新版本Kafka目录,修改核心配置文件config/server.properties,重点检查以下参数:
broker.id:集群中每个broker的唯一标识(需与旧版本一致,若集群中有节点变更需调整);listeners:broker监听的地址和端口(如PLAINTEXT://0.0.0.0:9092,允许所有IP访问);advertised.listeners:对外暴露的broker地址(客户端连接的地址,如PLAINTEXT://your_server_ip:9092);zookeeper.connect:Zookeeper集群连接字符串(如localhost:2181或zk1:2181,zk2:2181,zk3:2181);log.dirs:日志存储目录(建议与旧版本一致,避免数据迁移麻烦)。示例修改:
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.1.100:9092
log.dirs=/var/lib/kafka/logs
zookeeper.connect=localhost:2181
若通过systemd管理Kafka和Zookeeper服务,需更新服务文件以确保自动启动:
sudo nano /etc/systemd/system/zookeeper.service
写入以下内容(根据实际路径调整):
[Unit]
Description=Apache Zookeeper server
Requires=network.target
After=network.target
[Service]
Type=simple
User=root
Group=root
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=always
[Install]
WantedBy=multi-user.target
sudo nano /etc/systemd/system/kafka.service
写入以下内容(根据实际路径调整):
[Unit]
Description=Apache Kafka Server
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=root
Group=root
Environment="JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=always
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl start zookeeper
sudo systemctl enable zookeeper
sudo systemctl start kafka
sudo systemctl enable kafka
sudo systemctl status zookeeper
sudo systemctl status kafka
确保服务状态为active (running)。
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test_upgrade
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
若能列出test_upgrade,说明Topic创建成功。
# 生产者发送消息
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_upgrade
# 消费者接收消息(新终端)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_upgrade --from-beginning
若能收到生产者发送的消息,说明升级成功。
kafka-topics.sh、kafka-consumer-groups.sh等工具监控Topic状态、消费者组偏移量等指标;logs/server.log)和Zookeeper(logs/zookeeper.out)的日志文件定位错误;