Kafka消息压缩配置主要涉及生产者和Broker端,具体如下:
compression.type
属性来选择压缩算法,可选值为none
、gzip
、snappy
、lz4
、zstd
,默认为none
。对于Gzip压缩,还可通过compression.level
属性设置压缩级别(1-9),数值越大压缩比越高但CPU消耗也越大。示例代码如下:Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.COMPRESSION_LEVEL_CONFIG, "9");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
server.properties
文件中设置compression.type
属性,其默认值为producer
,表示继承Producer端的压缩方式。也可通过Kafka命令行工具在Topic级别进行配置,使用kafka-topics.sh
或kafka-configs.sh
命令,如kafka-topics.sh --create --topic my-topic --config compression.type=snappy
。