根据业务需求调整Kafka配置可从以下方面入手,结合具体场景选择参数并动态优化:
batch.size
(如1MB~10MB),提高linger.ms
(如50ms~100ms),启用compression.type=lz4/snappy
,设置acks=1
平衡吞吐与可靠性。num.io.threads
(如CPU核数×2),调整log.segment.bytes
(如2GB~5GB)减少日志切换开销,使用SSD存储提升磁盘IO。fetch.min.bytes
(如1MB~5MB),设置max.poll.records
(如500~1000),启用多线程并行消费。linger.ms
(如10ms~20ms),设置acks=1
或acks=0
,避免批量发送增加延迟。log.flush.interval.ms
(如100ms~500ms),启用unclean.leader.election.enable=false
确保数据一致性。fetch.max.wait.ms=100ms
,使用手动提交偏移量精准控制消费进度。acks=all
,增加retries
(如10次)和retry.backoff.ms
(如500ms)。replication.factor=3
,启用min.insync.replicas=2
,关闭log.retention.ms
或设置为极长周期(如7天)。enable.auto.commit=false
手动提交偏移量,增加session.timeout.ms
(如30秒)避免误判失联。kafka-configs.sh
动态修改参数,例如调整主题副本数:./kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config replication.factor=3 --entity-type topics --entity-name my_topic
// 增加分区数示例
adminClient.incrementalAlterTopicPartitions(Collections.singletonMap("my_topic",
Collections.singletonList(new AlterConfigOp(new ConfigEntry("num.partitions", "6"), AlterConfigOp.OpType.SET))));
net.core.rmem_max
和net.core.wmem_max
增大网络缓冲区。RecordsSentPerSec
)、消费者延迟(records-lag
)、Broker磁盘IO和CPU使用率。kafka-producer-perf-test
)验证配置效果,避免生产环境直接调整导致风险。参考来源: