Ubuntu 下 HDFS 与 Kafka 配合的常用方案
一、方案总览与适用场景
- Spark Structured Streaming:从 Kafka 持续消费,做转换后写入 HDFS(支持 Parquet/ORC/CSV 等),适合需要实时计算、窗口聚合、Exactly-once 语义的场景。
- Apache Flume:以 Kafka Source → Memory/File Channel → HDFS Sink 的管道方式落盘,适合日志归档、解耦生产与存储、按时间/大小滚动文件。
- Hudi DeltaStreamer:借助 Hudi 将 Kafka 数据以 COPY_ON_WRITE/MERGE_ON_READ 模式写入 HDFS,并可与 Hive 同步,适合增量更新、近实时湖仓一体。
二、方案一 Spark Structured Streaming 消费 Kafka 写入 HDFS
- 前置准备
- 启动 HDFS(确保 NameNode/DataNode 正常,HDFS Web/CLI 可访问)。
- 启动 Kafka(单机或集群均可,创建测试 topic)。
- 准备 Spark(建议与 Kafka 客户端版本匹配),并在提交作业时引入 spark-sql-kafka 连接器。
- 核心配置与示例
- 连接器依赖(示例):org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3(按你的 Spark/Kafka 版本调整)。
- 读取 Kafka(关键参数示例)
- kafka.bootstrap.servers=localhost:9092
- subscribe=your_topic
- 反序列化:key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- 容错:startingOffsets=earliest(首次消费从最早开始)
- 写入 HDFS(示例)
- 格式:parquet/orc/csv
- 分区:partitionBy(“dt”)
- 模式:append / complete / update(依据业务语义选择)
- 提交示例
- spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3
–class your.main.Class your-app.jar
–master yarn
–deploy-mode cluster
- 实践要点
- 合理设置 maxOffsetsPerTrigger、trigger(interval) 控制微批/持续处理速率。
- 选择列式格式(如 Parquet)并配合 partitionBy 提升后续查询性能。
- 若需端到端一致性,结合 checkpointLocation 与幂等/两阶段提交(如 foreachBatch + commit)。
三、方案二 Apache Flume 从 Kafka 写入 HDFS
- 组件与拓扑
- Source:Kafka(org.apache.flume.source.kafka.KafkaSource)
- Channel:Memory(快但易丢)或 File(持久化,容错更好)
- Sink:HDFS(HdfsSink)
- 关键配置示例(kafka-to-hdfs.conf)
- agent.sources = kafka-source
- agent.channels = file-channel
- agent.sinks = hdfs-sink
- agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
- agent.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
- agent.sources.kafka-source.kafka.topics = my-topic
- agent.sources.kafka-source.kafka.consumer.group.id = flume-hdfs
- agent.sources.kafka-source.kafka.consumer.auto.offset.reset = earliest
- agent.channels.file-channel.type = file
- agent.channels.file-channel.checkpointDir = /var/flume/checkpoint
- agent.channels.file-channel.dataDirs = /var/flume/data
- agent.sinks.hdfs-sink.type = hdfs
- agent.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/flume/kafka/%Y-%m-%d
- agent.sinks.hdfs-sink.hdfs.filePrefix = log-
- agent.sinks.hdfs-sink.hdfs.fileType = DataStream
- agent.sinks.hdfs-sink.hdfs.writeFormat = Text
- agent.sinks.hdfs-sink.hdfs.rollInterval = 3600
- agent.sinks.hdfs-sink.hdfs.rollSize = 134217728
- agent.sinks.hdfs-sink.hdfs.rollCount = 0
- agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
- agent.sources.kafka-source.channels = file-channel
- agent.sinks.hdfs-sink.channel = file-channel
- 运行与验证
- 启动 Flume:bin/flume-ng agent -n agent -c conf -f conf/kafka-to-hdfs.conf
- 验证:hdfs dfs -ls /flume/kafka/2025-08-14 查看按日期滚动落盘文件。
四、方案三 Hudi DeltaStreamer 近实时入湖并写入 HDFS
- 适用场景
- 需要 增量更新/去重/回撤、时光回溯、与 Hive 无缝对接的湖仓场景。
- 快速上手
- 准备 Hudi 工具包(HoodieDeltaStreamer 及其依赖),以及 Kafka 源配置(如 JSON 格式、bootstrap.servers、topic、消费组等)。
- 写入 COPY_ON_WRITE(快照查询快,写入放大高)
- spark-submit
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE
–table-type COPY_ON_WRITE
–source-class org.apache.hudi.utilities.sources.JsonKafkaSource
–source-ordering-field ts
–target-base-path /user/hive/warehouse/stock_ticks_cow
–target-table stock_ticks_cow
–props /var/demo/config/kafka-source.properties
–schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
- 写入 MERGE_ON_READ(写入延迟低,查询视合并策略而定)
- 在上述命令基础上将 –table-type 改为 MERGE_ON_READ,可按需关闭/开启压缩合并。
- Hive 同步
- /var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh
–jdbc-url jdbc:hive2://hiveserver:10000
–user hive --pass hive
–partitioned-by dt
–base-path /user/hive/warehouse/stock_ticks_cow
–database default --table stock_ticks_cow
- 结果验证
- HDFS 路径下可见分区目录、.hoodie 元数据目录与 parquet 数据文件;Hive 中可查询对应表。
五、实践要点与常见问题
- 时间分区与滚动策略
- 按 dt=yyyy-MM-dd 分区便于管理与查询;HDFS Sink 通过 rollInterval/rollSize/rollCount 控制文件滚动,避免小文件过多。
- 容错与一致性
- Flume 优先选 File Channel;Spark 启用 checkpoint 与幂等写入;Kafka 消费位点由 group.id 管理,必要时重置为 earliest。
- 权限与安全
- 确保运行用户对 HDFS 目标路径具备写权限;如使用 Kerberos,需 kinit 并在提交作业时配置 JAAS/keytab。
- 版本兼容
- 对齐 Scala/Spark/Kafka/Hudi 版本,避免依赖冲突;连接器与库版本需与集群一致。
- 监控与告警
- 关注 消费滞后(lag)、落盘延迟、HDFS 容量 与 小文件数量,按需调整批次大小与分区数。