在配置消费者组前,需确保Linux系统已安装Java(JDK 8+)、Zookeeper和Kafka,且Zookeeper(默认端口2181)和Kafka(默认端口9092)服务已正常启动。
消费者组的配置主要通过consumer.properties文件完成(可自定义路径),关键参数如下:
group.id:消费者组的唯一标识符(如my-consumer-group),同一组内的消费者会协同消费分区。bootstrap.servers:Kafka集群的broker地址列表(如localhost:9092),用于建立初始连接。enable.auto.commit:是否自动提交消费偏移量(默认true,生产环境建议设为false,避免因自动提交导致数据重复或丢失)。auto.commit.interval.ms:自动提交的间隔时间(默认5000ms,仅在enable.auto.commit=true时生效)。auto.offset.reset:当无初始偏移量或偏移量无效时的处理策略(earliest:从最早偏移量开始;latest:从最新偏移量开始;none:抛出异常)。max.poll.records:每次poll()调用返回的最大记录数(默认500,根据消费者处理能力调整,避免单次拉取过多导致内存溢出)。session.timeout.ms:消费者与Kafka集群的心跳超时时间(默认45s,需小于max.poll.interval.ms,建议设置为10-30s,超时会触发再平衡)。heartbeat.interval.ms:消费者发送心跳的频率(默认3s,需小于session.timeout.ms的1/3,确保及时检测消费者存活状态)。使用Kafka自带的命令行工具启动消费者并加入指定组:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your_topic --group my-consumer-group --from-beginning
--bootstrap-server:指定Kafka broker地址。--topic:要消费的主题名称。--group:消费者组ID(若组不存在则自动创建)。--from-beginning:从主题最早偏移量开始消费(可选,省略则从当前偏移量开始)。使用kafka-consumer-groups.sh工具列出所有已注册的消费者组:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
该命令会返回所有消费者组的ID,如my-consumer-group、group-A等。
查看指定消费者组的详细信息(包括分区分配、当前偏移量、LAG(日志末端偏移量-当前偏移量)等):
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group
输出示例:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-consumer-group topic-1 0 100 200 100 consumer-1 /192.168.1.10 consumer-app
my-consumer-group topic-1 1 50 150 100 consumer-2 /192.168.1.11 consumer-app
CURRENT-OFFSET:消费者当前处理的偏移量。LOG-END-OFFSET:主题分区的最新偏移量。LAG:未消费的消息数量(需关注,避免积压)。动态修改消费者组的配置(如调整max.poll.records):
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --alter --group my-consumer-group --add-config max.poll.records=1000
--add-config:指定要修改的参数及值(支持多个参数,用逗号分隔)。删除指定的消费者组(仅删除组元数据,不会删除主题数据):
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-consumer-group
删除后,该组再次启动时会从头消费主题消息(若未指定auto.offset.reset则为latest)。
当需要调整消费者组的消费位置时(如从最早开始消费、重置到指定偏移量),可使用--reset-offsets命令:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-consumer-group --topic your_topic --reset-offsets --to-earliest --execute
reset-offset.csv),内容格式为topic:partition:offset:your_topic:0:100
your_topic:1:200
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-consumer-group --reset-offsets --from-file reset-offset.csv --execute
--to-earliest:重置到分区最早偏移量。--to-latest:重置到分区最新偏移量。--execute:实际执行重置(默认--dry-run为预览,不执行)。消费者组触发再平衡(如新增/删除消费者、分区数量变化)时,会暂停消费并重新分配分区。为减少再平衡次数,需:
session.timeout.ms(10-30s)和max.poll.interval.ms(根据消息处理时间调整,如处理一条消息需1分钟,则设为3分钟以上)。enable.auto.commit=false),通过commitSync()或commitAsync()手动提交偏移量(如在消息处理完成后提交),确保数据一致性。LAG指标(可通过Kafka自带的kafka-consumer-groups.sh或第三方工具如Prometheus+Grafana),避免LAG持续增长(可能导致消息积压)。max.poll.records(如每秒处理100条,则设为500-1000)。fetch.min.bytes(如1024字节)和fetch.max.wait.ms(如500ms),减少网络请求次数,提高消费效率。使用监控工具(如Prometheus+Kafka Exporter、Confluent Control Center)监控消费者组的以下指标:
heartbeat是否正常)。LAG增长趋势(是否超过阈值)。