Kafka依赖Java运行环境,需安装JDK 1.8或更高版本(推荐OpenJDK 8):
sudo yum install java-1.8.0-openjdk-devel -y
java -version # 验证安装(需显示1.8.x版本)
从Apache官网下载最新稳定版Kafka(如3.5.2),解压至指定目录(如/opt/kafka):
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
tar -zxvf kafka_2.12-3.5.2.tgz -C /opt/
mv /opt/kafka_2.12-3.5.2 /opt/kafka # 重命名便于管理
Kafka需通过Zookeeper实现集群管理和Leader选举,需先配置Zookeeper:
cd /opt/kafka/config
# 修改zookeeper.properties关键参数
dataDir=/opt/kafka/zookeeper_data # 数据存储目录
clientPort=2181 # 客户端连接端口
maxClientCnxns=0 # 最大客户端连接数(0表示无限制)
启动Zookeeper(后台运行):
nohup /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties > /dev/null 2>&1 &
编辑/opt/kafka/config/server.properties,调整以下关键参数:
# Broker唯一标识(集群中需唯一)
broker.id=0
# 监听地址与端口(本地监听所有IP)
listeners=PLAINTEXT://0.0.0.0:9092
# 对外暴露的地址(客户端连接的地址,需替换为服务器公网IP)
advertised.listeners=PLAINTEXT://your_server_ip:9092
# 日志存储目录(需提前创建,建议挂载大容量磁盘)
log.dirs=/opt/kafka/kafka-logs
# Zookeeper连接字符串(集群需列出所有节点)
zookeeper.connect=localhost:2181
# 默认分区数(新Topic的默认分区数量,根据吞吐量调整,如8)
num.partitions=8
# 默认副本因子(每个分区的副本数量,生产环境建议≥3)
default.replication.factor=3
# 最小同步副本数(写操作需确认的副本数,保证数据持久性,建议≤default.replication.factor)
min.insync.replicas=2
# 日志保留时间(168小时=7天,超时自动删除)
log.retention.hours=168
# 日志段大小(1GB,达到后滚动新段)
log.segment.bytes=1073741824
# 消息最大大小(100MB,根据业务调整)
message.max.bytes=100000000
# 前台启动(调试用,日志输出到终端)
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
# 后台启动(生产环境推荐)
nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1 &
# 查看Kafka进程
ps -ef | grep kafka
# 创建测试Topic(1分区、1副本)
/opt/kafka/bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
# 列出Topic确认创建成功
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 生产者发送消息(终端1)
/opt/kafka/bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
# 消费者接收消息(终端2)
/opt/kafka/bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
sudo firewall-cmd --zone=public --add-port=9092/tcp --permanent
sudo firewall-cmd --reload
sudo setsebool -P httpd_can_network_connect 1 # 允许HTTP服务网络连接(如需通过Web监控)
编辑/opt/kafka/bin/kafka-server-start.sh,在exec $base_dir/kafka-run-class.sh前添加:
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G" # 设置堆内存为4GB(根据服务器内存调整)
修改server.properties:
# 网络线程数(处理客户端请求,建议2-8)
num.network.threads=3
# I/O线程数(处理磁盘读写,建议8-16)
num.io.threads=8
# 网络发送/接收缓冲区大小(提升网络吞吐量)
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
修改server.properties:
# 日志刷新间隔(消息数量达到1万条时刷新)
log.flush.interval.messages=10000
# 日志刷新时间间隔(1秒,避免频繁IO)
log.flush.interval.ms=1000
修改server.properties:
# 副本同步超时时间(10秒,超过则认为副本滞后)
replica.lag.time.max.ms=10000
# leader选举超时时间(10秒,加快故障恢复)
leader.imbalance.check.interval.seconds=30
修改server.properties,设置日志保留策略(按时间/大小清理):
# 按时间保留(7天)
log.retention.hours=168
# 按大小保留(10GB,超过则删除旧日志)
log.retention.bytes=10737418240
kafka-topics.sh、kafka-consumer-groups.sh等命令监控Topic、消费者组状态。zookeeper.connect配置是否正确(Zookeeper是否运行)。/opt/kafka/kafka.log)定位具体错误(如端口冲突、磁盘空间不足)。advertised.listeners配置是否为客户端可访问的地址(如公网IP)。sudo firewall-cmd --list-ports)。default.replication.factor≥3(副本数≥3)且min.insync.replicas≥2(最小同步副本≥2)。acks=all(生产者确认机制,确保消息写入所有同步副本)。