Linux上使用Rust进行大数据处理的实践路线
一 环境与工具链
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh,随后 source $HOME/.cargo/env 并验证 cargo --version。[profile.release]
opt-level = 3
lto = true
codegen-units = 1
cargo build --release。二 单机内存内处理的核心栈
三 从零到一的示例 并行处理大型 CSV
[dependencies]
polars = { version = "0.40", features = ["csv", "parquet", "lazy", "serde"] }
rayon = "1.10"
use polars::prelude::*;
use rayon::prelude::*;
use std::fs::File;
use std::io::{self, BufReader, BufRead};
use std::path::Path;
// 将大文件按行数大致切分为 N 片,返回每片的 (start, end) 字节偏移
fn split_offsets(path: &Path, n: usize) -> io::Result<Vec<(u64, u64)>> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut offsets = vec![(0, 0)];
let mut pos = 0u64;
let mut line = String::new();
for _ in 0..n-1 {
let read = reader.read_line(&mut line)?;
if read == 0 { break; }
pos += read as u64;
offsets.push((pos, 0));
line.clear();
}
// 最后一片读到文件末尾
offsets.last_mut().unwrap().1 = pos + reader.buffer().len() as u64;
Ok(offsets)
}
// 处理一片:从 start(含)读到 end(不含),返回该片的聚合结果 DataFrame
fn process_chunk(path: &Path, start: u64, end: u64) -> PolarsResult<DataFrame> {
let f = File::open(path)?;
let mut reader = BufReader::new(f);
reader.seek(std::io::SeekFrom::Start(start))?;
// 跳过首片的第一行(可能是上一片的半行)
if start != 0 {
let mut first = String::new();
reader.read_line(&mut first)?;
}
let lf = b'\n';
let mut buf = Vec::with_capacity(64 * 1024);
let mut chunk = Vec::new();
let mut bytes = 0u64;
// 读到 end 或文件尾
while bytes < end - start {
let n = reader.read_until(lf, &mut buf)?;
if n == 0 { break; }
bytes += n as u64;
// 简单 CSV 行解析:按逗号分割,取第 1 列为 key,第 2 列为 value(示例)
if let Some(line) = std::str::from_utf8(&buf).ok() {
let mut cols = line.trim_end_matches('\n').split(',');
if let (Some(k), Some(v)) = (cols.next(), cols.next()) {
if let Ok(v) = v.parse::<f64>() {
chunk.push((k.to_string(), v));
}
}
}
buf.clear();
}
// 转为 Polars DataFrame 并聚合
let df = df! {
"key" => chunk.iter().map(|(k, _)| k.clone()).collect::<Vec<_>>(),
"val" => chunk.iter().map(|(_, v)| *v).collect::<Vec<_>>(),
}?;
df.lazy()
.groupby(["key"])
.agg([col("val").sum()])
.collect()
}
fn main() -> PolarsResult<()> {
let path = Path::new("large.csv");
let n_workers = num_cpus::get();
let offsets = split_offsets(path, n_workers)?;
// 并行处理各分片
let results: Vec<DataFrame> = offsets
.par_iter()
.map(|&(s, e)| process_chunk(path, s, e))
.collect::<Result<_, _>>()?;
// 合并所有分片结果并再次聚合(跨分片合并相同 key)
let mut merged = results.into_iter().reduce(|mut a, b| a.vstack(&b).unwrap()).unwrap();
let final_agg = merged
.lazy()
.groupby(["key"])
.agg([col("val").sum()])
.collect()?;
println!("{:?}", final_agg);
Ok(())
}
cargo build --release./target/release/your_app四 性能优化与分布式扩展
perf record -g target/release/your_program && perf report;flamegraph 可视化分析。