- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用 Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- env 环境变量
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议 2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS 模块
- module/esm ECMAScript 模块
- module/package 包模块
- module/typescript TS 模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- sea 单个可执行应用程序
- sqlite 轻型数据库
- stream 流
- stream/web 网络流
- string_decoder 字符串解码器
- test 测试
- timers 定时器
- tls 安全传输层
- trace_events 跟踪事件
- tty 终端
- url 网址
- util 实用工具
- v8 引擎
- vm 虚拟机
- wasi 网络汇编系统接口
- worker_threads 工作线程
- zlib 压缩
Node.js v25.3.0 文档
- Node.js v25.3.0
-
目录
- 网络流 API
- 概述
- 应用接口
- 类:
ReadableStreamnew ReadableStream([underlyingSource [, strategy]])readableStream.lockedreadableStream.cancel([reason])readableStream.getReader([options])readableStream.pipeThrough(transform[, options])readableStream.pipeTo(destination[, options])readableStream.tee()readableStream.values([options])- 异步迭代
- 使用
postMessage()进行传输
ReadableStream.from(iterable)- 类:
ReadableStreamDefaultReader - 类:
ReadableStreamBYOBReader - 类:
ReadableStreamDefaultController - 类:
ReadableByteStreamController - 类:
ReadableStreamBYOBRequest - 类:
WritableStream - 类:
WritableStreamDefaultWriternew WritableStreamDefaultWriter(stream)writableStreamDefaultWriter.abort([reason])writableStreamDefaultWriter.close()writableStreamDefaultWriter.closedwritableStreamDefaultWriter.desiredSizewritableStreamDefaultWriter.readywritableStreamDefaultWriter.releaseLock()writableStreamDefaultWriter.write([chunk])
- 类:
WritableStreamDefaultController - 类:
TransformStream - 类:
TransformStreamDefaultController - 类:
ByteLengthQueuingStrategy - 类:
CountQueuingStrategy - 类:
TextEncoderStream - 类:
TextDecoderStream - 类:
CompressionStream - 类:
DecompressionStream - 实用工具消费者
- 类:
- 网络流 API
-
导航
- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用 Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- env 环境变量
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议 2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS 模块
- module/esm ECMAScript 模块
- module/package 包模块
- module/typescript TS 模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- sea 单个可执行应用程序
- sqlite 轻型数据库
- stream 流
- stream/web 网络流
- string_decoder 字符串解码器
- test 测试
- timers 定时器
- tls 安全传输层
- trace_events 跟踪事件
- tty 终端
- url 网址
- util 实用工具
- v8 引擎
- vm 虚拟机
- wasi 网络汇编系统接口
- worker_threads 工作线程
- zlib 压缩
- 其他版本
网络流 API#>
【Web Streams API】
WHATWG 流标准 的一个实现。
【An implementation of the WHATWG Streams Standard.】
概述#>
【Overview】
WHATWG 流标准(或“网络流”)定义了一个用于处理流数据的 API。它类似于 Node.js 的 溪流 API,但出现时间较晚,已经成为许多 JavaScript 环境中处理流数据的“标准”API。
【The WHATWG Streams Standard (or "web streams") defines an API for handling streaming data. It is similar to the Node.js Streams API but emerged later and has become the "standard" API for streaming data across many JavaScript environments.】
存在三种主要类型的对象:
【There are three primary types of objects:】
ReadableStream- 表示一个流数据的源。WritableStream- 表示数据流的目标。TransformStream- 表示用于转换流数据的算法。
示例 ReadableStream#>
【Example ReadableStream】
这个例子创建了一个简单的 ReadableStream,每秒永远推送一次当前的 performance.now() 时间戳。使用异步可迭代对象从流中读取数据。
【This example creates a simple ReadableStream that pushes the current
performance.now() timestamp once every second forever. An async iterable
is used to read the data from the stream.】
import {
ReadableStream,
} from 'node:stream/web';
import {
setInterval as every,
} from 'node:timers/promises';
import {
performance,
} from 'node:perf_hooks';
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
for await (const value of stream)
console.log(value);const {
ReadableStream,
} = require('node:stream/web');
const {
setInterval: every,
} = require('node:timers/promises');
const {
performance,
} = require('node:perf_hooks');
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
(async () => {
for await (const value of stream)
console.log(value);
})();
Node.js 流互操作性#>
【Node.js streams interoperability】
Node.js 流可以通过 stream.Readable、stream.Writable 和 stream.Duplex 对象上的 toWeb 和 fromWeb 方法相互转换为网页流,反之亦然。
【Node.js streams can be converted to web streams and vice versa via the toWeb and fromWeb methods present on stream.Readable, stream.Writable and stream.Duplex objects.】
有关更多详细信息,请参阅相关文档:
【For more details refer to the relevant documentation:】
应用接口#>
【API】
类:ReadableStream#>
【Class: ReadableStream】
new ReadableStream([underlyingSource [, strategy]])#>
underlyingSource<Object>start<Function> 用户定义的函数,在ReadableStream创建时立即调用。controller<ReadableStreamDefaultController> | <ReadableByteStreamController>- 返回:
undefined或一个以undefined完成的 Promise。
pull<Function> 一个用户定义的函数,当ReadableStream内部队列未满时会被重复调用。该操作可以是同步或异步的。如果是异步的,在前一个返回的 Promise 被解决之前,该函数不会再次被调用。controller<ReadableStreamDefaultController> | <ReadableByteStreamController>- 返回:一个以
undefined解决的 Promise。
cancel<Function> 当ReadableStream被取消时调用的用户自定义函数。reason<any>- 返回:一个以
undefined解决的 Promise。
type<string> 必须是'bytes'或undefined。autoAllocateChunkSize<number> 仅在type等于'bytes'时使用。设置为非零值时,将自动为ReadableByteStreamController.byobRequest分配一个视图缓冲区。如果未设置,则必须使用流的内部队列通过默认读取器ReadableStreamDefaultReader传输数据。
strategy<Object>highWaterMark<number> 在施加反压之前的最大内部队列大小。size<Function> 一个用户定义的函数,用于确定每个数据块的大小。
readableStream.locked#>
- 类型:<boolean> 如果有此 <ReadableStream> 的活动读取器,则设置为
true。
readableStream.locked 属性默认值为 false,当有活动的读取器正在消费流的数据时,该属性会切换为 true。
【The readableStream.locked property is false by default, and is
switched to true while there is an active reader consuming the
stream's data.】
readableStream.cancel([reason])#>
reason<any>- 返回:一旦取消完成,将返回一个以
undefined作为结果的已完成 Promise。
readableStream.getReader([options])#>
options<Object>mode<string>'byob'或undefined
- 返回:<ReadableStreamDefaultReader> | <ReadableStreamBYOBReader>
import { ReadableStream } from 'node:stream/web';
const stream = new ReadableStream();
const reader = stream.getReader();
console.log(await reader.read());const { ReadableStream } = require('node:stream/web');
const stream = new ReadableStream();
const reader = stream.getReader();
reader.read().then(console.log);
导致 readableStream.locked 为 true。
【Causes the readableStream.locked to be true.】
readableStream.pipeThrough(transform[, options])#>
transform<Object>readable<ReadableStream>ReadableStream,transform.writable将会把从该ReadableStream接收到的可能被修改的数据推送到这里。writable<WritableStream> 将写入此ReadableStream数据的WritableStream。
options<Object>preventAbort<boolean> 当为true时,该ReadableStream中的错误不会导致transform.writable被中止。preventCancel<boolean> 当设置为true时,目标transform.writable中的错误不会导致此ReadableStream被取消。preventClose<boolean> 当为true时,关闭此ReadableStream不会导致transform.writable被关闭。signal<AbortSignal> 允许使用 <AbortController> 取消数据传输。
- 返回:从
transform.readable获取 <ReadableStream>。
将此 <ReadableStream> 连接到 transform 参数中提供的 <ReadableStream> 和 <WritableStream> 对,使得此 <ReadableStream> 的数据被写入 transform.writable,可能经过转换,然后推送到 transform.readable。一旦管道配置完成,将返回 transform.readable。
【Connects this <ReadableStream> to the pair of <ReadableStream> and
<WritableStream> provided in the transform argument such that the
data from this <ReadableStream> is written in to transform.writable,
possibly transformed, then pushed to transform.readable. Once the
pipeline is configured, transform.readable is returned.】
在管道操作进行时,会导致 readableStream.locked 为 true。
【Causes the readableStream.locked to be true while the pipe operation
is active.】
import {
ReadableStream,
TransformStream,
} from 'node:stream/web';
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
for await (const chunk of transformedStream)
console.log(chunk);
// Prints: Aconst {
ReadableStream,
TransformStream,
} = require('node:stream/web');
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
(async () => {
for await (const chunk of transformedStream)
console.log(chunk);
// Prints: A
})();
readableStream.pipeTo(destination[, options])#>
destination<WritableStream> 一个 <WritableStream>,这个ReadableStream的数据将被写入其中。options<Object>preventAbort<boolean> 当值为true时,此ReadableStream中的错误不会导致destination被中止。preventCancel<boolean> 当为true时,destination中的错误不会导致此ReadableStream被取消。preventClose<boolean> 当为true时,关闭此ReadableStream不会导致destination被关闭。signal<AbortSignal> 允许使用 <AbortController> 取消数据传输。
- 返回:一个以
undefined解决的 Promise
在管道操作进行时,会导致 readableStream.locked 为 true。
【Causes the readableStream.locked to be true while the pipe operation
is active.】
readableStream.tee()#>
返回一对新的 <ReadableStream> 实例,该 ReadableStream 的数据将被转发到这两个实例。每个实例都会接收到相同的数据。
【Returns a pair of new <ReadableStream> instances to which this
ReadableStream's data will be forwarded. Each will receive the
same data.】
导致 readableStream.locked 为 true。
【Causes the readableStream.locked to be true.】
readableStream.values([options])#>
options<Object>preventCancel<boolean> 当值为true时,可防止在异步迭代器意外终止时关闭 <ReadableStream>。默认值:false。
创建并返回一个异步迭代器,可用于消费此 ReadableStream 的数据。
【Creates and returns an async iterator usable for consuming this
ReadableStream's data.】
在异步迭代器处于活动状态时,会导致 readableStream.locked 为 true。
【Causes the readableStream.locked to be true while the async iterator
is active.】
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream.values({ preventCancel: true }))
console.log(Buffer.from(chunk).toString());
异步迭代#>
【Async Iteration】
<ReadableStream> 对象支持使用 for await 语法的异步迭代器协议。
【The <ReadableStream> object supports the async iterator protocol using
for await syntax.】
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream)
console.log(Buffer.from(chunk).toString());
异步迭代器将会消耗 <ReadableStream> 直到它终止。
【The async iterator will consume the <ReadableStream> until it terminates.】
默认情况下,如果异步迭代器提前退出(通过 break、return 或 throw),<ReadableStream> 将会被关闭。要防止 <ReadableStream> 自动关闭,可以使用 readableStream.values() 方法获取异步迭代器,并将 preventCancel 选项设置为 true。
【By default, if the async iterator exits early (via either a break,
return, or a throw), the <ReadableStream> will be closed. To prevent
automatic closing of the <ReadableStream>, use the readableStream.values()
method to acquire the async iterator and set the preventCancel option to
true.】
<ReadableStream> 不得被锁定(即不得已有活动读取器)。在异步迭代期间,<ReadableStream> 将被锁定。
【The <ReadableStream> must not be locked (that is, it must not have an existing active reader). During the async iteration, the <ReadableStream> will be locked.】
使用 postMessage() 进行传输#>
【Transferring with postMessage()】
可以使用 <MessagePort> 转移 <ReadableStream> 实例。
【A <ReadableStream> instance can be transferred using a <MessagePort>.】
const stream = new ReadableStream(getReadableSourceSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getReader().read().then((chunk) => {
console.log(chunk);
});
};
port2.postMessage(stream, [stream]);
ReadableStream.from(iterable)#>
iterable<Iterable> 实现了Symbol.asyncIterator或Symbol.iterator可迭代协议的对象。
一个实用方法,用于从可迭代对象创建一个新的 <ReadableStream>。
【A utility method that creates a new <ReadableStream> from an iterable.】
import { ReadableStream } from 'node:stream/web';
async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}
const stream = ReadableStream.from(asyncIterableGenerator());
for await (const chunk of stream)
console.log(chunk); // Prints: 'a', 'b', 'c'const { ReadableStream } = require('node:stream/web');
async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}
(async () => {
const stream = ReadableStream.from(asyncIterableGenerator());
for await (const chunk of stream)
console.log(chunk); // Prints: 'a', 'b', 'c'
})();
将生成的 <ReadableStream> 管道到 <WritableStream> 中,<Iterable> 应该会产生一系列 <Buffer>、<TypedArray> 或 <DataView> 对象。
【To pipe the resulting <ReadableStream> into a <WritableStream> the <Iterable> should yield a sequence of <Buffer>, <TypedArray>, or <DataView> objects.】
import { ReadableStream } from 'node:stream/web';
import { Buffer } from 'node:buffer';
async function* asyncIterableGenerator() {
yield Buffer.from('a');
yield Buffer.from('b');
yield Buffer.from('c');
}
const stream = ReadableStream.from(asyncIterableGenerator());
await stream.pipeTo(createWritableStreamSomehow());const { ReadableStream } = require('node:stream/web');
const { Buffer } = require('node:buffer');
async function* asyncIterableGenerator() {
yield Buffer.from('a');
yield Buffer.from('b');
yield Buffer.from('c');
}
const stream = ReadableStream.from(asyncIterableGenerator());
(async () => {
await stream.pipeTo(createWritableStreamSomehow());
})();
类:ReadableStreamDefaultReader#>
【Class: ReadableStreamDefaultReader】
默认情况下,调用 readableStream.getReader() 而不传入任何参数时,会返回一个 ReadableStreamDefaultReader 实例。默认读取器将通过流传递的数据块视为不透明值,这允许 <ReadableStream> 可以处理几乎任何 JavaScript 值。
【By default, calling readableStream.getReader() with no arguments
will return an instance of ReadableStreamDefaultReader. The default
reader treats the chunks of data passed through the stream as opaque
values, which allows the <ReadableStream> to work with generally any
JavaScript value.】
new ReadableStreamDefaultReader(stream)#>
stream<ReadableStream>
创建一个新的 <ReadableStreamDefaultReader>,并锁定到指定的 <ReadableStream>。
【Creates a new <ReadableStreamDefaultReader> that is locked to the given <ReadableStream>.】
readableStreamDefaultReader.cancel([reason])#>
reason<any>- 返回:一个以
undefined解决的 Promise。
取消 <ReadableStream> 并返回一个 promise,当底层流被取消时该 promise 将被兑现。
【Cancels the <ReadableStream> and returns a promise that is fulfilled when the underlying stream has been canceled.】
readableStreamDefaultReader.closed#>
- 类型:<Promise> 当关联的 <ReadableStream> 被关闭或拒绝,或者如果流在完成关闭前出现错误或阅读器的锁被释放时,返回
undefined。
readableStreamDefaultReader.read()#>
从底层 <ReadableStream> 请求下一块数据,并返回一个在数据可用时被完成的 Promise。
【Requests the next chunk of data from the underlying <ReadableStream> and returns a promise that is fulfilled with the data once it is available.】
readableStreamDefaultReader.releaseLock()#>
释放此读取器对底层 <ReadableStream> 的锁定。
【Releases this reader's lock on the underlying <ReadableStream>.】
类:ReadableStreamBYOBReader#>
【Class: ReadableStreamBYOBReader】
ReadableStreamBYOBReader 是字节导向 <ReadableStream> 的另一种消费方式(那些在创建 ReadableStream 时,其 underlyingSource.type 被设置为 'bytes' 的流)。
【The ReadableStreamBYOBReader is an alternative consumer for
byte-oriented <ReadableStream>s (those that are created with
underlyingSource.type set equal to 'bytes' when the
ReadableStream was created).】
BYOB 是 “bring your own buffer”(自备缓冲区)的缩写。这是一种模式,它允许更高效地读取字节导向的数据,并避免不必要的复制。
【The BYOB is short for "bring your own buffer". This is a
pattern that allows for more efficient reading of byte-oriented
data that avoids extraneous copying.】
import {
open,
} from 'node:fs/promises';
import {
ReadableStream,
} from 'node:stream/web';
import { Buffer } from 'node:buffer';
class Source {
type = 'bytes';
autoAllocateChunkSize = 1024;
async start(controller) {
this.file = await open(new URL(import.meta.url));
this.controller = controller;
}
async pull(controller) {
const view = controller.byobRequest?.view;
const {
bytesRead,
} = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength,
});
if (bytesRead === 0) {
await this.file.close();
this.controller.close();
}
controller.byobRequest.respond(bytesRead);
}
}
const stream = new ReadableStream(new Source());
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' });
const chunks = [];
let result;
do {
result = await reader.read(Buffer.alloc(100));
if (result.value !== undefined)
chunks.push(Buffer.from(result.value));
} while (!result.done);
return Buffer.concat(chunks);
}
const data = await read(stream);
console.log(Buffer.from(data).toString());
new ReadableStreamBYOBReader(stream)#>
stream<ReadableStream>
创建一个新的 ReadableStreamBYOBReader,该读取器被锁定到给定的 <ReadableStream>。
【Creates a new ReadableStreamBYOBReader that is locked to the
given <ReadableStream>.】
readableStreamBYOBReader.cancel([reason])#>
reason<any>- 返回:一个以
undefined解决的 Promise。
取消 <ReadableStream> 并返回一个 promise,当底层流被取消时该 promise 将被兑现。
【Cancels the <ReadableStream> and returns a promise that is fulfilled when the underlying stream has been canceled.】
readableStreamBYOBReader.closed#>
- 类型:<Promise> 当关联的 <ReadableStream> 被关闭或拒绝,或者如果流在完成关闭前出现错误或阅读器的锁被释放时,返回
undefined。
readableStreamBYOBReader.read(view[, options])#>
view<Buffer> | <TypedArray> | <DataView>options<Object>min<number> 设置后,返回的 promise 只有在min数量的元素可用时才会被兑现。未设置时,当至少有一个元素可用时,promise 就会被兑现。
- 返回值:一个已兑现的对象的 Promise:
value<TypedArray> | <DataView>done<boolean>
从底层 <ReadableStream> 请求下一块数据,并返回一个在数据可用时被完成的 Promise。
【Requests the next chunk of data from the underlying <ReadableStream> and returns a promise that is fulfilled with the data once it is available.】
不要将池化的 <Buffer> 对象实例传入此方法。池化的 Buffer 对象是使用 Buffer.allocUnsafe() 或 Buffer.from() 创建的,或者通常由各种 node:fs 模块回调返回。这些类型的 Buffer 使用共享的底层 <ArrayBuffer> 对象,该对象包含所有池化 Buffer 实例的所有数据。当将 Buffer、<TypedArray> 或 <DataView> 传入 readableStreamBYOBReader.read() 时,该视图的底层 ArrayBuffer 会被_分离_,使其上可能存在的所有现有视图失效。这可能对你的应用造成严重后果。
【Do not pass a pooled <Buffer> object instance in to this method.
Pooled Buffer objects are created using Buffer.allocUnsafe(),
or Buffer.from(), or are often returned by various node:fs module
callbacks. These types of Buffers use a shared underlying
<ArrayBuffer> object that contains all of the data from all of
the pooled Buffer instances. When a Buffer, <TypedArray>,
or <DataView> is passed in to readableStreamBYOBReader.read(),
the view's underlying ArrayBuffer is detached, invalidating
all existing views that may exist on that ArrayBuffer. This
can have disastrous consequences for your application.】
readableStreamBYOBReader.releaseLock()#>
释放此读取器对底层 <ReadableStream> 的锁定。
【Releases this reader's lock on the underlying <ReadableStream>.】
类:ReadableStreamDefaultController#>
【Class: ReadableStreamDefaultController】
每个 <ReadableStream> 都有一个控制器,负责流队列的内部状态和管理。ReadableStreamDefaultController 是用于非字节导向 ReadableStream 的默认控制器实现。
【Every <ReadableStream> has a controller that is responsible for
the internal state and management of the stream's queue. The
ReadableStreamDefaultController is the default controller
implementation for ReadableStreams that are not byte-oriented.】
readableStreamDefaultController.close()#>
关闭与此控制器关联的 <ReadableStream>。
【Closes the <ReadableStream> to which this controller is associated.】
readableStreamDefaultController.desiredSize#>
- 类型:<number>
返回填充 <ReadableStream> 队列所剩余的数据量。
【Returns the amount of data remaining to fill the <ReadableStream>'s queue.】
readableStreamDefaultController.enqueue([chunk])#>
chunk<any>
将一段新数据附加到 <ReadableStream> 的队列中。
【Appends a new chunk of data to the <ReadableStream>'s queue.】
readableStreamDefaultController.error([error])#>
error<any>
发出导致 <ReadableStream> 出错并关闭的错误信号。
【Signals an error that causes the <ReadableStream> to error and close.】
类:ReadableByteStreamController#>
【Class: ReadableByteStreamController】
每个 <ReadableStream> 都有一个控制器,负责流队列的内部状态和管理。ReadableByteStreamController 用于面向字节的 ReadableStream。
【Every <ReadableStream> has a controller that is responsible for
the internal state and management of the stream's queue. The
ReadableByteStreamController is for byte-oriented ReadableStreams.】
readableByteStreamController.byobRequest#>
readableByteStreamController.close()#>
关闭与此控制器关联的 <ReadableStream>。
【Closes the <ReadableStream> to which this controller is associated.】
readableByteStreamController.desiredSize#>
- 类型:<number>
返回填充 <ReadableStream> 队列所剩余的数据量。
【Returns the amount of data remaining to fill the <ReadableStream>'s queue.】
readableByteStreamController.enqueue(chunk)#>
chunk<Buffer> | <TypedArray> | <DataView>
将一段新数据附加到 <ReadableStream> 的队列中。
【Appends a new chunk of data to the <ReadableStream>'s queue.】
readableByteStreamController.error([error])#>
error<any>
发出导致 <ReadableStream> 出错并关闭的错误信号。
【Signals an error that causes the <ReadableStream> to error and close.】
类:ReadableStreamBYOBRequest#>
【Class: ReadableStreamBYOBRequest】
在字节导向的流中使用 ReadableByteStreamController 时,以及在使用 ReadableStreamBYOBReader 时,readableByteStreamController.byobRequest 属性提供对 ReadableStreamBYOBRequest 实例的访问,该实例表示当前的读取请求。该对象用于访问为读取请求提供的 ArrayBuffer/TypedArray,并提供用于表示数据已提供的方法。
【When using ReadableByteStreamController in byte-oriented
streams, and when using the ReadableStreamBYOBReader,
the readableByteStreamController.byobRequest property
provides access to a ReadableStreamBYOBRequest instance
that represents the current read request. The object
is used to gain access to the ArrayBuffer/TypedArray
that has been provided for the read request to fill,
and provides methods for signaling that the data has
been provided.】
readableStreamBYOBRequest.respond(bytesWritten)#>
bytesWritten<number>
表示已向 readableStreamBYOBRequest.view 写入了 bytesWritten 数量的字节。
【Signals that a bytesWritten number of bytes have been written
to readableStreamBYOBRequest.view.】
readableStreamBYOBRequest.respondWithNewView(view)#>
view<Buffer> | <TypedArray> | <DataView>
表示请求已完成,并将字节写入新的 Buffer、TypedArray 或 DataView。
【Signals that the request has been fulfilled with bytes written
to a new Buffer, TypedArray, or DataView.】
readableStreamBYOBRequest.view#>
- 类型: <Buffer> | <TypedArray> | <DataView>
类:WritableStream#>
【Class: WritableStream】
WritableStream 是数据流发送到的目标。
【The WritableStream is a destination to which stream data is sent.】
import {
WritableStream,
} from 'node:stream/web';
const stream = new WritableStream({
write(chunk) {
console.log(chunk);
},
});
await stream.getWriter().write('Hello World');
new WritableStream([underlyingSink[, strategy]])#>
underlyingSink<Object>start<Function> 用户定义的函数,在WritableStream创建时立即调用。controller<WritableStreamDefaultController>- 返回:
undefined或一个以undefined完成的 Promise。
write<Function> 当一块数据被写入WritableStream时调用的用户定义函数。chunk<any>controller<WritableStreamDefaultController>- 返回:一个以
undefined解决的 Promise。
close<Function> 当WritableStream被关闭时调用的用户自定义函数。- 返回:一个以
undefined解决的 Promise。
- 返回:一个以
abort<Function> 一个用户定义的函数,用于突然关闭WritableStream。reason<any>- 返回:一个以
undefined解决的 Promise。
type<any>type选项保留供将来使用,必须 未定义。
strategy<Object>highWaterMark<number> 在施加反压之前的最大内部队列大小。size<Function> 一个用户定义的函数,用于确定每个数据块的大小。
writableStream.abort([reason])#>
reason<any>- 返回:一个以
undefined解决的 Promise。
突然终止 WritableStream。所有排队的写入操作将被取消,并且其相关的承诺将被拒绝。
【Abruptly terminates the WritableStream. All queued writes will be
canceled with their associated promises rejected.】
writableStream.close()#>
- 返回:一个以
undefined解决的 Promise。
当不再有额外写入时,关闭 WritableStream。
【Closes the WritableStream when no additional writes are expected.】
writableStream.getWriter()#>
创建并返回一个新的写入器实例,可用于向 WritableStream 写入数据。
【Creates and returns a new writer instance that can be used to write
data into the WritableStream.】
writableStream.locked#>
- 类型:<boolean>
writableStream.locked 属性默认值为 false,当有活动写入器连接到此 WritableStream 时,该属性会切换为 true。
【The writableStream.locked property is false by default, and is
switched to true while there is an active writer attached to this
WritableStream.】
使用 postMessage() 传输#>
【Transferring with postMessage()】
可以使用 <MessagePort> 转移 <WritableStream> 实例。
【A <WritableStream> instance can be transferred using a <MessagePort>.】
const stream = new WritableStream(getWritableSinkSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getWriter().write('hello');
};
port2.postMessage(stream, [stream]);
类:WritableStreamDefaultWriter#>
【Class: WritableStreamDefaultWriter】
new WritableStreamDefaultWriter(stream)#>
stream<WritableStream>
创建一个新的 WritableStreamDefaultWriter,该写入器被锁定到指定的 WritableStream 上。
【Creates a new WritableStreamDefaultWriter that is locked to the given
WritableStream.】
writableStreamDefaultWriter.abort([reason])#>
reason<any>- 返回:一个以
undefined解决的 Promise。
突然终止 WritableStream。所有排队的写入操作将被取消,并且其相关的承诺将被拒绝。
【Abruptly terminates the WritableStream. All queued writes will be
canceled with their associated promises rejected.】
writableStreamDefaultWriter.close()#>
- 返回:一个以
undefined解决的 Promise。
当不再有额外写入时,关闭 WritableStream。
【Closes the WritableStream when no additional writes are expected.】
writableStreamDefaultWriter.closed#>
- 类型:<Promise> 当关联的 <WritableStream> 被关闭或拒绝,或者如果流在完成关闭前出现错误或写入器锁被释放时,值为
undefined。
writableStreamDefaultWriter.desiredSize#>
- 类型:<number>
填充 <WritableStream> 队列所需的数据量。
【The amount of data required to fill the <WritableStream>'s queue.】
writableStreamDefaultWriter.ready#>
- 类型:<Promise> 当写入器准备好使用时,填充为
undefined。
writableStreamDefaultWriter.releaseLock()#>
释放此写入器对底层 <ReadableStream> 的锁定。
【Releases this writer's lock on the underlying <ReadableStream>.】
writableStreamDefaultWriter.write([chunk])#>
chunk<any>- 返回:一个以
undefined解决的 Promise。
将一段新数据附加到 <WritableStream> 的队列中。
【Appends a new chunk of data to the <WritableStream>'s queue.】
类:WritableStreamDefaultController#>
【Class: WritableStreamDefaultController】
WritableStreamDefaultController 管理 <WritableStream> 的内部状态。
【The WritableStreamDefaultController manages the <WritableStream>'s
internal state.】
writableStreamDefaultController.error([error])#>
error<any>
由用户代码调用以表示在处理 WritableStream 数据时发生了错误。调用时,<WritableStream> 将被中止,当前待处理的写入将被取消。
【Called by user-code to signal that an error has occurred while processing
the WritableStream data. When called, the <WritableStream> will be aborted,
with currently pending writes canceled.】
writableStreamDefaultController.signal#>
- 类型:<AbortSignal> 一个
AbortSignal,可以在 <WritableStream> 被中止时用来取消挂起的写入或关闭操作。
类:TransformStream#>
【Class: TransformStream】
TransformStream 由 <ReadableStream> 和 <WritableStream> 组成,它们相互连接,使得写入 WritableStream 的数据可以被接收,并在被推入 `ReadableStream`` 的队列之前可能会被转换。
【A TransformStream consists of a <ReadableStream> and a <WritableStream> that
are connected such that the data written to the WritableStream is received,
and potentially transformed, before being pushed into the ReadableStream's
queue.】
import {
TransformStream,
} from 'node:stream/web';
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
await Promise.all([
transform.writable.getWriter().write('A'),
transform.readable.getReader().read(),
]);
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])#>
transformer<Object>start<Function> 用户定义的函数,在创建TransformStream时立即调用。controller<TransformStreamDefaultController>- 返回:
undefined或一个已完成而值为undefined的 Promise
transform<Function> 一个用户自定义函数,它接收写入transformStream.writable的数据块,并可能对其进行修改,然后再将其转发到transformStream.readable。chunk<any>controller<TransformStreamDefaultController>- 返回:一个以
undefined解决的 Promise。
flush<Function> 用户定义的函数,在TransformStream的可写端关闭之前立即调用,用于标志转换过程的结束。controller<TransformStreamDefaultController>- 返回:一个以
undefined解决的 Promise。
readableType<any>readableType选项保留供将来使用,并且 必须 为undefined。writableType<any>writableType选项保留供将来使用,并且 必须 为undefined。
writableStrategy<Object>highWaterMark<number> 在施加反压之前的最大内部队列大小。size<Function> 一个用户定义的函数,用于确定每个数据块的大小。
readableStrategy<Object>highWaterMark<number> 在施加反压之前的最大内部队列大小。size<Function> 一个用户定义的函数,用于确定每个数据块的大小。
transformStream.readable#>
transformStream.writable#>
- 类型: <WritableStream>
使用 postMessage() 传输#>
【Transferring with postMessage()】
可以使用 <MessagePort> 转移 <TransformStream> 实例。
【A <TransformStream> instance can be transferred using a <MessagePort>.】
const stream = new TransformStream();
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
const { writable, readable } = data;
// ...
};
port2.postMessage(stream, [stream]);
类:TransformStreamDefaultController#>
【Class: TransformStreamDefaultController】
TransformStreamDefaultController 管理 TransformStream 的内部状态。
【The TransformStreamDefaultController manages the internal state
of the TransformStream.】
transformStreamDefaultController.desiredSize#>
- 类型:<number>
填充可读端队列所需的数据量。
【The amount of data required to fill the readable side's queue.】
transformStreamDefaultController.enqueue([chunk])#>
chunk<any>
将一大块数据附加到可读端的队列中。
【Appends a chunk of data to the readable side's queue.】
transformStreamDefaultController.error([reason])#>
reason<any>
向可读和可写两端发出信号,指示在处理转换数据时发生了错误,导致两端都被突然关闭。
【Signals to both the readable and writable side that an error has occurred while processing the transform data, causing both sides to be abruptly closed.】
transformStreamDefaultController.terminate()#>
关闭传输的可读端,并导致可写端因错误而被突然关闭。
【Closes the readable side of the transport and causes the writable side to be abruptly closed with an error.】
类:ByteLengthQueuingStrategy#>
【Class: ByteLengthQueuingStrategy】
new ByteLengthQueuingStrategy(init)#>
byteLengthQueuingStrategy.highWaterMark#>
- 类型:<number>
byteLengthQueuingStrategy.size#>
- 类型:<Function>
类:CountQueuingStrategy#>
【Class: CountQueuingStrategy】
new CountQueuingStrategy(init)#>
countQueuingStrategy.highWaterMark#>
- 类型:<number>
countQueuingStrategy.size#>
- 类型:<Function>
类:TextEncoderStream#>
【Class: TextEncoderStream】
new TextEncoderStream()#>
创建一个新的 TextEncoderStream 实例。
【Creates a new TextEncoderStream instance.】
textEncoderStream.encoding#>
- 类型:<string>
TextEncoderStream 实例支持的编码。
【The encoding supported by the TextEncoderStream instance.】
textEncoderStream.readable#>
textEncoderStream.writable#>
- 类型: <WritableStream>
类:TextDecoderStream#>
【Class: TextDecoderStream】
new TextDecoderStream([encoding[, options]])#>
创建一个新的 TextDecoderStream 实例。
【Creates a new TextDecoderStream instance.】
textDecoderStream.encoding#>
- 类型:<string>
TextDecoderStream 实例支持的编码。
【The encoding supported by the TextDecoderStream instance.】
textDecoderStream.fatal#>
- 类型:<boolean>
如果解码错误导致抛出 TypeError,该值将为 true。
【The value will be true if decoding errors result in a TypeError being
thrown.】
textDecoderStream.ignoreBOM#>
- 类型:<boolean>
如果解码结果将包含字节顺序标记,则该值将为 true。
【The value will be true if the decoding result will include the byte order
mark.】
textDecoderStream.readable#>
textDecoderStream.writable#>
- 类型: <WritableStream>
类:CompressionStream#>
【Class: CompressionStream】
new CompressionStream(format)#>
format<string> 可为'deflate'、'deflate-raw'、'gzip'或'brotli'之一。
compressionStream.readable#>
compressionStream.writable#>
- 类型: <WritableStream>
类:DecompressionStream#>
【Class: DecompressionStream】
new DecompressionStream(format)#>
format<string> 可为'deflate'、'deflate-raw'、'gzip'或'brotli'之一。
decompressionStream.readable#>
decompressionStream.writable#>
- 类型: <WritableStream>
实用工具消费者#>
【Utility Consumers】
实用程序消费者函数提供用于消费流的常用选项。
【The utility consumer functions provide common options for consuming streams.】
使用以下方式访问它们:
【They are accessed using:】
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';const {
arrayBuffer,
blob,
buffer,
json,
text,
} = require('node:stream/consumers');
streamConsumers.arrayBuffer(stream)#>
stream<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回:<Promise> 以包含流的完整内容的
ArrayBuffer形式完成。
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { TextEncoder } = require('node:util');
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76
});
streamConsumers.blob(stream)#>
stream<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回:<Promise> 将以包含流完整内容的 <Blob> 完成。
import { blob } from 'node:stream/consumers';
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27const { blob } = require('node:stream/consumers');
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
blob(readable).then((data) => {
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27
});
streamConsumers.buffer(stream)#>
stream<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回:<Promise> 以包含流的完整内容的 <Buffer> 成功完成。
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
});
streamConsumers.json(stream)#>
stream<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回:<Promise>,以解析为 UTF-8 编码字符串的流内容作为结果,然后通过
JSON.parse()处理。
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100
});
streamConsumers.text(stream)#>
stream<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回:<Promise> 使用解析为 UTF-8 编码字符串的流内容完成。
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
});