Итерируемые потоки¶
Стабильность: 1 – Экспериментальная
Эта возможность не подпадает под правила семантического версионирования. Несовместимые назад изменения или удаление могут произойти в любом будущем релизе. Использовать такую возможность в production-окружении не рекомендуется.
Модуль node:stream/iter предоставляет потоковый API на основе итерируемых объектов вместо событийной иерархии классов Readable/Writable/Transform или интерфейсов Web Streams ReadableStream/WritableStream/TransformStream.
Модуль доступен только при включённом флаге CLI --experimental-stream-iter.
Потоки представлены как AsyncIterable<Uint8Array[]> (асинхронно) или Iterable<Uint8Array[]> (синхронно). Базовых классов для наследования нет — любой объект с протоколом итератора может участвовать. Преобразования — обычные функции или объекты с методом transform.
Данные передаются пакетами (Uint8Array[] за одну итерацию), чтобы амортизировать стоимость асинхронных операций.
1 2 3 4 5 6 7 | |
1 2 3 4 5 6 7 8 9 10 11 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | |
Основные понятия¶
Байтовые потоки¶
Все данные в этом API представлены как байты Uint8Array. Строки при передаче в from(), push() или pipeTo() автоматически кодируются в UTF-8. Это устраняет неоднозначность кодировок и позволяет передавать данные без копирования между потоками и нативным кодом.
Пакетирование¶
Каждая итерация выдаёт пакет — массив фрагментов Uint8Array (Uint8Array[]). Пакетирование амортизирует стоимость await и создания Promise на нескольких фрагментах. Потребитель, обрабатывающий по одному фрагменту, может просто обойти внутренний массив:
1 2 3 4 5 | |
1 2 3 4 5 6 7 | |
Преобразования¶
Преобразования бывают двух видов:
-
Без состояния — функция
(chunks, options) => result, вызываемая один раз на пакет. ПринимаетUint8Array[](илиnullкак сигнал сброса) и объектoptions. ВозвращаетUint8Array[],nullили итерируемое фрагментов. -
С состоянием — объект
{ transform(source, options) }, гдеtransform— генератор (синхронный или асинхронный), получающий весь восходящий итерируемый поток и объектoptions, и выдающий выход. Так делают сжатие, шифрование и любые преобразования, которым нужен буфер между пакетами.
В обоих случаях передаётся параметр options со свойством:
options.signalСигнал прерывания при отмене конвейера, ошибке или остановке чтения потребителем. Преобразования могут проверять signal.abortedили слушать событие'abort'для досрочной очистки.
Сигнал сброса (null) посылается после окончания источника, чтобы преобразования могли выдать хвостовые данные (например, подписи сжатия).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | |
Pull и push¶
API поддерживает две модели:
-
Pull — данные идут по требованию.
pull()иpullSync()создают ленивые конвейеры: источник читается только когда потребитель итерирует. -
Push — данные записываются явно.
push()создаёт пару writer/readable с обратным давлением. Writer записывает данные; readable потребляется как async iterable.
Обратное давление¶
У pull-потоков обратное давление естественное — темп задаёт потребитель, источник не читается быстрее, чем успевает обработка. Push-потокам нужно явное обратное давление: производитель и потребитель работают независимо. Параметры highWaterMark и backpressure у push(), broadcast() и share() задают поведение.
Двухбуферная модель¶
Push-потоки используют двухчастную буферизацию. Представьте ведро (slots), заполняемое через шланг (ожидающие записи), с поплавковым клапаном, который закрывается, когда ведро полно:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
-
Слоты (ведро) — данные, готовые потребителю, не больше
highWaterMark. Когда потребитель читает, он за раз опустошает все слоты в один пакет. -
Ожидающие записи (шланг) — записи, ждущие места в слотах. После того как потребитель опустошил буфер, ожидающие записи попадают в освободившиеся слоты и их промисы завершаются.
Как политики используют буферы:
| Политика | Лимит слотов | Лимит ожидающих записей |
|---|---|---|
'strict' | highWaterMark | highWaterMark |
'block' | highWaterMark | Без ограничения |
'drop-oldest' | highWaterMark | Н/д (никогда не ждёт) |
'drop-newest' | highWaterMark | Н/д (никогда не ждёт) |
Strict (по умолчанию)¶
Режим strict отсекает сценарии «записал и забыл», когда производитель вызывает write() без await, что вело бы к неограниченному росту памяти. Ограничиваются и буфер слотов, и очередь ожидающих записей значением highWaterMark.
Если каждая запись ожидается через await, одновременно может быть не больше одной ожидающей записи (вашей), лимит очереди не достигается. Неожидаемые записи накапливаются в очереди и при переполнении вызывают исключение:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
Забытый await в итоге приведёт к исключению:
1 2 3 4 5 | |
Block¶
В режиме block слоты ограничены highWaterMark, а очередь ожидающих записей не ограничена. Записи с await блокируются, пока потребитель не освободит место, как в strict. Отличие: неожидаемые записи бессрочно ставятся в очередь без исключения — возможна утечка памяти, если производитель забывает await.
Так по умолчанию ведут себя классические потоки Node.js и Web Streams. Используйте, когда контролируете производителя и он корректно ожидает записи, или при переносе кода с этих API.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
Drop-oldest¶
Записи никогда не ждут. Когда буфер слотов полон, самый старый фрагмент вытесняется, чтобы освободить место новой записи. Потребитель всегда видит наиболее свежие данные. Удобно для live-лент, телеметрии и сценариев, где устаревшие данные менее важны текущих.
1 2 3 4 5 6 7 | |
1 2 3 4 5 6 7 | |
Drop-newest¶
Записи никогда не ждут. Когда буфер слотов полон, входящая запись тихо отбрасывается. Потребитель обрабатывает уже буферизованное без лавины новых данных. Удобно для ограничения скорости или сброса нагрузки под давлением.
1 2 3 4 5 6 7 | |
1 2 3 4 5 6 7 | |
Интерфейс Writer¶
Writer — любой объект, соответствующий интерфейсу Writer. Обязателен только write(); остальные методы необязательны.
У каждого асинхронного метода есть синхронный вариант *Sync для схемы try-fallback: сначала быстрый синхронный путь, при необходимости — асинхронная версия, если синхронный вызов не смог завершиться:
1 2 3 4 | |
writer.desiredSize¶
- number | null
Число свободных слотов буфера до достижения high water mark. Возвращает null, если writer закрыт или потребитель отключился.
Значение всегда неотрицательно.