什么是 Node.js Stream(流)?从生活场景说起
在日常生活中,我们常会遇到“流水线”式的操作。比如去餐厅吃饭,厨师在后厨切菜、炒菜、装盘,一个接一个地进行,而不是等所有菜都做好了才一起端上桌。这种“边生产边消费”的模式,就是“流”的本质。
Node.js Stream(流) 正是这样一种机制:它允许你以分块的方式处理数据,而不是一次性加载全部内容到内存中。想象一下,你要读取一个 1GB 的日志文件,如果一次性读取,内存会瞬间爆满。但使用流,你可以一边读一边处理,就像水流一样,源源不断,不堆积。
这种设计特别适合处理大文件、网络请求、实时数据等场景。它不仅是 Node.js 的核心特性,更是提升应用性能的关键工具。
Node.js Stream 的三大核心类型
Node.js 的 Stream 模块提供了三种基本流类型:Readable(可读流)、Writable(可写流)和 Duplex(双工流)。它们共同构成了数据流动的骨架。
可读流(Readable)
可读流用于“读取”数据。你可以把它想象成一个水龙头,它持续地输出水(数据),而你通过监听事件来接收这些“水流”。
const fs = require('fs');
// 创建一个可读流,读取文件内容
const readableStream = fs.createReadStream('large-file.log', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 每次读取 64KB 数据
});
// 监听 data 事件:每当有数据块到达时触发
readableStream.on('data', (chunk) => {
console.log('收到数据块:', chunk);
// chunk 是 Buffer 或字符串,表示当前读取的一小部分数据
});
// 监听 end 事件:数据读取完毕
readableStream.on('end', () => {
console.log('文件读取完成');
});
// 监听错误事件
readableStream.on('error', (err) => {
console.error('读取出错:', err);
});
说明:
highWaterMark控制每次读取的数据量。设置为 64KB 意味着每次最多读取 64KB,避免内存压力。
可写流(Writable)
可写流用于“输出”数据。比如你要把日志写入文件,或者把数据发送到网络。它就像一个水桶,你不断地往里面倒水,水桶会自动把水“排出去”。
const fs = require('fs');
// 创建一个可写流,写入文件
const writableStream = fs.createWriteStream('output.txt', {
encoding: 'utf8'
});
// 写入数据
writableStream.write('这是第一行数据\n');
writableStream.write('这是第二行数据\n');
// 关闭流,确保所有数据写入磁盘
writableStream.end();
// 监听 finish 事件:写入完成
writableStream.on('finish', () => {
console.log('所有数据已写入文件');
});
// 监听错误事件
writableStream.on('error', (err) => {
console.error('写入出错:', err);
});
说明:
end()方法会触发流的关闭,确保缓冲区中的数据被写入。如果忘记调用,可能会导致数据丢失。
双工流(Duplex)
双工流既能读又能写,就像一个双向管道。它常用于网络通信、代理服务器等场景。
const { Duplex } = require('stream');
// 创建一个自定义双工流
const duplexStream = new Duplex({
// 实现读取逻辑
read(size) {
// 模拟异步读取数据
setTimeout(() => {
const data = 'Hello from duplex stream\n';
// 将数据推入内部队列
this.push(data);
// 如果没有更多数据,push(null) 表示结束
// this.push(null);
}, 1000);
},
// 实现写入逻辑
write(chunk, encoding, callback) {
console.log('接收到写入数据:', chunk.toString());
// 处理写入数据
callback(); // 必须调用 callback,表示处理完成
}
});
// 读取数据
duplexStream.on('data', (chunk) => {
console.log('读取到:', chunk.toString());
});
// 写入数据
duplexStream.write('你好,世界!\n');
duplexStream.end();
说明:
this.push(data)是向可读端推送数据,callback()是通知写入完成,否则流会卡住。
流的管道机制:pipe() 方法的妙用
在真实项目中,我们常常需要把一个流的输出,直接作为另一个流的输入。比如:读取文件 → 处理数据 → 写入新文件。这时,pipe() 方法就派上用场了。
const fs = require('fs');
// 创建可读流
const readStream = fs.createReadStream('input.txt', {
encoding: 'utf8'
});
// 创建可写流
const writeStream = fs.createWriteStream('output.txt', {
encoding: 'utf8'
});
// 使用 pipe 连接两个流
readStream.pipe(writeStream);
// 监听完成事件
writeStream.on('finish', () => {
console.log('文件复制完成');
});
// 监听错误
readStream.on('error', (err) => {
console.error('读取错误:', err);
});
writeStream.on('error', (err) => {
console.error('写入错误:', err);
});
说明:
pipe()会自动管理数据流动,避免内存溢出。它会监听data事件,把数据推给目标流,并在结束时自动关闭。
pipe() 的高级用法:链式操作
你还可以把多个流串联起来,形成“数据流水线”:
const fs = require('fs');
const zlib = require('zlib'); // 压缩模块
// 读取文件 → 压缩 → 写入压缩包
const readStream = fs.createReadStream('large-file.txt');
const compressStream = zlib.createGzip();
const writeStream = fs.createWriteStream('large-file.txt.gz');
readStream
.pipe(compressStream)
.pipe(writeStream);
writeStream.on('finish', () => {
console.log('压缩完成,文件已保存为 large-file.txt.gz');
});
说明:
zlib.createGzip()是一个内置的双工流,可以压缩数据。整个过程无需手动管理缓冲区,非常高效。
流的背压机制:如何防止内存溢出?
背压(Backpressure)是流的核心机制之一。它就像“水龙头的水流速度不能超过水管的承载能力”。如果写入速度太快,而接收端处理不过来,就会导致内存堆积。
Node.js 的流系统会自动处理背压:当写入端发现接收端“处理不过来”时,会暂停数据输出,直到接收端准备好。
如何检测背压?
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');
// 监听可写流的 'drain' 事件
writeStream.on('drain', () => {
console.log('缓冲区已清空,可以继续写入');
});
// 写入大量数据
for (let i = 0; i < 10000; i++) {
const data = `第 ${i} 行数据\n`;
const result = writeStream.write(data);
// 如果返回 false,说明缓冲区已满,需要等待 drain
if (!result) {
console.log('缓冲区已满,暂停写入');
}
}
说明:
write()方法返回true表示可以继续写,返回false表示缓冲区满,应暂停写入,直到drain事件触发。
背压的最佳实践
- 使用
pipe()而不是手动write(),因为pipe()自动处理背压。 - 在自定义流中,调用
push(null)结束读取。 - 避免在
write()中进行耗时操作,否则会阻塞整个流。
实战案例:用流处理大文件日志分析
假设你有一个 5GB 的日志文件,需要统计其中包含 "ERROR" 的行数。
传统方式(不推荐)
const fs = require('fs');
// 一次性读取全部内容,内存爆炸!
const data = fs.readFileSync('huge-log.log', 'utf8');
const lines = data.split('\n');
const errorCount = lines.filter(line => line.includes('ERROR')).length;
console.log('ERROR 行数:', errorCount);
❌ 问题:5GB 文件无法全部加载到内存,会导致程序崩溃。
流式处理(推荐)
const fs = require('fs');
// 创建可读流
const readStream = fs.createReadStream('huge-log.log', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 每次读取 64KB
});
let errorCount = 0;
// 监听 data 事件
readStream.on('data', (chunk) => {
// 将 chunk 按行分割
const lines = chunk.split('\n');
// 处理每一行
lines.forEach(line => {
if (line.includes('ERROR')) {
errorCount++;
}
});
});
// 监听 end 事件
readStream.on('end', () => {
console.log('统计完成,共找到', errorCount, '个 ERROR');
});
// 监听错误
readStream.on('error', (err) => {
console.error('读取失败:', err);
});
✅ 优点:内存占用稳定,处理大文件毫无压力。
总结:掌握 Node.js Stream(流) 的核心价值
Node.js Stream(流) 不仅是技术特性,更是一种思维方式。它教会我们:不要一次性处理全部数据,而是分块、持续、高效地处理。
无论是读取大文件、处理网络数据,还是构建高性能服务器,流都提供了优雅的解决方案。它能显著降低内存占用,提升系统稳定性。
掌握流,意味着你真正理解了 Node.js 的非阻塞异步模型。它不是“高级技巧”,而是构建可扩展应用的基础。
从今天开始,遇到大文件、大数据流时,别再想着“全部加载”,试试用流吧——让数据像河水一样,自然流动,高效处理。