Node.js 在 Linux 上的并发处理
并发模型概览
Linux 下的并发手段
如何选择并发策略
| 场景 | 首选方案 | 说明 |
|---|---|---|
| 高并发 HTTP 服务、充分利用多核 | cluster | 多进程共享端口,主进程保活与重启,横向扩展吞吐 |
| 调用外部程序/脚本、任务解耦 | child_process | 并行执行命令或脚本,隔离性好 |
| 计算密集(图像处理、视频转码等) | worker_threads | 避免阻塞事件循环,利用多核;可共享内存 |
| 大量 I/O 并发(DB/缓存/文件/网络) | 异步 I/O + 事件循环 | 非阻塞 I/O 与 epoll 高效等待,通常无需额外线程 |
| 需要共享内存与细粒度并行 | worker_threads + SharedArrayBuffer | 减少拷贝,提升 CPU 密集任务并行效率 |
关键实践与注意事项
最小可用示例
// server.js
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isPrimary) {
console.log(`主进程 ${process.pid} 启动,CPU 核数: ${numCPUs}`);
for (let i = 0; i < numCPUs; i++) cluster.fork();
cluster.on('exit', (worker, code, signal) => {
console.warn(`工作进程 ${worker.process.pid} 退出 (${signal || code}),重启中...`);
cluster.fork();
});
} else {
http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end(`Hello from worker ${process.pid}\n`);
}).listen(3000, () => {
console.log(`Worker ${process.pid} 监听 3000`);
});
}
// worker.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) {
function runWorker(data) {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename, { workerData: data });
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) reject(new Error(`Worker 退出码 ${code}`));
});
});
}
(async () => {
const results = await Promise.all(
Array.from({ length: 4 }, (_, i) => runWorker({ taskId: i, n: 1e8 }))
);
console.log('All done:', results);
})();
} else {
// 模拟 CPU 密集任务
const { taskId, n } = workerData;
let sum = 0;
for (let i = 0; i < n; i++) sum += i;
parentPort.postMessage({ taskId, sum });
}
// spawnTasks.js
const { spawn } = require('child_process');
const tasks = ['sleep 1 && echo A', 'sleep 2 && echo B', 'sleep 1 && echo C'];
tasks.forEach(cmd => {
const [prog, ...args] = cmd.split(' ');
const child = spawn(prog, args);
child.stdout.on('data', data => process.stdout.write(data));
child.stderr.on('data', data => process.stderr.write(data));
child.on('close', code => console.log(`[${cmd}] 退出码: ${code}`));
});
// limitedParallel.js
const pLimit = require('p-limit'); // 需 npm i p-limit
const limit = pLimit(10); // 同时最多 10 个
async function task(i) {
// 模拟异步任务
await new Promise(r => setTimeout(r, Math.random() * 1000));
return `task-${i}`;
}
(async () => {
const tasks = Array.from({ length: 100 }, (_, i) => limit(() => task(i)));
const results = await Promise.all(tasks);
console.log('完成数量:', results.length);
})();
上述示例覆盖了 cluster、worker_threads、child_process 与 并发控制 的常见用法,可直接在 Linux 环境运行并根据业务调整并发度与容错策略。