Kafka依赖Java运行环境,需安装JDK 1.8及以上版本。在CentOS上可通过以下命令安装OpenJDK 8:
sudo yum install java-1.8.0-openjdk-devel -y
# 验证安装
java -version
从Apache官网下载稳定版本(如3.5.2),解压至指定目录(如/opt/kafka):
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
tar -zxvf kafka_2.12-3.5.2.tgz
sudo mv kafka_2.12-3.5.2 /opt/kafka
将Kafka的bin目录添加至系统PATH,方便全局调用:
echo 'export PATH=$PATH:/opt/kafka/bin' >> ~/.bashrc
source ~/.bashrc
Kafka的主配置文件位于/opt/kafka/config/server.properties,需调整以下关键参数:
broker.id=0 # 若有多个Broker,依次设置为1、2...
listeners=PLAINTEXT://your_server_ip:9092
advertised.listeners=PLAINTEXT://your_server_ip:9092
log.dirs=/opt/kafka/logs,/data/kafka/logs
num.partitions=8
default.replication.factor=3
zookeeper.connect=localhost:2181 # 若为集群,改为zk1:2181,zk2:2181,zk3:2181
default.replication.factor)。min.insync.replicas=2
auto.leader.rebalance.enable=true
开放Kafka与Zookeeper的端口,允许客户端访问:
# 开放Kafka端口(9092)
sudo firewall-cmd --zone=public --add-port=9092/tcp --permanent
# 开放Zookeeper端口(2181)
sudo firewall-cmd --zone=public --add-port=2181/tcp --permanent
# 重新加载防火墙规则
sudo firewall-cmd --reload
进入Kafka目录,启动Broker(后台运行):
cd /opt/kafka
nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
bin/kafka-topics.sh --list --bootstrap-server your_server_ip:9092
bin/kafka-topics.sh --create --bootstrap-server your_server_ip:9092 --replication-factor 1 --partitions 1 --topic test_topic
# 生产者发送消息
bin/kafka-console-producer.sh --topic test_topic --bootstrap-server your_server_ip:9092
# 消费者接收消息
bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server your_server_ip:9092
/etc/security/limits.conf:* soft nofile 65536
* hard nofile 65536
/etc/sysctl.conf:vm.max_map_count=262144
生效命令:sysctl -pnum.network.threads=8
num.io.threads=16
log.flush.interval.messages=10000
log.flush.interval.ms=1000
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
启用SASL/PLAIN认证,编辑server.properties:
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
创建JAAS配置文件kafka_server_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
启动时指定JAAS文件:
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
bin/kafka-server-start.sh config/server.properties
配置SSL加密传输(需生成证书),编辑server.properties:
listeners=SSL://your_server_ip:9093
ssl.keystore.location=/opt/kafka/config/kafka.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/opt/kafka/config/kafka.truststore.jks
ssl.truststore.password=password
log.retention.hours=168
kafka-topics.sh、kafka-consumer-groups.sh工具定期检查Topic状态。