Kafka依赖Java运行环境,推荐使用OpenJDK 11(兼容性更好):
sudo apt update && sudo apt install -y openjdk-11-jdk
java -version # 验证安装(需显示Java版本信息)
Kafka需Zookeeper存储元数据,建议单独安装(而非使用Kafka自带的临时Zookeeper):
# 下载并解压Zookeeper(以3.7.0为例)
wget https://downloads.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
tar -xzvf apache-zookeeper-3.7.0-bin.tar.gz
sudo mv apache-zookeeper-3.7.0 /opt/zookeeper
# 配置Zookeeper(创建数据目录并修改配置文件)
sudo mkdir -p /opt/zookeeper/data
sudo cat > /opt/zookeeper/conf/zoo.cfg << EOF
tickTime=2000
dataDir=/opt/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
EOF
# 启动Zookeeper并设置开机自启
sudo systemctl daemon-reload
sudo systemctl enable --now zookeeper
sudo systemctl status zookeeper # 验证状态(需显示"active (running)")
选择最新稳定版本(如3.7.0),解压至/opt目录(便于系统管理):
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzvf kafka_2.13-3.7.0.tgz
sudo mv kafka_2.13-3.7.0 /opt/kafka
编辑/opt/kafka/config/server.properties,设置以下核心参数:
# Broker唯一标识(集群中需唯一)
broker.id=0
# 监听地址(0.0.0.0允许所有IP访问;若需远程访问,需替换为服务器公网IP)
listeners=PLAINTEXT://0.0.0.0:9092
# 对外暴露的地址(远程客户端需使用此地址连接)
advertised.listeners=PLAINTEXT://your_server_public_ip:9092
# 日志存储目录(需提前创建,建议使用独立分区)
log.dirs=/var/lib/kafka/logs
sudo mkdir -p /var/lib/kafka/logs
sudo chown -R $USER:$USER /var/lib/kafka/logs # 赋予当前用户权限
# Zookeeper连接地址(集群中需列出所有Zookeeper节点,用逗号分隔)
zookeeper.connect=localhost:2181
# 消息保留策略(以下两项任选其一,避免数据无限增长)
log.retention.hours=168 # 保留7天(默认值)
# log.retention.bytes=1073741824 # 保留1GB(根据磁盘空间调整)
# 分区数(新Topic的默认分区数,根据并发需求调整)
num.partitions=3
# 副本数(集群中需>=1,建议设置为2或3以保证高可用)
default.replication.factor=1
# 删除Topic开关(设为true允许通过命令删除Topic)
delete.topic.enable=true
将Kafka命令加入系统PATH,方便全局调用:
echo 'export KAFKA_HOME=/opt/kafka' >> ~/.bashrc
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> ~/.bashrc
source ~/.bashrc
# 后台启动Kafka(使用配置文件)
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
# 验证Kafka是否运行(需显示9092端口监听)
sudo netstat -tulnp | grep 9092
# 创建测试Topic(名称:test,1分区,1副本)
/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# 生产消息(向test Topic发送消息)
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
# 消费消息(从test Topic读取消息)
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
若能在生产者终端输入消息并在消费者终端看到,说明配置成功。
允许Kafka(9092)和Zookeeper(2181)端口访问:
sudo ufw allow 9092/tcp
sudo ufw allow 2181/tcp
sudo ufw reload
编辑server.properties,优化以下参数:
# IO线程数(建议为CPU核心数的1-2倍)
num.io.threads=8
# 网络线程数(建议为CPU核心数的0.5-1倍)
num.network.threads=3
# 日志分段大小(1GB,减少文件数量,提高IO效率)
log.segment.bytes=1073741824
# 消息压缩(减少网络传输开销,可选LZ4/Snappy/GZIP)
compression.type=lz4
# 批次大小(1MB,提高吞吐量)
batch.size=1048576
# 等待时间(100ms,平衡延迟与吞吐量)
linger.ms=100
# 增加文件描述符限制(Kafka需处理大量文件)
echo "ulimit -n 65535" >> ~/.bashrc
source ~/.bashrc
# 调整内核参数(优化磁盘IO)
echo "vm.swappiness=10" | sudo tee -a /etc/sysctl.conf
echo "vm.dirty_background_ratio=5" | sudo tee -a /etc/sysctl.conf
echo "vm.dirty_ratio=10" | sudo tee -a /etc/sysctl.conf
sudo sysctl -p
若需搭建Kafka集群,需完成以下步骤:
zoo.cfg中添加所有Zookeeper节点(如server.0=zoo1:2888:3888;server.1=zoo2:2888:3888;server.2=zoo3:2888:3888),并在每台服务器的dataDir目录下创建myid文件(内容为对应节点ID,如1)。broker.id需唯一(如0、1、2)。advertised.listeners需设置为服务器公网IP。zookeeper.connect需指向集群中的所有Zookeeper节点。log.dirs目录所在磁盘有足够空间(建议预留2-3倍数据容量)。/opt/zookeeper/data)和Kafka日志目录(/var/lib/kafka/logs)。