runoops.com

Node.js 多线程-Worker 线程

node:worker_threads 模块允许使用并行执行 JavaScript 的线程。使用下面这个方式引入:

const worker = require('node:worker_threads');

Worker 线程对于执行 CPU 密集型的 JavaScript 操作很有用。它们对 I/O 密集型的工作帮助不大。Node.js 内置的异步 I/O 操作比 Worker 线程更高效。

与 child_process 或 cluster 不同,worker_threads 可以共享内存。它们通过传输 ArrayBuffer 实例或共享 SharedArrayBuffer 实例来实现。

const {
  Worker, isMainThread, parentPort, workerData,
} = require('node:worker_threads');

if (isMainThread) {
  module.exports = function parseJSAsync(script) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, {
        workerData: script,
      });
      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0)
          reject(new Error(`Worker stopped with exit code ${code}`));
      });
    });
  };
} else {
  const { parse } = require('some-js-parsing-library');
  const script = workerData;
  parentPort.postMessage(parse(script));
}

上面的示例为每个 parseJSAsync() 调用生成一个 Worker 线程。在实践中,使用 Worker 池来完成这类任务。否则,创建 Workers 的开销可能会超过其收益。

worker_threads 一些重要的属性、方法和类。

worker_threads 主要属性

isMainThread

isMainThread用来判断代码是否在主线程中运行:

const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
    console.log('在主线程中');
  new Worker(__filename);
} else {
  console.log('在工作线程中');
  console.log(isMainThread);  // 打印 'false'。
}

parentPort

parentPort是一个MessagePort类型,parentPort主要用于worker线程和主线程进行消息交互。

通过parentPort.postMessage()发送的消息,在主线程中将可以通过worker.on('message')接收。

主线程中通过worker.postMessage()发送的消息,将可以在工作线程中通过parentPort.on('message')接收。

const { Worker, isMainThread, parentPort } = require('node:worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);
  // 接收worker 线程消息,并打印消息
  worker.on('message', (message1) => {
    console.log('收到了 worker 线程发来的消息');  
  });
  // 收到woker 消息后的回调
  worker.once('message', (message) => {
    console.log(message);  // 打印 'Hello, world!'.
  });
  worker.postMessage('Hello, world!');
} else {

  // 收到来自父线程的消息后,将其发回:
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}

SHARE_ENV

SHARE_ENV是传递给worker构造函数的一个env变量,通过设置这个变量,我们可以在主线程与工作线程进行共享环境变量的读写。

const { Worker, SHARE_ENV } = require('node:worker_threads');
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
  .on('exit', () => {
    console.log(process.env.SET_IN_WORKER);  // Prints 'foo'.
  });

workerData

除了postMessage(),还可以通过在主线程中传递workerData给worker的构造函数,从而将主线程中的数据传递给worker:

const { Worker, isMainThread, workerData } = require('worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename, { workerData: 'Hello, world!' });
} else {
  console.log(workerData);  // Prints 'Hello, world!'.
}

worker_threads 主要方法

receiveMessageOnPort

除了port的on('message')方法之外,我们还可以使用receiveMessageOnPort来手动接收消息:

const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined

moveMessagePortToContext

先了解一下nodejs中的Context的概念,我们可以从vm中创建context,它是一个隔离的上下文环境,从而保证不同运行环境的安全性,context 的例子:

const vm = require('vm');

const x = 1;

const context = { x: 2 };
vm.createContext(context); // 上下文隔离化对象。

const code = 'x += 40; var y = 17;';
// `x` and `y` 是上下文中的全局变量。
// 最初,x 的值为 2,因为这是 context.x 的值。
vm.runInContext(code, context);

console.log(context.x); // 42
console.log(context.y); // 17

console.log(x); // 1; y 没有定义。

在worker中,我们可以将一个MessagePort move到其他的context中。

worker.moveMessagePortToContext(port, contextifiedSandbox)

这个方法接收两个参数,第一个参数就是要move的MessagePort,第二个参数就是vm.createContext()创建的context对象。

worker_threads 主要类

MessageChannel

MessageChannel代表的是一个异步双向通信channel。MessageChannel中没有方法,主要通过MessageChannel来连接两端的MessagePort。

// MessageChannel 结构
class MessageChannel {
    readonly port1: MessagePort;
    readonly port2: MessagePort;
}  

当我们使用new MessageChannel()的时候,会自动创建两个MessagePort。

const { MessageChannel } = require('worker_threads');

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Prints: received { foo: 'bar' } from the `port1.on('message')` listener

通过MessageChannel,我们可以进行MessagePort间的通信。

MessagePort

MessagePort继承自EventEmitter,它表示的是异步双向通信channel的一端。这个channel就叫做MessageChannel,MessagePort通过MessageChannel来进行通信。

我们可以通过 MessagePort 在不同的 Worker 之间来传输结构体数据,内存区域或者其他的 MessagePorts。

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();

// Prints:
//   foobar
//   closed!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));

port1.postMessage('foobar');
port1.close();

Worker

Worker的定义:

    class Worker extends EventEmitter {
        readonly stdin: Writable | null;
        readonly stdout: Readable;
        readonly stderr: Readable;
        readonly threadId: number;
        readonly resourceLimits?: ResourceLimits;

        constructor(filename: string | URL, options?: WorkerOptions);

        postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
        ref(): void;
        unref(): void;

        terminate(): Promise<number>;

        getHeapSnapshot(): Promise<Readable>;

        addListener(event: "error", listener: (err: Error) => void): this;
        addListener(event: "exit", listener: (exitCode: number) => void): this;
        addListener(event: "message", listener: (value: any) => void): this;
        addListener(event: "online", listener: () => void): this;
        addListener(event: string | symbol, listener: (...args: any[]) => void): this;

       ... 
    }

worker继承自EventEmitter,并且包含了4个重要的事件:error,exit,message和online。

worker表示的是一个独立的 JavaScript 执行线程,我们可以通过传递filename或者URL来构造worker。

每一个worker都有一对内置的MessagePort,在worker创建的时候就会相互关联。worker使用这对内置的MessagePort来和父线程进行通信。

通过parentPort.postMessage()发送的消息在主线程中将可以通过worker.on('message')接收。

主线程中通过worker.postMessage()发送的消息将可以在工作线程中通过parentPort.on('message')接收。

当然,你也可以显式的创建MessageChannel 对象,然后将MessagePort作为消息传递给其他线程,例子:

const assert = require('assert');
const {
  Worker, MessageChannel, MessagePort, isMainThread, parentPort
} = require('worker_threads');
if (isMainThread) {
  const worker = new Worker(__filename);
  const subChannel = new MessageChannel();
  worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  subChannel.port2.on('message', (value) => {
    console.log('接收到:', value);
  });
} else {
  parentPort.once('message', (value) => {
    assert(value.hereIsYourPort instanceof MessagePort);
    value.hereIsYourPort.postMessage('工作线程正在发送此消息');
    value.hereIsYourPort.close();
  });
}

上面的例子中,我们借助了worker和parentPort本身的消息传递功能,传递了一个显式的MessageChannel中的MessagePort。

然后又通过该MessagePort来进行消息的分发。

Worker 的重要事件:

worker message 事件:

worker.on('message', (data) => {});

只要 worker 将数据发送到父线程,就会发出 message 事件。

worker error 事件:

worker.on('error', (error) => {});

只要 worker 中有未捕获的异常,就会发出 error 事件。然后终止 worker,错误可以作为提供的回调中的第一个参数。

worker exit 事件:

worker.on('exit', (exitCode) => {});

在 worker 退出时会发出 exit 事件。如果在worker中调用了 process.exit(),那么 exitCode 将被提供给回调。如果 worker 以 worker.terminate() 终止,则代码为1。

worker online 事件:

worker.on('online', () => {});

只要 worker 停止解析 JavaScript 代码并开始执行,就会发出 online 事件。它不常用,但在特定情况下可以提供信息。

实例

利用 Worker threads 解析Excel 文件:https://github.com/liqiang88/nodejs-worker-threads-sample

Captcha Code

0 笔记

分享笔记

Inline Feedbacks
View all notes