在配置Kafka安全认证前,需完成以下准备工作:
keytool(Java自带,用于SSL证书管理)、openssl(可选,用于生成证书)、kafka-configs.sh(Kafka自带的ACL管理工具);SASL(Simple Authentication and Security Layer)是Kafka主流的认证框架,支持多种机制(如PLAIN、SCRAM、Kerberos)。以下以SCRAM-SHA-256(推荐,密码加密存储)为例:
在Kafka配置目录(如config/)下创建kafka_server_jaas.conf,内容如下(替换admin/admin-secret为实际用户名和密码):
KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret";
};
server.properties)添加或修改以下参数,启用SASL认证及SCRAM机制:
# 指定监听协议(SASL_PLAINTEXT为明文传输,SASL_SSL为加密传输,生产环境推荐后者)
listeners=SASL_SSL://:9093
# Broker间通信协议
security.inter.broker.protocol=SASL_SSL
# 启用的SASL机制(需与JAAS文件中的机制一致)
sasl.enabled.mechanisms=SCRAM-SHA-256
# Broker间通信的SASL机制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# 指定JAAS配置文件路径(通过JVM参数加载)
security.protocol=SASL_SSL
若Kafka与Zookeeper交互,需在Zookeeper的zoo.cfg中开启SASL认证:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
在Kafka启动脚本(如kafka-server-start.sh)中添加JVM参数,加载JAAS文件:
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka/config/kafka_server_jaas.conf"
./kafka-server-start.sh /path/to/kafka/config/server.properties
客户端需通过jaas.conf文件指定认证信息,以Java Producer为例:
在客户端配置目录下创建kafka_client_jaas.conf,内容如下(替换test/test-secret为客户端用户名和密码):
KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="test"
    password="test-secret";
};
在Producer/Consumer代码中添加以下属性:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test\" password=\"test-secret\";");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
SSL/TLS用于加密Kafka客户端与Broker之间的通信,防止数据泄露。以下是配置步骤:
使用keytool生成自签名证书(生产环境建议使用CA签发的证书):
# 创建密钥库(Keystore,存储Broker私钥和证书)
keytool -genkey -alias kafka -keyalg RSA -keystore /path/to/kafka.keystore.jks -validity 365 -keysize 2048
# 导出证书(用于客户端信任)
keytool -export -alias kafka -keystore /path/to/kafka.keystore.jks -file /path/to/kafka.crt
# 创建信任库(Truststore,存储客户端信任的证书)
keytool -import -alias kafka -file /path/to/kafka.crt -keystore /path/to/kafka.truststore.jks
server.properties)添加或修改以下参数,启用SSL加密:
# 监听协议(SSL为加密传输)
listeners=SSL://:9093
# Broker间通信协议
security.inter.broker.protocol=SSL
# 密钥库配置(Broker私钥和证书)
ssl.keystore.location=/path/to/kafka.keystore.jks
ssl.keystore.password=keystore-secret
ssl.key.password=key-secret
# 信任库配置(客户端证书)
ssl.truststore.location=/path/to/kafka.truststore.jks
ssl.truststore.password=truststore-secret
# 要求客户端提供证书(生产环境推荐)
ssl.client.auth=required
# 启用的加密协议(推荐TLSv1.2及以上)
ssl.enabled.protocols=TLSv1.2
客户端需信任Broker的证书,配置如下(以Java Producer为例):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/path/to/kafka.truststore.jks");
props.put("ssl.truststore.password", "truststore-secret");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ACL(Access Control Lists)用于定义用户对Kafka资源(如Topic、Group)的操作权限(读、写、创建等)。以下是配置步骤:
在server.properties中设置授权类为SimpleAclAuthorizer(默认启用):
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 指定超级用户(可跳过ACL检查)
super.users=User:admin
使用kafka-configs.sh工具创建SCRAM用户(若已通过JAAS配置创建,可跳过):
./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=test-secret]' --entity-type users --entity-name test
使用kafka-acls.sh工具为用户分配Topic权限,例如:
# 给test用户分配test-topic的读写权限
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
    --add --allow-principal User:test \
    --operation Read --operation Write \
    --topic test
# 给test用户分配test-group的消费权限
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
    --add --allow-principal User:test \
    --operation Read \
    --group test-group
使用以下命令查看Topic的ACL列表:
./kafka-acls.sh --bootstrap-server localhost:9093 --topic test --list
SCRAM-SHA-256或SCRAM-SHA-512;kafka-configs.sh更新用户密码,避免密码泄露;server.properties、kafka_server_jaas.conf等关键配置文件;以上配置覆盖了Kafka安全认证的核心需求,可根据实际环境调整参数(如协议版本、加密算法)。建议参考Kafka官方文档(Security)获取最新配置选项。