Перейти к содержанию

Рабочие потоки

v18.x.x

Стабильность: 2 – Стабильная

АПИ является удовлетворительным. Совместимость с NPM имеет высший приоритет и не будет нарушена кроме случаев явной необходимости.

Модуль node:worker_threads позволяет использовать потоки, параллельно выполняющие JavaScript. Чтобы получить к нему доступ:

1
const worker = require('node:worker_threads');

Рабочие (потоки) полезны для выполнения операций JavaScript, требующих больших затрат процессора. Они не очень помогают при выполнении интенсивных операций ввода-вывода. Встроенные в Node.js асинхронные операции ввода-вывода более эффективны, чем могут быть Workers.

В отличие от child_process или cluster, worker_threads могут совместно использовать память. Они делают это путем передачи экземпляров ArrayBuffer или совместного использования экземпляров SharedArrayBuffer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const {
    Worker,
    isMainThread,
    parentPort,
    workerData,
} = require('node:worker_threads');

if (isMainThread) {
    module.exports = function parseJSAsync(script) {
        return new Promise((resolve, reject) => {
            const worker = new Worker(__filename, {
                workerData: script,
            });
            worker.on('message', resolve);
            worker.on('error', reject);
            worker.on('exit', (code) => {
                if (code !== 0)
                    reject(
                        new Error(
                            `Worker stopped with exit code ${code}`
                        )
                    );
            });
        });
    };
} else {
    const { parse } = require('some-js-parsing-library');
    const script = workerData;
    parentPort.postMessage(parse(script));
}

Приведенный выше пример порождает поток Worker для каждого вызова parseJSAsync(). На практике для таких задач следует использовать пул Worker'ов. В противном случае накладные расходы на создание рабочих потоков, скорее всего, превысят их пользу.

При реализации пула рабочих используйте API AsyncResource для информирования диагностических инструментов (например, для предоставления асинхронных трассировок стека) о взаимосвязи между задачами и их результатами. Пример реализации см. в "Использование AsyncResource для пула потоков Worker" в документации async_hooks.

Рабочие потоки по умолчанию наследуют неспецифические для процесса опции. Обратитесь к Опции конструктора рабочего потока, чтобы узнать, как настроить опции рабочего потока, в частности опции argv и execArgv.

worker.getEnvironmentData(key)

  • key <any> Любое произвольное, клонируемое значение JavaScript, которое может быть использовано в качестве ключа {Map}.
  • Возвращает: <any>

Внутри рабочего потока worker.getEnvironmentData() возвращает клон данных, переданных в порождающий поток worker.setEnvironmentData(). Каждый новый Worker получает свою собственную копию данных окружения автоматически.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
const {
    Worker,
    isMainThread,
    setEnvironmentData,
    getEnvironmentData,
} = require('node:worker_threads');

if (isMainThread) {
    setEnvironmentData('Hello', 'World!');
    const worker = new Worker(__filename);
} else {
    console.log(getEnvironmentData('Hello')); // Печатает 'World!'.
}

worker.isMainThread

Является true, если этот код не выполняется внутри потока Worker.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
const {
    Worker,
    isMainThread,
} = require('node:worker_threads');

if (isMainThread) {
    // Это перезагружает текущий файл внутри экземпляра Worker.
    new Worker(__filename);
} else {
    console.log('Внутри Worker!');
    console.log(isMainThread); // Выводит 'false'.
}

worker.markAsUntransferable(object)

Пометить объект как непередаваемый. Если object встречается в списке передачи вызова port.postMessage(), он игнорируется.

В частности, это имеет смысл для объектов, которые можно клонировать, а не передавать, и которые используются другими объектами на передающей стороне. Например, Node.js помечает этим ArrayBuffer, который он использует для своего пула Buffer.

Эта операция не может быть отменена.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
const {
    MessageChannel,
    markAsUntransferable,
} = require('node:worker_threads');

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
port1.postMessage(typedArray1, [typedArray1.buffer]);

// Следующая строка печатает содержимое typedArray1 - он все еще владеет
// своей памятью и был клонирован, а не передан. Без
// `markAsUntransferable()`, это вывело бы пустой Uint8Array.
// typedArray2 также не поврежден.
console.log(typedArray1);
console.log(typedArray2);

В браузерах эквивалента этому API нет.

worker.moveMessagePortToContext(port, contextifiedSandbox)

  • port {MessagePort} Порт сообщения для передачи.

  • contextifiedSandbox <Object> Объект contextified, возвращенный методом vm.createContext().

  • Возвращает: {MessagePort}

Переносит порт сообщения в другой vm Context. Исходный объект port становится непригодным для использования, а его место занимает возвращаемый экземпляр MessagePort.

Возвращаемый MessagePort является объектом в целевом контексте и наследует от его глобального класса Object. Объекты, передаваемые слушателю port.onmessage(), также создаются в целевом контексте и наследуются от его глобального класса Object.

Однако созданный MessagePort больше не наследуется от EventTarget, и только port.onmessage() может быть использован для получения событий с его помощью.

worker.parentPort

  • {null|MessagePort}

Если данный поток является Worker, то это MessagePort, обеспечивающий связь с родительским потоком. Сообщения, отправленные с помощью parentPort.postMessage(), доступны в родительском потоке с помощью worker.on('message'), а сообщения, отправленные из родительского потока с помощью worker.postMessage(), доступны в этом потоке с помощью parentPort.on('message').

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
const {
    Worker,
    isMainThread,
    parentPort,
} = require('node:worker_threads');

if (isMainThread) {
    const worker = new Worker(__filename);
    worker.once('message', (message) => {
        console.log(message); // Печатает 'Hello, world!'.
    });
    worker.postMessage('Hello, world!');
} else {
    // Когда получено сообщение от родительского потока, отправьте его обратно:
    parentPort.once('message', (message) => {
        parentPort.postMessage(message);
    });
}

worker.receiveMessageOnPort(port)

  • port {MessagePort|BroadcastChannel}

  • Возвращает: {Object|undefined}

Получение одного сообщения от заданного MessagePort. Если сообщение недоступно, возвращается undefined, иначе - объект с единственным свойством message, содержащим полезную нагрузку сообщения, соответствующую самому старому сообщению в очереди MessagePort.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const {
    MessageChannel,
    receiveMessageOnPort,
} = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined

Когда используется эта функция, событие 'message' не испускается и слушатель onmessage не вызывается.

worker.resourceLimits

Предоставляет набор ограничений на ресурсы JS-движка внутри этого потока Worker. Если параметр resourceLimits был передан конструктору Worker, этот параметр соответствует его значениям.

Если этот параметр используется в главном потоке, его значением будет пустой объект.

worker.SHARE_ENV

  • {символ}

Специальное значение, которое может быть передано в качестве опции env конструктора Worker, чтобы указать, что текущий поток и поток Worker должны иметь общий доступ на чтение и запись к одному и тому же набору переменных окружения.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
const {
    Worker,
    SHARE_ENV,
} = require('node:worker_threads');
new Worker('process.env.SET_IN_WORKER = "foo"', {
    eval: true,
    env: SHARE_ENV,
}).on('exit', () => {
    console.log(process.env.SET_IN_WORKER); // Печатает 'foo'.
});

worker.setEnvironmentData(key[, value])

  • key <any> Любое произвольное, клонируемое значение JavaScript, которое может быть использовано в качестве ключа {Map}.
  • value <any> Любое произвольное, клонируемое значение JavaScript, которое будет клонироваться и автоматически передаваться всем новым экземплярам Worker. Если value передано как undefined, любое ранее установленное значение для key будет удалено.

API worker.setEnvironmentData() устанавливает содержимое worker.getEnvironmentData() в текущем потоке и во всех новых экземплярах Worker, порожденных из текущего контекста.

worker.threadId

Целочисленный идентификатор текущего потока. На соответствующем объекте worker (если он есть) он доступен как worker.threadId. Это значение уникально для каждого экземпляра Worker внутри одного процесса.

worker.workerData

Произвольное значение JavaScript, содержащее клон данных, переданных в конструктор Worker этого потока.

Данные клонируются как при использовании postMessage(), в соответствии с алгоритмом HTML structured clone algorithm.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
const {
    Worker,
    isMainThread,
    workerData,
} = require('node:worker_threads');

if (isMainThread) {
    const worker = new Worker(__filename, {
        workerData: 'Hello, world!',
    });
} else {
    console.log(workerData); // Печатает 'Hello, world!'.
}

Класс: BroadcastChannel расширяет EventTarget

Экземпляры BroadcastChannel позволяют асинхронную связь "один ко многим" со всеми другими экземплярами BroadcastChannel, привязанными к тому же имени канала.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
'use strict';

const {
    isMainThread,
    BroadcastChannel,
    Worker,
} = require('node:worker_threads');

const bc = new BroadcastChannel('hello');

if (isMainThread) {
    let c = 0;
    bc.onmessage = (event) => {
        console.log(event.data);
        if (++c === 10) bc.close();
    };
    for (let n = 0; n < 10; n++) new Worker(__filename);
} else {
    bc.postMessage('hello from every worker');
    bc.close();
}

new BroadcastChannel(name)

  • name <any> Имя канала, к которому нужно подключиться. Допускается любое значение JavaScript, которое может быть преобразовано в строку с помощью ${name}.

broadcastChannel.close()

Закрывает соединение BroadcastChannel.

broadcastChannel.onmessage

  • Тип: <Function> Вызывается с одним аргументом MessageEvent при получении сообщения.

broadcastChannel.onmessageerror

  • Тип: <Function> Вызывается при получении сообщения, которое не может быть десериализовано.

broadcastChannel.postMessage(message)

  • message <any> Любое клонируемое значение JavaScript.

broadcastChannel.ref()

Противоположность unref(). Вызов ref() на ранее unref() BroadcastChannel не позволяет программе выйти, если это единственный оставшийся активный хэндл (поведение по умолчанию). Если порт был ref(), повторный вызов ref() не имеет никакого эффекта.

broadcastChannel.unref()

Вызов unref() на BroadcastChannel позволяет потоку выйти, если это единственный активный хэндл в системе событий. Если BroadcastChannel уже был unref(), повторный вызов unref() не имеет никакого эффекта.

Класс: MessageChannel

Экземпляры класса worker.MessageChannel представляют асинхронный, двусторонний канал связи. У MessageChannel нет собственных методов. new MessageChannel() выдает объект со свойствами port1 и port2, которые ссылаются на связанные экземпляры MessagePort.

1
2
3
4
5
6
7
8
const { MessageChannel } = require('node:worker_threads');

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) =>
    console.log('received', message)
);
port2.postMessage({ foo: 'bar' });
// Выводит: получено { foo: 'bar' } от слушателя `port1.on('message')`.

Класс: MessagePort

  • Расширяет: {EventTarget}

Экземпляры класса worker.MessagePort представляют собой один конец асинхронного двустороннего канала связи. Он может использоваться для передачи структурированных данных, областей памяти и других MessagePort между различными Workers.

Эта реализация соответствует browser MessagePorts.

Событие: close

Событие close происходит, когда одна из сторон канала отключена.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

// Печатает:
// foobar
// закрыто!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('закрыто!'));

port1.postMessage('foobar');
port1.close();

Событие: message

  • значение <any> Передаваемое значение

Событие 'message' испускается для любого входящего сообщения, содержащего клонированный вход port.postMessage().

Слушатели этого события получают клон параметра value, переданного в postMessage(), и никаких дополнительных аргументов.

Событие: messageerror

  • error <Error> Объект ошибки

Событие 'messageerror' возникает при неудачной десериализации сообщения.

В настоящее время это событие возникает, когда происходит ошибка при инстанцировании размещенного JS-объекта на принимающей стороне. Такие ситуации редки, но могут произойти, например, когда определенные объекты API Node.js получены в vm.Context (где API Node.js в настоящее время недоступны).

port.close()

Отключает дальнейшую отправку сообщений с обеих сторон соединения. Этот метод может быть вызван, когда дальнейшее взаимодействие через этот MessagePort не будет происходить.

Событие close испускается на обоих экземплярах MessagePort, которые являются частью канала.

port.postMessage(value[, transferList])

  • значение <any>
  • transferList {Object[]}

Отправляет значение JavaScript на принимающую сторону этого канала. Передача value осуществляется способом, совместимым с HTML structured clone algorithm.

В частности, существенными отличиями от JSON являются:

  • значение может содержать круговые ссылки.
  • значение может содержать экземпляры встроенных типов JS, таких как RegExp, BigInt, Map, Set и т.д.
  • значение может содержать типизированные массивы, как с использованием ArrayBuffer, так и SharedArrayBuffer.
  • значение может содержать экземпляры WebAssembly.Module.
  • value не может содержать нативные (поддерживаемые C++) объекты, кроме:
    • {CryptoKey}s,
    • <FileHandle>s,
    • {Histogram}s,
    • {KeyObject}s,
    • {MessagePort}s,
    • {net.BlockList}s,
    • {net.SocketAddress}es,
    • {X509Certificate}s.
1
2
3
4
5
6
7
8
9
const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const circularData = {};
circularData.foo = circularData;
// Печатает: { foo: [Circular] }
port2.postMessage(circularData);

transferList может быть списком объектов ArrayBuffer, MessagePort и FileHandle. После передачи они больше не могут использоваться на передающей стороне канала (даже если они не содержатся в value). В отличие от дочерних процессов, передача хэндлов, таких как сетевые сокеты, в настоящее время не поддерживается.

Если value содержит экземпляры SharedArrayBuffer, то они доступны из любого потока. Они не могут быть перечислены в transferList.

value может по-прежнему содержать экземпляры ArrayBuffer, не включенные в transferList; в этом случае базовая память копируется, а не перемещается.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = new Uint8Array([1, 2, 3, 4]);
// This posts a copy of `uint8Array`:
port2.postMessage(uint8Array);
// This does not copy data, but renders `uint8Array` unusable:
port2.postMessage(uint8Array, [uint8Array.buffer]);

// The memory for the `sharedUint8Array` is accessible from both the
// original and the copy received by `.on('message')`:
const sharedUint8Array = new Uint8Array(
    new SharedArrayBuffer(4)
);
port2.postMessage(sharedUint8Array);

// This transfers a freshly created message port to the receiver.
// This can be used, for example, to create communication channels between
// multiple `Worker` threads that are children of the same parent thread.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [
    otherChannel.port1,
]);

Объект сообщения клонируется немедленно, и может быть изменен после размещения без побочных эффектов.

Для получения дополнительной информации о механизмах сериализации и десериализации, лежащих в основе этого API, смотрите serialization API модуля node:v8.

Соображения при передаче типизированных массивов и буферов

Все экземпляры TypedArray и Buffer являются представлениями над базовым ArrayBuffer. То есть, именно ArrayBuffer фактически хранит исходные данные, а объекты TypedArray и Buffer предоставляют способ просмотра и манипулирования данными. Для одного и того же экземпляра ArrayBuffer может быть создано несколько представлений. При использовании списка передачи для передачи ArrayBuffer следует быть очень осторожным, так как это приводит к тому, что все экземпляры TypedArray и Buffer, которые совместно используют тот же ArrayBuffer, становятся непригодными для использования.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
const ab = new ArrayBuffer(10);

const u1 = new Uint8Array(ab);
const u2 = new Uint16Array(ab);

console.log(u2.length); // печатает 5

port.postMessage(u1, [u1.buffer]);

console.log(u2.length); // печатает 0

Для экземпляров Buffer, в частности, можно ли передавать или клонировать лежащий в основе ArrayBuffer, полностью зависит от того, как были созданы экземпляры, что часто не может быть надежно определено.

Буфер ArrayBuffer может быть помечен markAsUntransferable(), чтобы указать, что его всегда следует клонировать и никогда не передавать.

В зависимости от того, как был создан экземпляр Buffer, он может владеть или не владеть своим базовым ArrayBuffer. Буфер ArrayBuffer не должен передаваться, если не известно, что экземпляр Buffer владеет им. В частности, для Buffer, созданных из внутреннего пула Buffer (используя, например, Buffer.from() или Buffer.allocUnsafe()), передача их невозможна, и они всегда клонируются, что отправляет копию всего пула Buffer. Такое поведение может привести к непреднамеренному увеличению использования памяти и возможным проблемам безопасности.

Более подробную информацию о пуле Buffer смотрите в Buffer.allocUnsafe().

Массив ArrayBuffer для экземпляров Buffer, созданных с помощью Buffer.alloc() или Buffer.allocUnsafeSlow(), всегда можно передать, но это делает непригодными все другие существующие представления этих ArrayBuffer.

Соображения при клонировании объектов с прототипами, классами и аксессорами

Поскольку при клонировании объектов используется алгоритм HTML structured clone algorithm, неперечислимые свойства, аксессоры свойств и прототипы объектов не сохраняются. В частности, объекты Buffer будут прочитаны как обычные Uint8Arrays на принимающей стороне, а экземпляры классов JavaScript будут клонированы как обычные объекты JavaScript.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
const b = Symbol('b');

class Foo {
    #a = 1;
    constructor() {
        this[b] = 2;
        this.c = 3;
    }

    get d() {
        return 4;
    }
}

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => console.log(data);

port2.postMessage(new Foo());

// Prints: { c: 3 }

Это ограничение распространяется на многие встроенные объекты, такие как глобальный объект URL:

1
2
3
4
5
6
7
const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => console.log(data);

port2.postMessage(new URL('https://example.org'));

// Prints: { }

port.hasRef()

Стабильность: 1 – Экспериментальная

Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.

Если true, то объект MessagePort будет поддерживать активным цикл событий Node.js.

port.ref()

Противоположность unref(). Вызов ref() на порту, который ранее был unref(), не позволяет программе завершить работу, если это единственный оставшийся активный хэндл (поведение по умолчанию). Если порт был ref()ed, повторный вызов ref() не имеет никакого эффекта.

Если слушатели подключаются или удаляются с помощью .on('message'), порт автоматически ref()отключается и unref()отключается в зависимости от того, существуют ли слушатели для данного события.

port.start()

Начинает принимать сообщения на этом MessagePort. При использовании этого порта в качестве эмиттера событий, этот метод вызывается автоматически, как только подключаются слушатели сообщений.

Этот метод существует для паритета с API Web MessagePort. В Node.js он полезен только для игнорирования сообщений при отсутствии слушателей событий. Node.js также расходится в своей обработке .onmessage. Установка этого параметра автоматически вызывает .start(), но снятие этого параметра позволяет сообщениям стоять в очереди до тех пор, пока не будет установлен новый обработчик или порт не будет отброшен.

port.unref()

Вызов unref() на порту позволяет потоку выйти, если это единственный активный хэндл в системе событий. Если порт уже был unref()идентифицирован, повторный вызов unref() не имеет эффекта.

Если слушатели подключаются или удаляются с помощью .on('message'), порт автоматически ref()отключается и unref()отключается в зависимости от того, существуют ли слушатели для данного события.

Класс: Worker

Класс Worker представляет собой независимый поток выполнения JavaScript. Большинство API Node.js доступны внутри него.

Заметными отличиями внутри среды Worker являются:

  • Потоки process.stdin, process.stdout и process.stderr могут быть перенаправлены родительским потоком.
  • Свойство require('node:worker_threads').isMainThread установлено в false.
  • Порт сообщения require('node:worker_threads').parentPort доступен.
  • process.exit() не останавливает всю программу, только отдельный поток, а process.abort() недоступен.
  • process.chdir() и методы process, задающие идентификаторы групп или пользователей, недоступны.
  • process.env - это копия переменных окружения родительского потока, если не указано иное. Изменения в одной копии не видны в других потоках и не видны встроенным дополнениям (если только worker.SHARE_ENV не передан в качестве опции env конструктору Worker).
  • process.title не может быть изменен.
  • Сигналы не передаются через process.on('...').
  • Выполнение может остановиться в любой момент в результате вызова worker.terminate().
  • IPC-каналы от родительских процессов недоступны.
  • Модуль trace_events не поддерживается.
  • Нативные дополнения могут быть загружены из нескольких потоков только при выполнении определенных условий.

Создание экземпляров Worker внутри других Worker возможно.

Подобно Web Workers и модулю node:cluster, двусторонняя связь может быть достигнута посредством межпоточной передачи сообщений. Внутри Worker имеет встроенную пару MessagePort, которые уже связаны друг с другом при создании Worker. Хотя объект MessagePort на родительской стороне напрямую не раскрывается, его функциональность раскрывается через worker.postMessage() и событие worker.on('message') на объекте Worker для родительского потока.

Для создания пользовательских каналов обмена сообщениями (что рекомендуется вместо использования глобального канала по умолчанию, поскольку это облегчает разделение проблем) пользователи могут создать объект MessageChannel в любом потоке и передать один из MessagePort этого MessageChannel в другой поток через заранее существующий канал, например, глобальный.

Смотрите port.postMessage() для получения дополнительной информации о том, как передаются сообщения и какие значения JavaScript могут быть успешно переданы через барьер потоков.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const assert = require('node:assert');
const {
    Worker,
    MessageChannel,
    MessagePort,
    isMainThread,
    parentPort,
} = require('node:worker_threads');
if (isMainThread) {
    const worker = new Worker(__filename);
    const subChannel = new MessageChannel();
    worker.postMessage(
        { hereIsYourPort: subChannel.port1 },
        [subChannel.port1]
    );
    subChannel.port2.on('message', (value) => {
        console.log('received:', value);
    });
} else {
    parentPort.once('message', (value) => {
        assert(value.hereIsYourPort instanceof MessagePort);
        value.hereIsYourPort.postMessage(
            'рабочий отправляет это'
        );
        value.hereIsYourPort.close();
    });
}

new Worker(filename[, options])

  • filename <string> | <URL> Путь к основному скрипту или модулю Рабочего. Должен быть либо абсолютным путем, либо относительным путем (т.е. относительно текущего рабочего каталога), начинающимся с ./ или ../, либо объектом WHATWG URL, использующим протокол file: или data:. При использовании data: URL данные интерпретируются на основе MIME-типа с помощью ECMAScript module loader. Если options.eval имеет значение true, то это строка, содержащая код JavaScript, а не путь.
  • options <Object>
    • argv {любой[]} Список аргументов, которые будут строфицированы и добавлены в process.argv в рабочем. Это в основном похоже на workerData, но значения доступны в глобальном process.argv, как если бы они были переданы как опции CLI скрипту.
    • env <Object> Если установлено, определяет начальное значение process.env внутри потока Worker. В качестве специального значения можно использовать worker.SHARE_ENV, чтобы указать, что родительский и дочерний потоки должны совместно использовать свои переменные окружения; в этом случае изменения объекта process.env одного потока влияют и на другой поток. По умолчанию: process.env.
    • eval <boolean> Если true и первый аргумент является строкой, интерпретируйте первый аргумент конструктора как сценарий, который будет выполнен, как только рабочий будет запущен.
    • execArgv <string[]> Список опций CLI узла, передаваемых рабочему. Опции V8 (такие как --max-old-space-size) и опции, влияющие на процесс (такие как --title), не поддерживаются. Если опция задана, она передается как process.execArgv внутри рабочего. По умолчанию опции наследуются от родительского потока.
    • stdin <boolean> Если параметр имеет значение true, то worker.stdin предоставляет записываемый поток, содержимое которого отображается как process.stdin внутри рабочего. По умолчанию данные не предоставляются.
    • stdout <boolean> Если установлено значение true, то worker.stdout не будет автоматически передаваться в process.stdout родителя.
    • stderr <boolean> Если установлено значение true, то worker.stderr не будет автоматически передаваться в process.stderr родителя.
    • workerData <any> Любое значение JavaScript, которое клонируется и становится доступным как require('node:worker_threads').workerData. Клонирование происходит, как описано в HTML structured clone algorithm, и если объект не может быть клонирован (например, потому что он содержит функции), возникает ошибка.
    • trackUnmanagedFds <boolean> Если установлено значение true, то Worker отслеживает необработанные файловые дескрипторы, управляемые через fs.open() и fs.close(), и закрывает их при выходе Worker, аналогично другим ресурсам, таким как

Событие: error

Событие 'error' происходит, если рабочий поток бросает не пойманное исключение. В этом случае рабочий поток завершается.

Событие: exit

Событие 'exit' испускается, когда рабочий остановился. Если рабочий завершился вызовом process.exit(), то параметром exitCode будет переданный код завершения. Если рабочий был завершен, параметр exitCode равен 1.

Это последнее событие, выдаваемое любым экземпляром Worker.

Событие: message

  • значение <any> Переданное значение

Событие 'message' происходит, когда рабочий поток вызвал require('node:worker_threads').parentPort.postMessage(). Подробнее см. событие port.on('message').

Все сообщения, отправленные из рабочего потока, выдаются до того, как на объекте Worker произойдет событие 'exit'.

Событие: messageerror

  • error <Error> Объект ошибки

Событие 'messageerror' возникает, когда десериализация сообщения не удалась.

Событие: 'online'

Событие 'online' генерируется, когда рабочий поток начинает выполнять код JavaScript.

worker.getHeapSnapshot([options])

  • options <Object>
    • exposeInternals <boolean> Если true, раскрывать внутренние компоненты в снимке кучи. По умолчанию: false.
    • exposeNumericValues <boolean> Если true, раскрывать числовые значения в искусственных полях. По умолчанию: false.
  • Возвращает: <Promise> Обещание для читаемого потока, содержащего снимок кучи V8.

Возвращает читаемый поток для V8-снимка текущего состояния Worker. Подробнее см. в v8.getHeapSnapshot().

Если поток Worker больше не запущен, что может произойти до наступления события 'exit'``](#event-exit), возвращенныйPromiseнемедленно отклоняется с ошибкой [ERR_WORKER_NOT_RUNNING`.

worker.performance

Объект, который можно использовать для запроса информации о производительности рабочего экземпляра. Аналогичен perf_hooks.performance.

performance.eventLoopUtilization([utilization1[, utilization2]]).

  • utilization1 <Object> Результат предыдущего вызова eventLoopUtilization().
  • utilization2 <Object> Результат предыдущего вызова eventLoopUtilization() перед utilization1.
  • Возвращает <Object>

Тот же вызов, что и perf_hooks eventLoopUtilization(), за исключением того, что возвращаются значения рабочего экземпляра.

Одно из отличий заключается в том, что, в отличие от основного потока, загрузка рабочего выполняется в цикле событий. Поэтому использование цикла событий становится доступным сразу после того, как сценарий рабочего начинает выполняться.

Время idle, которое не увеличивается, не указывает на то, что рабочий застрял в bootstrap. В следующих примерах показано, как за все время работы рабочего никогда не накапливается время idle, но он все еще способен обрабатывать сообщения.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const {
    Worker,
    isMainThread,
    parentPort,
} = require('node:worker_threads');

if (isMainThread) {
    const worker = new Worker(__filename);
    setInterval(() => {
        worker.postMessage('hi');
        console.log(
            worker.performance.eventLoopUtilization()
        );
    }, 100).unref();
    return;
}

parentPort.on('message', () => console.log('msg')).unref();
(function r(n) {
    if (--n < 0) return;
    const t = Date.now();
    while (Date.now() - t < 300);
    setImmediate(r, n);
})(10);

Использование цикла событий для рабочего доступно только после испускания 'online' события, а если оно вызвано до этого или после 'exit' события, то все свойства имеют значение 0.

worker.postMessage(value[, transferList])

  • значение <any>
  • transferList {Object[]}

Отправка сообщения на рабочий, полученного через require('node:worker_threads').parentPort.on('message'). Подробнее см. в port.postMessage().

worker.ref()

В противоположность unref(), вызов ref() на ранее unref()ed worker не позволяет программе выйти, если это единственный оставшийся активный хэндл (поведение по умолчанию). Если рабочий был ref()ed, повторный вызов ref() не имеет никакого эффекта.

worker.resourceLimits

Предоставляет набор ограничений ресурсов JS-движка для этого потока Worker. Если параметр resourceLimits был передан конструктору Worker, то он соответствует его значениям.

Если рабочий остановлен, возвращаемое значение - пустой объект.

worker.stderr

Это читаемый поток, который содержит данные, записанные в process.stderr внутри рабочего потока. Если stderr: true не было передано в конструктор Worker, то данные передаются в поток process.stderr родительского потока.

worker.stdin

  • {null|stream.Writable}

Если конструктору Worker было передано значение stdin: true, то это поток с возможностью записи. Данные, записанные в этот поток, будут доступны в рабочем потоке как process.stdin.

worker.stdout

Это читаемый поток, который содержит данные, записанные в process.stdout внутри рабочего потока. Если в конструктор Worker не было передано stdout: true, то данные передаются в поток process.stdout родительского потока.

worker.terminate()

  • Возвращает: { Обещание}

Остановить выполнение JavaScript в рабочем потоке как можно скорее. Возвращает обещание для кода выхода, который выполняется, когда происходит событие 'exit''.

worker.threadId

Целочисленный идентификатор для ссылающегося потока. Внутри рабочего потока он доступен как require('node:worker_threads').threadId. Это значение уникально для каждого экземпляра Worker внутри одного процесса.

worker.unref()

Вызов unref() на рабочем позволяет потоку выйти, если это единственный активный хэндл в системе событий. Если рабочий уже был unref()ed, повторный вызов unref() не имеет никакого эффекта.

Примечания

Синхронная блокировка stdio

Worker использует передачу сообщений через {MessagePort} для реализации взаимодействия с stdio. Это означает, что вывод stdio, исходящий от Worker, может быть заблокирован синхронным кодом на принимающей стороне, который блокирует цикл событий Node.js.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { Worker, isMainThread } from 'worker_threads';

if (isMainThread) {
    new Worker(new URL(import.meta.url));
    for (let n = 0; n < 1e10; n++) {
        // Looping to simulate work.
    }
} else {
    // This output will be blocked by the for loop in the main thread.
    console.log('foo');
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
'use strict';

const {
    Worker,
    isMainThread,
} = require('node:worker_threads');

if (isMainThread) {
    new Worker(__filename);
    for (let n = 0; n < 1e10; n++) {
        // Looping to simulate work.
    }
} else {
    // This output will be blocked by the for loop in the main thread.
    console.log('foo');
}

Запуск рабочих потоков из скриптов предварительной загрузки

Будьте осторожны при запуске рабочих потоков из скриптов предварительной загрузки (скрипты, загруженные и запущенные с помощью флага командной строки -r). Если опция execArgv не установлена явно, новые рабочие потоки автоматически наследуют флаги командной строки от запущенного процесса и будут загружать те же сценарии предварительной загрузки, что и основной поток. Если сценарий предварительной загрузки безоговорочно запускает рабочий поток, каждый порожденный поток будет порождать другой, пока приложение не завершится.

Комментарии