您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 如何用Flink Connectors读写txt文件
## 1. 引言
Apache Flink作为一款开源的流批一体数据处理框架,其核心优势在于对实时数据流的高效处理能力。但在实际业务场景中,文本文件(txt格式)仍然是常见的数据载体之一。本文将深入探讨如何通过Flink Connectors实现txt文件的读写操作,涵盖从基础API使用到高级配置的完整解决方案。
## 2. 环境准备
### 2.1 依赖配置
在Maven项目中需添加以下依赖:
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // 开发阶段建议设为1便于调试
DataStream<String> textData = env.readTextFile("input/example.txt");
textData.print();
FileSource<String> source = FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("input/example.txt")
).build();
DataStream<String> lines = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"file-source"
);
FileSource<String> monitoringSource = FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("input/")
)
.monitorContinuously(Duration.ofSeconds(5)) // 每5秒检查新文件
.build();
DataStream<String> outputData = ...; // 你的数据流
outputData.writeAsText("output/result.txt");
final FileSink<String> sink = FileSink.forRowFormat(
new Path("output"),
new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(1024 * 1024 * 128)
.build())
.build();
outputData.sinkTo(sink);
参数 | 说明 | 示例值 |
---|---|---|
withRolloverInterval | 滚动时间间隔 | Duration.ofHours(1) |
withInactivityInterval | 不活跃间隔 | Duration.ofMinutes(10) |
withMaxPartSize | 最大分区大小 | 128MB |
public class CustomParser implements MapFunction<String, POJO> {
@Override
public POJO map(String value) {
String[] parts = value.split("\\|");
return new POJO(parts[0], Integer.parseInt(parts[1]));
}
}
对于JSON格式的文本行:
DataStream<JSONObject> jsonData = textData.map(line -> {
try {
return new JSONObject(line);
} catch (JSONException e) {
return new JSONObject();
}
});
// 读取阶段
FileSource<String> source = ...;
source.setParallelism(4);
// 写入阶段
FileSink<String> sink = ...;
sink.setParallelism(2);
OutputFileConfig config = OutputFileConfig.builder()
.withPartPrefix("batch")
.withPartSuffix(".tmp")
.build();
FileSink<String> sink = FileSink.forRowFormat(...)
.withOutputFileConfig(config)
.build();
FileSource<String> source = FileSource.forRecordStreamFormat(
new TextLineInputFormat() {
@Override
public void open(FileInputSplit split) {
// 验证文件头
if (!validHeader(split.getPath())) {
throw new RuntimeException("Invalid file format");
}
}
},
path
).build();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 最大重试次数
Time.of(10, TimeUnit.SECONDS) // 重试间隔
));
DataStream<LogEntry> logs = env.readTextFile("logs/access.log")
.filter(line -> !line.startsWith("#")) // 跳过注释行
.map(new LogParser()); // 自定义日志解析
logs.keyBy(entry -> entry.getStatusCode())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new ErrorCounter())
.sinkTo(FileSink.forRowFormat(...));
DataStream<String> rawData = env.fromSource(
fileSource,
WatermarkStrategy.noWatermarks(),
"file-source"
).process(new DataCleanser());
rawData.sinkTo(FileSink.forRowFormat(...));
hdfs://
vs file://
)通过本文的详细讲解,我们掌握了: - 使用FileSource/FileSink API进行高效文件操作 - 各种场景下的配置优化技巧 - 生产环境中需要注意的关键事项
Flink的文件连接器提供了高度灵活的接口,开发者可以根据具体业务需求组合不同的策略,构建健壮的数据处理管道。
附录:完整示例代码
public class TxtFileProcessing {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取配置
FileSource<String> source = FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("input/data.txt"))
.build();
// 处理逻辑
DataStream<String> processed = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source")
.map(line -> line.toUpperCase());
// 写入配置
FileSink<String> sink = FileSink.forRowFormat(
new Path("output"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder().build())
.build();
processed.sinkTo(sink);
env.execute("Txt File Processing");
}
}
”`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。