Перейти к содержанию

Итерируемые потоки

latest

Стабильность: 1 – Экспериментальная

Эта возможность не подпадает под правила семантического версионирования. Несовместимые назад изменения или удаление могут произойти в любом будущем релизе. Использовать такую возможность в production-окружении не рекомендуется.

Модуль node:stream/iter предоставляет потоковый API на основе итерируемых объектов вместо событийной иерархии классов Readable/Writable/Transform или интерфейсов Web Streams ReadableStream/WritableStream/TransformStream.

Модуль доступен только при включённом флаге CLI --experimental-stream-iter.

Потоки представлены как AsyncIterable<Uint8Array[]> (асинхронно) или Iterable<Uint8Array[]> (синхронно). Базовых классов для наследования нет — любой объект с протоколом итератора может участвовать. Преобразования — обычные функции или объекты с методом transform.

Данные передаются пакетами (Uint8Array[] за одну итерацию), чтобы амортизировать стоимость асинхронных операций.

1
2
3
4
5
6
7
import { from, pull, text } from 'node:stream/iter';
import { compressGzip, decompressGzip } from 'node:zlib/iter';

// Сжать и распаковать строку
const compressed = pull(from('Hello, world!'), compressGzip());
const result = await text(pull(compressed, decompressGzip()));
console.log(result); // 'Hello, world!'
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const { from, pull, text } = require('node:stream/iter');
const { compressGzip, decompressGzip } = require('node:zlib/iter');

async function run() {
  // Сжать и распаковать строку
  const compressed = pull(from('Hello, world!'), compressGzip());
  const result = await text(pull(compressed, decompressGzip()));
  console.log(result); // 'Hello, world!'
}

run().catch(console.error);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import { open } from 'node:fs/promises';
import { text, pipeTo } from 'node:stream/iter';
import { compressGzip, decompressGzip } from 'node:zlib/iter';

// Прочитать файл, сжать, записать в другой файл
const src = await open('input.txt', 'r');
const dst = await open('output.gz', 'w');
await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true }));
await src.close();

// Прочитать обратно
const gz = await open('output.gz', 'r');
console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
const { open } = require('node:fs/promises');
const { text, pipeTo } = require('node:stream/iter');
const { compressGzip, decompressGzip } = require('node:zlib/iter');

async function run() {
  // Прочитать файл, сжать, записать в другой файл
  const src = await open('input.txt', 'r');
  const dst = await open('output.gz', 'w');
  await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true }));
  await src.close();

  // Прочитать обратно
  const gz = await open('output.gz', 'r');
  console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));
}

run().catch(console.error);

Основные понятия

Байтовые потоки

Все данные в этом API представлены как байты Uint8Array. Строки при передаче в from(), push() или pipeTo() автоматически кодируются в UTF-8. Это устраняет неоднозначность кодировок и позволяет передавать данные без копирования между потоками и нативным кодом.

Пакетирование

Каждая итерация выдаёт пакет — массив фрагментов Uint8Array (Uint8Array[]). Пакетирование амортизирует стоимость await и создания Promise на нескольких фрагментах. Потребитель, обрабатывающий по одному фрагменту, может просто обойти внутренний массив:

1
2
3
4
5
for await (const batch of source) {
  for (const chunk of batch) {
    handle(chunk);
  }
}
1
2
3
4
5
6
7
async function run() {
  for await (const batch of source) {
    for (const chunk of batch) {
      handle(chunk);
    }
  }
}

Преобразования

Преобразования бывают двух видов:

  • Без состояния — функция (chunks, options) => result, вызываемая один раз на пакет. Принимает Uint8Array[] (или null как сигнал сброса) и объект options. Возвращает Uint8Array[], null или итерируемое фрагментов.

  • С состоянием — объект { transform(source, options) }, где transform — генератор (синхронный или асинхронный), получающий весь восходящий итерируемый поток и объект options, и выдающий выход. Так делают сжатие, шифрование и любые преобразования, которым нужен буфер между пакетами.

В обоих случаях передаётся параметр options со свойством:

  • options.signal Сигнал прерывания при отмене конвейера, ошибке или остановке чтения потребителем. Преобразования могут проверять signal.aborted или слушать событие 'abort' для досрочной очистки.

Сигнал сброса (null) посылается после окончания источника, чтобы преобразования могли выдать хвостовые данные (например, подписи сжатия).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Без состояния: преобразование в верхний регистр
const upper = (chunks) => {
  if (chunks === null) return null; // flush
  return chunks.map((c) => new TextEncoder().encode(
    new TextDecoder().decode(c).toUpperCase(),
  ));
};

// С состоянием: разбиение на строки
const lines = {
  transform: async function*(source) {
    let partial = '';
    for await (const chunks of source) {
      if (chunks === null) {
        if (partial) yield [new TextEncoder().encode(partial)];
        continue;
      }
      for (const chunk of chunks) {
        const str = partial + new TextDecoder().decode(chunk);
        const parts = str.split('\n');
        partial = parts.pop();
        for (const line of parts) {
          yield [new TextEncoder().encode(`${line}\n`)];
        }
      }
    }
  },
};

Pull и push

API поддерживает две модели:

  • Pull — данные идут по требованию. pull() и pullSync() создают ленивые конвейеры: источник читается только когда потребитель итерирует.

  • Push — данные записываются явно. push() создаёт пару writer/readable с обратным давлением. Writer записывает данные; readable потребляется как async iterable.

Обратное давление

У pull-потоков обратное давление естественное — темп задаёт потребитель, источник не читается быстрее, чем успевает обработка. Push-потокам нужно явное обратное давление: производитель и потребитель работают независимо. Параметры highWaterMark и backpressure у push(), broadcast() и share() задают поведение.

Двухбуферная модель

Push-потоки используют двухчастную буферизацию. Представьте ведро (slots), заполняемое через шланг (ожидающие записи), с поплавковым клапаном, который закрывается, когда ведро полно:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
                          highWaterMark (напр., 3)
                                 |
    Производитель                v
       |                    +---------+
       v                    |         |
  [ write() ] ----+    +--->| слоты   |---> Потребитель читает
  [ write() ]     |    |    | (ведро) |     for await (...)
  [ write() ]     v    |    +---------+
              +--------+         ^
              | ожидающие|        |
              | записи   |   поплавковый клапан
              | (шланг)  |   (обратное давление)
              +--------+
                   ^
                   |
          режим 'strict' ограничивает и это!
  • Слоты (ведро) — данные, готовые потребителю, не больше highWaterMark. Когда потребитель читает, он за раз опустошает все слоты в один пакет.

  • Ожидающие записи (шланг) — записи, ждущие места в слотах. После того как потребитель опустошил буфер, ожидающие записи попадают в освободившиеся слоты и их промисы завершаются.

Как политики используют буферы:

Политика Лимит слотов Лимит ожидающих записей
'strict' highWaterMark highWaterMark
'block' highWaterMark Без ограничения
'drop-oldest' highWaterMark Н/д (никогда не ждёт)
'drop-newest' highWaterMark Н/д (никогда не ждёт)

Strict (по умолчанию)

Режим strict отсекает сценарии «записал и забыл», когда производитель вызывает write() без await, что вело бы к неограниченному росту памяти. Ограничиваются и буфер слотов, и очередь ожидающих записей значением highWaterMark.

Если каждая запись ожидается через await, одновременно может быть не больше одной ожидающей записи (вашей), лимит очереди не достигается. Неожидаемые записи накапливаются в очереди и при переполнении вызывают исключение:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import { push, text } from 'node:stream/iter';

const { writer, readable } = push({ highWaterMark: 16 });

// Потребитель должен работать параллельно — иначе первая запись,
// заполнившая буфер, навсегда заблокирует производителя.
const consuming = text(readable);

// ХОРОШО: await у записей. Производитель ждёт, пока потребитель
// освободит место, когда буфер полон.
for (const item of dataset) {
  await writer.write(item);
}
await writer.end();
console.log(await consuming);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
const { push, text } = require('node:stream/iter');

async function run() {
  const { writer, readable } = push({ highWaterMark: 16 });

  // Потребитель должен работать параллельно — иначе первая запись,
  // заполнившая буфер, навсегда заблокирует производителя.
  const consuming = text(readable);

  // ХОРОШО: await у записей. Производитель ждёт, пока потребитель
  // освободит место, когда буфер полон.
  for (const item of dataset) {
    await writer.write(item);
  }
  await writer.end();
  console.log(await consuming);
}

run().catch(console.error);

Забытый await в итоге приведёт к исключению:

1
2
3
4
5
// ПЛОХО: fire-and-forget. В strict оба буфера переполняются.
for (const item of dataset) {
  writer.write(item); // без await — очередь без границ
}
// --> выбрасывается "Backpressure violation: too many pending writes"

Block

В режиме block слоты ограничены highWaterMark, а очередь ожидающих записей не ограничена. Записи с await блокируются, пока потребитель не освободит место, как в strict. Отличие: неожидаемые записи бессрочно ставятся в очередь без исключения — возможна утечка памяти, если производитель забывает await.

Так по умолчанию ведут себя классические потоки Node.js и Web Streams. Используйте, когда контролируете производителя и он корректно ожидает записи, или при переносе кода с этих API.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import { push, text } from 'node:stream/iter';

const { writer, readable } = push({
  highWaterMark: 16,
  backpressure: 'block',
});

const consuming = text(readable);

// Безопасно — await блокирует до чтения потребителем.
for (const item of dataset) {
  await writer.write(item);
}
await writer.end();
console.log(await consuming);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
const { push, text } = require('node:stream/iter');

async function run() {
  const { writer, readable } = push({
    highWaterMark: 16,
    backpressure: 'block',
  });

  const consuming = text(readable);

  // Безопасно — await блокирует до чтения потребителем.
  for (const item of dataset) {
    await writer.write(item);
  }
  await writer.end();
  console.log(await consuming);
}

run().catch(console.error);

Drop-oldest

Записи никогда не ждут. Когда буфер слотов полон, самый старый фрагмент вытесняется, чтобы освободить место новой записи. Потребитель всегда видит наиболее свежие данные. Удобно для live-лент, телеметрии и сценариев, где устаревшие данные менее важны текущих.

1
2
3
4
5
6
7
import { push } from 'node:stream/iter';

// Хранить только 5 последних измерений
const { writer, readable } = push({
  highWaterMark: 5,
  backpressure: 'drop-oldest',
});
1
2
3
4
5
6
7
const { push } = require('node:stream/iter');

// Хранить только 5 последних измерений
const { writer, readable } = push({
  highWaterMark: 5,
  backpressure: 'drop-oldest',
});

Drop-newest

Записи никогда не ждут. Когда буфер слотов полон, входящая запись тихо отбрасывается. Потребитель обрабатывает уже буферизованное без лавины новых данных. Удобно для ограничения скорости или сброса нагрузки под давлением.

1
2
3
4
5
6
7
import { push } from 'node:stream/iter';

// До 10 элементов в буфере; остальное отбрасывается
const { writer, readable } = push({
  highWaterMark: 10,
  backpressure: 'drop-newest',
});
1
2
3
4
5
6
7
const { push } = require('node:stream/iter');

// До 10 элементов в буфере; остальное отбрасывается
const { writer, readable } = push({
  highWaterMark: 10,
  backpressure: 'drop-newest',
});

Интерфейс Writer

Writer — любой объект, соответствующий интерфейсу Writer. Обязателен только write(); остальные методы необязательны.

У каждого асинхронного метода есть синхронный вариант *Sync для схемы try-fallback: сначала быстрый синхронный путь, при необходимости — асинхронная версия, если синхронный вызов не смог завершиться:

1
2
3
4
if (!writer.writeSync(chunk)) await writer.write(chunk);
if (!writer.writevSync(chunks)) await writer.writev(chunks);
if (writer.endSync() < 0) await writer.end();
writer.fail(err);  // всегда синхронно, без запасного пути

writer.desiredSize

Число свободных слотов буфера до достижения high water mark. Возвращает null, если writer закрыт или потребитель отключился.

Значение всегда неотрицательно.

writer.end([options])

Комментарии