Filebeat与Kafka配合使用的完整流程
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch),创建Filebeat仓库文件(/etc/yum.repos.d/filebeat.repo),再通过yum install filebeat安装。kafka1:9092,kafka2:9092)、Topic名称(如filebeat-log)、实例接入地址(CKafka需登录控制台获取)。编辑Filebeat主配置文件(filebeat.yml),核心是配置output.kafka section,关键参数说明如下:
基础连接配置:
hosts:Kafka broker地址列表(如["kafka1:9092", "kafka2:9092"]);
topic:目标Topic名称(需提前在Kafka中创建);
version:Kafka集群版本(需与Filebeat版本兼容,如Filebeat 8.2+支持Kafka 2.2.0+)。
批量发送优化(提升性能):
通过batching或producer设置批量发送参数,例如:
output.kafka:
batching:
count: 5000 # 累计5000条日志后发送
period: 10s # 或10秒内发送(以先到为准)
producer:
compression: gzip # 启用gzip压缩(减少网络传输量)
```。
认证与加密(可选,生产环境必配):
若Kafka启用了SASL/SSL认证,需添加以下配置:
output.kafka:
sasl:
mechanism: SCRAM-SHA-256 # 认证机制(如SCRAM-SHA-256/PLAIN)
username: "instance#username" # CKafka需拼接实例ID和用户名
password: "your-password"
ssl:
enabled: true # 启用SSL加密
certificate_authorities: ["/path/to/ca.crt"] # CA证书路径
```。
根据日志来源选择输入类型,常见场景如下:
监控日志文件(Filebeat核心功能):
使用filebeat.inputs(Filebeat 7.0+版本)配置日志路径和类型,例如监控/var/log/*.log:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log # 支持通配符,监控所有.log文件
```。
启用预定义模块(简化配置):
Filebeat提供logstash、nginx、mysql等预定义模块,一键配置日志解析和输出。例如启用logstash模块:
filebeat.modules:
- module: logstash
enabled: true
var.logstash_host: "localhost"
var.logstash_port: 5000
```。
启动Filebeat:
使用以下命令启动(前台模式,便于查看日志):
sudo ./filebeat -e -c filebeat.yml # -e输出日志到stderr,-c指定配置文件
或设置为系统服务(开机自启):
sudo systemctl start filebeat
sudo systemctl enable filebeat
```。
验证数据发送:
通过Kafka消费者工具(如kafkacat、kafka-console-consumer)查看Topic中的数据:
kafkacat -b kafka1:9092 -t filebeat-log -C # 消费Topic数据(JSON格式)
kafka-console-consumer --bootstrap-server kafka1:9092 --topic filebeat-log --from-beginning # Kafka原生工具
若能看到Filebeat采集的日志内容(如message字段包含日志文本),则说明配置成功。