1. 网络层安全防护
通过防火墙规则限制Kafka端口的访问范围,仅允许可信IP(如客户端、运维管理节点)连接。例如,使用firewalld开放Kafka服务端口(默认9092)并限制源IP:
firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="192.168.1.0/24" port port="9092" protocol="tcp" accept'
firewall-cmd --reload
若无需远程访问,可禁用不必要的端口(如ZooKeeper的2181端口),降低攻击面。对于更高安全需求,可通过网络隔离(如Kubernetes NetworkPolicy、VPC peering)将Kafka集群与其他网络环境分隔,防止横向渗透。
2. 认证机制强化
SCRAM-SHA-256/SCRAM-SHA-512(推荐)或PLAIN(不推荐用于生产)机制。配置server.properties关键参数:listeners=SASL_SSL://:9093 # 同时启用SSL加密
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 # broker间通信机制
sasl.enabled.mechanisms=SCRAM-SHA-256 # 支持的客户端机制
创建kafka_server_jaas.conf文件定义用户凭证(如admin用户):KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="strong-password-123";
};
启动Kafka时指定JAAS配置:export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf"
bin/kafka-server-start.sh config/server.properties
producer.properties:security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="producer-user" password="producer-password";
3. 授权控制(ACL)
通过Kafka的**ACL(访问控制列表)**实现细粒度的权限管理,遵循“最小权限原则”。例如,授予user1对orders_topic的读写权限及consumer_group1的消费权限:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:user1 --operation Read --topic orders_topic \
--operation Write --topic orders_topic --group consumer_group1
配置server.properties启用ACL授权器:
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false # 无ACL时拒绝访问
定期通过kafka-acls.sh --list命令审计ACL规则,确保权限分配符合业务需求。
4. 数据传输加密
启用SSL/TLS加密保护客户端与Broker、Broker与Broker之间的通信,防止数据泄露。步骤如下:
keytool生成Broker证书:keytool -genkey -alias kafka-server -keystore kafka.server.keystore.jks -validity 365 -keyalg RSA -storepass server-pass -keypass server-key-pass
keytool -export -alias kafka-server -file kafka.server.crt -keystore kafka.server.keystore.jks -storepass server-pass
keytool -import -alias kafka-server -file kafka.server.crt -keystore kafka.server.truststore.jks -storepass truststore-pass
server.properties:listeners=SSL://:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/path/to/kafka.server.keystore.jks
ssl.keystore.password=server-pass
ssl.key.password=server-key-pass
ssl.truststore.location=/path/to/kafka.server.truststore.jks
ssl.truststore.password=truststore-pass
ssl.client.auth=required # 要求客户端提供证书(双向TLS)
client.truststore.jks):security.protocol=SSL
ssl.truststore.location=/path/to/client.truststore.jks
ssl.truststore.password=truststore-pass
5. 操作系统级加固
kafka)运行Kafka进程,限制其对系统资源的访问:useradd -r -s /sbin/nologin kafka
chown -R kafka:kafka /opt/kafka # Kafka安装目录归属
kafka_port_t上下文允许Kafka端口访问:semanage port -a -t kafka_port_t -p tcp 9093
或临时设置为宽松模式(生产环境不推荐):setenforce 0
sed -i 's/^SELINUX=.*/SELINUX=permissive/' /etc/selinux/config
6. 审计与监控
server.properties):log4j.logger.kafka=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.logger.kafka.authorizer.logger=DEBUG # 记录ACL操作
日志文件默认存储在logs目录,定期通过logrotate工具归档(如保留30天)。kafka_server_brokertopicmetrics_messages_in_total异常增长时触发告警)。7. 补丁与配置管理
kafka-config-checker)扫描配置文件,识别弱密码、未启用的加密协议(如SSLv3)等风险,确保配置符合安全基线。