一、安装Java环境(Kafka前置依赖)
Kafka基于Java开发,需先安装Java运行环境(JRE或JDK)。推荐使用OpenJDK 11及以上版本(兼容性更好):
sudo apt update && sudo apt install openjdk-11-jdk -y
;sudo yum install java-11-openjdk -y
。java -version
验证是否成功(需显示Java版本信息)。二、下载并解压Kafka
从Apache Kafka官网下载最新稳定版本(如3.6.1),使用wget
命令获取安装包,再用tar
解压到指定目录(如/opt/kafka
):
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -xzf kafka_2.13-3.6.1.tgz
mv kafka_2.13-3.6.1 /opt/kafka # 可选:移动到目标目录
建议将Kafka目录加入系统环境变量(编辑/etc/profile
,添加export KAFKA_HOME=/opt/kafka
和export PATH=$PATH:$KAFKA_HOME/bin
,然后执行source /etc/profile
),方便后续命令调用。
三、配置Kafka(修改server.properties)
进入Kafka配置目录($KAFKA_HOME/config
),编辑server.properties
文件,调整以下关键参数:
broker.id=0
);listeners=PLAINTEXT://your_server_ip:9092
,需替换为实际IP);log.dirs=/data/kafka/logs
,需提前创建目录并赋予权限mkdir -p /data/kafka/logs && chown -R $USER:$USER /data/kafka/logs
);zookeeper.connect=localhost:2181
,单节点ZooKeeper用此配置;集群需列出所有节点,如host1:2181,host2:2181,host3:2181
)。四、启动ZooKeeper(Kafka依赖的协调服务)
Kafka通过ZooKeeper管理集群元数据(如Topic、分区信息),需先启动ZooKeeper:
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
默认情况下,ZooKeeper会在2181
端口启动。可通过netstat -an | grep 2181
验证是否运行正常。
五、启动Kafka服务器
在另一个终端窗口中,启动Kafka服务(后台运行模式,避免占用当前终端):
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
启动后,可通过netstat -an | grep 9092
验证Kafka是否在9092
端口监听(客户端通信端口)。
六、创建Topic(消息存储的主题)
使用Kafka命令行工具创建Topic,指定Topic名称、分区数(partitions
,决定并行处理能力)和副本因子(replication-factor
,数据冗余备份数量,单节点集群设为1):
$KAFKA_HOME/bin/kafka-topics.sh --create \
--topic my_topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
创建成功后,可通过$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
查看所有Topic。
七、测试Kafka功能(发送/接收消息)
$KAFKA_HOME/bin/kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092
--from-beginning
表示从最早的消息开始读取):$KAFKA_HOME/bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092
在生产者终端输入消息(如Hello Kafka
),消费者终端会同步显示该消息,验证Kafka消息收发功能正常。
八、停止Kafka服务(可选)
测试完成后,可通过以下命令停止服务:
Ctrl+C
;Ctrl+C
;$KAFKA_HOME/bin/kafka-server-stop.sh
;$KAFKA_HOME/bin/zookeeper-server-stop.sh
。注意事项:
log.retention.hours
控制日志保留时间)、启用数据加密(SSL/TLS)和认证机制(SASL)以提升安全性;