您好,登录后才能下订单哦!
# Node.js中有几种stream
## 前言
在Node.js中,流(Stream)是处理流式数据的抽象接口,是Node.js最重要的核心模块之一。流提供了一种高效处理数据的方式,特别是当处理大量数据或数据从外部来源逐渐到达时。本文将深入探讨Node.js中的流类型、实现原理以及实际应用场景。
## 一、流的基本概念
### 1.1 什么是流
流是数据的集合,就像数组或字符串一样。不同的是,流的数据可能不会一次性全部可用,也不需要一次性全部加载到内存中。这使得流在处理大型数据集或来自外部源的数据时特别高效。
### 1.2 为什么需要流
传统的数据处理方式需要将全部数据加载到内存中才能进行处理,这会导致:
- 高内存消耗
- 处理延迟(需要等待所有数据加载完成)
- 不适用于实时数据处理
流解决了这些问题,它允许我们:
- 分段处理数据
- 内存使用更高效
- 实现实时处理
### 1.3 流的基本类型
Node.js中有四种基本的流类型:
1. 可读流(Readable)
2. 可写流(Writable)
3. 双工流(Duplex)
4. 转换流(Transform)
## 二、可读流(Readable Stream)
### 2.1 可读流概述
可读流是数据的源头,表示可以从中读取数据的流。常见例子包括:
- 文件读取流
- HTTP请求
- TCP sockets
- 标准输入(stdin)
### 2.2 可读流的两种模式
可读流有两种读取模式:
#### 2.2.1 流动模式(Flowing Mode)
数据自动从底层系统读取,并通过事件尽可能快地提供给应用程序。
```javascript
const fs = require('fs');
const readable = fs.createReadStream('largefile.txt');
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
必须显式调用stream.read()
方法来从流中读取数据片段。
const readable = fs.createReadStream('largefile.txt');
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
console.log(`Received ${chunk.length} bytes of data.`);
}
});
const { Readable } = require('stream');
class MyReadable extends Readable {
constructor(options) {
super(options);
this.data = ['a', 'b', 'c', 'd', 'e'];
this.index = 0;
}
_read(size) {
if (this.index >= this.data.length) {
this.push(null); // 结束流
} else {
this.push(this.data[this.index++]);
}
}
}
const myReadable = new MyReadable();
myReadable.on('data', (chunk) => {
console.log(chunk.toString());
});
可写流是数据的目标,表示可以向其中写入数据的流。常见例子包括: - 文件写入流 - HTTP响应 - TCP sockets - 标准输出(stdout) - 标准错误(stderr)
const fs = require('fs');
const writable = fs.createWriteStream('output.txt');
writable.write('Hello, ');
writable.write('World!');
writable.end(); // 结束写入
const { Writable } = require('stream');
class MyWritable extends Writable {
constructor(options) {
super(options);
this.data = [];
}
_write(chunk, encoding, callback) {
this.data.push(chunk.toString());
callback(); // 表示写入完成
}
}
const myWritable = new MyWritable();
myWritable.write('Hello');
myWritable.write('World');
myWritable.end();
双工流既是可读的又是可写的,可以看作是可读流和可写流的组合。常见例子包括: - TCP sockets - WebSocket连接 - 压缩/解压缩流
const { Duplex } = require('stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
this.data = [];
}
_write(chunk, encoding, callback) {
this.data.push(chunk.toString());
callback();
}
_read(size) {
if (this.data.length === 0) {
this.push(null);
} else {
this.push(this.data.shift());
}
}
}
const duplex = new MyDuplex();
duplex.on('data', (chunk) => {
console.log('Received:', chunk.toString());
});
duplex.write('Hello');
duplex.write('World');
duplex.end();
转换流是一种特殊的双工流,它的输出与输入是相关的。常见例子包括: - zlib压缩/解压缩 - crypto加密/解密 - 数据格式转换
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
const upperCase = new UpperCaseTransform();
process.stdin.pipe(upperCase).pipe(process.stdout);
一种特殊的转换流,只是简单地传递数据而不做任何修改。
const { PassThrough } = require('stream');
const passThrough = new PassThrough();
passThrough.on('data', (chunk) => {
console.log('Received:', chunk.toString());
});
passThrough.write('Hello');
passThrough.end();
默认情况下,流处理的是Buffer或String类型的数据。对象模式允许流处理任意JavaScript对象。
const { Readable } = require('stream');
const objectStream = new Readable({
objectMode: true,
read() {}
});
objectStream.push({ name: 'Alice' });
objectStream.push({ name: 'Bob' });
objectStream.push(null);
objectStream.on('data', (obj) => {
console.log('Received object:', obj);
});
管道是将多个流连接在一起的机制,数据自动从一个流流向另一个流。
const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
流中的错误需要通过事件监听器捕获:
const stream = fs.createReadStream('nonexistent.txt');
stream.on('error', (err) => {
console.error('Stream error:', err);
});
当数据生产速度大于消费速度时,流会自动处理背压问题。
使用destroy()
方法可以手动销毁流并释放资源。
// 文件复制
const readStream = fs.createReadStream('source.mp4');
const writeStream = fs.createWriteStream('destination.mp4');
readStream.pipe(writeStream);
const http = require('http');
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('largefile.txt');
fileStream.pipe(res);
});
server.listen(3000);
const csv = require('csv-parser');
const { Transform } = require('stream');
const { createWriteStream } = require('fs');
// CSV转JSON
fs.createReadStream('data.csv')
.pipe(csv())
.pipe(new Transform({
objectMode: true,
transform: (obj, encoding, callback) => {
callback(null, JSON.stringify(obj) + '\n');
}
}))
.pipe(fs.createWriteStream('data.jsonl'));
根据场景选择适当的流类型可以提高性能: - 纯读取:Readable - 纯写入:Writable - 双向独立:Duplex - 数据转换:Transform
const readStream = fs.createReadStream('largefile.txt', {
highWaterMark: 1024 * 1024 // 1MB
});
const { pipeline } = require('stream');
const zlib = require('zlib');
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
确保正确处理流结束和错误事件,避免未关闭的流导致内存泄漏。
确保正确处理end
事件,避免在流结束前关闭资源。
调整highWaterMark
或使用pause()
/resume()
控制流量。
使用pipeline
代替pipe
可以更好地传播错误。
确保流之间的编码和数据格式兼容。
Node.js持续优化流API,提高性能和易用性。
Node.js正在逐步实现Web Streams标准,提高与浏览器API的兼容性。
Node.js流现在支持异步迭代器,提供了新的消费流的方式:
async function processStream() {
const readable = fs.createReadStream('data.txt');
for await (const chunk of readable) {
console.log(chunk.toString());
}
}
Node.js中的流是处理数据的高效工具,主要分为四种基本类型:可读流、可写流、双工流和转换流。每种流类型都有其特定的用途和优势:
掌握这些流类型及其特性,可以帮助开发者构建高效、可扩展的Node.js应用程序,特别是在处理I/O密集型任务时。通过合理使用流,可以显著降低内存使用,提高应用程序性能,并实现实时数据处理能力。
随着Node.js的发展,流API也在不断改进,引入了更多现代JavaScript特性如异步迭代器,以及与Web标准的对齐。这些改进使得流API更加易用和强大,成为Node.js生态系统中不可或缺的一部分。
fs
:文件系统流http
/https
:HTTP流zlib
:压缩/解压缩流crypto
:加密流net
/dgram
:TCP/UDP流child_process
:子进程流stream
:核心流模块”`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。