Kafka消息压缩配置指南
一 核心配置项与生效位置
二 配置示例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "lz4"); // 可选:gzip、snappy、lz4、zstd
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "value");
// 仅本条消息使用 gzip(需在客户端开启对 per-record 压缩的支持)
record.headers().add("compression.type", "gzip".getBytes(StandardCharsets.UTF_8));
producer.send(record);
# 推荐:保留生产者压缩
compression.type=producer
# 如显式指定算法,可能与生产者不一致导致重压缩
# compression.type=snappy
# 生产端指定压缩
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test-topic \
--compression-type gzip
# 消费端无需额外配置
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic --from-beginning
kafka-topics.sh --describe \
--topic your_topic_name \
--bootstrap-server your_kafka_broker
# 在输出中查看 Compression Type 字段
三 何时会发生二次压缩与性能影响
四 算法选型与批量参数建议