1. 认证配置:确保客户端与Broker身份可信
认证是Kafka安全的基础,用于验证客户端(生产者/消费者)和Broker的身份。常见机制包括SASL(支持PLAIN、SCRAM、Kerberos等)和SSL客户端认证:
server.properties设置security.protocol=SASL_SSL(推荐,同时启用加密)、sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256(Broker间认证机制)、sasl.enabled.mechanisms=SCRAM-SHA-256(启用的机制);创建JAAS配置文件(如kafka_server_jaas.conf),内容示例:KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="strong-password";
};
启动时通过-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf加载配置。keytool -genkey -alias kafka-server -keystore server.keystore.jks)和信任库(keytool -export/import交换证书),配置server.properties:ssl.keystore.location=/path/to/server.keystore.jks
ssl.keystore.password=keystore-pwd
ssl.key.password=key-pwd
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=truststore-pwd
ssl.enabled.protocols=TLSv1.2
ssl.client.auth=required # 要求客户端提供证书
客户端需配置对应的ssl.keystore.location和ssl.truststore.location。2. 授权管理:精细化控制访问权限
通过ACL(访问控制列表)定义用户/用户组对主题、集群的操作权限(如读、写、创建、删除):
server.properties中设置authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer(默认实现),allow.everyone.if.no.acl.found=false(无ACL时拒绝访问)。kafka-acls.sh工具,示例:为用户alice授予对主题orders的读写权限:kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:alice --operation Read --operation Write --topic orders
可通过--group参数为用户组授权,或--cluster参数管理集群级权限(如创建主题、修改配置)。3. 加密通信:防止数据泄露
通过SSL/TLS加密客户端与Broker、Broker之间的通信,避免数据被窃听或篡改:
security.inter.broker.protocol=SASL_SSL(若使用SASL),配置ssl.keystore和ssl.truststore参数。security.protocol=SASL_SSL(或SSL,若无需SASL),指定信任库和密钥库路径。4. 网络隔离:限制访问范围
通过防火墙和网络配置,减少攻击面:
firewall-cmd开放Kafka端口(默认9092/9093),仅允许可信IP访问:firewall-cmd --permanent --zone=public --add-rich-rule='rule family="ipv4" source address="192.168.1.0/24" port port="9093" protocol="tcp" accept'
firewall-cmd --reload
advertised.listeners为内部IP或域名,避免暴露真实地址。5. 系统层安全加固:提升整体安全性
passwd -l <用户名>);设置强口令策略(包含大小写字母、数字、特殊字符,长度≥10位)。log.dirs)、配置文件(server.properties)属主设为kafka用户,限制其他用户访问:chown -R kafka:kafka /var/lib/kafka/data
chmod -R 750 /var/lib/kafka/data
setenforce 0)或调整策略允许Kafka运行;修改/etc/profile设置TMOUT=300(root账户5分钟自动注销)。6. 审计与监控:及时发现异常
server.properties中配置日志记录客户端操作(如log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender),或使用第三方审计工具(如ELK)收集日志。7. 定期维护:保持安全状态