Node.js Stream(流)(长文讲解)

什么是 Node.js Stream(流)?从生活场景说起

在日常生活中,我们常会遇到“流水线”式的操作。比如去餐厅吃饭,厨师在后厨切菜、炒菜、装盘,一个接一个地进行,而不是等所有菜都做好了才一起端上桌。这种“边生产边消费”的模式,就是“流”的本质。

Node.js Stream(流) 正是这样一种机制:它允许你以分块的方式处理数据,而不是一次性加载全部内容到内存中。想象一下,你要读取一个 1GB 的日志文件,如果一次性读取,内存会瞬间爆满。但使用流,你可以一边读一边处理,就像水流一样,源源不断,不堆积。

这种设计特别适合处理大文件、网络请求、实时数据等场景。它不仅是 Node.js 的核心特性,更是提升应用性能的关键工具。

Node.js Stream 的三大核心类型

Node.js 的 Stream 模块提供了三种基本流类型:Readable(可读流)、Writable(可写流)和 Duplex(双工流)。它们共同构成了数据流动的骨架。

可读流(Readable)

可读流用于“读取”数据。你可以把它想象成一个水龙头,它持续地输出水(数据),而你通过监听事件来接收这些“水流”。

const fs = require('fs');

// 创建一个可读流,读取文件内容
const readableStream = fs.createReadStream('large-file.log', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 每次读取 64KB 数据
});

// 监听 data 事件:每当有数据块到达时触发
readableStream.on('data', (chunk) => {
  console.log('收到数据块:', chunk);
  // chunk 是 Buffer 或字符串,表示当前读取的一小部分数据
});

// 监听 end 事件:数据读取完毕
readableStream.on('end', () => {
  console.log('文件读取完成');
});

// 监听错误事件
readableStream.on('error', (err) => {
  console.error('读取出错:', err);
});

说明:highWaterMark 控制每次读取的数据量。设置为 64KB 意味着每次最多读取 64KB,避免内存压力。

可写流(Writable)

可写流用于“输出”数据。比如你要把日志写入文件,或者把数据发送到网络。它就像一个水桶,你不断地往里面倒水,水桶会自动把水“排出去”。

const fs = require('fs');

// 创建一个可写流,写入文件
const writableStream = fs.createWriteStream('output.txt', {
  encoding: 'utf8'
});

// 写入数据
writableStream.write('这是第一行数据\n');
writableStream.write('这是第二行数据\n');

// 关闭流,确保所有数据写入磁盘
writableStream.end();

// 监听 finish 事件:写入完成
writableStream.on('finish', () => {
  console.log('所有数据已写入文件');
});

// 监听错误事件
writableStream.on('error', (err) => {
  console.error('写入出错:', err);
});

说明:end() 方法会触发流的关闭,确保缓冲区中的数据被写入。如果忘记调用,可能会导致数据丢失。

双工流(Duplex)

双工流既能读又能写,就像一个双向管道。它常用于网络通信、代理服务器等场景。

const { Duplex } = require('stream');

// 创建一个自定义双工流
const duplexStream = new Duplex({
  // 实现读取逻辑
  read(size) {
    // 模拟异步读取数据
    setTimeout(() => {
      const data = 'Hello from duplex stream\n';
      // 将数据推入内部队列
      this.push(data);
      // 如果没有更多数据,push(null) 表示结束
      // this.push(null);
    }, 1000);
  },

  // 实现写入逻辑
  write(chunk, encoding, callback) {
    console.log('接收到写入数据:', chunk.toString());
    // 处理写入数据
    callback(); // 必须调用 callback,表示处理完成
  }
});

// 读取数据
duplexStream.on('data', (chunk) => {
  console.log('读取到:', chunk.toString());
});

// 写入数据
duplexStream.write('你好,世界!\n');
duplexStream.end();

说明:this.push(data) 是向可读端推送数据,callback() 是通知写入完成,否则流会卡住。

流的管道机制:pipe() 方法的妙用

在真实项目中,我们常常需要把一个流的输出,直接作为另一个流的输入。比如:读取文件 → 处理数据 → 写入新文件。这时,pipe() 方法就派上用场了。

const fs = require('fs');

// 创建可读流
const readStream = fs.createReadStream('input.txt', {
  encoding: 'utf8'
});

// 创建可写流
const writeStream = fs.createWriteStream('output.txt', {
  encoding: 'utf8'
});

// 使用 pipe 连接两个流
readStream.pipe(writeStream);

// 监听完成事件
writeStream.on('finish', () => {
  console.log('文件复制完成');
});

// 监听错误
readStream.on('error', (err) => {
  console.error('读取错误:', err);
});

writeStream.on('error', (err) => {
  console.error('写入错误:', err);
});

说明:pipe() 会自动管理数据流动,避免内存溢出。它会监听 data 事件,把数据推给目标流,并在结束时自动关闭。

pipe() 的高级用法:链式操作

你还可以把多个流串联起来,形成“数据流水线”:

const fs = require('fs');
const zlib = require('zlib'); // 压缩模块

// 读取文件 → 压缩 → 写入压缩包
const readStream = fs.createReadStream('large-file.txt');
const compressStream = zlib.createGzip();
const writeStream = fs.createWriteStream('large-file.txt.gz');

readStream
  .pipe(compressStream)
  .pipe(writeStream);

writeStream.on('finish', () => {
  console.log('压缩完成,文件已保存为 large-file.txt.gz');
});

说明:zlib.createGzip() 是一个内置的双工流,可以压缩数据。整个过程无需手动管理缓冲区,非常高效。

流的背压机制:如何防止内存溢出?

背压(Backpressure)是流的核心机制之一。它就像“水龙头的水流速度不能超过水管的承载能力”。如果写入速度太快,而接收端处理不过来,就会导致内存堆积。

Node.js 的流系统会自动处理背压:当写入端发现接收端“处理不过来”时,会暂停数据输出,直到接收端准备好。

如何检测背压?

const fs = require('fs');

const writeStream = fs.createWriteStream('output.txt');

// 监听可写流的 'drain' 事件
writeStream.on('drain', () => {
  console.log('缓冲区已清空,可以继续写入');
});

// 写入大量数据
for (let i = 0; i < 10000; i++) {
  const data = `第 ${i} 行数据\n`;
  const result = writeStream.write(data);
  
  // 如果返回 false,说明缓冲区已满,需要等待 drain
  if (!result) {
    console.log('缓冲区已满,暂停写入');
  }
}

说明:write() 方法返回 true 表示可以继续写,返回 false 表示缓冲区满,应暂停写入,直到 drain 事件触发。

背压的最佳实践

  • 使用 pipe() 而不是手动 write(),因为 pipe() 自动处理背压。
  • 在自定义流中,调用 push(null) 结束读取。
  • 避免在 write() 中进行耗时操作,否则会阻塞整个流。

实战案例:用流处理大文件日志分析

假设你有一个 5GB 的日志文件,需要统计其中包含 "ERROR" 的行数。

传统方式(不推荐)

const fs = require('fs');

// 一次性读取全部内容,内存爆炸!
const data = fs.readFileSync('huge-log.log', 'utf8');
const lines = data.split('\n');
const errorCount = lines.filter(line => line.includes('ERROR')).length;

console.log('ERROR 行数:', errorCount);

❌ 问题:5GB 文件无法全部加载到内存,会导致程序崩溃。

流式处理(推荐)

const fs = require('fs');

// 创建可读流
const readStream = fs.createReadStream('huge-log.log', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 每次读取 64KB
});

let errorCount = 0;

// 监听 data 事件
readStream.on('data', (chunk) => {
  // 将 chunk 按行分割
  const lines = chunk.split('\n');
  
  // 处理每一行
  lines.forEach(line => {
    if (line.includes('ERROR')) {
      errorCount++;
    }
  });
});

// 监听 end 事件
readStream.on('end', () => {
  console.log('统计完成,共找到', errorCount, '个 ERROR');
});

// 监听错误
readStream.on('error', (err) => {
  console.error('读取失败:', err);
});

✅ 优点:内存占用稳定,处理大文件毫无压力。

总结:掌握 Node.js Stream(流) 的核心价值

Node.js Stream(流) 不仅是技术特性,更是一种思维方式。它教会我们:不要一次性处理全部数据,而是分块、持续、高效地处理

无论是读取大文件、处理网络数据,还是构建高性能服务器,流都提供了优雅的解决方案。它能显著降低内存占用,提升系统稳定性。

掌握流,意味着你真正理解了 Node.js 的非阻塞异步模型。它不是“高级技巧”,而是构建可扩展应用的基础。

从今天开始,遇到大文件、大数据流时,别再想着“全部加载”,试试用流吧——让数据像河水一样,自然流动,高效处理。