1. 启用Kafka JMX监控(基础前提)
Kafka通过JMX(Java Management Extensions)暴露CPU、内存、吞吐量、延迟等核心指标,是监控的基础。需修改Kafka启动脚本(kafka-server-start.sh),添加以下JMX配置:
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=<broker-ip> -Dcom.sun.management.jmxremote.port=9999"
重启Kafka Broker使配置生效,之后可通过JMX客户端(如jconsole、VisualVM)连接<broker-ip>:9999查看指标。
2. 使用Kafka自带命令行工具(轻量级监控)
Kafka自带的命令行工具可直接查看集群状态,无需额外安装:
bin/kafka-topics.sh --bootstrap-server <broker-list> --describe(显示分区Leader、副本分布、ISR集合等);bin/kafka-consumer-groups.sh --bootstrap-server <broker-list> --describe --group <group-name>(查看各分区的消费偏移量、滞后量LAG);bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec --jmx-url service:jmx:rmi:///jndi/rmi://<broker-ip>:9999/jmxrmi --attributes OneMinuteRate(实时获取每秒消息摄入速率)。3. 第三方图形化工具(直观可视化)
Kafka Manager:开源Web工具,支持集群状态、Topic/Consumer Group管理、Broker指标监控。安装步骤:
wget https://github.com/yahoo/CMAK/releases/download/0.11.0/kafka_manager-0.11.0.tgz
tar xvf kafka_manager-0.11.0.tgz
cd kafka_manager-0.11.0
./bin/cmak -Dconfig.file=conf/application.conf
访问http://<host>:9000,添加Kafka集群信息即可监控。
Kafdrop:基于Web的轻量级工具,支持Topic浏览、消息查看、Consumer Group管理。通过Docker快速部署:
docker run -d --rm -p 9000:9000 -e KAFKA_BROKERCONNECT=<broker-ip>:9092 -e SERVER_SERVLET_CONTEXTPATH="/" obsidiandynamics/kafdrop
访问http://<host>:9000即可使用。
4. Prometheus+Grafana(专业监控方案)
/etc/prometheus/prometheus.yml),添加kafka_exporter抓取任务:scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['<kafka-exporter-host>:9308']
version: '3.1'
services:
kafka-exporter:
image: bitnami/kafka-exporter:latest
command: "--kafka.server=<broker-ip>:9092"
ports:
- "9308:9308"
5. ELK Stack(日志与指标结合)
通过ELK(Elasticsearch+Logstash+Kibana)收集Kafka日志和指标,实现日志分析与可视化:
/etc/collectd/collectd.conf,添加Kafka插件配置;logstash.conf解析Kafka日志;6. 告警规则配置(及时预警)
结合Prometheus设置告警规则,及时发现异常:
alert: KAFKA_broker_down expr: up{job="kafka"} == 0 for: 2m labels: severity=critical;alert: kafka_consumer_lag_high expr: sum(kafka_consumergroup_lag_sum{job="kafka-exporter"}) by (consumergroup, topic) > 5000 for: 2m labels: severity=warning。