Kafka消息压缩需在**生产者(Producer)、Broker、消费者(Consumer)**端配置,具体步骤如下:
在Producer的配置文件(如producer.properties)或代码中,添加以下参数:
compression.type=gzip  # 可选值:gzip、snappy、lz4、zstd(默认不压缩)
# 仅对gzip有效:设置压缩级别(1-9,9为最高压缩比)
compression.gzip.level=9
示例(Java代码):
props.put("compression.type", "snappy");  // 指定压缩算法
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
修改Kafka Broker的配置文件server.properties:
# 允许的压缩类型(需与Producer端一致)
compression.type=snappy  # 可选值:gzip、snappy、lz4、zstd
# 可选:设置特定压缩算法的参数(如LZ4级别)
compression.codec.lz4.level=9
注意:Broker端通常无需单独配置压缩类型,会默认使用Producer发送的压缩格式,除非需覆盖。
消费者无需手动配置压缩,Kafka会自动解压缩消息。若需验证,可通过以下参数查看压缩状态:
# 打印消息压缩信息(调试用,生产环境不建议)
print.offset=true
创建或修改主题时,可指定压缩类型(覆盖全局配置):
# 创建主题时设置压缩
kafka-topics.sh --create --topic compressed-topic --config compression.type=lz4 --bootstrap-server localhost:9092
# 修改已有主题的压缩类型
kafka-topics.sh --alter --topic compressed-topic --config compression.type=zstd --bootstrap-server localhost:9092
使用命令行工具发送和接收消息,观察消息内容是否为压缩格式(如二进制数据):
# 发送压缩消息
kafka-console-producer --broker-list localhost:9092 --topic test-topic --property compression.type=gzip
# 接收并查看消息(正常应为解压后的原始内容)
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
配置参考来源: