Перейти к содержанию

🚀 Где задеплоить проект?

Партнёрская ссылка. Дата-центры в РФ, оплата картой.

Потоки worker (worker_threads)

Стабильность: 2 – Стабильная

АПИ является удовлетворительным. Совместимость с NPM имеет высший приоритет и не будет нарушена кроме случаев явной необходимости.

Модуль node:worker_threads позволяет использовать потоки, в которых выполняется JavaScript параллельно. Подключение:

1
import worker_threads from 'node:worker_threads';
1
2
3
'use strict';

const worker_threads = require('node:worker_threads');

Потоки worker полезны для вычислительно тяжёлого кода на JavaScript. Для I/O-нагрузки они мало что дают: встроенные в Node.js асинхронные операции ввода-вывода обычно эффективнее.

В отличие от child_process или cluster, worker_threads могут разделять память — передавая экземпляры ArrayBuffer или используя общий SharedArrayBuffer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import {
  Worker,
  isMainThread,
  parentPort,
  workerData,
} from 'node:worker_threads';

if (!isMainThread) {
  const { parse } = await import('some-js-parsing-library');
  const script = workerData;
  parentPort.postMessage(parse(script));
}

export default function parseJSAsync(script) {
  return new Promise((resolve, reject) => {
    const worker = new Worker(new URL(import.meta.url), {
      workerData: script,
    });
    worker.on('message', resolve);
    worker.once('error', reject);
    worker.once('exit', (code) => {
      if (code !== 0)
        reject(new Error(`Worker stopped with exit code ${code}`));
    });
  });
};
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
'use strict';

const {
  Worker,
  isMainThread,
  parentPort,
  workerData,
} = require('node:worker_threads');

if (isMainThread) {
  module.exports = function parseJSAsync(script) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, {
        workerData: script,
      });
      worker.on('message', resolve);
      worker.once('error', reject);
      worker.once('exit', (code) => {
        if (code !== 0)
          reject(new Error(`Worker stopped with exit code ${code}`));
      });
    });
  };
} else {
  const { parse } = require('some-js-parsing-library');
  const script = workerData;
  parentPort.postMessage(parse(script));
}

В примере на каждый вызов parseJSAsync() создаётся новый поток Worker. На практике для таких задач используйте пул worker’ов — иначе накладные расходы на создание потоков могут перевесить пользу.

Реализуя пул, применяйте API AsyncResource, чтобы диагностические инструменты (например трассировки асинхронного стека) видели связь задач с результатами. Пример — в разделе «Использование AsyncResource для пула потоков Worker» документации async_hooks.

Потоки worker по умолчанию наследуют опции, не зависящие от процесса. См. параметры конструктора Worker — настройка argv и execArgv.

worker_threads.getEnvironmentData(key)

  • key <any> Произвольное клонируемое значение JavaScript, пригодное в качестве ключа Map.
  • Возвращает: <any>

В потоке worker worker.getEnvironmentData() возвращает клон данных, переданных через worker.setEnvironmentData() породившего потока. Каждый новый Worker автоматически получает свою копию этих данных.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import {
  Worker,
  isMainThread,
  setEnvironmentData,
  getEnvironmentData,
} from 'node:worker_threads';

if (isMainThread) {
  setEnvironmentData('Hello', 'World!');
  const worker = new Worker(new URL(import.meta.url));
} else {
  console.log(getEnvironmentData('Hello'));  // Выводит 'World!'.
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
'use strict';

const {
  Worker,
  isMainThread,
  setEnvironmentData,
  getEnvironmentData,
} = require('node:worker_threads');

if (isMainThread) {
  setEnvironmentData('Hello', 'World!');
  const worker = new Worker(__filename);
} else {
  console.log(getEnvironmentData('Hello'));  // Выводит 'World!'.
}

worker_threads.isInternalThread

true, если этот код выполняется во внутреннем потоке Worker (например, в потоке загрузчика).

1
node --experimental-loader ./loader.js main.js
1
2
3
// файл loader.js
import { isInternalThread } from 'node:worker_threads';
console.log(isInternalThread);  // выводит true
1
2
3
4
5
// файл loader.js
'use strict';

const { isInternalThread } = require('node:worker_threads');
console.log(isInternalThread);  // выводит true

1
2
3
// файл main.js
import { isInternalThread } from 'node:worker_threads';
console.log(isInternalThread);  // выводит false
1
2
3
4
5
// файл main.js
'use strict';

const { isInternalThread } = require('node:worker_threads');
console.log(isInternalThread);  // выводит false

worker_threads.isMainThread

true, если этот код выполняется не в потоке Worker.

1
2
3
4
5
6
7
8
9
import { Worker, isMainThread } from 'node:worker_threads';

if (isMainThread) {
  // Снова загружает текущий файл внутри экземпляра Worker.
  new Worker(new URL(import.meta.url));
} else {
  console.log('Inside Worker!');
  console.log(isMainThread);  // Выводит 'false'.
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
'use strict';

const { Worker, isMainThread } = require('node:worker_threads');

if (isMainThread) {
  // Снова загружает текущий файл внутри экземпляра Worker.
  new Worker(__filename);
} else {
  console.log('Inside Worker!');
  console.log(isMainThread);  // Выводит 'false'.
}

worker_threads.markAsUntransferable(object)

  • object <any> Произвольное значение JavaScript.

Помечает объект как непередаваемый. Если object попадает в список передачи вызова port.postMessage(), выбрасывается ошибка. Для примитивных значений object ничего не делает.

Это уместно для объектов, которые клонируются, а не передаются, и используются другими объектами на стороне отправителя. Например, Node.js помечает так ArrayBuffer, используемые для Buffer pool. ArrayBuffer.prototype.transfer() для таких буферов запрещён.

Эту операцию нельзя отменить.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { MessageChannel, markAsUntransferable } from 'node:worker_threads';

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
try {
  // Здесь будет ошибка: pooledBuffer нельзя передать (transfer).
  port1.postMessage(typedArray1, [ typedArray1.buffer ]);
} catch (error) {
  // у ошибки: error.name === 'DataCloneError'
}

// Следующая строка выводит содержимое typedArray1 — буфер по-прежнему
// принадлежит ему и не был передан. Без markAsUntransferable() здесь
// вывелся бы пустой Uint8Array, а вызов postMessage прошёл бы успешно.
// typedArray2 тоже остаётся в порядке.
console.log(typedArray1);
console.log(typedArray2);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
'use strict';

const { MessageChannel, markAsUntransferable } = require('node:worker_threads');

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
try {
  // Здесь будет ошибка: pooledBuffer нельзя передать (transfer).
  port1.postMessage(typedArray1, [ typedArray1.buffer ]);
} catch (error) {
  // у ошибки: error.name === 'DataCloneError'
}

// Следующая строка выводит содержимое typedArray1 — буфер по-прежнему
// принадлежит ему и не был передан. Без markAsUntransferable() здесь
// вывелся бы пустой Uint8Array, а вызов postMessage прошёл бы успешно.
// typedArray2 тоже остаётся в порядке.
console.log(typedArray1);
console.log(typedArray2);

В браузерах аналога этого API нет.

worker_threads.isMarkedAsUntransferable(object)

  • object <any> Любое значение JavaScript.
  • Возвращает: <boolean>

Проверяет, помечен ли объект как непередаваемый через markAsUntransferable().

1
2
3
4
5
6
import { markAsUntransferable, isMarkedAsUntransferable } from 'node:worker_threads';

const pooledBuffer = new ArrayBuffer(8);
markAsUntransferable(pooledBuffer);

isMarkedAsUntransferable(pooledBuffer);  // возвращает true.
1
2
3
4
5
6
7
8
'use strict';

const { markAsUntransferable, isMarkedAsUntransferable } = require('node:worker_threads');

const pooledBuffer = new ArrayBuffer(8);
markAsUntransferable(pooledBuffer);

isMarkedAsUntransferable(pooledBuffer);  // возвращает true.

В браузерах аналога этого API нет.

worker_threads.markAsUncloneable(object)

  • object <any> Произвольное значение JavaScript.

Помечает объект как неклонируемый. Если object используется как message в вызове port.postMessage(), выбрасывается ошибка. Для примитивных значений object ничего не делает.

Это не действует на ArrayBuffer и объекты, похожие на Buffer.

Эту операцию нельзя отменить.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { markAsUncloneable } from 'node:worker_threads';

const anyObject = { foo: 'bar' };
markAsUncloneable(anyObject);
const { port1 } = new MessageChannel();
try {
  // Здесь будет ошибка: anyObject нельзя клонировать.
  port1.postMessage(anyObject);
} catch (error) {
  // у ошибки: error.name === 'DataCloneError'
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
'use strict';

const { markAsUncloneable } = require('node:worker_threads');

const anyObject = { foo: 'bar' };
markAsUncloneable(anyObject);
const { port1 } = new MessageChannel();
try {
  // Здесь будет ошибка: anyObject нельзя клонировать.
  port1.postMessage(anyObject);
} catch (error) {
  // у ошибки: error.name === 'DataCloneError'
}

В браузерах аналога этого API нет.

worker_threads.moveMessagePortToContext(port, contextifiedSandbox)

Передаёт MessagePort в другой vm Context. Исходный объект port становится непригодным, на его месте используется возвращённый MessagePort.

Возвращённый MessagePort — объект в целевом контексте и наследует глобальный класс Object. Объекты, передаваемые в слушатель port.onmessage(), тоже создаются в целевом контексте и наследуют его глобальный класс Object.

Однако созданный MessagePort больше не наследует EventTarget, для получения событий можно использовать только port.onmessage().

worker_threads.parentPort

Если этот поток — Worker, это MessagePort для связи с родительским потоком. Сообщения, отправленные через parentPort.postMessage(), доступны родителю в worker.on('message'), а сообщения от родителя через worker.postMessage() — в этом потоке в parentPort.on('message').

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import { Worker, isMainThread, parentPort } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  worker.once('message', (message) => {
    console.log(message);  // Выводит 'Hello, world!'.
  });
  worker.postMessage('Hello, world!');
} else {
  // При сообщении от родительского потока отправляем его обратно:
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
'use strict';

const { Worker, isMainThread, parentPort } = require('node:worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);
  worker.once('message', (message) => {
    console.log(message);  // Выводит 'Hello, world!'.
  });
  worker.postMessage('Hello, world!');
} else {
  // При сообщении от родительского потока отправляем его обратно:
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}

worker_threads.postMessageToThread(threadId, value[, transferList][, timeout])

Стабильность: 1.1 – Активная разработка

  • threadId <number> ID целевого потока. Если ID недействителен, выбрасывается ERR_WORKER_MESSAGING_FAILED. Если ID совпадает с текущим потоком, выбрасывается ERR_WORKER_MESSAGING_SAME_THREAD.
  • value <any> Отправляемое значение.
  • transferList <Object[]> Если в value передаются один или несколько объектов, похожих на MessagePort, для них нужен transferList, иначе выбрасывается ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST. См. port.postMessage().
  • timeout <number> Ожидание доставки сообщения в миллисекундах. По умолчанию: undefined — ждать бесконечно. При таймауте выбрасывается ERR_WORKER_MESSAGING_TIMEOUT.
  • Возвращает: <Promise> промис выполняется, если целевой поток успешно обработал сообщение.

Отправляет значение другому worker, определяемому по ID потока.

Если в целевом потоке нет слушателя события workerMessage, выбрасывается ERR_WORKER_MESSAGING_FAILED.

Если целевой поток выбросил ошибку при обработке workerMessage, выбрасывается ERR_WORKER_MESSAGING_ERRORED.

Этот метод нужен, когда целевой поток не является прямым родителем или дочерним для текущего. Если потоки в отношении родитель–дочерний, используйте require('node:worker_threads').parentPort.postMessage() и worker.postMessage().

Ниже пример postMessageToThread: создаётся 10 вложенных потоков, последний пытается связаться с главным потоком.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import process from 'node:process';
import {
  postMessageToThread,
  threadId,
  workerData,
  Worker,
} from 'node:worker_threads';

const channel = new BroadcastChannel('sync');
const level = workerData?.level ?? 0;

if (level < 10) {
  const worker = new Worker(new URL(import.meta.url), {
    workerData: { level: level + 1 },
  });
}

if (level === 0) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    postMessageToThread(source, { message: 'pong' });
  });
} else if (level === 10) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    channel.postMessage('done');
    channel.close();
  });

  await postMessageToThread(0, { message: 'ping' });
}

channel.onmessage = channel.close;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
'use strict';

const process = require('node:process');
const {
  postMessageToThread,
  threadId,
  workerData,
  Worker,
} = require('node:worker_threads');

const channel = new BroadcastChannel('sync');
const level = workerData?.level ?? 0;

if (level < 10) {
  const worker = new Worker(__filename, {
    workerData: { level: level + 1 },
  });
}

if (level === 0) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    postMessageToThread(source, { message: 'pong' });
  });
} else if (level === 10) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    channel.postMessage('done');
    channel.close();
  });

  postMessageToThread(0, { message: 'ping' });
}

channel.onmessage = channel.close;

worker_threads.receiveMessageOnPort(port)

Принимает одно сообщение из MessagePort. Если сообщений нет, возвращается undefined, иначе объект с полем message с полезной нагрузкой — самое старое сообщение в очереди MessagePort.

1
2
3
4
5
6
7
8
import { MessageChannel, receiveMessageOnPort } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(receiveMessageOnPort(port2));
// Вывод: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Вывод: undefined
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
'use strict';

const { MessageChannel, receiveMessageOnPort } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(receiveMessageOnPort(port2));
// Вывод: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Вывод: undefined

При использовании этой функции событие 'message' не генерируется и слушатель onmessage не вызывается.

worker_threads.resourceLimits

Задаёт ограничения ресурсов JS-движка в этом потоке Worker. Если в конструктор Worker передавалась опция resourceLimits, значения совпадают с ней.

В главном потоке значение — пустой объект.

worker_threads.SHARE_ENV

Специальное значение для опции env конструктора Worker: текущий поток и поток Worker совместно читают и изменяют один и тот же набор переменных окружения.

1
2
3
4
5
6
import process from 'node:process';
import { Worker, SHARE_ENV } from 'node:worker_threads';
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
  .once('exit', () => {
    console.log(process.env.SET_IN_WORKER);  // Выводит 'foo'.
  });
1
2
3
4
5
6
7
'use strict';

const { Worker, SHARE_ENV } = require('node:worker_threads');
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
  .once('exit', () => {
    console.log(process.env.SET_IN_WORKER);  // Выводит 'foo'.
  });

worker_threads.setEnvironmentData(key[, value])

  • key <any> Произвольное клонируемое значение JavaScript, пригодное в качестве ключа Map.
  • value <any> Произвольное клонируемое значение; клонируется и автоматически передаётся всем новым Worker. Если передать undefined, ранее заданное значение для key удаляется.

API worker.setEnvironmentData() задаёт содержимое worker.getEnvironmentData() в текущем потоке и во всех новых Worker, созданных из текущего контекста.

worker_threads.threadId

Целочисленный идентификатор текущего потока. На соответствующем объекте worker (если есть) доступен как worker.threadId. Уникален для каждого Worker в одном процессе.

worker_threads.threadName

Строковый идентификатор текущего потока или null, если поток не выполняется. На соответствующем объекте worker (если есть) доступен как worker.threadName.

worker_threads.workerData

Произвольное значение JavaScript — клон данных, переданных в конструктор Worker этого потока.

Данные клонируются как при postMessage(), по алгоритму структурированного клонирования HTML.

1
2
3
4
5
6
7
import { Worker, isMainThread, workerData } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url), { workerData: 'Hello, world!' });
} else {
  console.log(workerData);  // Выводит 'Hello, world!'.
}
1
2
3
4
5
6
7
8
9
'use strict';

const { Worker, isMainThread, workerData } = require('node:worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename, { workerData: 'Hello, world!' });
} else {
  console.log(workerData);  // Выводит 'Hello, world!'.
}

worker_threads.locks

Стабильность: 1 – Экспериментальная

Экземпляр LockManager для координации доступа к ресурсам, общим для нескольких потоков одного процесса. Семантика соответствует браузерному LockManager

Класс: Lock

Интерфейс Lock описывает блокировку, выданную через locks.request()

lock.name

Имя блокировки.

lock.mode

Режим блокировки: shared или exclusive.

Класс: LockManager

Интерфейс LockManager предоставляет методы запроса и просмотра блокировок. Экземпляр LockManager получают так:

1
import { locks } from 'node:worker_threads';
1
2
3
'use strict';

const { locks } = require('node:worker_threads');

Реализация соответствует API browser LockManager.

locks.request(name[, options], callback)

  • name <string>
  • options <Object>
    • mode <string> 'exclusive' или 'shared'. По умолчанию: 'exclusive'.
    • ifAvailable <boolean> Если true, запрос удовлетворяется только если блокировка ещё не удерживается. Иначе callback вызывается с null вместо Lock. По умолчанию: false.
    • steal <boolean> Если true, существующие блокировки с тем же именем снимаются и запрос выполняется сразу, опережая очередь. По умолчанию: false.
    • signal <AbortSignal> для отмены ожидающего (ещё не выданного) запроса блокировки.
  • callback <Function> Вызывается после выдачи блокировки (или сразу с null, если ifAvailable равен true и блокировка недоступна). Блокировка снимается при возврате из функции или, если возвращается промис, после его завершения.
  • Возвращает: <Promise> выполняется после снятия блокировки.
1
2
3
4
5
6
import { locks } from 'node:worker_threads';

await locks.request('my_resource', async (lock) => {
  // Блокировка получена.
});
// Здесь блокировка уже снята.
1
2
3
4
5
6
7
8
9
'use strict';

const { locks } = require('node:worker_threads');

locks.request('my_resource', async (lock) => {
  // Блокировка получена.
}).then(() => {
  // Здесь блокировка уже снята.
});

locks.query()

Выполняется с LockManagerSnapshot с текущими удерживаемыми и ожидающими блокировками для процесса.

1
2
3
4
5
6
7
8
9
import { locks } from 'node:worker_threads';

const snapshot = await locks.query();
for (const lock of snapshot.held) {
  console.log(`held lock: name ${lock.name}, mode ${lock.mode}`);
}
for (const pending of snapshot.pending) {
  console.log(`pending lock: name ${pending.name}, mode ${pending.mode}`);
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
'use strict';

const { locks } = require('node:worker_threads');

locks.query().then((snapshot) => {
  for (const lock of snapshot.held) {
    console.log(`held lock: name ${lock.name}, mode ${lock.mode}`);
  }
  for (const pending of snapshot.pending) {
    console.log(`pending lock: name ${pending.name}, mode ${pending.mode}`);
  }
});

Класс: BroadcastChannel extends EventTarget

Экземпляры BroadcastChannel обеспечивают асинхронную связь «один ко многим» со всеми BroadcastChannel с тем же именем канала.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import {
  isMainThread,
  BroadcastChannel,
  Worker,
} from 'node:worker_threads';

const bc = new BroadcastChannel('hello');

if (isMainThread) {
  let c = 0;
  bc.onmessage = (event) => {
    console.log(event.data);
    if (++c === 10) bc.close();
  };
  for (let n = 0; n < 10; n++)
    new Worker(new URL(import.meta.url));
} else {
  bc.postMessage('hello from every worker');
  bc.close();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
'use strict';

const {
  isMainThread,
  BroadcastChannel,
  Worker,
} = require('node:worker_threads');

const bc = new BroadcastChannel('hello');

if (isMainThread) {
  let c = 0;
  bc.onmessage = (event) => {
    console.log(event.data);
    if (++c === 10) bc.close();
  };
  for (let n = 0; n < 10; n++)
    new Worker(__filename);
} else {
  bc.postMessage('hello from every worker');
  bc.close();
}

new BroadcastChannel(name)

  • name <any> Имя канала. Допустимо любое значение JavaScript, приводимое к строке через `${name}`.

broadcastChannel.close()

Закрывает соединение BroadcastChannel.

broadcastChannel.onmessage

  • Тип: <Function> Вызывается с одним аргументом MessageEvent при получении сообщения.

broadcastChannel.onmessageerror

  • Тип: <Function> Вызывается, если входящее сообщение нельзя десериализовать.

broadcastChannel.postMessage(message)

  • message <any> Любое клонируемое значение JavaScript.

broadcastChannel.ref()

Противоположность unref(). Вызов ref() у ранее unref()ed BroadcastChannel не позволяет завершить процесс, если это единственный активный handle (поведение по умолчанию). Если порт уже ref()ed, повторный ref() ничего не меняет.

broadcastChannel.unref()

Вызов unref() у BroadcastChannel позволяет потоку завершиться, если это единственный активный handle в системе событий. Если BroadcastChannel уже unref()ed, повторный unref() не действует.

Класс: MessageChannel

Класс worker.MessageChannel представляет асинхронный двусторонний канал связи. У MessageChannel нет собственных методов. new MessageChannel() возвращает объект с полями port1 и port2 — связанные экземпляры MessagePort.

1
2
3
4
5
6
import { MessageChannel } from 'node:worker_threads';

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Вывод: получено { foo: 'bar' } в слушателе port1.on('message')
1
2
3
4
5
6
7
8
'use strict';

const { MessageChannel } = require('node:worker_threads');

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Вывод: получено { foo: 'bar' } в слушателе port1.on('message')

Класс: MessagePort

Класс worker.MessagePort — один конец асинхронного двустороннего канала. Через него передают структурированные данные, области памяти и другие MessagePort между разными Worker.

Реализация соответствует browser MessagePort.

Событие: 'close'

Событие 'close' генерируется, когда любая сторона канала отключена.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

// Вывод:
//   foobar
//   closed!
port2.on('message', (message) => console.log(message));
port2.once('close', () => console.log('closed!'));

port1.postMessage('foobar');
port1.close();
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
'use strict';

const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

// Вывод:
//   foobar
//   closed!
port2.on('message', (message) => console.log(message));
port2.once('close', () => console.log('closed!'));

port1.postMessage('foobar');
port1.close();

Событие: 'message'

  • value <any> Переданное значение

Событие 'message' генерируется для каждого входящего сообщения с клоном аргумента port.postMessage().

Слушатели получают клон параметра value, как в postMessage(), без дополнительных аргументов.

Событие: 'messageerror'

Событие 'messageerror' генерируется при ошибке десериализации сообщения.

Обычно оно возникает при ошибке создания переданного JS-объекта на приёмной стороне. Такое редко, но возможно, например, если объекты API Node.js попадают в vm.Context (где API Node.js сейчас недоступны).

port.close()

Отключает дальнейшую отправку сообщений с обеих сторон соединения. Вызывают, когда обмен по этому MessagePort больше не нужен.

Событие 'close' генерируется на обоих MessagePort этого канала.

port.postMessage(value[, transferList])

Отправляет значение JavaScript на приёмную сторону канала. value передаётся совместимо с алгоритмом структурированного клонирования HTML.

Отличия от JSON:

  • value может содержать циклические ссылки.
  • value может содержать встроенные типы JS: RegExp, BigInt, Map, Set и т.д.
  • value может содержать типизированные массивы на ArrayBuffer и SharedArrayBuffer.
  • value может содержать экземпляры WebAssembly.Module.
  • нельзя передавать нативные (C++) объекты, кроме:
1
2
3
4
5
6
7
8
9
import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const circularData = {};
circularData.foo = circularData;
// Вывод: { foo: [Circular] }
port2.postMessage(circularData);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
'use strict';

const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const circularData = {};
circularData.foo = circularData;
// Вывод: { foo: [Circular] }
port2.postMessage(circularData);

transferList — список ArrayBuffer, MessagePort и FileHandle. После передачи они недоступны на стороне отправителя (даже если не входят в value). В отличие от дочерних процессов, передача сокетов и подобных handle сейчас не поддерживается.

Если value содержит SharedArrayBuffer, память доступна из любого потока. Их нельзя указывать в transferList.

value может включать ArrayBuffer, не перечисленные в transferList; тогда память копируется, а не переносится.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// Отправляет копию uint8Array:
port2.postMessage(uint8Array);
// Данные не копируются, но uint8Array на этой стороне становится недействительным:
port2.postMessage(uint8Array, [ uint8Array.buffer ]);

// Память sharedUint8Array доступна и здесь, и в копии на стороне
// обработчика .on('message'):
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);

// Передаёт только что созданный порт сообщений получателю.
// Так, например, можно связать несколько потоков Worker — детей одного родителя.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
'use strict';

const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// Отправляет копию uint8Array:
port2.postMessage(uint8Array);
// Данные не копируются, но uint8Array на этой стороне становится недействительным:
port2.postMessage(uint8Array, [ uint8Array.buffer ]);

// Память sharedUint8Array доступна и здесь, и в копии на стороне
// обработчика .on('message'):
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);

// Передаёт только что созданный порт сообщений получателю.
// Так, например, можно связать несколько потоков Worker — детей одного родителя.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]);

Объект сообщения клонируется сразу; после отправки исходный можно менять без побочных эффектов.

Подробнее о сериализации и десериализации см. API сериализации модуля node:v8.

Передача TypedArray и Buffer

Все TypedArray | Buffer — представления над ArrayBuffer: сырые данные хранит ArrayBuffer, а объекты TypedArray и Buffer дают доступ к ним. Часто несколько представлений ссылаются на один ArrayBuffer. При передаче ArrayBuffer через transfer list нужна осторожность: все TypedArray и Buffer, разделяющие этот буфер, становятся недействительными.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
const ab = new ArrayBuffer(10);

const u1 = new Uint8Array(ab);
const u2 = new Uint16Array(ab);

console.log(u2.length); // выводит 5

port.postMessage(u1, [u1.buffer]);

console.log(u2.length); // выводит 0

Для Buffer возможность передать или клонировать базовый ArrayBuffer зависит от способа создания, который не всегда можно надёжно определить.

ArrayBuffer можно пометить markAsUntransferable(), чтобы он всегда клонировался, а не передавался.

В зависимости от создания Buffer он может владеть или не владеть своим ArrayBuffer. Передавать ArrayBuffer нельзя, если неизвестно, владеет ли им Buffer. Для Buffer из внутреннего пула (Buffer.from(), Buffer.allocUnsafe() и т.п.) передача невозможна — всегда клонирование, что может копировать весь пул Buffer. Это ведёт к лишнему расходу памяти и рискам безопасности.

См. Buffer.allocUnsafe() о пуле Buffer.

ArrayBuffer у Buffer, созданных через Buffer.alloc() или Buffer.allocUnsafeSlow(), можно передавать, но тогда все остальные представления этого буфера становятся недействительными.

Клонирование объектов с прототипами, классами и аксессорами

Клонирование следует алгоритму структурированного клонирования HTML: неперечислимые свойства, аксессоры и прототипы не сохраняются. Buffer на приёмной стороне станет обычным Uint8Array, экземпляры классов — простыми объектами.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
const b = Symbol('b');

class Foo {
    #a = 1;
    constructor() {
        this[b] = 2;
        this.c = 3;
    }

    get d() {
        return 4;
    }
}

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => console.log(data);

port2.postMessage(new Foo());

// Вывод: { c: 3 }

То же касается многих встроенных объектов, например глобального URL:

1
2
3
4
5
6
7
const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => console.log(data);

port2.postMessage(new URL('https://example.org'));

// Вывод: { }

port.hasRef()

Если истина, MessagePort удерживает цикл событий Node.js активным.

port.ref()

Противоположность unref(). Вызов ref() у ранее unref()ed порта не завершает процесс, если это единственный handle (поведение по умолчанию). Если порт уже ref()ed, повторный ref() не действует.

При подключении или снятии слушателей через .on('message') порт автоматически ref() и unref() в зависимости от наличия слушателей.

port.start()

Начинает приём сообщений на этом MessagePort. При использовании порта как event emitter вызывается автоматически при появлении слушателей 'message'.

Метод нужен для совместимости с Web MessagePort API. В Node.js полезен, чтобы игнорировать сообщения без слушателя. Поведение .onmessage отличается: установка вызывает .start(), снятие оставляет сообщения в очереди до нового обработчика или уничтожения порта.

port.unref()

unref() у порта позволяет потоку завершиться, если это единственный активный handle. Если порт уже unref()ed, повторный вызов не действует.

При подключении или снятии слушателей через .on('message') порт автоматически ref() и unref() в зависимости от наличия слушателей.

Класс: Worker

Класс Worker представляет независимый поток выполнения JavaScript. Большинство API Node.js в нём доступно.

Отличия среды Worker:

  • Потоки process.stdin, process.stdout, process.stderr могут перенаправлять родительский поток.
  • Свойство require('node:worker_threads').isMainThread равно false.
  • Доступен порт сообщений require('node:worker_threads').parentPort.
  • process.exit() завершает только этот поток, не всю программу; process.abort() недоступен.
  • process.chdir() и методы process для смены группы/пользователя недоступны.
  • process.env — копия переменных окружения родителя, если не указано иное. Изменения в одной копии не видны другим потокам и нативным аддонам (кроме случая worker.SHARE_ENV в опции env конструктора Worker). В Windows копия переменных, в отличие от главного потока, учитывает регистр.
  • process.title нельзя изменить.
  • Сигналы не доставляются через process.on('...').
  • Выполнение может прерваться в любой момент при worker.terminate().
  • IPC-каналы родительского процесса недоступны.
  • Модуль trace_events не поддерживается.
  • Нативные аддоны в нескольких потоках — только при выполнении условий.

Worker можно создавать внутри других Worker.

Как у Web Workers и модуля node:cluster, двусторонняя связь достижима через передачу сообщений. У Worker внутри есть пара связанных MessagePort, созданных при инициализации. Родительский MessagePort не экспонируется напрямую; функции доступны через worker.postMessage() и событие worker.on('message') у объекта Worker в родительском потоке.

Для своих каналов связи (предпочтительнее глобального канала из‑за разделения ответственности) можно создать MessageChannel в любом потоке и передать один из MessagePort другому потоку через уже существующий канал, например глобальный.

Подробнее о передаче сообщений и допустимых значениях см. port.postMessage().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import assert from 'node:assert';
import {
  Worker, MessageChannel, MessagePort, isMainThread, parentPort,
} from 'node:worker_threads';
if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  const subChannel = new MessageChannel();
  worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  subChannel.port2.on('message', (value) => {
    console.log('received:', value);
  });
} else {
  parentPort.once('message', (value) => {
    assert(value.hereIsYourPort instanceof MessagePort);
    value.hereIsYourPort.postMessage('the worker is sending this');
    value.hereIsYourPort.close();
  });
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
'use strict';

const assert = require('node:assert');
const {
  Worker, MessageChannel, MessagePort, isMainThread, parentPort,
} = require('node:worker_threads');
if (isMainThread) {
  const worker = new Worker(__filename);
  const subChannel = new MessageChannel();
  worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  subChannel.port2.on('message', (value) => {
    console.log('received:', value);
  });
} else {
  parentPort.once('message', (value) => {
    assert(value.hereIsYourPort instanceof MessagePort);
    value.hereIsYourPort.postMessage('the worker is sending this');
    value.hereIsYourPort.close();
  });
}

new Worker(filename[, options])

  • filename <string> | <URL> Путь к основному скрипту или модулю Worker. Должен быть либо абсолютным путём, либо относительным (относительно текущего рабочего каталога) с префиксом ./ или ../, либо объектом WHATWG URL с протоколом file: или data:. При использовании data: URL данные интерпретируются по MIME-типу с помощью загрузчика модулей ECMAScript. Если options.eval равен true, это строка с кодом JavaScript, а не путь.
  • options <Object>
    • argv <any[]> Список аргументов, которые преобразуются в строки и добавляются к process.argv в worker. По смыслу близко к workerData, но значения доступны в глобальном process.argv, как если бы они были переданы скрипту в командной строке.
    • env <Object> Если задано, задаёт начальное значение process.env в потоке Worker. В качестве особого значения можно указать worker.SHARE_ENV, чтобы родительский и дочерний потоки разделяли переменные окружения; тогда изменения объекта process.env в одном потоке видны и в другом. По умолчанию: process.env.
    • eval <boolean> Если true и первый аргумент — строка, первый аргумент конструктора трактуется как скрипт, выполняемый после перехода worker в состояние online.
    • execArgv <string[]> Список опций CLI Node.js, передаваемых worker. Опции V8 (например --max-old-space-size) и опции, влияющие на процесс (например --title), не поддерживаются. Если задано, внутри worker доступно как process.execArgv. По умолчанию опции наследуются от родительского потока.
    • stdin <boolean> Если true, worker.stdin — поток записи, содержимое которого попадает в process.stdin внутри Worker. По умолчанию данные не подаются.
    • stdout <boolean> Если true, worker.stdout не перенаправляется автоматически в process.stdout родителя.
    • stderr <boolean> Если true, worker.stderr не перенаправляется автоматически в process.stderr родителя.
    • workerData <any> Любое значение JavaScript, клонируемое и доступное как require('node:worker_threads').workerData. Клонирование выполняется по алгоритму структурированного клонирования HTML; если объект клонировать нельзя (например, из‑за наличия function), выбрасывается ошибка.
    • trackUnmanagedFds <boolean> Если true, Worker отслеживает «сырые» дескрипторы файлов, открытые через fs.open() и fs.close(), и закрывает их при завершении Worker, аналогично другим ресурсам (сетевые сокеты, дескрипторы через API FileHandle). Опция автоматически наследуется всеми вложенными Worker. По умолчанию: true.
    • transferList <Object[]> Если в workerData передан один или несколько объектов, похожих на MessagePort, для них нужен transferList, иначе выбрасывается ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST. Подробнее см. port.postMessage().
    • resourceLimits <Object> Необязательный набор ограничений ресурсов для нового экземпляра JS-движка. При достижении лимитов экземпляр Worker завершается. Ограничения действуют только на движок JS, не на внешние данные, в том числе на ArrayBuffer. Даже при заданных лимитах процесс может аварийно завершиться при глобальной нехватке памяти.
      • maxOldGenerationSizeMb <number> Максимальный размер основной кучи в МБ. Если задан аргумент командной строки --max-old-space-size, он переопределяет это значение.
      • maxYoungGenerationSizeMb <number> Максимальный размер области кучи для недавно созданных объектов. Если задан аргумент --max-semi-space-size, он переопределяет это значение.
      • codeRangeSizeMb <number> Размер заранее выделенного диапазона памяти для сгенерированного кода.
      • stackSizeMb <number> Максимальный размер стека потока по умолчанию. Слишком малые значения могут сделать экземпляры Worker непригодными. По умолчанию: 4.
    • name <string> Необязательное имя name, подставляемое в имя потока и заголовок worker для отладки и идентификации; итоговый заголовок вида [worker ${id}] ${name}. Максимальная длина зависит от ОС. Если имя длиннее допустимого, оно обрезается.
      • Максимальные размеры:
        • Windows: 32 767 символов
        • macOS: 64 символа
        • Linux: 16 символов
        • NetBSD: ограничено PTHREAD_MAX_NAMELEN_NP
        • FreeBSD и OpenBSD: ограничено MAXCOMLEN По умолчанию: 'WorkerThread'.

Событие: 'error'

Событие 'error' генерируется, если в потоке worker выброшено необработанное исключение. В этом случае worker завершается.

Событие: 'exit'

Событие 'exit' генерируется после остановки worker. Если worker завершился вызовом process.exit(), параметр exitCode — переданный код выхода. Если worker был принудительно завершён, exitCode равен 1.

Это последнее событие, которое генерирует любой экземпляр Worker.

Событие: 'message'

  • value <any> Переданное значение

Событие 'message' генерируется, когда поток worker вызывает require('node:worker_threads').parentPort.postMessage(). Подробнее см. событие port.on('message').

Все сообщения, отправленные из потока worker, доставляются до генерации события 'exit' на объекте Worker.

Событие: 'messageerror'

Событие 'messageerror' генерируется при ошибке десериализации сообщения.

Событие: 'online'

Событие 'online' генерируется, когда поток worker начал выполнение кода JavaScript.

worker.cpuUsage([prev])

Метод возвращает Promise, который выполнится объектом, совпадающим с process.threadCpuUsage(), или будет отклонён с ERR_WORKER_NOT_RUNNING, если worker уже не работает. Позволяет получать статистику использования CPU потока извне, не находясь в нём.

worker.getHeapSnapshot([options])

  • options <Object>
    • exposeInternals <boolean> Если true, в снимок кучи включаются внутренности движка. По умолчанию: false.
    • exposeNumericValues <boolean> Если true, числовые значения попадают в искусственные поля. По умолчанию: false.
  • Возвращает: <Promise> Промис с потоком чтения (Readable), содержащим снимок кучи V8

Возвращает поток чтения со снимком V8 текущего состояния Worker. Подробнее см. v8.getHeapSnapshot().

Если поток Worker уже не выполняется (это возможно до генерации события 'exit'), возвращённый Promise сразу отклоняется с ошибкой ERR_WORKER_NOT_RUNNING.

worker.getHeapStatistics()

Метод возвращает Promise, который выполнится объектом, совпадающим с v8.getHeapStatistics(), или будет отклонён с ERR_WORKER_NOT_RUNNING, если worker уже не работает. Позволяет получать статистику кучи извне, не находясь в потоке worker.

worker.performance

Объект для запроса сведений о производительности экземпляра worker.

performance.eventLoopUtilization([utilization1[, utilization2]])

  • utilization1 <Object> Результат предыдущего вызова eventLoopUtilization().
  • utilization2 <Object> Результат предыдущего вызова eventLoopUtilization() до момента utilization1.
  • Возвращает: <Object>

Тот же вызов, что и perf_hooks eventLoopUtilization(), но возвращаются значения для экземпляра worker.

Отличие: в отличие от главного потока, инициализация в worker выполняется внутри цикла событий, поэтому загрузку цикла событий можно измерить сразу после начала выполнения скрипта worker.

Если время idle не растёт, это не значит, что worker «застрял» в инициализации. Ниже показано, как за всё время жизни worker не накапливается idle, но сообщения всё равно обрабатываются.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import { Worker, isMainThread, parentPort } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  setInterval(() => {
    worker.postMessage('hi');
    console.log(worker.performance.eventLoopUtilization());
  }, 100).unref();
} else {
  parentPort.on('message', () => console.log('msg')).unref();
  (function r(n) {
    if (--n < 0) return;
    const t = Date.now();
    while (Date.now() - t < 300);
    setImmediate(r, n);
  })(10);
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
'use strict';

const { Worker, isMainThread, parentPort } = require('node:worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);
  setInterval(() => {
    worker.postMessage('hi');
    console.log(worker.performance.eventLoopUtilization());
  }, 100).unref();
} else {
  parentPort.on('message', () => console.log('msg')).unref();
  (function r(n) {
    if (--n < 0) return;
    const t = Date.now();
    while (Date.now() - t < 300);
    setImmediate(r, n);
  })(10);
}

Загрузку цикла событий worker можно получить только после генерации события 'online'; при вызове до этого или после события 'exit' все свойства равны 0.

worker.postMessage(value[, transferList])

Отправляет сообщение worker; оно принимается в require('node:worker_threads').parentPort.on('message'). Подробнее см. port.postMessage().

worker.ref()

Противоположность unref(): вызов ref() у ранее unref()ed worker не даёт процессу завершиться, если это единственный активный handle (поведение по умолчанию). Если worker уже ref()ed, повторный ref() не действует.

worker.resourceLimits

Задаёт ограничения ресурсов JS-движка для этого потока Worker. Если в конструктор Worker передавалась опция resourceLimits, значения совпадают с ней.

Если worker остановлен, возвращается пустой объект.

worker.startCpuProfile()

Запускает профиль CPU и возвращает Promise, который выполняется с ошибкой или объектом CPUProfileHandle. Этот API поддерживает синтаксис await using.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
const { Worker } = require('node:worker_threads');

const worker = new Worker(`
  const { parentPort } = require('worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

worker.on('online', async () => {
  const handle = await worker.startCpuProfile();
  const profile = await handle.stop();
  console.log(profile);
  worker.terminate();
});

Пример с await using.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const { Worker } = require('node:worker_threads');

const w = new Worker(`
  const { parentPort } = require('node:worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

w.on('online', async () => {
  // При выходе из области видимости профиль останавливается и отбрасывается
  await using handle = await w.startCpuProfile();
});

worker.startHeapProfile([options])

  • options <Object>
    • sampleInterval <number> Средний интервал выборки в байтах. По умолчанию: 524288 (512 KiB).
    • stackDepth <integer> Максимальная глубина стека для образцов. По умолчанию: 16.
    • forceGC <boolean> Принудительная сборка мусора перед снятием профиля. По умолчанию: false.
    • includeObjectsCollectedByMajorGC <boolean> Включать объекты, собранные major GC. По умолчанию: false.
    • includeObjectsCollectedByMinorGC <boolean> Включать объекты, собранные minor GC. По умолчанию: false.
  • Возвращает: <Promise>

Запускает профиль кучи и возвращает Promise, который выполняется с ошибкой или объектом HeapProfileHandle. Этот API поддерживает синтаксис await using.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
const { Worker } = require('node:worker_threads');

const worker = new Worker(`
  const { parentPort } = require('worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

worker.on('online', async () => {
  const handle = await worker.startHeapProfile();
  const profile = await handle.stop();
  console.log(profile);
  worker.terminate();
});
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import { Worker } from 'node:worker_threads';

const worker = new Worker(`
  const { parentPort } = require('node:worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

worker.on('online', async () => {
  const handle = await worker.startHeapProfile();
  const profile = await handle.stop();
  console.log(profile);
  worker.terminate();
});

Пример с await using.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const { Worker } = require('node:worker_threads');

const w = new Worker(`
  const { parentPort } = require('node:worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

w.on('online', async () => {
  // При выходе из области видимости профиль останавливается и отбрасывается
  await using handle = await w.startHeapProfile();
});
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { Worker } from 'node:worker_threads';

const w = new Worker(`
  const { parentPort } = require('node:worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

w.on('online', async () => {
  // При выходе из области видимости профиль останавливается и отбрасывается
  await using handle = await w.startHeapProfile();
});

worker.stderr

Поток чтения с данными, записанными в process.stderr внутри потока worker. Если в конструктор Worker не передано stderr: true, данные перенаправляются в process.stderr родительского потока.

worker.stdin

Если в конструктор Worker передано stdin: true, это поток записи. Данные, записанные в него, доступны в потоке worker как process.stdin.

worker.stdout

Поток чтения с данными, записанными в process.stdout внутри потока worker. Если в конструктор Worker не передано stdout: true, данные перенаправляются в process.stdout родительского потока.

worker.terminate()

Останавливает выполнение JavaScript в потоке worker как можно скорее. Возвращает Promise с кодом выхода, который выполняется при генерации события 'exit'.

worker.threadId

Целочисленный идентификатор соответствующего потока. В потоке worker доступен как require('node:worker_threads').threadId. Уникален для каждого экземпляра Worker в одном процессе.

worker.threadName

Строковый идентификатор потока или null, если поток не выполняется. В потоке worker доступен как require('node:worker_threads').threadName.

worker.unref()

Вызов unref() у worker позволяет потоку завершиться, если это единственный активный handle в системе событий. Если worker уже unref()ed, повторный unref() не действует.

worker[Symbol.asyncDispose]()

Вызывает worker.terminate() при выходе из области dispose.

1
2
3
4
async function example() {
  await using worker = new Worker('for (;;) {}', { eval: true });
  // Worker автоматически завершается при выходе из области видимости.
}

Примечания

Синхронная блокировка stdio

Экземпляры Worker используют передачу сообщений через MessagePort для взаимодействия со stdio. Поэтому вывод stdio из worker может блокироваться синхронным кодом на принимающей стороне, который удерживает цикл событий Node.js.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import {
  Worker,
  isMainThread,
} from 'node:worker_threads';

if (isMainThread) {
  new Worker(new URL(import.meta.url));
  for (let n = 0; n < 1e10; n++) {
    // Долгий цикл — имитация работы.
  }
} else {
  // Этот вывод заблокирован циклом for в главном потоке.
  console.log('foo');
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
'use strict';

const {
  Worker,
  isMainThread,
} = require('node:worker_threads');

if (isMainThread) {
  new Worker(__filename);
  for (let n = 0; n < 1e10; n++) {
    // Долгий цикл — имитация работы.
  }
} else {
  // Этот вывод заблокирован циклом for в главном потоке.
  console.log('foo');
}

Запуск потоков worker из preload-скриптов

Будьте осторожны при запуске потоков worker из preload-скриптов (скриптов, подключаемых флагом -r). Пока явно не задана опция execArgv, новые потоки Worker наследуют флаги командной строки текущего процесса и подгружают те же preload-скрипты, что и главный поток. Если preload-скрипт безусловно создаёт поток worker, каждый новый поток породит ещё один, пока приложение не упадёт.

Комментарии