Конвейеры stream в Node.js: ошибки, очистка и AbortSignal¶
Источник: theNodeBook — Stream Pipeline: Errors, Cleanup & AbortSignal
Конвейеры stream в Node.js соединяют несколько стадий в один путь данных. Сложность — в согласовании жизненного цикла. Одна стадия может упасть, закрыться раньше времени, включить backpressure или оставить файловый дескриптор открытым, пока другая ещё держит данные в буфере. source.pipe(dest) связывает поток данных и backpressure между двумя stream. stream.pipeline() добавляет согласованное распространение ошибок и очистку по всей цепочке.
Обработка ошибок конвейера stream в Node.js¶
Используйте pipeline(), когда сбой одной стадии должен остановить весь путь: функция уничтожает stream по необходимости и сообщает о завершении через callback или promise. finished() наблюдает за stream до успешного завершения или ошибки. Современные конвейеры также принимают стадии на async generator и AbortSignal для отмены.
Вы уже знаете, как работают отдельные stream — Readable производит данные, Writable потребляет, Transform обрабатывает между ними. У каждого типа свой буфер, свой механизм backpressure и свой жизненный цикл событий. В реальных приложениях stream редко живут изолированно. Их соединяют в конвейеры, где данные идут от источника через несколько стадий преобразования к финальному приёмнику.
И концепция проста — «прокачать» данные из одного stream в другой — но сделать это правильно сложно. При соединении stream появляются несколько источников ошибок, несколько сигналов backpressure и несколько сценариев очистки ресурсов. Если любой stream в конвейере падает, что происходит с остальными? Если backpressure возникает посередине, доходит ли он до источника? При завершении конвейера все ли ресурсы освобождены?
Эта глава отвечает на эти вопросы. Разберём классический pipe() — зачем он есть и почему для продакшена недостаточен. Затем подробно stream.pipeline() — современный рекомендуемый способ собирать конвейеры с корректной обработкой ошибок и очисткой. Посмотрим паттерны обработки ошибок, специфичные для streaming. Рассмотрим async iteration как альтернативу конвейеру. В конце — продвинутые паттерны композиции для переиспользуемых сегментов.
Глава про то, как правильно соединять stream: распространение ошибок, очистка ресурсов и backpressure на всех стадиях.
Метод pipe()¶
Кратко: pipe() соединяет Readable с Writable, автоматически обрабатывая backpressure — вызывает pause(), когда write() возвращает false, и resume() при событии drain у Writable. Этот паттерн подробно разбирался в главе про Writable stream.
Метод возвращает destination stream, что позволяет строить цепочки:
1 | |
Получается четырёхстадийный конвейер: readable → transform1 → transform2 → writable. Данные идут последовательно, backpressure распространяется назад от writable к readable. Если writable сигнализирует backpressure, вся цепочка ставится на паузу; при drain сигнал возобновления идёт вперёд.
Конкретный пример — сжатие лог-файла:
1 2 3 4 5 6 | |
Три stream, два вызова pipe(). Читатель отдаёт чанки, gzip сжимает, writer сохраняет результат. Память остаётся ограниченной, потому что каждая стадия уважает backpressure.
Но у pipe() есть проблема: обработка ошибок.
Когда в piped stream возникает ошибка, stream эмитит событие error. Ошибка остаётся на том stream, который её сгенерировал. Как вы знаете из глав про Readable и Writable, на каждый stream нужен свой обработчик error, иначе процесс упадёт. В конвейерах это особенно болезненно.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | |
Нужны три отдельных обработчика. Пропустите один — процесс упадёт. Утомительно, легко ошибиться и, честно говоря, нелепо для задачи, которая должна быть простой.
Хуже того: при ошибке посередине конвейера остальные stream не останавливаются автоматически. Допустим, transform падает на чанке. Transform эмитит error и перестаёт обрабатывать. Reader продолжает читать и писать в transform в сломанном состоянии. Writer ждёт данные, которых не будет, и может никогда не эмитить finish, потому что конвейер не завершился чисто.
Остаются висящие ресурсы: незакрытые файловые дескрипторы, сетевые соединения, неосвобождённые буферы. Конвейер в частично упавшем состоянии, и для очистки приходится вручную вызывать destroy() на каждом stream:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | |
Многословно, повторяется и хрупко. Добавили stream в конвейер — обновляйте все обработчики.
Ещё ограничение pipe(): сложно понять, когда весь конвейер завершился. Readable эмитит end, writable — finish. На что подписываться? При нескольких transform каждый эмитит свой end, финальный destination — finish. Нужно отслеживать правильное событие на правильном stream, и это зависит от структуры конвейера.
Для простых сценариев из двух stream pipe() подходит. Для продакшен-конвейеров с несколькими стадиями и требованиями к ошибкам — нет. Поэтому появился stream.pipeline().
Метод unpipe()¶
Перед pipeline() стоит упомянуть unpipe() — на практике он нужен редко. Метод отключает piped stream:
1 2 3 4 5 | |
После unpipe() readable перестаёт отправлять данные указанному writable. Без аргументов отключаются все получатели:
1 | |
Зачем это? В основном для динамической маршрутизации: перенаправить выход stream по условию во время выполнения. Например, читаете сокет, сначала пишете в файл, потом по данным переключаете destination:
1 2 3 4 5 6 | |
На практике unpipe() почти не нужен. Большинство конвейеров статичны — поток задаётся при старте и идёт до конца. Динамическую маршрутизацию лучше решать абстракциями уровнем выше — routing stream или условными transform.
Главное про unpipe(): destination не завершается автоматически. Метод снимает слушатели destination с source. Состояние flowing mode source зависит от оставшихся потребителей: если pipe и data-слушателей не осталось, stream переходит в paused mode; если есть — data продолжают идти. Чтобы закрыть destination, нужно вручную вызвать end().
stream.pipeline()¶
stream.pipeline() — современный способ собирать stream. Функция появилась в Node.js именно из‑за проблем pipe() с ошибками и очисткой. Базовое использование:
1 2 3 4 5 6 7 8 9 | |
Вместо цепочки pipe() передаёте все stream аргументами, затем callback на завершение или ошибку. Сигнатура:
1 | |
pipeline() делает три вещи, которых нет у pipe():
- Автоматическое распространение ошибок — при
errorна любом stream конвейер останавливается, callback получает эту ошибку. Один обработчик на границе конвейера покрывает всю цепочку. - Автоматическая очистка — при ошибке или успешном завершении вызывается
destroy()на всех stream: закрываются дескрипторы, освобождаются буферы, рвутся соединения. - Один callback завершения — одна точка для всего.
Практический пример:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
Любая стадия может упасть — чтение файла, повреждённые данные в gzip, диск переполнен при записи. Callback получит ошибку, все три stream будут уничтожены. При успехе err будет undefined.
Проще, чем эквивалент на pipe() с ручными обработчиками: без отдельных error, без ручного destroy, без угадывания, на каком stream слушать завершение.
Есть promise-версия в модуле stream/promises:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
Promise резолвится при успехе или реджектится при ошибке любого stream. Естественно вписывается в async/await: try/catch, как для любого rejected promise.
Это рекомендуемый паттерн для современного Node.js: stream/promises и async/await для читаемой композиции конвейеров.
Как pipeline() работает внутри¶
Внутреннее устройство объясняет поведение при сбое стадии. При вызове pipeline(s1, s2, s3, callback) функция выполняет четыре вида координации:
- Соединяет stream теми же механиками
pipe(), что в предыдущих главах — с автоматическим backpressure. - Вешает обработчики
errorна все stream для согласованной обработки. - Вызывает
destroy()на всех stream при ошибке или завершении. - Один раз вызывает callback с ошибкой или
undefined.
Ключевое отличие от ручного pipe() — координация ошибок и автоматическая очистка. Backpressure тот же (подробно в главе про Writable stream), но с продакшен-уровнем управления ошибками.
Упрощённая концептуальная модель (не реальная реализация Node.js):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | |
Это педагогическая модель «что делает» pipeline(), а не «как устроено внутри». Реальная реализация Node.js (на базе библиотеки pump) обрабатывает async iterables, генераторы, сложные ошибки, гарантию однократного callback, определение типов stream и другие краевые случаи.
pipeline() обрабатывает случай, когда stream эмитит ошибку после уничтожения — бывает в кастомных stream с асинхронными операциями. Реализация гарантирует один вызов callback, даже если несколько stream падают одновременно.
pipeline() с функциями-трансформами¶
В pipeline() можно передавать async generator functions — они трактуются как transform:
1 2 3 4 5 6 7 8 9 10 11 12 | |
Генератор посередине автоматически оборачивается в Transform. Для каждого чанка из source генератор преобразует (здесь — в верхний регистр) и отдаёт результат. Значения из yield становятся чанками выходного stream.
Удобно для простых преобразований: вместо класса Transform — inline-генератор. Читается как цикл: для каждого входного чанка — выходной чанк.
Можно использовать обычные async-функции, возвращающие async iterable:
1 2 3 4 5 6 7 8 9 10 11 | |
pipeline() распознаёт async iterables и внутри оборачивает их в Transform.
Можно цепочкой из нескольких генераторов:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | |
Первый генератор переводит буферы в строки и режет на строки с буфером на границах чанков. Второй отфильтровывает строки, начинающиеся с #. Каждый генератор — стадия, pipeline() связывает их.
Так в современном Node.js обычно строят конвейеры: простые преобразования — inline-генераторы; сложные stateful — класс Transform; при необходимости смешивают.
В генераторах конвейера в выход stream попадают только значения из yield. Значение из return не уходит в pipeline — оно доступно только коду, который напрямую потребляет генератор. В transform-стадиях всегда используйте yield для выходных чанков.
Обработка ошибок в конвейерах stream¶
В конвейерах ошибки сложнее, чем в коде с одним stream: сценарии, которых нет при работе с одним потоком. Stream эмитят error при сбоях чтения, записи, сети и т.д.
В конвейере ошибки могут прийти одновременно из разных мест:
- источник — файл не найден, нет прав, обрыв сети;
- transform — невалидные данные, ошибка парсинга или валидации;
- приёмник — диск полон, broken pipe, закрыт удалённый endpoint.
Каждая проявляется как error на соответствующем stream. С pipe() обрабатывали бы отдельно; с pipeline() всё попадает в callback или rejection promise.
Что с данными при ошибке посередине? Читаете 100 МБ, transform на 50 МБ натыкается на повреждённые данные. Первые 50 МБ уже могли быть записаны.
Зависит от поведения destination. Запись в файл — в файле частичный результат: файл есть, но неполный и, возможно, невалидный. pipeline() не откатывает уже записанное в underlying resource.
Нужна логика на уровне приложения. Паттерн: писать во временный файл и переименовывать только при успехе:
1 2 3 4 5 6 7 8 9 10 11 12 | |
Успех — temp переименовывается в финальное имя. Ошибка — temp удаляется. По финальному пути либо полный результат, либо файла нет.
Другой паттерн — транзакции в БД: все строки в транзакции, commit только после успешного конвейера:
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
pipeline() отвечает только за очистку на уровне stream — destroy(). Очистка предметной области (удаление частичных файлов, rollback транзакций) — ваша ответственность.
При ошибке одной стадии pipeline() сразу вызывает destroy() на остальных: close, отмена pending-операций. Это правильно: упала одна стадия — весь конвейер останавливается.
Если нужно различать источник ошибки (read vs write), в callback приходит только первая ошибка. Можно помечать ошибки в кастомных stream:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
Свойство code помогает различать ошибки в общем обработчике.
Другой паттерн — stream.finished() для наблюдения за конкретным stream внутри большого конвейера:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
finished() вешает слушатели и вызывает callback, когда stream завершился, упал или был уничтожен.
stream.finished()¶
stream.finished() заслуживает отдельного внимания.
Функция принимает stream и callback, вызывая callback при завершении (успех или ошибка):
1 2 3 4 5 6 7 8 9 | |
«Завершён» для Readable — null отправлен или stream уничтожен. Для Writable — запись закончена, эмитирован finish или destroy. Для Duplex/Transform — завершены обе стороны.
Безопаснее, чем слушать только end или finish: finished() также слушает error, close, destroy и разбирает, действительно ли stream завершён.
Promise-версия:
1 2 3 4 5 6 7 8 | |
Promise резолвится при успехе или реджектится при ошибке.
stream.finished() намеренно оставляет висящие слушатели (error, end, finish, close) после вызова callback или settlement promise — чтобы ловить неожиданные ошибки от некорректных реализаций stream и не ронять процесс. Для короткоживущих stream GC обычно справляется. Для долгоживущих или чувствительных к памяти приложений есть опция cleanup:
1 2 3 | |
Зачем finished() вместо end/finish? Stream может завершиться по-разному: естественный end, destroy из‑за ошибки, явный destroy(). finished() покрывает все случаи одним callback или promise — «этот stream закончил работу, как бы то ни было».
Полезно, когда нужно знать о завершении конкретного stream, пока общий конвейер ещё идёт. Например, tee/broadcast — source в несколько destination:
1 2 3 4 5 6 7 8 9 10 11 12 | |
Ждём, пока оба destination допишут данные.
Восстановление после ошибок в конвейерах¶
Не каждая ошибка фатальна — часть можно повторить. Сетевой обрыв может быть временным; EACCES при чтении файла — нет.
Первый шаг — классификация: операционная ошибка или баг? Временная или постоянная?
Для временных ошибок — retry вокруг конвейера:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
До maxRetries попыток, exponential backoff на transient-ошибках, иначе throw.
source() — функция, создающая новый source. Stream после error и destroy() нельзя переиспользовать; каждая попытка — новые экземпляры.
Fallback: основной источник упал — пробуем запасной:
1 2 3 4 5 6 | |
Полезно для резервных источников (CDN → origin).
При сбое destination — запись в другое место:
1 2 3 4 5 6 | |
Снова source() для нового чтения после уничтожения первого source.
Принцип: заранее решите, какие ошибки восстанавливаемы, и реализуйте retry/fallback на уровне конвейера. Stream обрабатывают чанки и события; политику повторов координирует приложение.
Проблема частичных данных¶
При падении конвейера посередине уже записанные данные остаются. pipeline() их не убирает.
Это риск целостности: экспорт БД на 60% — файл на 60%; повтор может дублировать данные или перезаписать файл целиком — зависит от режима открытия.
Стратегии:
1. Временный файл и атомарное переименование
1 2 3 4 5 6 7 8 9 10 | |
Самый безопасный паттерн для файлов: финальный путь существует только после успеха.
2. Append и идемпотентность
Если выход поддерживает append (логи) и операции идемпотентны:
1 | |
Повтор дописывает данные; при дедупликации downstream это приемлемо.
3. Транзакционные приёмники
БД, очереди, часть облачных хранилищ — commit только при успехе конвейера (см. пример с beginTransaction выше).
4. Маркер завершения
1 2 | |
Перед обработкой output.dat проверяйте маркер; без маркера — файл неполный.
Выбор зависит от возможностей destination и требований к согласованности. Явно решите, что происходит при частичном сбое.
Уничтожение stream¶
stream.destroy() уже разбирался в главах про Readable и Writable. destroy() переводит stream в destroyed, эмитит close, при переданной ошибке — error.
В pipeline() уничтожение любого stream приводит к destroy() остальных и вызову callback с ошибкой:
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
source.destroy() останавливает чтение, эмитит close и при ошибке — error. pipeline() видит ошибку, уничтожает dest, вызывает callback.
Автоочистка — ещё плюс перед ручным pipe().
Удобно для отмены по действию пользователя: destroy source — конвейер останавливается, ресурсы освобождаются, callback обрабатывает отмену.
Можно destroy без ошибки:
1 | |
Stream уничтожен без error; callback всё равно вызовется с err === null — остановка без трактовки как сбой.
destroy() идемпотентен — повторные вызовы игнорируются.
При destroy() буферизованные данные теряются: не сброшенные записи в Writable, не прочитанные в Readable. Destroy — «остановиться сейчас и выбросить состояние», не «дожать pending».
Для graceful shutdown writable используйте end():
1 | |
end() только для Writable. У Readable нет graceful stop — либо дочитать всё, либо destroy().
Конвейеры на async iteration¶
В главе про Readable stream разбирали for await...of с автоматическим backpressure. Это альтернатива pipe() и pipeline() для логики обработки.
При итерации по Readable протокол итератора реализует backpressure: следующий чанк не тянется, пока не завершена текущая итерация. Асинхронная обработка — stream ждёт:
1 2 3 | |
Механику backpressure в async iteration см. в главе про Readable stream; здесь — применение к сборке конвейера.
Можно читать source через for await...of, преобразовывать и писать в destination:
1 2 3 4 5 6 7 8 9 10 11 12 | |
Ручной конвейер: явный backpressure при write() === false — ждать drain. Забыли проверку — неограниченный рост памяти (см. главу про Writable stream).
Чище — stream.Readable.from() с async generator:
1 2 3 4 5 6 7 8 9 10 11 | |
Генератор оборачивается в Readable, pipeline() даёт устойчивость к ошибкам.
Цепочка генераторов:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | |
Особенно удобно для objectMode, когда чанк — объект:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | |
Каждая стадия — функция из async iterable в async iterable. pipeline() склеивает их — функциональная композиция, часто понятнее классов Transform.
Backpressure при async iteration в конвейерах¶
Ключевые моменты при for await...of в конвейерах:
- Await async-работы — итератор тянет по одному чанку; await передаёт backpressure от скорости обработки к source.
- Сохраняйте backpressure — избегайте
promises.push(processAsync(chunk)), когда весь stream уходит в память до обработки. - Ограниченный параллелизм — для параллельной обработки с верхней границей используйте, например,
p-limit.
Без await внутри цикла backpressure теряется:
1 2 3 4 5 6 7 8 9 | |
Плюс подхода — явный контроль потока. Минус — обработку ошибок и очистку нужно делать сами, без автоматики pipeline().
Подробная механика — раздел «Backpressure in Async Iteration» в главе про Readable stream.
Композируемые трансформы¶
В главе про Transform stream — кастомные transform. Здесь — переиспользуемые компоненты через фабрики:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | |
Каждый вызов createCSVParser() — новый экземпляр. Один stream нельзя переиспользовать в двух конвейерах после end/error; фабрику — можно.
Настраиваемые фабрики:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
Сложные конвейеры — композиция сегментов:
1 2 3 4 5 6 7 8 9 10 11 12 | |
createProcessingPipeline() инкапсулирует всю цепочку — higher-order function для stream.
Композиция генераторов:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | |
Функциональный стиль хорошо ложится на streaming в Node.js.
Сегменты конвейера¶
Сегмент конвейера — переиспользуемый кусок: один transform, цепочка transform или условная маршрутизация.
Сегмент валидации с отводом невалидных объектов:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | |
Валидные объекты идут downstream, невалидные — в error destination (лог, отдельный stream). Ветвление: один вход, два выхода.
1 2 3 4 5 6 7 | |
Конвейер продолжается при невалидных объектах — они просто уходят в другую ветку.
Условный сегмент:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
По результату condition объект отправляется в trueTransform или falseTransform — routing.
Ветвление, маршрутизация, условия — кирпичи для сложных потоков данных при фокусированных переиспользуемых сегментах.
Tee и broadcast¶
Иногда одни и те же данные нужны нескольким приёмникам — tee (как тройник) или broadcast.
Простейший способ — два pipe() с одного source:
1 2 | |
Оба destination получают одни чанки. Но backpressure общий: если dest1 медленный и ставит source на паузу, страдает и быстрый dest2. Source не может паузить для одного и продолжать для другого — все или никто. При tee через pipe() темп задаёт самый медленный приёмник.
Если это приемлемо — достаточно простого piping. Для независимого backpressure — сложнее.
PassThrough как промежуточные буферы:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
У dest1 и dest2 backpressure независим: медленный dest1 буферизуется в pass1, быстрый dest2 не ждёт. Но backpressure на уровне source ломается: оба PassThrough могут раздуваться в памяти без ограничения.
Для независимых destination с ограниченной памятью нужен fan-out, который ждёт drain у всех destination, сигнализировавших backpressure:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | |
Writable пересылает чанк всем destination. write() === false — буфер полон, ждём drain. Собираем promise для всех «затормозивших» destination и вызываем callback только после drain у всех — backpressure доходит до source.
1 2 3 | |
Паттерн редко нужен в приложениях: обычно либо соглашаются на темп самого медленного, либо на неограниченный буфер в PassThrough. Настоящий fan-out с независимым backpressure — для логирования, мониторинга и узких сценариев.
Интеграция AbortSignal¶
Stream в Node.js поддерживают AbortSignal. В promise-версии pipeline() можно передать signal — при abort конвейер уничтожается:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | |
controller.abort() немедленно рвёт конвейер: все stream уничтожены, promise реджектится с AbortError, pending-операции отменяются.
Полезно для отмены пользователем, таймаутов, очистки в долгих операциях.
Таймаут:
1 2 3 4 5 6 7 8 9 10 11 12 | |
Несколько источников отмены — AbortSignal.any():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
Композитный signal abort'ится при отмене пользователем или по таймауту.
AbortSignal делает отмену явной и стандартной: вместо ручного destroy() на каждом stream — abort() signal, очистку делает pipeline().
Примеры из практики¶
Несколько полных конвейеров, собирающих идеи главы.
1) Обработка лог-файла
Чтение большого лога, парсинг строк как JSON, фильтр по уровню, запись в отдельные файлы:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | |
Генератор parseLines решает типичную задачу построчной обработки: чанки не совпадают с границами строк. Чанк может оборвать строку посередине, разрезав {"level":"ERROR"... на два чанка. Решение — накопление в буфере:
1 2 3 4 5 6 7 | |
После split("\n") последний элемент массива — либо пустая строка (чанк закончился на \n), либо незавершённая строка. pop() сохраняет этот хвост; следующий чанк допишет его до полной строки. После цикла обрабатывается оставшийся буфер — в том числе для файлов без завершающего \n.
try/catch вокруг JSON.parse() не даёт битой строке убить весь импорт:
1 2 3 4 5 | |
Без обработки ошибок одна невалидная JSON-строка роняет весь конвейер и теряется весь прогресс. С обработкой конвейер логирует ошибку и продолжает. В реальных логах бывают повреждённые записи — конвейер должен переживать невалидные данные без остановки.
LevelSplitter и отводит данные в side channel, и пропускает всё дальше:
1 2 3 4 5 6 7 | |
Каждая запись идёт по основному конвейеру, а логи уровня ERROR дополнительно пишутся в errors.log. Получается ветвящийся конвейер: после parseLines все логи продолжают путь; ERROR дублируются в errors.log, WARN — в warnings.log, финальный поток — в all.log.
Подход экономичен по памяти: файл читается один раз, разделение идёт в потоке при постоянном объёме памяти. Два отдельных конвейера удвоили бы I/O и память.
Опция { objectMode: true } соответствует типу входа: transform получает объекты JavaScript из parseLines, а не буферы. В side destination пишем JSON-строки через JSON.stringify(log) + "\n". Парсим один раз, в конвейере работаем с объектами, сериализуем только при записи на диск.
Сплиттеры стоят последовательно:
1 | |
Каждый вызывает this.push(log) и передаёт объекты дальше. Финальный all.log тоже получает объекты; Writable по умолчанию вызовет toString() — для нормального формата в продакшене перед финальным destination добавьте serialize transform (в примере упрощено; в бою — например async function* serializeJSON(source) { for await (const obj of source) yield JSON.stringify(obj) + "\n"; }).
2) Импорт CSV с валидацией
Чтение CSV, разбор строк, валидация и пакетная вставка в БД:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | |
Конвейер собирает переиспользуемые генераторы для преобразования данных и batching для операций с БД.
В отличие от parseLines, parseCSV хранит состояние между чанками — нужно помнить строку заголовков:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | |
Переменная headers живёт всё время работы генератора: первая строка становится заголовками, остальные превращаются в объекты. Сырой CSV:
1 2 3 | |
Становится структурированными объектами:
1 2 | |
Весь CSV в память не загружается — каждая строка обрабатывается по мере прохождения потока.
Генератор validate фильтрует без изменения валидных данных:
1 2 3 4 5 6 7 8 9 | |
Валидные строки идут дальше. Невалидные логируются и пропускаются — плохие данные не попадают в БД.
Ошибка на невалидной строке обрушила бы конвейер и уничтожила бы прогресс. Реальные данные грязные — логирование с продолжением позволяет разобрать ошибки после импорта.
Генератор batch группирует строки перед записью в БД:
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
Поток отдельных элементов превращается в поток пакетов:
1 2 | |
Round-trip к БД дороги. Пакеты по 100 строк могут дать порядка 100× ускорения по сравнению с вставкой по одной. Размер batch — компромисс: слишком малый — много round-trip и низкая скорость; слишком большой — память, риск таймаута, сложнее восстановление после ошибки. Для большинства БД разумны пакеты 100–1000.
Проверка if (batch.length > 0) после цикла отдаёт финальный неполный пакет. Без неё хвостовые строки теряются молча.
DatabaseWriter обрабатывает асинхронный I/O:
1 2 3 4 5 6 7 8 | |
Метод _write может быть async, но callback вызывать обязательно: без аргументов при успехе, callback(err) при ошибке.
Пока выполняется await this.db.insertMany(batch), stream на паузе — следующий пакет не придёт, пока не завершится текущая вставка. Так не перегружают БД.
Стрелочные функции (source) => validate(source, mySchema) передают дополнительные аргументы в генераторы:
1 2 3 4 5 6 7 | |
Вы создаёте специализированные версии универсальных генераторов для конкретного конвейера.
Весь конвейер использует примерно постоянную память независимо от размера файла: при разборе CSV — только текущая строка, при валидации — одна строка, при batching — не более 100 строк, при записи — только текущий пакет. CSV на 10 ГБ и на 10 МБ потребляют сопоставимый объём памяти.
Связанное чтение¶
- Предыдущая: Transform stream в Node.js
- Далее: Zero-copy stream: scatter/gather