stream.pipeline(streams, callback)


一个模块方法,用于在流和生成器之间进行传输,转发错误并正确清理,并在管道完成时提供回调。

【A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.】

const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  },
); 

pipeline API 提供了一个 承诺版本

【The pipeline API provides a promise version.】

stream.pipeline() 会对除以下情况外的所有流调用 stream.destroy(err)

  • 已经触发 'end''close' 事件的 Readable 流。
  • 已经触发 'finish''close' 事件的 Writable 流。

stream.pipeline() 在调用 callback 后会在流上留下悬空的事件监听器。在流在失败后被重复使用的情况下,这可能会导致事件监听器泄漏和错误被吞掉。如果最后一个流是可读的,悬空的事件监听器将会被移除,以便最后一个流可以在以后被使用。

stream.pipeline() 在发生错误时会关闭所有流。 将 IncomingRequestpipeline 一起使用可能会导致意外行为,因为它会在未发送预期响应的情况下销毁套接字。 请参阅下面的示例:

const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt');
  pipeline(fileStream, res, (err) => {
    if (err) {
      console.log(err); // No such file
      // this message can't be sent once `pipeline` already destroyed the socket
      return res.end('error!!!');
    }
  });
});