Ubuntu环境下Kafka集群管理指南
Kafka集群管理涵盖部署配置、日常运维、监控告警、故障处理及优化扩展等环节,以下是针对Ubuntu系统的具体操作框架:
sudo apt update && sudo apt install -y openjdk-11-jdk
java -version # 验证安装(需显示11.x版本)
zoo.cfg配置文件(/opt/zookeeper/conf/zoo.cfg),添加节点信息:dataDir=/opt/zookeeper/data
clientPort=2181
server.0=zookeeper1:2888:3888 # 节点ID:Leader选举端口:数据同步端口
server.1=zookeeper2:2888:3888
server.2=zookeeper3:2888:3888
每个节点需创建myid文件(/opt/zookeeper/data/myid),内容为对应节点ID(如zookeeper1节点写入0)。kafka_2.13-3.5.2.tgz),解压至/opt/kafka:wget https://downloads.apache.org/kafka/3.5.2/kafka_2.13-3.5.2.tgz
tar -xzf kafka_2.13-3.5.2.tgz -C /opt
sudo mv /opt/kafka_2.13-3.5.2 /opt/kafka
server.properties(关键参数):broker.id=1 # 每个Broker唯一ID(集群内不可重复)
listeners=PLAINTEXT://your_server_ip:9092 # 监听地址(替换为实际IP)
log.dirs=/opt/kafka/logs # 日志存储目录
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181 # Zookeeper集群地址
replication.factor=3 # Topic副本数(建议≥3,保障高可用)
min.insync.replicas=2 # 最小同步副本数(确保数据可靠性)
unclean.leader.election.enable=false # 禁止非同步副本成为Leader(避免数据丢失)
每个Broker节点需修改broker.id和listeners(指向自身IP)。在每台Zookeeper节点上执行:
cd /opt/zookeeper
bin/zkServer.sh start # 启动服务
bin/zkServer.sh status # 查看状态(Leader/Follower)
在每台Kafka节点上执行(后台运行):
cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties
验证Broker是否启动:
jps | grep Kafka # 应显示Kafka进程
bin/kafka-broker-api-versions.sh --bootstrap-server your_server_ip:9092
bin/kafka-topics.sh --list --bootstrap-server your_server_ip:9092
创建带副本的主题(如test-topic,3分区、3副本):
bin/kafka-topics.sh --create \
--bootstrap-server your_server_ip:9092 \
--replication-factor 3 \
--partitions 3 \
--topic test-topic
验证Topic详情:
bin/kafka-topics.sh --describe \
--bootstrap-server your_server_ip:9092 \
--topic test-topic
输出应显示每个分区的Leader及ISR(同步副本集)。
bin/kafka-topics.sh --alter \
--bootstrap-server your_server_ip:9092 \
--topic test-topic \
--partitions 5 # 分区数只能增加,不能减少
delete.topic.enable=true,默认开启):bin/kafka-topics.sh --delete \
--bootstrap-server your_server_ip:9092 \
--topic test-topic
bin/kafka-consumer-groups.sh --bootstrap-server your_server_ip:9092 --list
bin/kafka-consumer-groups.sh --bootstrap-server your_server_ip:9092 \
--describe \
--group test-group
test-group重置到最早位置):bin/kafka-consumer-groups.sh --bootstrap-server your_server_ip:9092 \
--group test-group \
--topic test-topic \
--reset-offsets --to-earliest --execute
bin/kafka-console-producer.sh --bootstrap-server your_server_ip:9092 --topic test-topic
bin/kafka-console-consumer.sh --bootstrap-server your_server_ip:9092 \
--topic test-topic \
--from-beginning # 从最早消息开始消费
log.dirs目录):通过log.retention.hours(日志保留时长,默认168小时)或log.retention.bytes(日志大小上限)自动清理,也可手动删除旧日志。server.properties、zoo.cfg及myid文件。application.conf(配置Zookeeper地址),启动kafka-manager即可访问。server.properties配置(如zookeeper.connect地址是否正确)、日志文件(logs/server.log)中的错误信息。kafka-preferred-replica-election.sh工具触发Leader重新选举:bin/kafka-preferred-replica-election.sh --bootstrap-server your_server_ip:9092
fetch.min.bytes(每次拉取的最小数据量)、fetch.max.wait.ms(拉取等待时间)参数。kafka-topics.sh --create重新创建同名Topic,Kafka会自动恢复元数据。log.dirs目录,删除损坏的分区目录,重启Broker后Kafka会重新生成数据(需确保副本数充足)。server.properties(分配新的broker.id,指向现有Zookeeper集群),启动Broker。Kafka会自动将分区副本分配到新节点,提升集群吞吐量。-Xmx4G -Xms4G),避免频繁GC。compression.type=snappy),减少网络传输量。通过以上步骤,可实现Ubuntu环境下Kafka集群的有效管理,保障其高可用性与稳定性。实际运维中需根据集群规模与业务需求调整配置,并定期进行演练与优化。