Kafka安全认证配置指南
Kafka支持多种安全认证机制,其中SASL(Simple Authentication and Security Layer)和SSL/TLS是最常用的组合(SASL负责身份认证,SSL/TLS负责传输加密)。以下是具体配置步骤:
SASL提供多种机制(如PLAIN、SCRAM、Kerberos),其中SCRAM(基于哈希的挑战响应机制)和PLAIN(用户名/密码明文,需配合SSL使用)是生产环境常用选项。
使用kafka-configs.sh工具为用户创建SCRAM凭证(存储于ZooKeeper):
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--alter \
--add-config 'SCRAM-SHA-256=[password=your_password],SCRAM-SHA-512=[password=your_password]' \
--entity-type users \
--entity-name your_username
注:若使用PLAIN机制,需通过JAAS文件配置用户信息(见下文)。
在$KAFKA_HOME/config/目录下创建kafka_server_jaas.conf,定义Broker的认证模块(以SCRAM为例):
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required;
};
若使用PLAIN机制(需配合SSL),配置如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin_password";
};
修改server.properties文件,添加以下配置:
# 指定Broker监听的协议(SASL_PLAINTEXT为明文,SASL_SSL为加密)
listeners=SASL_SSL://:9093
# Broker间通信协议(需与listeners一致)
security.inter.broker.protocol=SASL_SSL
# 启用的SASL机制(需与JAAS文件中的模块匹配)
sasl.enabled.mechanisms=SCRAM-SHA-256
# 指定Broker间通信的SASL机制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# 指定JAAS文件路径(JVM参数)
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required;
bin/kafka-server-stop.sh
bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
SSL/TLS用于加密Broker与客户端、Broker之间的通信,防止数据泄露。
使用keytool生成密钥库(Keystore)和信任库(Truststore):
# 生成密钥库(包含Broker私钥和自签名证书)
keytool -genkey -alias kafka -keyalg RSA -keystore $KAFKA_HOME/config/kafka.keystore.jks \
-validity 365 -keysize 2048 -storepass your_keystore_password -keypass your_key_password
# 导出证书(用于客户端信任)
keytool -export -alias kafka -file $KAFKA_HOME/config/kafka.crt \
-keystore $KAFKA_HOME/config/kafka.keystore.jks -storepass your_keystore_password
# 导入证书到信任库(客户端需信任此证书)
keytool -import -alias kafka -file $KAFKA_HOME/config/kafka.crt \
-keystore $KAFKA_HOME/config/kafka.truststore.jks -storepass your_truststore_password -noprompt
修改server.properties文件,添加以下配置:
# 指定Broker监听的SSL端口
listeners=SSL://:9093
# Broker间通信协议
security.inter.broker.protocol=SSL
# 密钥库路径及密码
ssl.keystore.location=$KAFKA_HOME/config/kafka.keystore.jks
ssl.keystore.password=your_keystore_password
ssl.key.password=your_key_password
# 信任库路径及密码
ssl.truststore.location=$KAFKA_HOME/config/kafka.truststore.jks
ssl.truststore.password=your_truststore_password
# 要求客户端进行身份验证(可选,生产环境建议开启)
ssl.client.auth=required
# 支持的SSL协议(推荐TLSv1.2及以上)
ssl.enabled.protocols=TLSv1.2
客户端(生产者/消费者)需配置信任库以验证Broker证书:
# 指定安全协议
security.protocol=SSL
# 信任库路径及密码(客户端需信任Broker的证书)
ssl.truststore.location=$KAFKA_HOME/config/kafka.truststore.jks
ssl.truststore.password=your_truststore_password
# 可选:客户端证书(若Broker要求双向认证)
ssl.keystore.location=$KAFKA_HOME/config/kafka.keystore.jks
ssl.keystore.password=your_keystore_password
ssl.key.password=your_key_password
bin/kafka-server-stop.sh
bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
# 测试SSL连接(生产者)
bin/kafka-console-producer.sh --broker-list localhost:9093 \
--topic test_topic \
--producer.config $KAFKA_HOME/config/producer_ssl.properties
# 测试SSL连接(消费者)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 \
--topic test_topic \
--from-beginning \
--consumer.config $KAFKA_HOME/config/consumer_ssl.properties
生产环境建议同时启用SASL(身份认证)和SSL(传输加密),配置示例如下:
server.properties)listeners=SASL_SSL://:9093
security.inter.broker.protocol=SASL_SSL
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required;
ssl.keystore.location=$KAFKA_HOME/config/kafka.keystore.jks
ssl.keystore.password=your_keystore_password
ssl.key.password=your_key_password
ssl.truststore.location=$KAFKA_HOME/config/kafka.truststore.jks
ssl.truststore.password=your_truststore_password
ssl.client.auth=required
producer_ssl.properties)security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="your_username" \
password="your_password";
ssl.truststore.location=$KAFKA_HOME/config/kafka.truststore.jks
ssl.truststore.password=your_truststore_password
kafka-acls.sh工具为用户分配Topic权限(如READ、WRITE)。