Kafka消费者组的配置主要包括以下几个方面:
bootstrap.servers
bootstrap.servers=broker1:9092,broker2:9092
group.id
group.id=my-consumer-group
enable.auto.commit
true
。enable.auto.commit=true
auto.commit.interval.ms
auto.commit.interval.ms=5000
auto.offset.reset
earliest
: 从最早的偏移量开始消费。latest
: 从最新的偏移量开始消费。none
: 如果没有找到合适的偏移量,则抛出异常。auto.offset.reset=earliest
max.poll.records
poll()
方法返回的最大记录数。max.poll.records=500
fetch.min.bytes
fetch.min.bytes=1024
fetch.max.wait.ms
fetch.min.bytes
,则等待的最长时间。fetch.max.wait.ms=500
session.timeout.ms
session.timeout.ms=30000
heartbeat.interval.ms
heartbeat.interval.ms=3000
max.partition.fetch.bytes
max.partition.fetch.bytes=1048576
isolation.level
read_committed
: 只读取已提交的数据。read_uncommitted
: 读取所有数据,包括未提交的数据。isolation.level=read_committed
security.protocol
SSL
, SASL_PLAINTEXT
, SASL_SSL
等。security.protocol=SASL_SSL
sasl.mechanism
PLAIN
, GSSAPI
等。sasl.mechanism=PLAIN
sasl.jaas.config
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass";
bootstrap.servers=broker1:9092,broker2:9092
group.id=my-consumer-group
enable.auto.commit=true
auto.commit.interval.ms=5000
auto.offset.reset=earliest
max.poll.records=500
fetch.min.bytes=1024
fetch.max.wait.ms=500
session.timeout.ms=30000
heartbeat.interval.ms=3000
max.partition.fetch.bytes=1048576
isolation.level=read_committed
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass";
security.protocol
和sasl.*
)应根据实际的安全需求进行设置。通过合理配置这些参数,可以优化Kafka消费者的性能和可靠性。