- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用 Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- Corepack 核心包
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议 2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS 模块
- module/esm ECMAScript 模块
- module/package 包模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- stream 流
- stream/web 网络流
- string_decoder 字符串解码器
- test 测试
- timers 定时器
- tls 安全传输层
- trace_events 跟踪事件
- tty 终端
- url 网址
- util 实用工具
- v8 引擎
- vm 虚拟机
- wasi 网络汇编系统接口
- worker_threads 工作线程
- zlib 压缩
Node.js v16.20.2 文档
- Node.js v16.20.2
-
目录
- 异步上下文跟踪
- 介绍
- 类:
AsyncLocalStorage - 类:
AsyncResourcenew AsyncResource(type[, options])- 静态方法:
AsyncResource.bind(fn[, type[, thisArg]]) asyncResource.bind(fn[, thisArg])asyncResource.runInAsyncScope(fn[, thisArg, ...args])asyncResource.emitDestroy()asyncResource.asyncId()asyncResource.triggerAsyncId()- 在
Worker线程池中使用AsyncResource - 将
AsyncResource与EventEmitter集成
- 异步上下文跟踪
-
导航
- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用 Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- Corepack 核心包
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议 2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS 模块
- module/esm ECMAScript 模块
- module/package 包模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- stream 流
- stream/web 网络流
- string_decoder 字符串解码器
- test 测试
- timers 定时器
- tls 安全传输层
- trace_events 跟踪事件
- tty 终端
- url 网址
- util 实用工具
- v8 引擎
- vm 虚拟机
- wasi 网络汇编系统接口
- worker_threads 工作线程
- zlib 压缩
- 其他版本
异步上下文跟踪#>
【Asynchronous context tracking】
源代码: lib/async_hooks.js
介绍#>
【Introduction】
这些类用于关联状态并在回调和 Promise 链中传播它。它们允许在 Web 请求的整个生命周期或任何其他异步期间存储数据。这类似于其他语言中的线程本地存储。
【These classes are used to associate state and propagate it throughout callbacks and promise chains. They allow storing data throughout the lifetime of a web request or any other asynchronous duration. It is similar to thread-local storage in other languages.】
AsyncLocalStorage 和 AsyncResource 类是 node:async_hooks 模块的一部分:
【The AsyncLocalStorage and AsyncResource classes are part of the
node:async_hooks module:】
import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks';const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks');
类:AsyncLocalStorage#>
【Class: AsyncLocalStorage】
此类创建通过异步操作保持一致的存储。
【This class creates stores that stay coherent through asynchronous operations.】
虽然你可以在 node:async_hooks 模块的基础上创建自己的实现,但应该优先使用 AsyncLocalStorage,因为它是一个高性能且内存安全的实现,包含许多不易实现的优化。
【While you can create your own implementation on top of the node:async_hooks
module, AsyncLocalStorage should be preferred as it is a performant and memory
safe implementation that involves significant optimizations that are non-obvious
to implement.】
以下示例使用 AsyncLocalStorage 构建一个简单的日志记录器,为传入的 HTTP 请求分配 ID,并将其包含在每个请求中记录的消息中。
【The following example uses AsyncLocalStorage to build a simple logger
that assigns IDs to incoming HTTP requests and includes them in messages
logged within each request.】
import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finishconst http = require('node:http');
const { AsyncLocalStorage } = require('node:async_hooks');
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
每个 AsyncLocalStorage 实例都维护一个独立的存储上下文。多个实例可以同时存在而不会互相干扰彼此的数据。
【Each instance of AsyncLocalStorage maintains an independent storage context.
Multiple instances can safely exist simultaneously without risk of interfering
with each other's data.】
new AsyncLocalStorage()#>
创建一个新的 AsyncLocalStorage 实例。存储仅在 run() 调用中或在 enterWith() 调用之后提供。
【Creates a new instance of AsyncLocalStorage. Store is only provided within a
run() call or after an enterWith() call.】
asyncLocalStorage.disable()#>
禁用 AsyncLocalStorage 的实例。之后对 asyncLocalStorage.getStore() 的所有调用都会返回 undefined,直到再次调用 asyncLocalStorage.run() 或 asyncLocalStorage.enterWith()。
【Disables the instance of AsyncLocalStorage. All subsequent calls
to asyncLocalStorage.getStore() will return undefined until
asyncLocalStorage.run() or asyncLocalStorage.enterWith() is called again.】
调用 asyncLocalStorage.disable() 时,与该实例关联的所有当前上下文将被退出。
【When calling asyncLocalStorage.disable(), all current contexts linked to the
instance will be exited.】
在 asyncLocalStorage 可以被垃圾回收之前,必须调用 asyncLocalStorage.disable()。这不适用于 asyncLocalStorage 提供的存储,因为这些对象会随着相应的异步资源一起被垃圾回收。
【Calling asyncLocalStorage.disable() is required before the
asyncLocalStorage can be garbage collected. This does not apply to stores
provided by the asyncLocalStorage, as those objects are garbage collected
along with the corresponding async resources.】
当当前进程中不再使用 asyncLocalStorage 时,请使用此方法。
【Use this method when the asyncLocalStorage is not in use anymore
in the current process.】
asyncLocalStorage.getStore()#>
- 返回:<any>
返回当前存储。如果在未通过调用 asyncLocalStorage.run() 或 asyncLocalStorage.enterWith() 初始化的异步上下文之外调用,它将返回 undefined。
【Returns the current store.
If called outside of an asynchronous context initialized by
calling asyncLocalStorage.run() or asyncLocalStorage.enterWith(), it
returns undefined.】
asyncLocalStorage.enterWith(store)#>
store<any>
在当前同步执行的其余部分中切换到上下文,然后在随后的任何异步调用中持久化存储。
【Transitions into the context for the remainder of the current synchronous execution and then persists the store through any following asynchronous calls.】
示例:
【Example:】
const store = { id: 1 };
// Replaces previous store with the given store object
asyncLocalStorage.enterWith(store);
asyncLocalStorage.getStore(); // Returns the store object
someAsyncOperation(() => {
asyncLocalStorage.getStore(); // Returns the same object
});
这种过渡将持续整个同步执行过程。这意味着,例如,如果上下文在事件处理程序中被进入,后续的事件处理程序也将在该上下文中运行,除非使用 AsyncResource 明确绑定到另一个上下文。这就是为什么除非有强烈理由使用后者方法,否则应优先使用 run() 而不是 enterWith()。
【This transition will continue for the entire synchronous execution.
This means that if, for example, the context is entered within an event
handler subsequent event handlers will also run within that context unless
specifically bound to another context with an AsyncResource. That is why
run() should be preferred over enterWith() unless there are strong reasons
to use the latter method.】
const store = { id: 1 };
emitter.on('my-event', () => {
asyncLocalStorage.enterWith(store);
});
emitter.on('my-event', () => {
asyncLocalStorage.getStore(); // Returns the same object
});
asyncLocalStorage.getStore(); // Returns undefined
emitter.emit('my-event');
asyncLocalStorage.getStore(); // Returns the same object
asyncLocalStorage.run(store, callback[, ...args])#>
store<any>callback<Function>...args<any>
在上下文中同步运行一个函数并返回其返回值。存储在回调函数外不可访问。存储可以被回调函数内创建的任何异步操作访问。
【Runs a function synchronously within a context and returns its return value. The store is not accessible outside of the callback function. The store is accessible to any asynchronous operations created within the callback.】
可选的 args 会传递给回调函数。
【The optional args are passed to the callback function.】
如果回调函数抛出错误,run() 也会抛出该错误。堆栈跟踪不会受到此调用的影响,并且上下文会被退出。
【If the callback function throws an error, the error is thrown by run() too.
The stacktrace is not impacted by this call and the context is exited.】
示例:
【Example:】
const store = { id: 2 };
try {
asyncLocalStorage.run(store, () => {
asyncLocalStorage.getStore(); // Returns the store object
setTimeout(() => {
asyncLocalStorage.getStore(); // Returns the store object
}, 200);
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns undefined
// The error will be caught here
}
asyncLocalStorage.exit(callback[, ...args])#>
callback<Function>...args<any>
在上下文之外同步运行一个函数并返回其返回值。回调函数内或回调函数内创建的异步操作中无法访问存储。回调函数中进行的任何 getStore() 调用都将始终返回 undefined。
【Runs a function synchronously outside of a context and returns its
return value. The store is not accessible within the callback function or
the asynchronous operations created within the callback. Any getStore()
call done within the callback function will always return undefined.】
可选的 args 会传递给回调函数。
【The optional args are passed to the callback function.】
如果回调函数抛出错误,exit() 也会抛出该错误。栈跟踪不会受到此调用的影响,并且上下文会被重新进入。
【If the callback function throws an error, the error is thrown by exit() too.
The stacktrace is not impacted by this call and the context is re-entered.】
示例:
【Example:】
// Within a call to run
try {
asyncLocalStorage.getStore(); // Returns the store object or value
asyncLocalStorage.exit(() => {
asyncLocalStorage.getStore(); // Returns undefined
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns the same object or value
// The error will be caught here
}
async/await 的使用#>
【Usage with async/await】
如果在异步函数中,只有一个 await 调用需要在某个上下文中运行,应使用以下模式:
【If, within an async function, only one await call is to run within a context,
the following pattern should be used:】
async function fn() {
await asyncLocalStorage.run(new Map(), () => {
asyncLocalStorage.getStore().set('key', value);
return foo(); // The return value of foo will be awaited
});
}
在这个例子中,该存储仅在回调函数以及 foo 调用的函数中可用。在 run 外部,调用 getStore 将返回 undefined。
【In this example, the store is only available in the callback function and the
functions called by foo. Outside of run, calling getStore will return
undefined.】
故障排除:上下文丢失#>
【Troubleshooting: Context loss】
在大多数情况下,AsyncLocalStorage 都能正常工作。在少数情况下,当前存储会在某个异步操作中丢失。
【In most cases, AsyncLocalStorage works without issues. In rare situations, the
current store is lost in one of the asynchronous operations.】
如果你的代码是基于回调的,只需使用 util.promisify() 将其转换为 Promise,就可以开始使用原生 Promise。
【If your code is callback-based, it is enough to promisify it with
util.promisify() so it starts working with native promises.】
如果你需要使用基于回调的 API,或者你的代码假设有自定义的 thenable 实现,请使用 AsyncResource 类将异步操作与正确的执行上下文关联起来。通过在你怀疑导致上下文丢失的调用之后记录 asyncLocalStorage.getStore() 的内容,找到导致上下文丢失的函数调用。当代码记录为 undefined 时,最后调用的回调很可能是导致上下文丢失的原因。
【If you need to use a callback-based API or your code assumes
a custom thenable implementation, use the AsyncResource class
to associate the asynchronous operation with the correct execution context.
Find the function call responsible for the context loss by logging the content
of asyncLocalStorage.getStore() after the calls you suspect are responsible
for the loss. When the code logs undefined, the last callback called is
probably responsible for the context loss.】
类:AsyncResource#>
【Class: AsyncResource】
AsyncResource 类被设计为可以被嵌入者的异步资源扩展。使用它,用户可以轻松触发自己资源的生命周期事件。
【The class AsyncResource is designed to be extended by the embedder's async
resources. Using this, users can easily trigger the lifetime events of their
own resources.】
init 钩子将在实例化 AsyncResource 时触发。
【The init hook will trigger when an AsyncResource is instantiated.】
以下是 AsyncResource API 的概述。
【The following is an overview of the AsyncResource API.】
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false }
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();const { AsyncResource, executionAsyncId } = require('node:async_hooks');
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false }
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
new AsyncResource(type[, options])#>
用法示例:
【Example usage:】
class DBQuery extends AsyncResource {
constructor(db) {
super('DBQuery');
this.db = db;
}
getInfo(query, callback) {
this.db.get(query, (err, data) => {
this.runInAsyncScope(callback, null, err, data);
});
}
close() {
this.db = null;
this.emitDestroy();
}
}
静态方法:AsyncResource.bind(fn[, type[, thisArg]])#>
【Static method: AsyncResource.bind(fn[, type[, thisArg]])】
fn<Function> 绑定到当前执行上下文的函数。type<string> 可选名称,用于与底层的AsyncResource关联。thisArg<any>
将给定函数绑定到当前执行上下文。
【Binds the given function to the current execution context.】
返回的函数将具有一个 asyncResource 属性,该属性引用函数所绑定的 AsyncResource。
【The returned function will have an asyncResource property referencing
the AsyncResource to which the function is bound.】
asyncResource.bind(fn[, thisArg])#>
fn<Function> 绑定到当前AsyncResource的函数。thisArg<any>
将给定的函数绑定到此 AsyncResource 的作用域中执行。
【Binds the given function to execute to this AsyncResource's scope.】
返回的函数将具有一个 asyncResource 属性,该属性引用函数所绑定的 AsyncResource。
【The returned function will have an asyncResource property referencing
the AsyncResource to which the function is bound.】
asyncResource.runInAsyncScope(fn[, thisArg, ...args])#>
fn<Function> 在此异步资源的执行上下文中调用的函数。thisArg<any> 用于函数调用的接收对象。...args<any> 可选参数,传递给函数。
在异步资源的执行上下文中使用提供的参数调用提供的函数。这将建立上下文,触发 AsyncHooks 的回调前钩子,调用函数,触发 AsyncHooks 的回调后钩子,然后恢复原始的执行上下文。
【Call the provided function with the provided arguments in the execution context of the async resource. This will establish the context, trigger the AsyncHooks before callbacks, call the function, trigger the AsyncHooks after callbacks, and then restore the original execution context.】
asyncResource.emitDestroy()#>
- 返回:<AsyncResource> 对
asyncResource的引用。
调用所有 destroy 钩子。此方法应该只被调用一次。如果被调用多次,将会抛出错误。此方法 必须 手动调用。如果资源被留给垃圾回收器回收,则 destroy 钩子将永远不会被调用。
【Call all destroy hooks. This should only ever be called once. An error will
be thrown if it is called more than once. This must be manually called. If
the resource is left to be collected by the GC then the destroy hooks will
never be called.】
asyncResource.asyncId()#>
- 返回:<number> 分配给该资源的唯一
asyncId。
asyncResource.triggerAsyncId()#>
- 返回值:<number> 与传递给
AsyncResource构造函数的triggerAsyncId相同。
在 Worker 线程池中使用 AsyncResource#>
【Using AsyncResource for a Worker thread pool】
以下示例演示了如何使用 AsyncResource 类为 Worker 池正确提供异步跟踪。其他资源池,如数据库连接池,也可以遵循类似的模型。
【The following example shows how to use the AsyncResource class to properly
provide async tracking for a Worker pool. Other resource pools, such as
database connection pools, can follow a similar model.】
假设任务是将两个数字相加,使用名为 task_processor.js 的文件,其内容如下:
【Assuming that the task is adding two numbers, using a file named
task_processor.js with the following content:】
import { parentPort } from 'node:worker_threads';
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});const { parentPort } = require('node:worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
它周围的工作池可以使用以下结构:
【a Worker pool around it could use the following structure:】
import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import path from 'node:path';
import { Worker } from 'node:worker_threads';
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
export default class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(new URL('task_processer.js', import.meta.url));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}const { AsyncResource } = require('node:async_hooks');
const { EventEmitter } = require('node:events');
const path = require('node:path');
const { Worker } = require('node:worker_threads');
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
module.exports = WorkerPool;
如果没有 WorkerPoolTaskInfo 对象添加的显式跟踪,看起来回调是与各个 Worker 对象相关联的。然而,Worker 的创建并不是与任务的创建相关联的,并且不能提供任务何时被调度的信息。
【Without the explicit tracking added by the WorkerPoolTaskInfo objects,
it would appear that the callbacks are associated with the individual Worker
objects. However, the creation of the Workers is not associated with the
creation of the tasks and does not provide information about when tasks
were scheduled.】
该池可以按如下方式使用:
【This pool could be used as follows:】
import WorkerPool from './worker_pool.js';
import os from 'node:os';
const pool = new WorkerPool(os.cpus().length);
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}const WorkerPool = require('./worker_pool.js');
const os = require('node:os');
const pool = new WorkerPool(os.cpus().length);
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
将 AsyncResource 与 EventEmitter 集成#>
【Integrating AsyncResource with EventEmitter】
由 EventEmitter 触发的事件监听器可能在与调用 eventEmitter.on() 时不同的执行上下文中运行。
【Event listeners triggered by an EventEmitter may be run in a different
execution context than the one that was active when eventEmitter.on() was
called.】
以下示例演示了如何使用 AsyncResource 类将事件监听器正确地关联到相应的执行上下文。相同的方法也可以应用于 Stream 或类似的事件驱动类。
【The following example shows how to use the AsyncResource class to properly
associate an event listener with the correct execution context. The same
approach can be applied to a Stream or a similar event-driven class.】
import { createServer } from 'node:http';
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);const { createServer } = require('node:http');
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);