Kafka依赖Java运行环境(JRE/JDK),需先安装OpenJDK 1.8及以上版本:
sudo yum install java-1.8.0-openjdk-devel -y
# 验证Java安装
java -version
确保输出显示Java版本信息,否则Kafka无法启动。
从Apache Kafka官网下载最新稳定版本(如3.5.2),解压至指定目录(如/opt/kafka
):
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
tar -zxvf kafka_2.12-3.5.2.tgz
sudo mv kafka_2.12-3.5.2 /opt/kafka
建议将Kafka安装至系统目录(如/opt
),便于后续管理。
Kafka通过Zookeeper实现集群管理和元数据存储,需先配置Zookeeper:
cd /opt/kafka/config
zookeeper.properties
文件,设置数据目录和客户端端口:dataDir=/var/lib/zookeeper # Zookeeper数据存储路径
clientPort=2181 # Zookeeper监听端口
nohup
或systemd):/opt/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties
若需后台运行,可添加nohup
:nohup /opt/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties > /dev/null 2>&1 &
编辑Kafka主配置文件server.properties
,设置关键参数:
# Broker唯一标识(集群中每个broker需不同)
broker.id=0
# 监听地址(本地测试可设为localhost,生产环境需设为服务器IP)
listeners=PLAINTEXT://your_server_ip:9092
# 对外暴露的地址(客户端连接用,生产环境需设为服务器IP)
advertised.listeners=PLAINTEXT://your_server_ip:9092
# 日志存储目录(需提前创建)
log.dirs=/var/lib/kafka-logs
# Zookeeper连接字符串(集群需列出所有节点)
zookeeper.connect=localhost:2181
注意:
9092
(Kafka)和2181
(Zookeeper)端口;PLAINTEXT
协议,启用SASL/SSL加密(需额外配置)。nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /dev/null 2>&1 &
jps | grep Kafka
若输出Kafka
,说明启动成功。/opt/kafka/bin/kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
/opt/kafka/bin/kafka-console-producer.sh --topic test_topic --bootstrap-server localhost:9092
输入消息(如Hello Kafka
)并按回车。/opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server localhost:9092
应能看到生产者发送的消息,说明Kafka配置成功。为避免服务器重启后Kafka和Zookeeper手动启动,可创建systemd服务:
/etc/systemd/system/kafka.service
):[Unit]
Description=Apache Kafka Server
After=network.target zookeeper.service
[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
/etc/systemd/system/zookeeper.service
):[Unit]
Description=Apache Zookeeper
After=network.target
[Service]
Type=simple
User=zookeeper
Group=zookeeper
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable zookeeper kafka
sudo systemctl start zookeeper kafka
通过systemctl status kafka
可查看Kafka服务状态。num.partitions
:增加默认分区数(如8
);default.replication.factor
:设置副本因子(如3
,需与broker数量匹配);min.insync.replicas
:最小同步副本数(如2
,确保数据一致性);log.retention.hours=168
,即7天)。9092
(Kafka)和2181
(Zookeeper)端口:sudo firewall-cmd --zone=public --add-port=9092/tcp --permanent
sudo firewall-cmd --zone=public --add-port=2181/tcp --permanent
sudo firewall-cmd --reload
sudo setsebool -P httpd_can_network_connect 1