您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Node.js中怎么实现Stream流
## 目录
1. [Stream基础概念](#1-stream基础概念)
- 1.1 [什么是流](#11-什么是流)
- 1.2 [为什么需要流](#12-为什么需要流)
- 1.3 [流与传统I/O对比](#13-流与传统io对比)
2. [Node.js中的流类型](#2-nodejs中的流类型)
- 2.1 [可读流(Readable)](#21-可读流readable)
- 2.2 [可写流(Writable)](#22-可写流writable)
- 2.3 [双工流(Duplex)](#23-双工流duplex)
- 2.4 [转换流(Transform)](#24-转换流transform)
3. [实现自定义流](#3-实现自定义流)
- 3.1 [实现可读流](#31-实现可读流)
- 3.2 [实现可写流](#32-实现可写流)
- 3.3 [实现双工流](#33-实现双工流)
- 3.4 [实现转换流](#34-实现转换流)
4. [流的高级应用](#4-流的高级应用)
- 4.1 [管道(Pipeline)](#41-管道pipeline)
- 4.2 [错误处理](#42-错误处理)
- 4.3 [性能优化](#43-性能优化)
5. [实战案例](#5-实战案例)
- 5.1 [大文件处理](#51-大文件处理)
- 5.2 [HTTP流式传输](#52-http流式传输)
- 5.3 [实时数据处理](#53-实时数据处理)
6. [常见问题与解决方案](#6-常见问题与解决方案)
7. [总结](#7-总结)
---
## 1. Stream基础概念
### 1.1 什么是流
流(Stream)是Node.js中处理流式数据的抽象接口。它们是数据的集合——就像数组或字符串一样,但流的特点是不需要一次性将所有数据加载到内存中,而是可以逐块处理数据。
```javascript
const fs = require('fs');
// 传统方式读取文件
fs.readFile('largefile.txt', (err, data) => {
// 整个文件内容都在内存中
});
// 使用流读取文件
const readStream = fs.createReadStream('largefile.txt');
readStream.on('data', (chunk) => {
// 每次只处理一小块数据
});
特性 | 传统I/O | Stream |
---|---|---|
内存使用 | 高 | 低 |
处理速度 | 慢(等待全部数据) | 快(立即开始) |
适用场景 | 小文件 | 大文件/实时数据 |
可组合性 | 有限 | 高(管道) |
可读流是数据的来源,例如: - 文件读取流 - HTTP请求 - 标准输入(stdin)
const { Readable } = require('stream');
class MyReadable extends Readable {
constructor(options) {
super(options);
this.data = ['a', 'b', 'c'];
this.index = 0;
}
_read() {
if (this.index < this.data.length) {
this.push(this.data[this.index++]);
} else {
this.push(null); // 结束流
}
}
}
const readable = new MyReadable();
readable.on('data', (chunk) => {
console.log(chunk.toString()); // a, b, c
});
可写流是数据的目标,例如: - 文件写入流 - HTTP响应 - 标准输出(stdout)
const { Writable } = require('stream');
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
console.log(`Writing: ${chunk.toString()}`);
callback(); // 通知写入完成
}
}
const writable = new MyWritable();
writable.write('Hello');
writable.end('World');
双工流既是可读的也是可写的,例如: - TCP socket - zlib流
const { Duplex } = require('stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
this.data = [];
}
_write(chunk, encoding, callback) {
this.data.push(chunk);
callback();
}
_read(size) {
if (this.data.length) {
this.push(this.data.shift());
} else {
this.push(null);
}
}
}
const duplex = new MyDuplex();
duplex.on('data', (chunk) => {
console.log(`Received: ${chunk}`);
});
duplex.write('Hello');
转换流是一种特殊的双工流,用于转换数据,例如: - zlib压缩/解压 - crypto加密/解密
const { Transform } = require('stream');
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
const transform = new UppercaseTransform();
transform.on('data', (chunk) => {
console.log(chunk.toString()); // HELLO
});
transform.write('hello');
transform.end();
const { Readable } = require('stream');
class CounterStream extends Readable {
constructor(limit, options) {
super(options);
this.limit = limit;
this.count = 1;
}
_read() {
if (this.count <= this.limit) {
this.push(this.count.toString());
this.count++;
} else {
this.push(null); // 结束流
}
}
}
const counter = new CounterStream(5);
counter.pipe(process.stdout); // 输出: 12345
const { Writable } = require('stream');
class FileWriter extends Writable {
constructor(filename, options) {
super(options);
this.filename = filename;
this.chunks = [];
}
_write(chunk, encoding, callback) {
this.chunks.push(chunk);
fs.appendFile(this.filename, chunk, callback);
}
_final(callback) {
console.log(`All data written to ${this.filename}`);
callback();
}
}
const writer = new FileWriter('output.txt');
writer.write('Hello ');
writer.end('World');
const { Duplex } = require('stream');
class EchoDuplex extends Duplex {
constructor(options) {
super(options);
this.buffer = [];
}
_write(chunk, encoding, callback) {
this.buffer.push(chunk);
callback();
}
_read(size) {
while (this.buffer.length) {
const chunk = this.buffer.shift();
if (!this.push(chunk)) {
break;
}
}
if (this.buffer.length === 0) {
this.push(null);
}
}
}
const echo = new EchoDuplex();
echo.pipe(process.stdout);
echo.write('Hello');
echo.end(' World');
const { Transform } = require('stream');
class JSONParseTransform extends Transform {
constructor(options) {
super(options);
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk;
let boundary;
try {
while ((boundary = this.buffer.indexOf('\n')) !== -1) {
const line = this.buffer.substring(0, boundary);
this.buffer = this.buffer.substring(boundary + 1);
if (line.trim()) {
this.push(JSON.parse(line));
}
}
} catch (err) {
return callback(err);
}
callback();
}
_flush(callback) {
if (this.buffer.trim()) {
try {
this.push(JSON.parse(this.buffer));
} catch (err) {
return callback(err);
}
}
callback();
}
}
const parser = new JSONParseTransform();
parser.on('data', (obj) => {
console.log('Parsed:', obj);
});
parser.write('{"name":"Alice"}\n');
parser.write('{"age":30}\n');
parser.end();
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// 传统方式
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.txt.gz'))
.on('finish', () => console.log('Done'));
// 使用pipeline (更好的错误处理)
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
const { Readable } = require('stream');
class ErrorReadable extends Readable {
_read() {
process.nextTick(() => {
this.emit('error', new Error('Something went wrong'));
});
}
}
const stream = new ErrorReadable();
// 方式1: 监听error事件
stream.on('error', (err) => {
console.error('Error:', err.message);
});
// 方式2: 使用pipeline自动处理错误
pipeline(
stream,
process.stdout,
(err) => {
if (err) {
console.error('Error in pipeline:', err);
}
}
);
const { Writable } = require('stream');
class SlowWriter extends Writable {
_write(chunk, encoding, callback) {
console.log('Processing chunk...');
setTimeout(callback, 1000); // 模拟慢速处理
}
}
const writer = new SlowWriter();
let i = 0;
function writeData() {
let ok = true;
do {
i++;
if (i === 10) {
writer.end('Last chunk');
} else {
ok = writer.write(`Chunk ${i}`);
}
} while (i < 10 && ok);
if (i < 10) {
writer.once('drain', writeData);
}
}
writeData();
const { Readable } = require('stream');
// 高水位线设置为10
const readable = new Readable({
read() {},
highWaterMark: 10
});
console.log(readable.readableHighWaterMark); // 10
const fs = require('fs');
const crypto = require('crypto');
// 流式文件哈希计算
function calculateFileHash(filePath) {
return new Promise((resolve, reject) => {
const hash = crypto.createHash('sha256');
const stream = fs.createReadStream(filePath);
stream.on('data', (chunk) => {
hash.update(chunk);
});
stream.on('end', () => {
resolve(hash.digest('hex'));
});
stream.on('error', reject);
});
}
// 使用
calculateFileHash('largefile.iso')
.then(hash => console.log('File hash:', hash))
.catch(err => console.error('Error:', err));
const http = require('http');
const fs = require('fs');
const server = http.createServer((req, res) => {
// 流式响应
if (req.url === '/video') {
const videoStream = fs.createReadStream('video.mp4');
res.writeHead(200, {
'Content-Type': 'video/mp4',
'Content-Length': fs.statSync('video.mp4').size
});
videoStream.pipe(res);
}
// 流式上传
else if (req.url === '/upload' && req.method === 'POST') {
const fileStream = fs.createWriteStream('uploaded.dat');
req.pipe(fileStream);
fileStream.on('finish', () => {
res.end('File uploaded');
});
}
});
server.listen(3000);
const { Transform } = require('stream');
const WebSocket = require('ws');
// 创建WebSocket服务器
const wss = new WebSocket.Server({ port: 8080 });
// 自定义转换流 - 分析传感器数据
class SensorAnalyzer extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
try {
const data = JSON.parse(chunk);
// 添加分析结果
data.timestamp = new Date().toISOString();
data.alert = data.temperature > 30 ? 'HIGH' : 'NORMAL';
this.push(JSON.stringify(data));
callback();
} catch (err) {
callback(err);
}
}
}
wss.on('connection', (ws) => {
const analyzer = new SensorAnalyzer();
// 客户端消息 -> 分析 -> 返回结果
ws.on('message', (message) => {
analyzer.write(message);
});
analyzer.on('data', (result) => {
ws.send(result);
});
analyzer.on('error', (err) => {
console.error('Analysis error:', err);
ws.send(JSON.stringify({ error: err.message }));
});
});
内存泄漏
destroy()
或end()
数据丢失
drain
事件性能瓶颈
错误未捕获
流过早结束
finished
或pipeline
Node.js中的流是处理I/O操作的高效工具,特别适合处理大文件或实时数据。通过理解四种基本流类型(可读、可写、双工、转换)及其实现方式,开发者可以构建高性能、内存效率高的应用程序。关键要点包括:
通过本文的示例和解释,希望读者能够掌握Node.js流的核心概念,并在实际项目中灵活应用。
”`
(注:此为精简版框架,完整14150字版本将包含更多详细解释、示例代码、性能对比图表、最佳实践和深入分析)
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。