您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Flume入门知识点有哪些
## 一、Flume概述
### 1.1 什么是Flume
Apache Flume是一个分布式、可靠且可用的服务,用于高效收集、聚合和移动大量日志数据。其核心设计理念是基于流式数据流架构,特别适合处理日志类事件数据。
主要特性包括:
- **可靠性**:事务型数据传输机制保障数据不丢失
- **可扩展性**:采用多Agent架构,支持水平扩展
- **可管理性**:提供Web UI和Shell命令管理接口
- **定制化**:支持自定义Source、Channel和Sink组件
### 1.2 典型应用场景
- 日志收集(Web服务器日志、应用日志等)
- 社交媒体数据采集
- 物联网设备数据汇聚
- 数据仓库ETL流程的前置环节
## 二、核心架构解析
### 2.1 基础架构模型
Flume采用三层架构:
Event Source → Channel → Event Sink ↑________Agent________↑
### 2.2 核心组件详解
#### 2.2.1 Event
数据传输基本单元,包含:
- 头部信息(键值对元数据)
- 有效载荷(byte数组形式的消息体)
#### 2.2.2 Source
数据摄入端点,常见类型:
- **NetCat Source**:监听指定端口
- **Exec Source**:执行命令获取输出
- **Spooling Directory**:监控目录新增文件
- **Kafka Source**:从Kafka消费数据
#### 2.2.3 Channel
数据缓冲区,保证可靠性:
- **Memory Channel**:内存队列(高性能但易丢失)
- **File Channel**:基于WAL日志(持久化)
- **JDBC Channel**:数据库存储
- **Kafka Channel**:利用Kafka作为存储
#### 2.2.4 Sink
数据输出端:
- **HDFS Sink**:写入Hadoop HDFS
- **Logger Sink**:日志输出
- **Avro Sink**:转发到其他Agent
- **Kafka Sink**:输出到Kafka主题
### 2.3 Agent运行机制
```mermaid
graph LR
S[Source] -->|Put事务| C[Channel]
C -->|Take事务| K[Sink]
# 解压安装包
tar -zxvf apache-flume-1.9.0-bin.tar.gz
cd apache-flume-1.9.0-bin
# 基础配置
cp conf/flume-env.sh.template conf/flume-env.sh
vi conf/flume-env.sh # 设置JAVA_HOME
# 定义Agent组件
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
# 配置Source
agent1.sources.r1.type = netcat
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 44444
# 配置Channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
# 配置Sink
agent1.sinks.k1.type = logger
# 绑定组件
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
两阶段提交过程: 1. Put事务: - doPut:临时写入 - commit:正式提交 - rollback:失败回滚
常用拦截器: - Timestamp Interceptor:添加时间戳 - Host Interceptor:添加主机信息 - Regex Filtering:正则过滤 - JSON Processing:JSON解析
配置示例:
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = timestamp
graph LR
A[WebServer] -->|Avro| B[Collector Agent]
B -->|HDFS| C[Hadoop Cluster]
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 5
参数 | 说明 | 推荐值 |
---|---|---|
channel.capacity | Channel容量 | 根据内存调整 |
transactionCapacity | 事务批大小 | 100-1000 |
batchSize | 批量写入大小 | 100-500 |
keep-alive | 线程等待时间 | 3(秒) |
agent1.sources.r1.threads = 5
agent1.sinks.k1.type = hdfs
agent1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/
agent1.sinks.k1.hdfs.filePrefix = logs-
agent1.sinks.k1.hdfs.round = true
agent1.sinks.k1.hdfs.roundValue = 30
agent1.sinks.k1.hdfs.roundUnit = minute
agent1.sources.r1.selector.type = multiplexing
agent1.sources.r1.selector.header = type
agent1.sources.r1.selector.mapping.login = c1
agent1.sources.r1.selector.mapping.order = c2
bin/flume-ng agent -n agent1 -c conf -f conf/example.conf -Dflume.root.logger=DEBUG,console
# Source配置
agent1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.r1.kafka.bootstrap.servers = kafka1:9092
agent1.sources.r1.kafka.topics = flume-test
# Sink配置
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.kafka.topic = output-topic
val flumeStream = FlumeUtils.createStream(ssc, hostname, port)
版本 | 重要改进 |
---|---|
1.9.0 | Kafka 2.0+支持 |
1.8.0 | 性能提升40% |
1.7.0 | 安全增强 |
最佳实践建议:生产环境建议使用File Channel保证数据可靠性,对于延迟敏感场景可考虑Memory Channel配合冗余部署。定期监控Channel填充率,当达到容量的80%时需要及时告警。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。