node:worker_threads 模块允许使用并行执行 JavaScript 的线程。使用下面这个方式引入:
const worker = require('node:worker_threads');
Worker 线程对于执行 CPU 密集型的 JavaScript 操作很有用。它们对 I/O 密集型的工作帮助不大。Node.js 内置的异步 I/O 操作比 Worker 线程更高效。
与 child_process 或 cluster 不同,worker_threads 可以共享内存。它们通过传输 ArrayBuffer 实例或共享 SharedArrayBuffer 实例来实现。
const {
Worker, isMainThread, parentPort, workerData,
} = require('node:worker_threads');
if (isMainThread) {
module.exports = function parseJSAsync(script) {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename, {
workerData: script,
});
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`));
});
});
};
} else {
const { parse } = require('some-js-parsing-library');
const script = workerData;
parentPort.postMessage(parse(script));
}
上面的示例为每个 parseJSAsync() 调用生成一个 Worker 线程。在实践中,使用 Worker 池来完成这类任务。否则,创建 Workers 的开销可能会超过其收益。
worker_threads 一些重要的属性、方法和类。
worker_threads 主要属性
isMainThread
isMainThread用来判断代码是否在主线程中运行:
const { Worker, isMainThread } = require('worker_threads');
if (isMainThread) {
console.log('在主线程中');
new Worker(__filename);
} else {
console.log('在工作线程中');
console.log(isMainThread); // 打印 'false'。
}
parentPort
parentPort是一个MessagePort类型,parentPort主要用于worker线程和主线程进行消息交互。
通过parentPort.postMessage()发送的消息,在主线程中将可以通过worker.on('message')接收。
主线程中通过worker.postMessage()发送的消息,将可以在工作线程中通过parentPort.on('message')接收。
const { Worker, isMainThread, parentPort } = require('node:worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
// 接收worker 线程消息,并打印消息
worker.on('message', (message1) => {
console.log('收到了 worker 线程发来的消息');
});
// 收到woker 消息后的回调
worker.once('message', (message) => {
console.log(message); // 打印 'Hello, world!'.
});
worker.postMessage('Hello, world!');
} else {
// 收到来自父线程的消息后,将其发回:
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
}
SHARE_ENV
SHARE_ENV是传递给worker构造函数的一个env变量,通过设置这个变量,我们可以在主线程与工作线程进行共享环境变量的读写。
const { Worker, SHARE_ENV } = require('node:worker_threads');
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
.on('exit', () => {
console.log(process.env.SET_IN_WORKER); // Prints 'foo'.
});
workerData
除了postMessage(),还可以通过在主线程中传递workerData给worker的构造函数,从而将主线程中的数据传递给worker:
const { Worker, isMainThread, workerData } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename, { workerData: 'Hello, world!' });
} else {
console.log(workerData); // Prints 'Hello, world!'.
}
worker_threads 主要方法
receiveMessageOnPort
除了port的on('message')方法之外,我们还可以使用receiveMessageOnPort来手动接收消息:
const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });
console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined
moveMessagePortToContext
先了解一下nodejs中的Context的概念,我们可以从vm中创建context,它是一个隔离的上下文环境,从而保证不同运行环境的安全性,context 的例子:
const vm = require('vm');
const x = 1;
const context = { x: 2 };
vm.createContext(context); // 上下文隔离化对象。
const code = 'x += 40; var y = 17;';
// `x` and `y` 是上下文中的全局变量。
// 最初,x 的值为 2,因为这是 context.x 的值。
vm.runInContext(code, context);
console.log(context.x); // 42
console.log(context.y); // 17
console.log(x); // 1; y 没有定义。
在worker中,我们可以将一个MessagePort move到其他的context中。
worker.moveMessagePortToContext(port, contextifiedSandbox)
这个方法接收两个参数,第一个参数就是要move的MessagePort,第二个参数就是vm.createContext()创建的context对象。
worker_threads 主要类
MessageChannel
MessageChannel代表的是一个异步双向通信channel。MessageChannel中没有方法,主要通过MessageChannel来连接两端的MessagePort。
// MessageChannel 结构
class MessageChannel {
readonly port1: MessagePort;
readonly port2: MessagePort;
}
当我们使用new MessageChannel()的时候,会自动创建两个MessagePort。
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Prints: received { foo: 'bar' } from the `port1.on('message')` listener
通过MessageChannel,我们可以进行MessagePort间的通信。
MessagePort
MessagePort继承自EventEmitter,它表示的是异步双向通信channel的一端。这个channel就叫做MessageChannel,MessagePort通过MessageChannel来进行通信。
我们可以通过 MessagePort 在不同的 Worker 之间来传输结构体数据,内存区域或者其他的 MessagePorts。
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
// Prints:
// foobar
// closed!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));
port1.postMessage('foobar');
port1.close();
Worker
Worker的定义:
class Worker extends EventEmitter {
readonly stdin: Writable | null;
readonly stdout: Readable;
readonly stderr: Readable;
readonly threadId: number;
readonly resourceLimits?: ResourceLimits;
constructor(filename: string | URL, options?: WorkerOptions);
postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
ref(): void;
unref(): void;
terminate(): Promise<number>;
getHeapSnapshot(): Promise<Readable>;
addListener(event: "error", listener: (err: Error) => void): this;
addListener(event: "exit", listener: (exitCode: number) => void): this;
addListener(event: "message", listener: (value: any) => void): this;
addListener(event: "online", listener: () => void): this;
addListener(event: string | symbol, listener: (...args: any[]) => void): this;
...
}
worker继承自EventEmitter,并且包含了4个重要的事件:error,exit,message和online。
worker表示的是一个独立的 JavaScript 执行线程,我们可以通过传递filename或者URL来构造worker。
每一个worker都有一对内置的MessagePort,在worker创建的时候就会相互关联。worker使用这对内置的MessagePort来和父线程进行通信。
通过parentPort.postMessage()发送的消息在主线程中将可以通过worker.on('message')接收。
主线程中通过worker.postMessage()发送的消息将可以在工作线程中通过parentPort.on('message')接收。
当然,你也可以显式的创建MessageChannel 对象,然后将MessagePort作为消息传递给其他线程,例子:
const assert = require('assert');
const {
Worker, MessageChannel, MessagePort, isMainThread, parentPort
} = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
const subChannel = new MessageChannel();
worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
subChannel.port2.on('message', (value) => {
console.log('接收到:', value);
});
} else {
parentPort.once('message', (value) => {
assert(value.hereIsYourPort instanceof MessagePort);
value.hereIsYourPort.postMessage('工作线程正在发送此消息');
value.hereIsYourPort.close();
});
}
上面的例子中,我们借助了worker和parentPort本身的消息传递功能,传递了一个显式的MessageChannel中的MessagePort。
然后又通过该MessagePort来进行消息的分发。
Worker 的重要事件:
worker message 事件:
worker.on('message', (data) => {});
只要 worker 将数据发送到父线程,就会发出 message 事件。
worker error 事件:
worker.on('error', (error) => {});
只要 worker 中有未捕获的异常,就会发出 error 事件。然后终止 worker,错误可以作为提供的回调中的第一个参数。
worker exit 事件:
worker.on('exit', (exitCode) => {});
在 worker 退出时会发出 exit 事件。如果在worker中调用了 process.exit(),那么 exitCode 将被提供给回调。如果 worker 以 worker.terminate() 终止,则代码为1。
worker online 事件:
worker.on('online', () => {});
只要 worker 停止解析 JavaScript 代码并开始执行,就会发出 online 事件。它不常用,但在特定情况下可以提供信息。
实例
利用 Worker threads 解析Excel 文件:https://github.com/liqiang88/nodejs-worker-threads-sample
分享笔记