Writable streams Node.js: backpressure и drain¶
Источник: theNodeBook — Writable Streams
Writable stream в Node.js — это конечный автомат на стороне потребителя для данных по частям (chunk). Механика строится вокруг обратного давления (backpressure). write(chunk) принимает данные, складывает ожидающие записи во внутреннюю очередь и в итоге передаёт каждый chunk нижележащему приёмнику — файлу, сокету, потоку сжатия или кастомному назначению. Возвращаемое значение сообщает производителю, ниже ли буфер порога.
Что такое Writable stream в Node.js¶
write(), возвращающий false, означает: вызывающий код должен дождаться drain, прежде чем слать ещё. Поток сохраняет порядок записей, отслеживает колбэки и координирует финальные записи через end(). Кастомные Writable реализуют _write() или _writev() и вызывают переданный колбэк, когда приёмник принял chunk.
Вы уже видели, как работают Readable streams: внутренние буферы, переключение режимов, доставка данных потребителю. Теперь нужно развернуть перспективу и посмотреть на другую сторону уравнения потоков: куда уходят данные после того, как их произвели?
Это область Writable streams. Если Readable — про извлечение данных из источника, Writable — про запись в назначение. Файлы на диске, сетевые сокеты, HTTP‑ответы, алгоритмы сжатия, подключения к БД — везде, где вы отправляете данные по частям, вы работаете с той или иной формой Writable stream.
Запись в Writable stream имеет обратную связь. write() принимает данные и возвращает boolean о состоянии буфера на стороне writable. Если игнорировать этот сигнал, процесс начнёт потреблять больше памяти, чем ожидаете. Этот сигнал — backpressure, и продакшен‑код, который обрабатывает потоки данных, обязан его учитывать.
Ниже — Writable streams с нуля: класс Writable (опции, события, смысл возвращаемого значения write()), затем backpressure (зачем он нужен, как внутренняя буферизация его создаёт и что будет, если его игнорировать), реализация кастомных Writable и практические паттерны записи в реальных приложениях.
Класс Writable stream¶
Создавая или получая Writable stream, вы работаете с объектом, расширяющим EventEmitter, как и Readable. Потоки в Node.js общаются через события: асинхронные I/O завершаются или падают позже.
Задача Writable stream в концепции проста: принимать chunks через write() и отправлять их в нижележащее назначение. Это может быть что угодно — файловый дескриптор ОС, TCP‑сокет, массив в памяти. Writable stream не волнует конкретика: он даёт интерфейс и логику буферизации. Назначение абстрагировано во внутренний метод _write(), который реализуют подклассы.
Опции конфигурации задают поведение Writable stream под нагрузкой.
Опция highWaterMark похожа на Readable, но смысл чуть другой. Для Writable highWaterMark — максимальное число байт (или объектов в objectMode), которое поток буферизует внутри, прежде чем начнёт сигнализировать backpressure. По умолчанию 16384 байта — те же 16 КБ, что у Readable.
При вызове write() данные обычно попадают во внутренний буфер, пока назначение их не примет. Если назначение быстрое (запись в /dev/null или в сокет с запасом пропускной способности), буфер почти пуст и записи завершаются быстро. Если назначение медленное (механический диск под нагрузкой, перегруженная сеть), буфер заполняется.
Когда объём буфера достигает или превышает highWaterMark, write() возвращает false. Это сигнал backpressure: «я буферизую слишком много, замедлитесь или остановитесь, пока снова не буду готов». Если приложение игнорирует сигнал и продолжает write(), буфер растёт, память растёт, пока процесс не исчерпает её.
Пример конфигурации:
1 2 3 4 5 | |
Поток сигнализирует backpressure, когда внутренний буфер достигает 8 КБ. Записи после сигнала всё ещё принимаются; false означает, что нужно приостановиться.
Опция objectMode, как у Readable, переводит поток с байтов на произвольные объекты JavaScript. В objectMode highWaterMark — число объектов в буфере, а не байты. По умолчанию в objectMode — 16 объектов.
1 2 3 4 | |
Полезно в конвейерах обработки, где chunk — логическая единица (строка БД, разобранная запись лога, JSON‑документ), а не кусок байтов.
Опция decodeStrings задаёт, преобразовывать ли строки в Buffer перед передачей в _write(). По умолчанию true. При false строки идут как есть — нужно, если Writable специально обрабатывает строки иначе, чем буферы.
1 2 3 | |
Есть defaultEncoding — кодировка при преобразовании строк в буферы (если decodeStrings: true). По умолчанию 'utf8' — почти всегда то, что нужно для текста.
Наконец, emitClose управляет событием close при destroy(). По умолчанию true. Без веской причины лучше не трогать.
Чтобы эффективно работать с Writable, нужно понимать события и что они означают.
События Writable streams¶
Writable stream испускает события при смене состояния. Каждое — в определённой точке жизненного цикла.
Главное для backpressure — drain. Оно срабатывает, когда внутренний буфер был полон (write() возвращал false) и опустился ниже highWaterMark. drain — сигнал возобновить запись.
Типичный паттерн:
1 2 3 4 5 6 7 8 | |
write() вернул false — буфер полон. Вешаем одноразовый слушатель drain и останавливаем логику записи. Когда приходит drain, снова есть место.
Блокировка в write() убила бы смысл асинхронной модели I/O Node.js. Если бы write() блокировал, event loop замер бы в ожидании завершения записи. Событийный сигнал оставляет loop свободным для другой работы, пока буфер потока опустошается.
Событие finish срабатывает после end(), когда все буферизованные данные успешно записаны в назначение. Поток завершил работу. finish — до close: запись закончена, ресурсы назначения могут ещё быть открыты.
1 2 3 4 5 6 | |
end() говорит: «больше данных не будет». После end() вызов write() бросит ошибку. Поток обрабатывает оставшийся буфер и при успехе испускает finish.
Событие close — когда поток и нижележащие ресурсы закрыты. После finish. Не все потоки испускают close — зависит от реализации ресурса. У файловых потоков close — при закрытии дескриптора; у сокетов — при закрытии сокета.
1 2 3 | |
Событие error — при сбое записи: диск заполнен, сеть оборвалась, ошибка внутри назначения. Без обработчика error исключение может уронить процесс — как у Readable.
1 2 3 | |
Событие pipe — когда Readable подключён через pipe(). В обработчик передаётся источник — в основном для логирования и отладки.
1 2 3 | |
Есть unpipe — когда Readable отключают от этого Writable.
Эти события — поверхность API Writable streams. Чтобы использовать их правильно, нужно разобрать backpressure — ядро управления потоком в потоковых системах.
Понимание backpressure¶
Backpressure звучит абстрактно, пока не увидишь последствия без него. Конкретный сценарий.
Программа читает большой файл и пишет в другое место. Наивный вариант:
1 2 3 4 5 6 7 8 | |
Читаем chunks из input.dat и пишем в output.dat. Просто? Но есть скрытая проблема. Что если Readable отдаёт данные быстрее, чем Writable успевает их принять?
Диск читает 100 МБ/с, пишет 50 МБ/с. Readable даёт 100 МБ/с chunks, вы вызываете write() на каждый. Writable обрабатывает 50 МБ/с — остальные 50 МБ/с копятся во внутреннем буфере. За секунду — 50 МБ в буфере, за две — 100 МБ, за десять — 500 МБ. Буфер растёт, пока процесс не упадёт от нехватки памяти.
Эту проблему решает backpressure. Writable говорит производителю «замедлись» — возвращая false из write(). Производитель должен паузу до drain, когда поток снова готов.
Правильный вариант:
1 2 3 4 5 6 7 8 9 | |
write() вернул false — ставим Readable на паузу, он перестаёт испускать data. Буфер Writable опустошается по мере записи в назначение. Когда буфер ниже highWaterMark, срабатывает drain — возобновляем Readable. Поток данных регулируется скоростью потребителя, а не производителя.
Паттерн настолько частый, что в Node.js есть pipe() с таким flow control из коробки. Подробнее — в отдельной главе; здесь важно, что управление backpressure — часть модели потоков.
Что происходит внутри при write()?
writable.write(chunk) проверяет, идёт ли уже запись в назначение. Если да — chunk в буфер. Если назначение свободно — chunk сразу в _write(), который делает I/O.
Внутренний буфер — связный список запросов на запись. Каждый запрос: chunk, кодировка (для строки), колбэк по завершении. Повторные write() дополняют список.
После добавления (или передачи в _write()) поток считает размер буфера: для байтовых потоков — сумма длин chunks; в objectMode — число объектов. Если сумма ≥ highWaterMark, write() возвращает false, иначе true.
Упрощённая модель:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | |
Идея: write() добавляет в буфер и возвращает boolean — ниже ли порог.
Скорость опустошения буфера зависит от _write() и производительности назначения. Быстрый SSD — буфер быстро пустеет; медленная сеть — медленно. Writable измеряет давление в очереди и сигнализирует переполнение.
Поэтому highWaterMark — параметр настройки. Слишком низкий — частые сигналы backpressure и паузы, ниже throughput. Слишком высокий — много данных в памяти; нормально при запасе RAM, плохо при тысячах потоков или жёстких лимитах.
16 КБ по умолчанию — разумный компромисс: реже backpressure при типичных записях и не даёт медленным приёмникам накопить огромный буфер.
Что если игнорировать backpressure? Миллион вызовов write() по 1 КБ без проверки возврата — буфер ~1 ГБ только на этот поток.
Несколько файлов или HTTP‑запросов одновременно — гигабайты буферизованных записей. В какой‑то момент ОС не выделяет память — out-of-memory.
В продакшене это частая причина утечек памяти: быстрый источник, медленное назначение, возврат write() не проверяют.
Исправление простое, но требует дисциплины: проверять возврат, паузить производителя, ждать drain, возобновлять.
Внутренняя буферизация Writable streams¶
Структура и управление внутренним буфером определяют память и производительность потокового кода.
Writable хранит объект _writableState (приватный, с подчёркиванием), но понимание полей помогает предсказывать поведение.
bufferedRequestCount — число запросов на запись в буфере. Каждый write(), пока поток уже пишет, увеличивает счётчик; по завершении записи в назначение — уменьшает.
length — суммарный размер буферизованных данных (байты или число объектов в objectMode). Сравнивается с highWaterMark, чтобы решить, вернуть ли false.
Флаг writing — идёт ли сейчас операция записи. При вызове _write() — true; после колбэка _write() — false. Пока writing === true, новые chunks буферизуются, а не идут сразу в _write().
Буфер (в старых версиях Node — связный список запросов; в новых — более эффективная структура) концептуально остаётся очередью FIFO: сначала самый старый chunk.
При write() и writing === false chunk сразу в _write() с колбэком. По завершении — следующий из буфера, пока буфер не пуст.
Если размер буфера падает ниже highWaterMark после того, как был ≥ highWaterMark, испускается drain — backpressure снят.
Более детальная модель:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | |
Упрощённо: write() буферизует и возвращает false при превышении highWaterMark; _doWrite() обрабатывает по одному chunk; при пустом буфере и needDrain — drain.
Буфер — очередь между производителем (write()) и назначением (_write()). Производитель может добавлять быстро; медленное назначение раздувает очередь. Backpressure (false и drain) — обратная связь «замедлись».
Поток сигнализирует backpressure через возврат write(). Если после false продолжать write(), буфер растёт до OOM. Дизайн даёт приложению выбор: жёсткий real-time — иногда сбрасывают данные; есть запас RAM — буферизуют агрессивнее. Корректное поведение по умолчанию — пауза при false.
Память буфера — сверх памяти самих chunks: каждый запрос — объект с chunk, кодировкой, колбэком, метаданными. Миллионы мелких write() дают заметный overhead объектов — одна из причин батчить мелкие записи в крупные chunks.
Методы cork() и uncork() оптимизируют буферизацию при множестве мелких записей.
writable.cork() переводит поток в закупоренное состояние: все записи буферизуются, _write() не вызывается. Идея — серия мелких write() и одна пачка на выходе.
writable.uncork() сбрасывает буфер. Если есть _writev(), вызывается он со всеми chunks; иначе — _write() по очереди.
1 2 3 4 5 | |
Без cork() каждый write() может сразу вызвать _write() — три отдельные I/O. С cork() — одна операция (при _writev()) или последовательность без пауз.
Полезно при множестве мелких записей: три syscall по 8 байт медленнее одного на 24 байта.
В типичном прикладном коде cork()/uncork() редко нужны — чаще в библиотеках и обработчиках протоколов с мелкими фреймами.
cork() можно вызывать несколько раз — внутренний счётчик; столько же раз нужен uncork() для сброса (вложенный cork).
Реализация кастомных Writable streams¶
Пользоваться Writable — одно; понимать внутренности — другое. Лучший способ закрепить модель — реализовать свой Writable stream.
Паттерн прост: наследуете Writable и реализуете _write(). Три аргумента: chunk, кодировка (для строки), колбэк по завершении записи.
Минимальный пример:
1 2 3 4 5 6 7 | |
Как /dev/null: данные принимаются и отбрасываются. _write() сразу вызывает колбэк — успех.
Writable, пишущий в массив:
1 2 3 4 5 6 7 8 9 10 11 | |
Chunks накапливаются в data.
Если запись асинхронная — БД, сеть — колбэк важен: его вызывают после завершения асинхронной операции.
Имитация async‑записи:
1 2 3 4 5 6 7 8 | |
setTimeout имитирует I/O 100 мс. До callback() следующий буферизованный chunk не обработают. Так поток подстраивается под скорость назначения.
При ошибке передают её в колбэк:
1 2 3 4 5 6 7 8 9 | |
Ошибка в колбэке → событие error, буферизованные записи отбрасываются, поток в состоянии ошибки.
_writev() — опциональная оптимизация для пакетной записи. При буферизации или cork поток может вызвать _writev() с массивом chunks вместо многократного _write().
Сигнатура _writev():
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
chunks — массив объектов с chunk и encoding. Обработали пачку — один колбэк.
_writev() не обязателен: без него — _write() на каждый chunk. Имеет смысл, если назначение умеет batch (многстрочный SQL, протокол с объединением сообщений).
Хук _final() вызывается при завершении (после end() и обработки буфера), до finish. Для финального flush или закрытия дескриптора.
1 2 3 4 5 6 7 8 9 10 11 12 | |
Колбэк _final() обязателен — после него finish.
Реалистичнее — лог в файл с форматированием:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | |
Файл открывается в конструкторе; _write() добавляет timestamp; _final() закрывает дескриптор. Если файл ещё не открыт — ждут open и повторяют _write(). Частый паттерн при асинхронной инициализации ресурса.
Кастомный Writable даёт полный контроль над тем, куда и как уходят данные: БД, внешние API, сжатие — интерфейс гибкий для любого назначения.
Корректная запись в Writable streams¶
Зная внутренности, ниже — практические паттерны.
Главное правило: всегда проверяйте возврат write(). При false — пауза источника и ожидание drain.
Чтение из одного потока и запись в другой:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
При false читатель на паузе; на drain — снова читает. Буфер writer ограничен: reader ждёт, пока назначение не догонит.
По окончании вызывайте end() — больше данных не будет. Можно передать финальный chunk:
1 | |
Эквивалентно:
1 2 | |
После end() — оставшийся буфер, при наличии _final(), затем finish. Дальнейший write() бросит ошибку.
Это ERR_STREAM_WRITE_AFTER_END:
1 2 | |
Частая ошибка: асинхронный код ещё пишет, а другая ветка уже вызвала end(). Нужно дождаться всех записей перед end().
cork()/uncork() в приложениях — при множестве мелких записей подряд:
1 2 3 4 5 | |
На практике редко нужно вручную: pipe() и pipeline() сами управляют backpressure; внутренняя буферизация потока уже даёт некоторый батчинг. Cork в основном для библиотечного кода — HTTP/2, протоколы БД.
Ручной backpressure неизбежен, когда данные из непотокового источника — например, массив:
1 2 3 4 5 6 7 8 9 10 11 | |
При false ждут drain — цикл не кормит поток, пока буфер выше порога, даже если массив огромен, а поток медленный.
Есть stream.Writable.toWeb() для WHATWG WritableStream и async iteration — отдельная тема веб‑API.
Встроенный flow control через возврат write() и drain. Учитывать его не опционально — разница между «работает под лёгкой нагрузкой» и стабильностью в продакшене.
Полный пример: большой CSV → парсинг строк → БД с корректным backpressure:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | |
pipeline() сам управляет backpressure. DatabaseWriter пишет строки в БД; async _write() допустим — Node ждёт завершения перед следующим chunk.
Здесь не проверяют write() и не слушают drain — это делает pipeline(). Рекомендуемый паттерн: pipeline() или pipe(), backpressure на Node, вы — в логике преобразования.
Когда pipeline() не подходит — несколько источников/приёмников, интеграция с непотоковым API — backpressure вручную: проверка write(), пауза при false, возобновление на drain.
Механика переполнения буфера¶
Пошаговая цепочка до исчерпания памяти объясняет, зачем нужен backpressure.
Копирование 1 ГБ с SSD на сетевой ресурс: SSD ~500 МБ/с, сеть ~10 МБ/с — разница скоростей ~50×.
Код:
1 2 3 | |
Readable отдаёт chunks по 64 КБ с максимальной скоростью SSD. Каждые 128 микросекунд новый chunk (64 КБ при 500 МБ/с). Сеть принимает chunk 64 КБ примерно раз в 6,4 мс (10 МБ/с).
За первые 6,4 мс Readable отдал ~50 chunks, Writable отправил 1 — 49 chunks в буфере (~3,1 МБ).
За 64 мс — 500 chunks против 10 отправленных, ~490 в буфере (~30,6 МБ).
За секунду — сотни мегабайт в буфере; за две — почти гигабайт. Потом OOM.
Это не плавное замедление: процесс жив до внезапной смерти. Event loop отзывчив, пока аллокатор не откажет — V8 бросит out-of-memory.
Код с учётом backpressure:
1 2 3 4 5 6 7 8 9 10 | |
При достижении highWaterMark (16 КБ по умолчанию) write() → false, Readable на паузе. Writable продолжает слать буфер в сеть. Ниже порога — drain, Readable снова читает.
Буфер колеблется между 0 и highWaterMark, не растёт безгранично. Память ограничена highWaterMark плюс размер одного chunk от Readable — при дефолтах порядка ~32 КБ независимо от размера файла и скорости сети.
В этом сила backpressure: скорости производителя и потребителя развязаны при ограниченной памяти.
Нюанс: highWaterMark не жёсткий потолок. Буфер может быть больше. highWaterMark задаёт, когда write() вернёт false. Chunk 10 МБ в пустой буфер даст 10 МБ в буфере при highWaterMark 16 КБ — и false на этом вызове.
Пиковая память ≈ highWaterMark + размер крупнейшего chunk. Chunks 64 КБ и highWaterMark 16 КБ → ~80 КБ; chunks 1 МБ → ~1 МБ. Размер chunk важен в условиях жёстких лимитов памяти.
highWaterMark — порог сигнала backpressure, а не жёсткий лимит размера буфера. Один большой chunk может временно раздуть буфер выше порога.
Другой случай: несколько производителей в один Writable — десять операций пишут в один лог. Без backpressure у каждого своя скорость, буфер растёт на всех. Десять потоков × 16 КБ легко превращаются в сотни килобайт и больше; на загруженном сервере — утечка памяти.
Решение то же: каждый проверяет write(), паузит при false; один drain — все возобновляются; буфер ограничен.
Обработка ошибок Writable streams¶
Счастливый путь: данные текут, backpressure соблюдается, end() чистый. Что при сбоях?
Ошибки возможны на этапе назначения (диск полон, сеть оборвалась, права), в данных, в неверном состоянии потока.
В _write(), _writev(), _final() ошибку передают в колбэк → событие error → поток в состоянии ошибки, буфер сбрасывается, дальнейшие write() бросают.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
Первый write() — error; поток уничтожен, следующий write() — исключение.
Без обработчика error исключение может уронить процесс. На каждый Writable, который создаёте или получаете, вешайте обработчик ошибок — особенно в продакшене.
Паттерны для конвейеров — в главе про pipeline. Пока — обработчик на каждый Writable.
write() после end() — ошибка программирования, ERR_STREAM_WRITE_AFTER_END. Часто при параллельных асинхронных путях: одна ветка вызвала end(), другая ещё пишет. Координируйте: end() только после всех записей (Promise.all(), счётчик pending writes).
Проблема:
1 2 3 4 5 6 7 | |
Исправление:
1 2 3 4 5 6 7 8 9 | |
end() только после завершения async‑записи.
destroy(err) — принудительное закрытие, опционально с ошибкой. Буфер отбрасывается, затем error (если передан err) и close.
1 | |
Полезно при отмене загрузки пользователем. Немедленное завершение, записи в буфере могут не дойти.
Свойство destroyed:
1 2 3 | |
Проверка перед записью в уже уничтоженный поток.
Свойства и интроспекция¶
Writable expose свойства для отладки, мониторинга и решений о flow control.
writableLength — сколько байт (или объектов в objectMode) сейчас в буфере:
1 | |
Близко к writableHighWaterMark — скоро backpressure. Можно для мягкого rate limiting или прогресса загрузки.
writableHighWaterMark — значение порога:
1 | |
writable — можно ли вызывать write():
1 2 3 | |
false после destroy() или end().
writableEnded — вызывали ли end():
1 | |
true после end(), даже до finish.
writableFinished — был ли finish:
1 | |
true после обработки всех записей и finish.
writableCorked — сколько раз cork() без uncork():
1 2 3 | |
Для отладки cork/uncork.
writableObjectMode — режим объектов:
1 | |
Задаётся при создании, не меняется. Для универсальных утилит над byte‑ и object‑потоками.
В прикладном коде редко нужны; при отладке потоков и generic‑утилитах — незаменимы.
Очередь write request подробнее¶
Как управляются запросы на запись в очереди — для оценки производительности и памяти.
write() оборачивает chunk и метаданные в объект запроса на запись:
- сам chunk (
Buffer, строка или объект); - кодировка (для строки);
- колбэк по завершении (опционально);
- ссылка на следующий запрос.
Объекты образуют связный список: голова — обрабатываемый запрос, хвост — последний добавленный. Новый write() — в хвост.
По колбэку _write() текущий снимается с головы, следующий становится головой; если есть — снова _write(); иначе очередь пуста, ждут write().
Каждый объект даёт overhead (указатели, метаданные, замыкания). Миллион write() по 1 байту — миллион объектов по ~50–100 байт overhead при данных 1 МБ. Батчинг мелких записей (1000 × 1 КБ вместо миллиона × 1 байт) сильно снижает overhead очереди.
cork()/uncork() и очередь: при cork запросы создаются и копятся, _write() ждёт; при uncork() — _writev() на всю пачку или цикл _write().
С cork:
1 2 3 4 5 | |
Без cork:
1 2 3 4 | |
При эффективном _writev() три chunk — одна I/O вместо трёх. Для сокетов и файлов это часто быстрее; для массива в памяти — без разницы. _writev() опционален — оптимизация там, где batch выгоден.
Writable streams в objectMode¶
В основном говорили о байтах; objectMode меняет контракт в конвейерах обработки.
В objectMode highWaterMark — число объектов, не байты. По умолчанию 16. Каждый объект в буфере +1 к счётчику независимо от размера в памяти.
В objectMode highWaterMark не лимит памяти, а лимит числа единиц в полёте. Шестнадцать объектов по 10 МБ — 160 МБ при highWaterMark: 16. Это намеренно: логические записи (строки БД), а не байты.
Реализация как у byte stream с objectMode: true:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
Каждый write() — объект строки; encoding в objectMode игнорируется (передаётся 'buffer' для совместимости сигнатуры).
Частый паттерн: byte stream → objectMode через Transform, например JSON Lines:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | |
Transform накапливает байты в строки и пушит объекты — выход objectMode, вход байтовый.
objectMode удобен в ETL, обработке логов, импорте/экспорте — везде, где этап работает с записями, а не сырыми chunks.
Хук _final() подробнее¶
_final() часто недопонимают. Это хук нормального завершения: после всех write(), до finish.
Задача _final() — финальная запись или очистка: footer сжатия, flush накопленного batch.
Writable с батчингом:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | |
_write() наполняет batch; при размере — flush. _final() сбрасывает остаток при end().
Без _final() неполный batch теряется: finish есть, последние chunks не дошли до назначения. Частый баг в Writable с батчингом.
Колбэк _final() обязателен; без него finish не придёт — поток зависнет. Ошибка в колбэке → error вместо finish.
Async:
1 2 3 4 5 6 7 8 | |
Или promise без колбэка:
1 2 3 | |
Promise из _final() Node ждёт до finish или error при reject.
Кастомный Writable с ограничением скорости¶
Пример rate limit: backpressure, тайминг, очередь.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | |
Token bucket: токены (байты), пополнение bytesPerSecond в секунду. Хватает токенов — запись сразу; иначе setTimeout до накопления.
Паттерн для API rate limit, throttle логов, pacing экспорта.
Колбэк при отложенной записи вызывается позже — очередь Writable заблокирована до завершения. Корректно: backpressure естественно, пока колбэк не вызван.
Backpressure при нескольких writers¶
Несколько производителей в один Writable усложняют картину.
Каждый вызывает write() и видит возврат. drain — на всех слушателей. Один паузится на false; при drain все возобновляются.
Возможен thundering herd: 100 паузенных на drain; все резюмируют и снова заливают буфер — снова false у всех. Крайний случай, но реальный: координация backpressure между производителями нетривиальна.
Решение — очередь уровнем выше: производители кладут в очередь, один consumer пишет в поток и обрабатывает backpressure.
Или семафор: не более N писателей одновременно.
На практике проще не плодить много concurrent writers в один поток — лог‑библиотека с внутренней координацией или мультиплексор потоков.
Backpressure — сигнал на поток, не на производителя. Несколько производителей требуют координации уровнем выше, иначе возможны патологические осцилляции буфера.
Профилирование памяти Writable stream¶
Память растёт — подозрение на потоки. Как диагностировать?
Проверьте backpressure — лог при write():
1 2 3 4 5 6 7 | |
«Backpressure!» без паузы — игнорируете сигнал.
Если паузите, а память растёт — периодически writableLength:
1 2 3 | |
Стабильный рост — назначение медленнее производителя (ожидаемо или зависло).
Снимок кучи Node:
1 2 3 4 5 | |
В Chrome DevTools ищите миллионы Buffer и объектов запросов записи — типичная утечка при неограниченном буфере.
node --trace-gc app.js — частый GC при высокой аллокации быстрее, чем сбор — как при растущем буфере.
В продакшене метрика writable.writableLength: стабильно у writableHighWaterMark — узкое место в конвейере.
Запись в несколько Writable одновременно¶
Один chunk — в файл и БД, или на несколько endpoint'ов.
Вручную:
1 2 3 4 5 6 7 8 9 | |
Backpressure сложен: один false, другие true — паузить всех или только медленного? Ждать drain у всех — тормозите быстрые; не ждать — буфер медленного растёт.
Лучше fan-out внутри одного Writable:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | |
Пишет во все назначения параллельно; колбэк _write() — когда все завершили (или первая ошибка). Медленное назначение задерживает колбэк → буфер FanOutWritable растёт → backpressure производителю.
Полезно для мульти‑логов, репликации, broadcast событий.
Выбор highWaterMark для Writable streams¶
16 КБ по умолчанию — баланс для большинства сценариев.
Крупные chunks — поднимайте highWaterMark (2–4 МБ при chunks 1 МБ), иначе backpressure на каждой записи.
Жёсткие лимиты памяти (Docker, embedded) — снижайте до 4–8 КБ.
Много одновременных потоков: highWaterMark × число потоков — оценка буферной памяти (1000 HTTP‑ответов × 16 КБ ≈ 16 МБ только буферы).
В objectMode — число объектов в полёте (100–1000 для БД, 10–50 для парсинга файлов) — зависит от размера объекта и throughput.
Делайте highWaterMark настраиваемым, меряйте throughput и память, подстраивайте.
Помните: порог сигнала, буфер может быть больше из‑за крупных chunks.
Связанное чтение¶
- Предыдущая: Readable streams Node.js
- Далее: Transform streams Node.js