在 CentOS 上使用 C++ 进行大数据处理
一 环境准备与工具链
二 单机内存受限时的核心技术与代码骨架
流式处理:文本用 std::ifstream + std::getline 逐行处理;二进制用固定缓冲区分块读取,避免一次性装入内存。
内存映射:对大文件的随机访问,使用 mmap 减少系统调用与拷贝开销。
并行加速:用 OpenMP 或 TBB 做数据并行/任务并行;注意线程安全与负载均衡。
内存与数据结构:选择 std::unordered_map 等高效容器;对海量键空间可引入 布隆过滤器 做快速存在性预判。
I/O 优化:合理增大缓冲区、顺序读写优先、必要时采用异步 I/O 或内存映射。
示例骨架(逐行流式处理 + OpenMP 并行规约,需外部归并)
#include
struct WordCount { std::unordered_map<std::string, size_t> local; void merge(const std::unordered_map<std::string, size_t>& other) { for (const auto& [k, v] : other) local[k] += v; } };
int main(int argc, char* argv[]) { if (argc != 2) return 1; const std::string path = argv[1]; const size_t chunk_size = 64 * 1024 * 1024; // 64MB per task std::vectorstd::string files; if (std::filesystem::is_regular_file(path)) { files.push_back(path); } else { for (const auto& entry : std::filesystem::directory_iterator(path)) if (entry.is_regular_file()) files.push_back(entry.path()); }
std::vector
#pragma omp parallel for schedule(dynamic) for (size_t i = 0; i < files.size(); ++i) { auto& local = locals[omp_get_thread_num()]; std::ifstream in(files[i]); std::string line; while (std::getline(in, line)) { std::string w; for (char c : line) { if (std::isalpha©) w += std::tolower©; else if (!w.empty()) { ++local.local[w]; w.clear(); } } if (!w.empty()) ++local.local[w]; } }
// 归并到主线程 WordCount total; for (auto& lc : locals) total.merge(lc);
// 输出 Top-N(示例前 10) std::vector<std::pair<std::string, size_t>> top; for (const auto& kv : total.local) top.emplace_back(kv.first, kv.second); std::partial_sort(top.begin(), top.begin() + std::min<size_t>(10, top.size()), top.end(), [](auto& a, auto& b){ return a.second > b.second; }); for (size_t i = 0; i < std::min<size_t>(10, top.size()); ++i) std::cout << top[i].first << “\t” << top[i].second << “\n”; }
// 编译示例:g++ -O3 -std=c++17 -fopenmp -o wordcount_stream wordcount_stream.cpp -ltbb 要点:流式处理避免 OOM;按文件粒度并行,最后在主线程归并;跨块词需由解析器状态机处理(此处简化为行边界)。
三 超越内存与分布式扩展
四 性能优化与工程实践
五 典型落地路径