stream.compose(...streams)
stream.compose 是实验性的。- 'streams' <Stream[]> | <WritableStream[]> | <TransformStream[]>
- 返回:<stream.Duplex>
将两个或多个流合并为一个 Duplex 流,该流向第一个流写入数据,并从最后一个流读取数据。每个提供的流都会使用 stream.pipeline 管道传输到下一个流。如果其中任何一个流发生错误,则所有流都会被销毁,包括外部的 Duplex 流。
【Combines two or more streams into a Duplex stream that writes to the
first stream and reads from the last. Each provided stream is piped into
the next, using stream.pipeline. If any of the streams error then all
are destroyed, including the outer Duplex stream.】
因为 stream.compose 返回一个新的流,这个流可以(也应该)被连接到其他流中,从而实现组合。相比之下,当将流传递给 stream.pipeline 时,通常第一个流是可读流,最后一个流是可写流,形成一个封闭的回路。
【Because stream.compose returns a new stream that in turn can (and
should) be piped into other streams, it enables composition. In contrast,
when passing streams to stream.pipeline, typically the first stream is
a readable stream and the last a writable stream, forming a closed
circuit.】
如果传入的是 Function,它必须是一个接收 source Iterable 的工厂方法。
【If passed a Function it must be a factory method taking a source
Iterable.】
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // prints 'HELLOWORLD' stream.compose 可以用来将异步可迭代对象、生成器和函数转换为流。
AsyncIterable可以转换为可读的Duplex。不能产生null。AsyncGeneratorFunction会转换为可读/可写的转换Duplex。必须将源AsyncIterable作为第一个参数。不能生成null。AsyncFunction会转换为可写的Duplex。必须返回null或undefined。
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD' 有关将 stream.compose 用作运算符,请参见 readable.compose(stream)。
【See readable.compose(stream) for stream.compose as operator.】