readable.forEach(fn[, options])


稳定性: 1 - 实验性

  • fn <Function> | <AsyncFunction> 是一个在流的每个数据块上调用的函数。
    • data <any> 从流中获取一块数据。
    • options <Object>
      • signal <AbortSignal> 如果流被销毁则中止,从而允许提前中止 fn 调用。
  • options <Object>
    • concurrency <number>fn 在流上同时调用的最大并发次数。默认值: 1
    • signal <AbortSignal> 允许在信号被中止时销毁流。
  • 返回:<Promise> 一个表示流已完成的承诺。

此方法允许迭代一个流。对于流中的每个块,将调用 fn 函数。如果 fn 函数返回一个 Promise,该 Promise 将被 await

【This method allows iterating a stream. For each chunk in the stream the fn function will be called. If the fn function returns a promise - that promise will be awaited.】

这种方法不同于 for await...of 循环,因为它可以选择并发处理数据块。此外,forEach 迭代只能通过传递 signal 选项并中止相关的 AbortController 来停止,而 for await...of 可以通过 breakreturn 来停止。在任何情况下,流都将被销毁。

【This method is different from for await...of loops in that it can optionally process chunks concurrently. In addition, a forEach iteration can only be stopped by having passed a signal option and aborting the related AbortController while for await...of can be stopped with break or return. In either case the stream will be destroyed.】

这种方法与监听 'data' 事件不同,它使用底层机制中的 readable 事件,并且可以限制并发 fn 调用的数量。

【This method is different from listening to the 'data' event in that it uses the readable event in the underlying machinery and can limit the number of concurrent fn calls.】

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
  // Logs result, similar to `for await (const result of dnsResults)`
  console.log(result);
});
console.log('done'); // Stream has finished