Перейти к содержанию

Web Streams API

latest

Добавлено в: v16.5.0

Стабильность: 2 – Стабильная

АПИ является удовлетворительным. Совместимость с NPM имеет высший приоритет и не будет нарушена кроме случаев явной необходимости.

Реализация стандарта WHATWG Streams.

Обзор

Стандарт WHATWG Streams («веб-потоки») задаёт API для работы с потоковыми данными. Он близок к API Streams в Node.js, но появился позже и стал распространённым «стандартным» API потоков во многих средах JavaScript.

Три основных типа объектов:

  • ReadableStream — источник потоковых данных.
  • WritableStream — приёмник потоковых данных.
  • TransformStream — алгоритм преобразования потоковых данных.

Пример ReadableStream

Пример создаёт простой ReadableStream, который каждую секунду помещает в очередь текущее значение performance.now(). Для чтения используется асинхронный итератор.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import {
  ReadableStream,
} from 'node:stream/web';

import {
  setInterval as every,
} from 'node:timers/promises';

import {
  performance,
} from 'node:perf_hooks';

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

for await (const value of stream)
  console.log(value);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const {
  ReadableStream,
} = require('node:stream/web');

const {
  setInterval: every,
} = require('node:timers/promises');

const {
  performance,
} = require('node:perf_hooks');

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

(async () => {
  for await (const value of stream)
    console.log(value);
})();

Совместимость с потоками Node.js

Потоки Node.js можно преобразовать в веб-потоки и обратно методами toWeb и fromWeb у stream.Readable, stream.Writable и stream.Duplex.

Подробнее в соответствующих разделах:

API

Класс: ReadableStream

Добавлено в: v16.5.0

new ReadableStream([underlyingSource [, strategy]])

  • underlyingSource <Object>
  • start <Function> Пользовательская функция, вызываемая сразу при создании ReadableStream.
  • pull <Function> Пользовательская функция, вызываемая повторно, пока внутренняя очередь ReadableStream не заполнена. Операция может быть синхронной или асинхронной. Если асинхронная, функция не вызывается снова, пока не выполнится ранее возвращённый промис.
  • cancel <Function> Пользовательская функция, вызываемая при отмене ReadableStream.
    • reason <any>
    • Возвращает: промис, выполняемый с undefined.
  • type <string> Должно быть 'bytes' или undefined.
  • autoAllocateChunkSize <number> Используется только при type, равном 'bytes'. При ненулевом значении буфер представления автоматически выделяется для ReadableByteStreamController.byobRequest. Если не задано, данные передаются через внутренние очереди потока и обычный читатель ReadableStreamDefaultReader.
  • strategy <Object>
  • highWaterMark <number> Максимальный размер внутренней очереди до срабатывания обратного давления.
  • size <Function> Пользовательская функция для определения размера каждого фрагмента данных.

readableStream.locked

Свойство readableStream.locked по умолчанию false и становится true, пока активный читатель потребляет данные потока.

readableStream.cancel([reason])

  • reason <any>
  • Возвращает: промис, выполняемый с undefined после завершения отмены.

readableStream.getReader([options])

1
2
3
4
5
6
7
import { ReadableStream } from 'node:stream/web';

const stream = new ReadableStream();

const reader = stream.getReader();

console.log(await reader.read());
1
2
3
4
5
6
7
const { ReadableStream } = require('node:stream/web');

const stream = new ReadableStream();

const reader = stream.getReader();

reader.read().then(console.log);

Устанавливает readableStream.locked в true.

readableStream.pipeThrough(transform[, options])

  • transform <Object>
  • readable <ReadableStream> ReadableStream, в который transform.writable помещает возможно изменённые данные, полученные из этого ReadableStream.
  • writable <WritableStream> WritableStream, в который записываются данные этого ReadableStream.
  • options <Object>
  • preventAbort <boolean> Если true, ошибки в этом ReadableStream не приводят к прерыванию transform.writable.
  • preventCancel <boolean> Если true, ошибки в целевом transform.writable не отменяют этот ReadableStream.
  • preventClose <boolean> Если true, закрытие этого ReadableStream не закрывает transform.writable.
  • signal <AbortSignal> Позволяет отменить передачу данных через AbortController.
  • Возвращает: <ReadableStream> из transform.readable.

Соединяет этот ReadableStream с парой ReadableStream и WritableStream из аргумента transform: данные из этого ReadableStream записываются в transform.writable, при необходимости преобразуются и попадают в transform.readable. После настройки конвейера возвращается transform.readable.

Пока активна операция pipe, readableStream.locked равен true.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import {
  ReadableStream,
  TransformStream,
} from 'node:stream/web';

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

for await (const chunk of transformedStream)
  console.log(chunk);
  // Prints: A
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const {
  ReadableStream,
  TransformStream,
} = require('node:stream/web');

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

(async () => {
  for await (const chunk of transformedStream)
    console.log(chunk);
    // Prints: A
})();

readableStream.pipeTo(destination[, options])

  • destination <WritableStream> WritableStream, в который записываются данные этого ReadableStream.
  • options <Object>
  • preventAbort <boolean> Если true, ошибки в этом ReadableStream не приводят к прерыванию destination.
  • preventCancel <boolean> Если true, ошибки в destination не отменяют этот ReadableStream.
  • preventClose <boolean> Если true, закрытие этого ReadableStream не закрывает destination.
  • signal <AbortSignal> Позволяет отменить передачу данных через AbortController.
  • Возвращает: промис, выполняемый с undefined

Пока активна операция pipe, readableStream.locked равен true.

readableStream.tee()

Добавлено в: v16.5.0

Возвращает пару новых экземпляров ReadableStream, в которые пересылаются данные этого ReadableStream. Оба получают одинаковые данные.

Устанавливает readableStream.locked в true.

readableStream.values([options])

  • options <Object>
  • preventCancel <boolean> Если true, ReadableStream не закрывается при резком завершении асинхронного итератора. По умолчанию: false.

Создаёт и возвращает асинхронный итератор для чтения данных этого ReadableStream.

Пока активен асинхронный итератор, readableStream.locked равен true.

1
2
3
4
5
6
import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream.values({ preventCancel: true }))
  console.log(Buffer.from(chunk).toString());

Асинхронная итерация

Объект ReadableStream поддерживает протокол асинхронного итератора через синтаксис for await.

1
2
3
4
5
6
import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream)
  console.log(Buffer.from(chunk).toString());

Асинхронный итератор читает ReadableStream до его завершения.

По умолчанию при раннем выходе из итератора (break, return или throw) ReadableStream закрывается. Чтобы не закрывать ReadableStream автоматически, получите итератор через readableStream.values() и установите опцию preventCancel в true.

ReadableStream не должен быть заблокирован (не должно быть активного читателя). На время асинхронной итерации ReadableStream блокируется.

Передача через postMessage()

Экземпляр ReadableStream можно передать через MessagePort.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const stream = new ReadableStream(getReadableSourceSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getReader().read().then((chunk) => {
    console.log(chunk);
  });
};

port2.postMessage(stream, [stream]);

ReadableStream.from(iterable)

  • iterable <Iterable> Объект, реализующий протокол итерируемости Symbol.asyncIterator или Symbol.iterator.

Вспомогательный метод создаёт новый ReadableStream из итерируемого объекта.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import { ReadableStream } from 'node:stream/web';

async function* asyncIterableGenerator() {
  yield 'a';
  yield 'b';
  yield 'c';
}

const stream = ReadableStream.from(asyncIterableGenerator());

for await (const chunk of stream)
  console.log(chunk); // Prints: 'a', 'b', 'c'
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
const { ReadableStream } = require('node:stream/web');

async function* asyncIterableGenerator() {
  yield 'a';
  yield 'b';
  yield 'c';
}

(async () => {
  const stream = ReadableStream.from(asyncIterableGenerator());

  for await (const chunk of stream)
    console.log(chunk); // Prints: 'a', 'b', 'c'
})();

Чтобы направить получившийся ReadableStream в WritableStream, Iterable должен отдавать последовательность объектов Buffer, TypedArray или DataView.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import { ReadableStream } from 'node:stream/web';
import { Buffer } from 'node:buffer';

async function* asyncIterableGenerator() {
  yield Buffer.from('a');
  yield Buffer.from('b');
  yield Buffer.from('c');
}

const stream = ReadableStream.from(asyncIterableGenerator());

await stream.pipeTo(createWritableStreamSomehow());
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
const { ReadableStream } = require('node:stream/web');
const { Buffer } = require('node:buffer');

async function* asyncIterableGenerator() {
  yield Buffer.from('a');
  yield Buffer.from('b');
  yield Buffer.from('c');
}

const stream = ReadableStream.from(asyncIterableGenerator());

(async () => {
  await stream.pipeTo(createWritableStreamSomehow());
})();

Класс: ReadableStreamDefaultReader

Добавлено в: v16.5.0

По умолчанию вызов readableStream.getReader() без аргументов возвращает экземпляр ReadableStreamDefaultReader. Обычный читатель обрабатывает фрагменты данных как непрозрачные значения, поэтому ReadableStream может работать с любыми значениями JavaScript.

new ReadableStreamDefaultReader(stream)

Создаёт новый ReadableStreamDefaultReader, привязанный к заданному ReadableStream.

readableStreamDefaultReader.cancel([reason])

  • reason <any>
  • Возвращает: промис, выполняемый с undefined.

Отменяет ReadableStream и возвращает промис, выполняемый после отмены нижележащего потока.

readableStreamDefaultReader.closed

  • Тип: <Promise> Выполняется с undefined, когда связанный ReadableStream закрыт, или отклоняется при ошибке потока или снятии блокировки читателя до завершения закрытия.

readableStreamDefaultReader.read()

  • Возвращает: промис, выполняемый с объектом:
  • value <any>
  • done <boolean>

Запрашивает следующий фрагмент данных из нижележащего ReadableStream и возвращает промис, выполняемый, когда данные доступны.

readableStreamDefaultReader.releaseLock()

Снимает блокировку этого читателя с нижележащего ReadableStream.

Класс: ReadableStreamBYOBReader

Добавлено в: v16.5.0

ReadableStreamBYOBReader — альтернативный потребитель для байто-ориентированных ReadableStream (создаются с underlyingSource.type, равным 'bytes' при создании ReadableStream).

BYOB — сокращение от «bring your own buffer». Это шаблон более эффективного чтения байтовых данных без лишнего копирования.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import {
  open,
} from 'node:fs/promises';

import {
  ReadableStream,
} from 'node:stream/web';

import { Buffer } from 'node:buffer';

class Source {
  type = 'bytes';
  autoAllocateChunkSize = 1024;

  async start(controller) {
    this.file = await open(new URL(import.meta.url));
    this.controller = controller;
  }

  async pull(controller) {
    const view = controller.byobRequest?.view;
    const {
      bytesRead,
    } = await this.file.read({
      buffer: view,
      offset: view.byteOffset,
      length: view.byteLength,
    });

    if (bytesRead === 0) {
      await this.file.close();
      this.controller.close();
    }
    controller.byobRequest.respond(bytesRead);
  }
}

const stream = new ReadableStream(new Source());

async function read(stream) {
  const reader = stream.getReader({ mode: 'byob' });

  const chunks = [];
  let result;
  do {
    result = await reader.read(Buffer.alloc(100));
    if (result.value !== undefined)
      chunks.push(Buffer.from(result.value));
  } while (!result.done);

  return Buffer.concat(chunks);
}

const data = await read(stream);
console.log(Buffer.from(data).toString());

new ReadableStreamBYOBReader(stream)

Создаёт новый ReadableStreamBYOBReader, привязанный к заданному ReadableStream.

readableStreamBYOBReader.cancel([reason])

  • reason <any>
  • Возвращает: промис, выполняемый с undefined.

Отменяет ReadableStream и возвращает промис, выполняемый после отмены нижележащего потока.

readableStreamBYOBReader.closed

  • Тип: <Promise> Выполняется с undefined, когда связанный ReadableStream закрыт, или отклоняется при ошибке потока или снятии блокировки читателя до завершения закрытия.

readableStreamBYOBReader.read(view[, options])

Добавлено в: v16.5.0

  • view <Buffer> | <TypedArray> | <DataView>
  • options <Object>
  • min <number> Если задано, промис выполнится только когда доступно не менее min элементов. Если не задано, промис выполняется, когда доступен хотя бы один элемент.
  • Возвращает: промис, выполняемый с объектом:
  • value <TypedArray> | <DataView>
  • done <boolean>

Запрашивает следующий фрагмент данных из нижележащего ReadableStream и возвращает промис, выполняемый, когда данные доступны.

Не передавайте в этот метод pooled-экземпляр Buffer. Pooled-Buffer создаются через Buffer.allocUnsafe(), Buffer.from() или часто возвращаются колбэками модуля node:fs. Такие Buffer разделяют общий ArrayBuffer, в котором лежат данные всех pooled-Buffer. Когда в readableStreamBYOBReader.read() передаётся Buffer, TypedArray или DataView, у представления отсоединяется (detach) базовый ArrayBuffer, с чего снимается действительность всех существующих представлений на этом ArrayBuffer. Это может привести к серьёзным сбоям в приложении.

readableStreamBYOBReader.releaseLock()

Снимает блокировку этого читателя с нижележащего ReadableStream.

Класс: ReadableStreamDefaultController

У каждого ReadableStream есть контроллер, отвечающий за внутреннее состояние и очередь потока. ReadableStreamDefaultController — контроллер по умолчанию для не байто-ориентированных ReadableStream.

readableStreamDefaultController.close()

Закрывает ReadableStream, с которым связан этот контроллер.

readableStreamDefaultController.desiredSize

Возвращает объём данных, которого не хватает до заполнения очереди ReadableStream.

readableStreamDefaultController.enqueue([chunk])

Добавляет новый фрагмент данных в очередь ReadableStream.

readableStreamDefaultController.error([error])

Сообщает об ошибке: ReadableStream переходит в ошибку и закрывается.

Класс: ReadableByteStreamController

Добавлено в: v16.5.0

У каждого ReadableStream есть контроллер, отвечающий за внутреннее состояние и очередь потока. ReadableByteStreamController — для байто-ориентированных ReadableStream.

readableByteStreamController.byobRequest

readableByteStreamController.close()

Закрывает ReadableStream, с которым связан этот контроллер.

readableByteStreamController.desiredSize

Возвращает объём данных, которого не хватает до заполнения очереди ReadableStream.

readableByteStreamController.enqueue(chunk)

Добавляет новый фрагмент данных в очередь ReadableStream.

readableByteStreamController.error([error])

Сообщает об ошибке: ReadableStream переходит в ошибку и закрывается.

Класс: ReadableStreamBYOBRequest

Добавлено в: v16.5.0

При использовании ReadableByteStreamController в байто-ориентированных потоках и при использовании ReadableStreamBYOBReader свойство readableByteStreamController.byobRequest даёт доступ к экземпляру ReadableStreamBYOBRequest, соответствующему текущему запросу чтения. Объект нужен для доступа к ArrayBuffer/TypedArray, выделенным под заполнение при чтении, и содержит методы сигнализации о том, что данные уже записаны.

readableStreamBYOBRequest.respond(bytesWritten)

Сообщает, что в readableStreamBYOBRequest.view записано bytesWritten байт.

readableStreamBYOBRequest.respondWithNewView(view)

Сообщает, что запрос выполнен: данные записаны в новый Buffer, TypedArray или DataView.

readableStreamBYOBRequest.view

Класс: WritableStream

Добавлено в: v16.5.0

WritableStream — приёмник, в который отправляются данные потока.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import {
  WritableStream,
} from 'node:stream/web';

const stream = new WritableStream({
  write(chunk) {
    console.log(chunk);
  },
});

await stream.getWriter().write('Hello World');

new WritableStream([underlyingSink[, strategy]])

  • underlyingSink <Object>
  • start <Function> Пользовательская функция, вызываемая сразу при создании WritableStream.
  • write <Function> Пользовательская функция, вызываемая при записи фрагмента данных в WritableStream.
  • close <Function> Пользовательская функция, вызываемая при закрытии WritableStream.
    • Возвращает: промис, выполняемый с undefined.
  • abort <Function> Пользовательская функция для немедленного закрытия WritableStream.
    • reason <any>
    • Возвращает: промис, выполняемый с undefined.
  • type <any> Опция type зарезервирована и должна быть undefined.
  • strategy <Object>
  • highWaterMark <number> Максимальный размер внутренней очереди до срабатывания обратного давления.
  • size <Function> Пользовательская функция для определения размера каждого фрагмента данных.

writableStream.abort([reason])

  • reason <any>
  • Возвращает: промис, выполняемый с undefined.

Немедленно завершает WritableStream. Все запросы записи в очереди отменяются, связанные с ними промисы отклоняются.

writableStream.close()

  • Возвращает: промис, выполняемый с undefined.

Закрывает WritableStream, когда дальнейшая запись не ожидается.

writableStream.getWriter()

Создаёт и возвращает новый writer для записи данных в WritableStream.

writableStream.locked

Свойство writableStream.locked по умолчанию false и становится true, пока к этому WritableStream привязан активный writer.

Передача через postMessage()

Экземпляр WritableStream можно передать через MessagePort.

1
2
3
4
5
6
7
8
9
const stream = new WritableStream(getWritableSinkSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getWriter().write('hello');
};

port2.postMessage(stream, [stream]);

Класс: WritableStreamDefaultWriter

Добавлено в: v16.5.0

new WritableStreamDefaultWriter(stream)

Создаёт новый WritableStreamDefaultWriter, привязанный к заданному WritableStream.

writableStreamDefaultWriter.abort([reason])

  • reason <any>
  • Возвращает: промис, выполняемый с undefined.

Немедленно завершает WritableStream. Все запросы записи в очереди отменяются, связанные с ними промисы отклоняются.

writableStreamDefaultWriter.close()

  • Возвращает: промис, выполняемый с undefined.

Закрывает WritableStream, когда дальнейшая запись не ожидается.

writableStreamDefaultWriter.closed

  • Тип: <Promise> Выполняется с undefined, когда связанный WritableStream закрыт, или отклоняется при ошибке потока или снятии блокировки writer до завершения закрытия.

writableStreamDefaultWriter.desiredSize

Объём данных, необходимый для заполнения очереди WritableStream.

writableStreamDefaultWriter.ready

  • Тип: <Promise> Выполняется со значением undefined, когда writer готов к использованию.

writableStreamDefaultWriter.releaseLock()

Снимает блокировку этого writer с нижележащего WritableStream.

writableStreamDefaultWriter.write([chunk])

  • chunk <any>
  • Возвращает: промис, выполняемый с undefined.

Ставит в очередь новый фрагмент данных для записи в WritableStream.

Класс: WritableStreamDefaultController

Добавлено в: v16.5.0

WritableStreamDefaultController управляет внутренним состоянием WritableStream.

writableStreamDefaultController.error([error])

Вызывается из кода пользователя, чтобы сообщить об ошибке при обработке данных WritableStream. При вызове WritableStream прерывается, текущие запросы записи отменяются.

writableStreamDefaultController.signal

  • Тип: <AbortSignal> AbortSignal для отмены ожидающих операций записи или закрытия при прерывании WritableStream.

Класс: TransformStream

Добавлено в: v16.5.0

TransformStream объединяет ReadableStream и WritableStream, соединённые так, что данные, записанные в WritableStream, поступают и при необходимости преобразуются перед помещением в очередь ReadableStream.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import {
  TransformStream,
} from 'node:stream/web';

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

await Promise.all([
  transform.writable.getWriter().write('A'),
  transform.readable.getReader().read(),
]);

new TransformStream([transformer[, writableStrategy[, readableStrategy]]])

Добавлено в: v16.5.0

  • transformer <Object>
  • start <Function> Пользовательская функция, вызываемая сразу при создании TransformStream.
  • transform <Function> Пользовательская функция: получает и при необходимости изменяет фрагмент данных, записанный в transformStream.writable, затем передаёт его в transformStream.readable.
  • flush <Function> Пользовательская функция, вызываемая непосредственно перед закрытием записывающей стороны TransformStream, сигнализируя о завершении преобразования.
  • cancel <Function> Пользовательская функция при отмене читающей стороны TransformStream или прерывании записывающей.
    • reason <any>
    • Возвращает: промис, выполняемый с undefined.
  • readableType <any> опция readableType зарезервирована и должна быть undefined.
  • writableType <any> опция writableType зарезервирована и должна быть undefined.
  • writableStrategy <Object>
  • highWaterMark <number> Максимальный размер внутренней очереди до срабатывания обратного давления.
  • size <Function> Пользовательская функция для определения размера каждого фрагмента данных.
  • readableStrategy <Object>
  • highWaterMark <number> Максимальный размер внутренней очереди до срабатывания обратного давления.
  • size <Function> Пользовательская функция для определения размера каждого фрагмента данных.

transformStream.readable

transformStream.writable

Передача через postMessage()

Экземпляр TransformStream можно передать через MessagePort.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
const stream = new TransformStream();

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  const { writable, readable } = data;
  // ...
};

port2.postMessage(stream, [stream]);

Класс: TransformStreamDefaultController

Добавлено в: v16.5.0

TransformStreamDefaultController управляет внутренним состоянием TransformStream.

transformStreamDefaultController.desiredSize

Объём данных, необходимый для заполнения очереди читающей стороны.

transformStreamDefaultController.enqueue([chunk])

Добавляет фрагмент данных в очередь читающей стороны.

transformStreamDefaultController.error([reason])

Сообщает об ошибке на читающей и записывающей сторонах при обработке данных трансформации; обе стороны немедленно закрываются.

transformStreamDefaultController.terminate()

Закрывает читающую сторону и приводит к немедленному закрытию записывающей стороны с ошибкой.

Класс: ByteLengthQueuingStrategy

Добавлено в: v16.5.0

new ByteLengthQueuingStrategy(init)

byteLengthQueuingStrategy.highWaterMark

byteLengthQueuingStrategy.size

Класс: CountQueuingStrategy

Добавлено в: v16.5.0

new CountQueuingStrategy(init)

countQueuingStrategy.highWaterMark

countQueuingStrategy.size

Класс: TextEncoderStream

Добавлено в: v16.6.0

new TextEncoderStream()

Создаёт новый экземпляр TextEncoderStream.

textEncoderStream.encoding

Кодировка, поддерживаемая экземпляром TextEncoderStream.

textEncoderStream.readable

textEncoderStream.writable

Класс: TextDecoderStream

Добавлено в: v16.6.0

new TextDecoderStream([encoding[, options]])

  • encoding <string> Кодировка, которую поддерживает этот TextDecoder. По умолчанию: 'utf-8'.
  • options <Object>
  • fatal <boolean> true, если ошибки декодирования фатальны.
  • ignoreBOM <boolean> Если true, TextDecoderStream включает метку порядка байтов в результат. Если false, метка удаляется из вывода. Опция используется только при encoding 'utf-8', 'utf-16be' или 'utf-16le'. По умолчанию: false.

Создаёт новый экземпляр TextDecoderStream.

textDecoderStream.encoding

Кодировка, поддерживаемая экземпляром TextDecoderStream.

textDecoderStream.fatal

true, если при ошибках декодирования выбрасывается TypeError.

textDecoderStream.ignoreBOM

true, если в результат декодирования включается метка порядка байтов.

textDecoderStream.readable

textDecoderStream.writable

Класс: CompressionStream

Добавлено в: v17.0.0

new CompressionStream(format)

Добавлено в: v17.0.0

  • format <string> Одно из 'deflate', 'deflate-raw', 'gzip' или 'brotli'.

compressionStream.readable

compressionStream.writable

Класс: DecompressionStream

Добавлено в: v17.0.0

new DecompressionStream(format)

Добавлено в: v17.0.0

  • format <string> Одно из 'deflate', 'deflate-raw', 'gzip' или 'brotli'.

decompressionStream.readable

decompressionStream.writable

Утилиты-потребители

Вспомогательные функции-потребители задают общие варианты чтения потоков.

Импорт:

1
2
3
4
5
6
7
import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';
1
2
3
4
5
6
7
const {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} = require('node:stream/consumers');

streamConsumers.arrayBuffer(stream)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');

const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { TextEncoder } = require('node:util');

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
  console.log(`from readable: ${data.byteLength}`);
  // Prints: from readable: 76
});

streamConsumers.blob(stream)

1
2
3
4
5
6
7
8
import { blob } from 'node:stream/consumers';

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27
1
2
3
4
5
6
7
8
9
const { blob } = require('node:stream/consumers');

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
blob(readable).then((data) => {
  console.log(`from readable: ${data.size}`);
  // Prints: from readable: 27
});

streamConsumers.buffer(stream)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});

streamConsumers.bytes(stream)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import { bytes } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
const data = await bytes(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const { bytes } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
bytes(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});

streamConsumers.json(stream)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 100
});

streamConsumers.text(stream)

1
2
3
4
5
6
7
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
1
2
3
4
5
6
7
8
const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});

Комментарии