Поток¶
Стабильность: 2 – Стабильная
АПИ является удовлетворительным. Совместимость с NPM имеет высший приоритет и не будет нарушена кроме случаев явной необходимости.
Поток - это абстрактный интерфейс для работы с потоковыми данными в Node.js. Модуль node:stream
предоставляет API для реализации интерфейса потока.
В Node.js существует множество объектов потока. Например, запрос к HTTP-серверу и process.stdout
являются экземплярами потока.
Потоки могут быть доступны для чтения, записи или и для того, и для другого. Все потоки являются экземплярами EventEmitter
.
Чтобы получить доступ к модулю node:stream
:
1 |
|
Модуль node:stream
полезен для создания новых типов экземпляров потоков. Обычно нет необходимости использовать модуль node:stream
для потребления потоков.
Организация данного документа¶
Этот документ содержит два основных раздела и третий раздел для примечаний. В первом разделе объясняется, как использовать существующие потоки в приложении. Во втором разделе объясняется, как создавать новые типы потоков.
Типы потоков¶
В Node.js существует четыре основных типа потоков:
Writable
: потоки, в которые можно записывать данные (например,fs.createWriteStream()
).Readable
: потоки, из которых можно читать данные (например,fs.createReadStream()
).Duplex
: потоки, которые являются одновременноReadable
иWritable
(например,net.Socket
).Transform
:дуплексные
потоки, которые могут изменять или преобразовывать данные по мере их записи и чтения (например,zlib.createDeflate()
).
Кроме того, в этот модуль входят служебные функции stream.pipeline()
, stream.finished()
, stream.Readable.from()
и stream.addAbortSignal()
.
Promise API потоков¶
API stream/promises
предоставляет альтернативный набор асинхронных служебных функций для потоков, которые возвращают объекты Promise
, а не используют обратные вызовы. API доступен через require('node:stream/promises')
или require('node:stream').promises
.
stream.pipeline¶
1 |
|
1 |
|
streams
{Stream[]} | {Iterable[]} | {AsyncIterable[]} | {Function[]}source
<Stream>
|<Iterable>
|<AsyncIterable>
|<Function>
- Возвращает:
<Promise>
|<AsyncIterable>
- Возвращает:
...transforms
<Stream>
|<Function>
source
<AsyncIterable>
- Возвращает:
<Promise>
|<AsyncIterable>
destination
<Stream>
|<Function>
source
<AsyncIterable>
- Возвращает:
<Promise>
|<AsyncIterable>
options
<Object>
сигнал
<AbortSignal>
end
<boolean>
- Возвращает:
<Promise>
Выполняется, когда конвейер завершен.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
Чтобы использовать AbortSignal
, передайте его внутри объекта options в качестве последнего аргумента. Когда сигнал будет прерван, на базовом конвейере будет вызвана команда destroy
с сообщением AbortError
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
API pipeline
также поддерживает асинхронные генераторы:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
1 2 3 4 5 6 7 |
|
API pipeline
предоставляет версию обратного вызова:
stream.finished¶
1 |
|
stream
<Stream>
options
<Object>
error
<boolean>
|<undefined>
readable
<boolean>
|<undefined>
writable
<boolean>
|<undefined>
сигнал
:<AbortSignal>
|<undefined>
- Возвращает:
<Promise>
Выполняется, когда поток больше не доступен для чтения или записи.
1 2 3 4 5 6 7 8 9 10 11 12 |
|
1 2 3 4 5 6 7 8 9 10 11 12 |
|
API finished
предоставляет версию обратного вызова:
Объектный режим¶
Все потоки, создаваемые API Node.js, работают исключительно со строками и объектами Buffer
(или Uint8Array
). Однако, возможно, что реализация потоков может работать с другими типами значений JavaScript (за исключением null
, который служит специальной цели в потоках). Такие потоки считаются работающими в "объектном режиме".
Экземпляры потоков переводятся в объектный режим с помощью опции objectMode
при создании потока. Попытка переключить существующий поток в объектный режим небезопасна.
Буферизация¶
Потоки Writable
и Readable
будут хранить данные во внутреннем буфере.
Объем потенциально буферизуемых данных зависит от параметра highWaterMark
, передаваемого в конструктор потока. Для обычных потоков опция highWaterMark
определяет общее количество байт. Для потоков, работающих в объектном режиме, параметр highWaterMark
указывает общее количество объектов.
Данные буферизуются в потоках Readable
, когда реализация вызывает stream.push(chunk)
. Если потребитель потока не вызывает stream.read()
, данные будут находиться во внутренней очереди, пока не будут потреблены.
Когда общий размер внутреннего буфера чтения достигнет порога, заданного параметром highWaterMark
, поток временно прекратит чтение данных из базового ресурса, пока данные, находящиеся в буфере, не будут потреблены (то есть поток перестанет вызывать внутренний метод readable._read()
, который используется для заполнения буфера чтения).
Буферизация данных в потоках Writable
происходит при многократном вызове метода writable.write(chunk)
. Пока общий размер внутреннего буфера записи ниже порога, установленного highWaterMark
, вызовы writable.write()
будут возвращать true
. Как только размер внутреннего буфера достигнет или превысит highWaterMark
, будет возвращена false
.
Ключевой целью API stream
, в частности метода stream.pipe()
, является ограничение буферизации данных до приемлемого уровня, чтобы источники и пункты назначения с разной скоростью не перегружали доступную память.
Опция highWaterMark
- это порог, а не предел: она определяет количество данных, которое поток буферизирует, прежде чем перестанет запрашивать больше данных. Она не обеспечивает жесткого ограничения памяти в целом. Конкретные реализации потоков могут выбрать более строгие ограничения, но это необязательно.
Поскольку потоки Duplex
и Transform
являются одновременно Readable
и Writable
, каждый из них поддерживает два отдельных внутренних буфера, используемых для чтения и записи, что позволяет каждой стороне работать независимо от другой, поддерживая при этом соответствующий и эффективный поток данных. Например, экземпляры net.Socket
представляют собой Duplex
потоки, чья Readable
сторона позволяет потреблять данные, полученные из сокета, и чья Writable
сторона позволяет записывать данные в сокет. Поскольку данные могут записываться в сокет быстрее или медленнее, чем приниматься, каждая сторона должна работать (и буферизироваться) независимо от другой.
Механика внутренней буферизации является внутренней деталью реализации и может быть изменена в любое время. Однако для некоторых продвинутых реализаций внутренние буферы могут быть r
API для потребителей потоков¶
Почти все приложения Node.js, независимо от того, насколько они просты, в той или иной мере используют потоки. Ниже приведен пример использования потоков в приложении Node.js, реализующем HTTP-сервер:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
|
Writable
потоки (такие как res
в примере) раскрывают такие методы как write()
и end()
, которые используются для записи данных в поток.
Потоки Readable
используют API EventEmitter
для уведомления кода приложения, когда данные доступны для чтения из потока. Эти доступные данные могут быть считаны из потока несколькими способами.
Потоки Writable
и Readable
используют API EventEmitter
различными способами для передачи текущего состояния потока.
Потоки Duplex
и Transform
являются одновременно Writable
и Readable
.
Приложениям, которые записывают данные в поток или потребляют данные из потока, не требуется реализовывать интерфейсы потоков напрямую, и у них, как правило, нет причин вызывать require('node:stream')
.
Разработчики, желающие реализовать новые типы потоков, должны обратиться к разделу "API для реализаторов потоков".
Записываемые потоки¶
Записываемые потоки - это абстракция для назначения, в которое записываются данные.
Примеры Writable
потоков включают:
- HTTP-запросы, на клиенте
- HTTP ответы, на сервере
- потоки записи fs
- потоки zlib
- crypto streams
- TCP сокеты
- stdin дочернего процесса
process.stdout
,process.stderr
Некоторые из этих примеров на самом деле являются потоками Duplex
, которые реализуют интерфейс Writable
.
Все потоки Writable
реализуют интерфейс, определенный классом stream.Writable
.
Хотя конкретные экземпляры потоков Writable
могут различаться различными способами, все потоки Writable
следуют одной и той же основной схеме использования, как показано в примере ниже:
1 2 3 4 |
|
stream.Writable¶
Событие: close¶
Событие 'close'
генерируется, когда поток и любой из его базовых ресурсов (например, дескриптор файла) закрыты. Это событие указывает на то, что больше не будет испускаться никаких событий, и никаких дальнейших вычислений не будет.
Поток Writable
всегда будет испускать событие close
, если он создан с опцией emitClose
.
Событие: drain¶
Если вызов stream.write(chunk)
возвращает false
, событие 'drain'
будет выдано, когда будет уместно возобновить запись данных в поток.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
|
Событие: error¶
Событие 'error'
генерируется, если во время записи или передачи данных произошла ошибка. При вызове обратного вызова слушателя ему передается единственный аргумент Error
.
Поток закрывается при возникновении события 'error'
, если только опция autoDestroy
не была установлена в false
при создании потока.
После error
больше не должно происходить никаких событий, кроме close
(включая события error
).
Событие: finish¶
Событие finish
возникает после вызова метода stream.end()
, и все данные были переданы в базовую систему.
1 2 3 4 5 6 7 8 |
|
Событие: pipe¶
src
<stream.Readable>
исходный поток, который передается по трубопроводу в этот объект записи
Событие pipe
возникает, когда метод stream.pipe()
вызывается на потоке readable, добавляя этот writable к его набору пунктов назначения.
1 2 3 4 5 6 7 |
|
Событие: unpipe¶
src
<stream.Readable>
Исходный поток, который unpipeed этот writable
Событие unpipe
испускается, когда метод stream.unpipe()
вызывается на потоке Readable
, удаляя этот Writable
из его набора пунктов назначения.
Это также происходит в случае, если этот поток Writable
выдает ошибку при передаче в него потока Readable
.
1 2 3 4 5 6 7 8 |
|
writable.cork¶
1 |
|
Метод writable.cork()
заставляет все записанные данные буферизироваться в памяти. Буферизованные данные будут удалены при вызове методов stream.uncork()
или stream.end()
.
Основная цель метода writable.cork()
- приспособиться к ситуации, когда в поток записывается несколько небольших фрагментов в быстрой последовательности. Вместо того, чтобы немедленно пересылать их по назначению, writable.cork()
буферизирует все куски до вызова writable.uncork()
, который передаст их все в writable._writev()
, если таковой имеется. Это предотвращает ситуацию блокировки в голове строки, когда данные буферизируются в ожидании обработки первого небольшого фрагмента. Однако использование writable.cork()
без реализации writable._writev()
может негативно сказаться на пропускной способности.
См. также: writable.uncork()
, writable._writev()
.
writable.destroy¶
1 |
|
Уничтожить поток. Опционально выдает событие 'error'
и выдает событие 'close'
(если emitClose
не установлено в false
). После этого вызова поток, доступный для записи, завершен, и последующие вызовы write()
или end()
приведут к ошибке ERR_STREAM_DESTROYED
. Это деструктивный и немедленный способ уничтожения потока. Предыдущие вызовы write()
могут не уничтожить поток и вызвать ошибку ERR_STREAM_DESTROYED
. Используйте end()
вместо destroy, если данные должны быть удалены до закрытия, или дождитесь события 'drain'
перед уничтожением потока.
1 2 3 4 5 6 7 8 9 |
|
1 2 3 4 5 6 |
|
1 2 3 4 5 6 7 |
|
После вызова destroy()
любые дальнейшие вызовы будут бесполезны, и никакие другие ошибки, кроме _destroy()
, не могут быть выданы как 'error'
.
Реализаторы не должны переопределять этот метод, а вместо этого реализовать writable._destroy()
.
writable.closed¶
Является true
после испускания close
.
writable.destroyed¶
Является true
после вызова writable.destroy()
.
1 2 3 4 5 6 7 |
|
writable.end¶
1 |
|
chunk
<string>
|<Buffer>
|<Uint8Array>
|<any>
Необязательные данные для записи. Для потоков, не работающих в объектном режиме,chunk
должен быть строкой,Buffer
илиUint8Array
. Для потоков, работающих в объектном режиме,chunk
может быть любым значением JavaScript, кромеnull
.encoding
<string>
Кодировка, еслиchunk
является строкой.callback
<Function>
Обратный вызов для завершения потока.- Возвращает:
<this>
Вызов метода writable.end()
сигнализирует о том, что данные больше не будут записываться в Writable
. Необязательные аргументы chunk
и encoding
позволяют записать последний дополнительный фрагмент данных непосредственно перед закрытием потока.
Вызов метода stream.write()
после вызова stream.end()
приведет к ошибке.
1 2 3 4 5 6 |
|
writable.setDefaultEncoding¶
1 |
|
Метод writable.setDefaultEncoding()
устанавливает кодировку по умолчанию
для потока Writable
.
writable.uncork¶
1 |
|
Метод writable.uncork()
очищает все данные, буферизованные с момента вызова stream.cork()
.
При использовании writable.cork()
и writable.uncork()
для управления буферизацией записей в поток, отложите вызов writable.uncork()
с помощью process.nextTick()
. Это позволяет выполнять пакетную обработку всех вызовов writable.write()
, которые происходят в данной фазе цикла событий Node.js.
1 2 3 4 |
|
Если метод writable.cork()
вызывается несколько раз на потоке, то такое же количество вызовов writable.uncork()
должно быть вызвано для промывки буферизованных данных.
1 2 3 4 5 6 7 8 9 |
|
См. также: writable.cork()
.
writable.writable¶
Является true
, если безопасно вызывать writable.write()
, что означает, что поток не был уничтожен, ошибочен или завершен.
writable.writableAborted¶
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
Возвращает, был ли поток уничтожен или ошибочен перед выдачей 'finish'
.
writable.writableEnded¶
Является true
после вызова writable.end()
. Это свойство не указывает, были ли данные выгружены, для этого используйте writable.writableFinished
.
writable.writableCorked¶
Количество раз, которое необходимо вызвать writable.uncork()
, чтобы полностью откупорить поток.
writable.errored¶
Возвращает ошибку, если поток был уничтожен с ошибкой.
writable.writableFinished¶
Устанавливается в true
непосредственно перед испусканием события 'finish'
.
writable.writableHighWaterMark¶
Возвращает значение highWaterMark
, переданное при создании этой writable
.
writable.writableLength¶
Это свойство содержит количество байтов (или объектов) в очереди, готовых к записи. Значение предоставляет данные интроспекции относительно состояния highWaterMark
.
writable.writableNeedDrain¶
Является true
, если буфер потока был заполнен и поток будет издавать сигнал 'drain'
.
writable.writableObjectMode¶
Получатель для свойства objectMode
данного потока Writable
.
writable.write¶
1 |
|
chunk
<string>
|<Buffer>
|<Uint8Array>
|<any>
Необязательные данные для записи. Для потоков, не работающих в объектном режиме,chunk
должен быть строкой,Buffer
илиUint8Array
. Для потоков, работающих в объектном режиме,chunk
может быть любым значением JavaScript, кромеnull
.encoding
<string>
|<null>
Кодировка, еслиchunk
является строкой. По умолчанию:'utf8'
.callback
<Function>
Обратный вызов, когда этот фрагмент данных будет удален.- Возвращает:
<boolean>
false
, если поток желает, чтобы вызывающий код дождался события'drain'
, прежде чем продолжить запись дополнительных данных; иначеtrue
.
Метод writable.write()
записывает некоторые данные в поток и вызывает предоставленный callback
, когда данные будут полностью обработаны. Если произошла ошибка, будет вызван callback
с ошибкой в качестве первого аргумента. Вызов callback
происходит асинхронно и до того, как будет выдана 'error'
.
Возвращаемое значение равно true
, если внутренний буфер меньше, чем highWaterMark
, настроенный при создании потока после приема chunk
. Если возвращается false
, дальнейшие попытки записи данных в поток должны быть прекращены до тех пор, пока не произойдет событие 'drain'
.
Пока поток не осушен, вызовы write()
будут буферизировать chunk
и возвращать false
. Когда все текущие буферизованные фрагменты будут осушены (приняты операционной системой для доставки), произойдет событие 'drain'
. Если write()
возвращает false
, не записывайте больше чанков, пока не произойдет событие 'drain'
. Хотя вызов write()
на потоке, который не осушается, разрешен, Node.js будет буферизировать все записанные фрагменты до тех пор, пока не произойдет максимальное использование памяти, после чего произойдет безусловное прерывание. Даже до прерывания, высокое использование памяти приведет к плохой работе сборщика мусора и высокой RSS (которая обычно не возвращается в систему, даже после того, как память больше не требуется). Поскольку TCP-сокеты могут никогда не разряжаться, если удаленный пир не читает данные, запись в сокет, который не разряжается, может привести к уязвимости, которую можно использовать удаленно.
Запись данных, пока поток не иссякает, особенно проблематична для Transform
, поскольку потоки Transform
по умолчанию приостанавливаются до тех пор, пока они не будут переданы по трубопроводу или не будет добавлен обработчик событий data
или readable
.
Если данные для записи могут быть сгенерированы или получены по требованию, рекомендуется инкапсулировать логику в Readable
и использовать stream.pipe()
. Однако, если вызов write()
предпочтительнее, можно соблюсти обратное давление и избежать проблем с памятью, используя событие 'drain'
:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Поток Writable
в объектном режиме всегда будет игнорировать аргумент encoding
.
Читаемые потоки¶
Читаемые потоки - это абстракция для источника, из которого потребляются данные.
Примеры Readable
потоков включают:
- HTTP ответы, на клиенте
- HTTP-запросы, на сервере
- потоки чтения fs
- потоки zlib
- crypto streams
- TCP сокеты
- stdout и stderr дочернего процесса
process.stdin
Все потоки Readable
реализуют интерфейс, определенный классом stream.Readable
.
Два режима чтения¶
Потоки Readable
эффективно работают в одном из двух режимов: текущем и приостановленном. Эти режимы отличаются от объектного режима. Поток Readable
может быть в объектном режиме или нет, независимо от того, находится ли он в потоковом режиме или в режиме паузы.
- В режиме потока данные считываются из базовой системы автоматически и предоставляются приложению как можно быстрее с помощью событий через интерфейс
EventEmitter
. - В режиме паузы для чтения фрагментов данных из потока необходимо явно вызывать метод
stream.read()
.
Все потоки Readable
начинаются в режиме паузы, но могут быть переключены в режим потока одним из следующих способов:
- Добавление обработчика события
'data'
. - Вызов метода
stream.resume()
. - Вызов метода
stream.pipe()
для отправки данных наWritable
.
Readable
может переключиться обратно в режим паузы, используя одно из следующих действий:
- Если нет мест назначения, вызвав метод
stream.pause()
. - Если есть места назначения труб, то путем удаления всех мест назначения труб. Несколько мест назначения труб можно удалить, вызвав метод
stream.unpipe()
.
Важно помнить, что Readable
не будет генерировать данные, пока не будет предоставлен механизм для потребления или игнорирования этих данных. Если механизм потребления отключен или убран, Readable
будет пытаться прекратить генерировать данные.
По причинам обратной совместимости, удаление обработчиков событий 'data'
не будет автоматически приостанавливать поток. Кроме того, если есть конечные пункты назначения, то вызов stream.pause()
не гарантирует, что поток останется приостановленным, когда эти пункты назначения иссякнут и запросят больше данных.
Если Readable
переключается в режим потока и нет потребителей, доступных для обработки данных, эти данные будут потеряны. Это может произойти, например, когда метод readable.resume()
вызывается без слушателя, присоединенного к событию 'data'
, или когда обработчик события 'data'
удаляется из потока.
Добавление обработчика события 'readable'
автоматически прекращает поток, и данные должны быть потреблены через readable.read()
. Если обработчик события 'readable'
удален, то поток снова начнет течь, если есть обработчик события 'data'
.
Три состояния¶
Два режима работы потока Readable
- это упрощенная абстракция для более сложного внутреннего управления состояниями, которое происходит в реализации потока Readable
.
В частности, в любой момент времени каждый Readable
находится в одном из трех возможных состояний:
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
.
Когда readable.readableFlowing
имеет значение null
, механизм потребления данных потока не предусмотрен. Поэтому поток не будет генерировать данные. В этом состоянии прикрепление слушателя для события 'data'
, вызов метода readable.pipe()
или вызов метода readable.resume()
переключит readable.readableFlowing
в true
, заставляя Readable
начать активно генерировать события по мере генерации данных.
Вызов readable.pause()
, readable.unpipe()
или получение обратного давления приведет к тому, что readable.readableFlowing
будет установлен как false
, временно останавливая поток событий, но не останавливая генерацию данных. Находясь в этом состоянии, прикрепление слушателя для события 'data'
не переключит readable.readableFlowing
в true
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
Пока readable.readableFlowing
имеет значение false
, данные могут накапливаться во внутреннем буфере потока.
Выберите один стиль API¶
API потока Readable
развивался на протяжении нескольких версий Node.js и предоставляет несколько методов потребления данных потока. В целом, разработчики должны выбрать один из методов потребления данных и никогда не должны использовать несколько методов для потребления данных из одного потока. В частности, использование комбинации on('data')
, on('readable')
, pipe()
или асинхронных итераторов может привести к неинтуитивному поведению.
stream.Readable¶
Событие: close¶
Событие close
генерируется, когда поток и любой из его базовых ресурсов (например, дескриптор файла) закрыты. Это событие указывает на то, что больше не будет испускаться никаких событий, и никаких дальнейших вычислений не будет.
Поток Readable
всегда будет испускать событие close
, если он создан с опцией emitClose
.
Событие: data¶
chunk
<Buffer>
|<string>
|<any>
Кусок данных. Для потоков, не работающих в объектном режиме, чанк будет либо строкой, либобуфером
. Для потоков, работающих в объектном режиме, чанк может быть любым значением JavaScript, кромеnull
.
Событие 'data'
генерируется всякий раз, когда поток передает право собственности на кусок данных потребителю. Это может происходить всякий раз, когда поток переключается в режим потока, вызывая readable.pipe()
, readable.resume()
или присоединяя обратный вызов слушателя к событию 'data'
. Событие 'data'
также будет возникать всякий раз, когда вызывается метод readable.read()
и фрагмент данных доступен для возврата.
Прикрепление слушателя события 'data'
к потоку, который не был явно приостановлен, переключит поток в режим потока. Данные будут передаваться, как только они станут доступны.
В обратный вызов слушателя будет передан фрагмент данных в виде строки, если для потока была задана кодировка по умолчанию с помощью метода readable.setEncoding()
; в противном случае данные будут переданы в виде Buffer
.
1 2 3 4 |
|
Событие: end¶
Событие 'end'
происходит, когда больше нет данных для потребления из потока.
Событие 'end'
не будет вызвано, пока данные не будут полностью израсходованы. Этого можно добиться, переключив поток в режим потока, или вызывая stream.read()
несколько раз, пока все данные не будут потреблены.
1 2 3 4 5 6 7 |
|
Событие: error¶
Событие 'error'
может быть вызвано реализацией Readable
в любое время. Как правило, это может произойти, если базовый поток не может генерировать данные из-за внутреннего сбоя или когда реализация потока пытается передать недопустимый фрагмент данных.
Обратному вызову слушателя будет передан единственный объект Error
.
Событие: pause¶
Событие pause
происходит, когда вызывается stream.pause()
и readableFlowing
не равно false
.
Событие: readable¶
Событие 'readable'
генерируется, когда из потока доступны данные для чтения или когда достигнут конец потока. По сути, событие 'readable'
указывает на то, что в потоке есть новая информация. Если данные доступны, stream.read()
вернет эти данные.
1 2 3 4 5 6 7 8 9 |
|
Если достигнут конец потока, вызов stream.read()
вернет null
и вызовет событие 'end'
. Это также верно, если никогда не было данных для чтения. Например, в следующем примере foo.txt
является пустым файлом:
1 2 3 4 5 6 7 8 |
|
Результатом выполнения этого скрипта будет:
1 2 3 |
|
В некоторых случаях прикрепление слушателя для события 'readable'
приведет к считыванию некоторого количества данных во внутренний буфер.
В целом, механизмы событий readable.pipe()
и 'data'
проще для понимания, чем событие 'readable'
. Однако обработка 'readable'
может привести к увеличению пропускной способности.
Если одновременно используются readable
и 'data'
, 'readable'
имеет приоритет в управлении потоком, т. е. 'data'
будет выдаваться только при вызове stream.read()
. Свойство readableFlowing
станет false
. Если есть слушатели 'data'
, когда 'readable'
будет удалено, поток начнет течь, т. е. события 'data'
будут испускаться без вызова .resume()
.
Событие: resume¶
Событие 'resume'
происходит, когда вызывается stream.resume()
и readableFlowing
не является true
.
readable.destroy¶
1 |
|
error
<Error>
Ошибка, которая будет передана в качестве полезной нагрузки в событии'error'
.- Возвращает:
<this>
Уничтожить поток. Опционально испускает событие 'error'
и испускает событие 'close'
(если emitClose
не установлено в false
). После этого вызова читаемый поток освободит все внутренние ресурсы, и последующие вызовы push()
будут игнорироваться.
После вызова destroy()
все последующие вызовы будут бесполезны, и никакие другие ошибки, кроме _destroy()
, не могут быть выданы как 'error'
.
Реализаторы не должны переопределять этот метод, а вместо этого реализовать readable._destroy()
.
readable.closed¶
Является true
после испускания close
.
readable.destroyed¶
Является true
после вызова readable.destroy()
.
readable.isPaused¶
1 |
|
- Возвращает:
<boolean>
Метод readable.isPaused()
возвращает текущее рабочее состояние Readable
. Он используется в основном механизмом, который лежит в основе метода readable.pipe()
. В большинстве типичных случаев нет причин использовать этот метод напрямую.
1 2 3 4 5 6 7 |
|
readable.pause¶
1 |
|
- Возвращает:
<this>
Метод readable.pause()
заставит поток в режиме потока прекратить испускать события 'data'
, переходя из режима потока. Любые данные, которые становятся доступными, остаются во внутреннем буфере.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
Метод readable.pause()
не имеет эффекта, если существует слушатель событий 'readable'
.
readable.pipe¶
1 |
|
destination
<stream.Writable>
Место назначения для записи данныхoptions
<Object>
Опции трубыend
<boolean>
Завершить запись при завершении чтения. По умолчанию:true
.
- Возвращает:
<stream.Writable>
конечный пункт, позволяющий создавать цепочку труб, если это потокDuplex
илиTransform
.
Метод readable.pipe()
присоединяет поток Writable
к readable
, заставляя его автоматически переключаться в режим потока и передавать все свои данные в присоединенный Writable
. Поток данных будет автоматически управляться таким образом, чтобы конечный поток Writable
не был перегружен более быстрым потоком Readable
.
В следующем примере все данные из readable
передаются в файл с именем file.txt
:
1 2 3 4 5 |
|
Можно присоединить несколько потоков Writable
к одному потоку Readable
.
Метод readable.pipe()
возвращает ссылку на поток назначения, что позволяет создавать цепочки потоков, передаваемых по трубопроводу:
1 2 3 4 5 6 |
|
По умолчанию, stream.end()
вызывается на конечном Writable
потоке, когда исходный Readable
поток испускает 'end'
, так что конечный поток больше не доступен для записи. Чтобы отключить это поведение по умолчанию, опцию end
можно передать как false
, в результате чего поток назначения останется открытым:
1 2 3 4 |
|
Важной оговоркой является то, что если поток Readable
выдает ошибку во время обработки, направление Writable
не закрывается автоматически. Если произойдет ошибка, необходимо будет ручно закрыть каждый поток, чтобы предотвратить утечку памяти.
Потоки process.stderr
и process.stdout
Writable
никогда не закрываются до выхода процесса Node.js, независимо от указанных опций.
readable.read¶
1 |
|
size
<number>
Необязательный аргумент, указывающий, сколько данных нужно прочитать.- Возвращает:
<string>
|<Buffer>
|<null>
|<any>
Метод readable.read()
считывает данные из внутреннего буфера и возвращает их. Если данные не доступны для чтения, возвращается null
. По умолчанию данные возвращаются в виде объекта Buffer
, если только кодировка не была указана с помощью метода readable.setEncoding()
или поток работает в объектном режиме.
Необязательный аргумент size
задает определенное количество байт для чтения. Если size
байт недоступен для чтения, будет возвращен null
, если только поток не завершился, в этом случае будут возвращены все данные, оставшиеся во внутреннем буфере.
Если аргумент size
не указан, будут возвращены все данные, содержащиеся во внутреннем буфере.
Аргумент size
должен быть меньше или равен 1 GiB.
Метод readable.read()
следует вызывать только на потоках Readable
, работающих в приостановленном режиме. В потоковом режиме readable.read()
вызывается автоматически, пока внутренний буфер не будет полностью опустошен.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
Каждый вызов readable.read()
возвращает фрагмент данных или null
. Куски не конкатенируются. Цикл while
необходим для потребления всех данных, находящихся в буфере. При чтении большого файла .read()
может вернуть null
, израсходовав все буферизованное содержимое, но есть еще больше данных, которые еще не буферизованы. В этом случае новое событие 'readable'
будет выдано, когда в буфере будет больше данных. Наконец, событие 'end'
будет вызвано, когда больше не будет данных.
Поэтому, чтобы прочитать все содержимое файла из readable
, необходимо собирать фрагменты в несколько событий 'readable'
:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Поток Readable
в объектном режиме всегда будет возвращать один элемент из вызова readable.read(size)
, независимо от значения аргумента size
.
Если метод readable.read()
возвращает фрагмент данных, также будет выдано событие 'data'
.
Вызов stream.read([size])
после того, как было выдано событие 'end'
, вернет null
. Никакой ошибки во время выполнения не возникнет.
readable.readable¶
Является true
, если безопасно вызывать readable.read()
, что означает, что поток не был уничтожен или выдал 'error'
или 'end'
.
readable.readableAborted¶
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
Возвращает, был ли поток уничтожен или ошибочен перед выдачей 'end'
.
readable.readableDidRead¶
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
Возвращает, были ли испущены данные
.
readable.readableEncoding¶
Получатель свойства encoding
для данного потока Readable
. Свойство encoding
может быть установлено с помощью метода readable.setEncoding()
.
readable.readableEnded¶
Становится true
, когда испускается событие 'end'
.
readable.errored¶
Возвращает ошибку, если поток был уничтожен с ошибкой.
readable.readableFlowing¶
Это свойство отражает текущее состояние потока Readable
, как описано в разделе Три состояния.
readable.readableHighWaterMark¶
Возвращает значение highWaterMark
, переданное при создании этого Readable
.
readable.readableLength¶
Это свойство содержит количество байтов (или объектов) в очереди, готовых к чтению. Значение предоставляет данные интроспекции относительно состояния highWaterMark
.
readable.readableObjectMode¶
Получатель для свойства objectMode
данного потока Readable
.
readable.resume¶
1 |
|
- Возвращает:
<this>
Метод readable.resume()
заставляет явно приостановленный поток Readable
возобновить испускание событий 'data'
, переводя поток в режим потока.
Метод readable.resume()
можно использовать для полного потребления данных из потока без фактической обработки этих данных:
1 2 3 4 5 |
|
Метод readable.resume()
не имеет эффекта, если существует слушатель события 'readable'
.
readable.setEncoding¶
1 |
|
Метод readable.setEncoding()
устанавливает кодировку символов для данных, считываемых из потока Readable
.
По умолчанию кодировка не задается, и данные потока будут возвращаться в виде объектов Buffer
. Установка кодировки приводит к тому, что данные потока будут возвращаться в виде строк указанной кодировки, а не в виде объектов Buffer
. Например, вызов readable.setEncoding('utf8')
приведет к тому, что выходные данные будут интерпретированы как данные UTF-8 и переданы как строки. Вызов readable.setEncoding('hex')
приведет к тому, что данные будут закодированы в шестнадцатеричном формате строк.
Поток Readable
будет правильно обрабатывать многобайтовые символы, передаваемые через поток, которые в противном случае были бы неправильно декодированы, если бы просто извлекались из потока как объекты Buffer
.
1 2 3 4 5 6 7 8 9 |
|
readable.unpipe¶
1 |
|
destination
<stream.Writable>
Необязательный конкретный поток для распайки- Возвращает:
<this>
Метод readable.unpipe()
отсоединяет поток Writable
, ранее присоединенный с помощью метода stream.pipe()
.
Если destination
не указан, то отсоединяются все трубы.
Если назначение
указано, но для него не установлена труба, то метод ничего не делает.
1 2 3 4 5 6 7 8 9 10 11 12 |
|
readable.unshift¶
1 |
|
chunk
<Buffer>
|<Uint8Array>
|<string>
|<null>
|<any>
Кусок данных для выгрузки в очередь чтения. Для потоков, не работающих в объектном режиме,chunk
должен быть строкой,Buffer
,Uint8Array
илиnull
. Для потоков, работающих в объектном режиме,chunk
может быть любым значением JavaScript.encoding
<string>
Кодировка кусков строки. Должна быть правильной кодировкойBuffer
, такой как'utf8
или'ascii
.
Передача chunk
как null
сигнализирует о конце потока (EOF) и ведет себя так же, как readable.push(null)
, после чего данные больше не могут быть записаны. Сигнал EOF ставится в конце буфера, и все буферизованные данные все равно будут смыты.
Метод readable.unshift()
выталкивает фрагмент данных обратно во внутренний буфер. Это полезно в некоторых ситуациях, когда поток потребляется кодом, которому нужно "отменить потребление" некоторого количества данных, которые он оптимистично извлек из источника, чтобы эти данные могли быть переданы другой стороне.
Метод stream.unshift(chunk)
не может быть вызван после того, как произошло событие 'end'
, иначе будет выдана ошибка времени выполнения.
Разработчикам, часто использующим stream.unshift()
, следует рассмотреть возможность перехода на использование потока Transform
вместо этого. Дополнительную информацию смотрите в разделе "API для реализаторов потоков".
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
|
В отличие от stream.push(chunk)
, stream.unshift(chunk)
не завершает процесс чтения, сбрасывая внутреннее состояние потока. Это может привести к неожиданным результатам, если readable.unshift()
вызывается во время чтения (например, из реализации stream._read()
на пользовательском потоке). После вызова readable.unshift()
с немедленным stream.push('')
будет сброшен параметр
readable.wrap¶
1 |
|
До версии Node.js 0.10 потоки не реализовывали весь API модуля node:stream
, как он определен в настоящее время. (Более подробную информацию смотрите в "Совместимость").
При использовании старой библиотеки Node.js, которая испускает события 'data'
и имеет метод stream.pause()
, который является только рекомендательным, метод readable.wrap()
можно использовать для создания потока Readable
, который использует старый поток в качестве источника данных.
Использование readable.wrap()
потребуется редко, но метод был предоставлен в качестве удобства для взаимодействия со старыми приложениями и библиотеками Node.js.
1 2 3 4 5 6 7 8 |
|
readable[Symbol.asyncIterator]
¶
1 |
|
- Возвращает:
<AsyncIterator>
для полного потребления потока.
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Если цикл завершится с break
, return
или throw
, поток будет уничтожен. Другими словами, итерация над потоком будет полностью его потреблять. Поток будет считываться кусками размером, равным параметру highWaterMark
. В приведенном выше примере данные будут в одном куске, если файл имеет размер менее 64 KiB, потому что опция highWaterMark
не предоставляется в fs.createReadStream()
.
readable.compose¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
stream
<Stream>
|<Iterable>
|<AsyncIterable>
|<Function>
options
<Object>
signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает: {Duplex} поток, составленный с потоком
stream
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
Дополнительную информацию смотрите в stream.compose
.
readable.iterator¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
options
<Object>
destroyOnReturn
<boolean>
Если установлено значениеfalse
, вызовreturn
на асинхронном итераторе или завершение итерацииfor await...of
с помощьюbreak
,return
илиthrow
не будет уничтожать поток. По умолчанию:true
.
- Возвращает:
<AsyncIterator>
для потребления потока.
Итератор, созданный этим методом, дает пользователям возможность отменить уничтожение потока, если цикл for await...of
будет завершен return
, break
или throw
, или если итератор должен уничтожить поток, если поток выдал ошибку во время итерации.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
|
readable.map¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
fn
<Function>
| {AsyncFunction} функция для отображения каждого куска данных в потоке.data
<any>
фрагмент данных из потока.options
<Object>
signal
<AbortSignal>
прерывается, если поток уничтожается, позволяя прервать вызовfn
раньше времени.
options
<Object>
concurrency
<number>
максимальное количество одновременных вызововfn
для потока. По умолчанию:1
.signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает: {Readable} поток, отображенный с помощью функции
fn
.
Этот метод позволяет выполнять отображение над потоком. Функция fn
будет вызываться для каждого чанка в потоке. Если функция fn
возвращает обещание - это обещание будет ожидаться
перед передачей в поток результатов.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
readable.filter¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
fn
<Function>
| {AsyncFunction} функция для фильтрации фрагментов из потока.data
<any>
кусок данных из потока.options
<Object>
signal
<AbortSignal>
прерывается, если поток уничтожается, позволяя прервать вызовfn
раньше времени.
options
<Object>
concurrency
<number>
максимальное количество одновременных вызововfn
для потока. По умолчанию:1
.signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает: {Readable} поток, отфильтрованный с помощью предиката
fn
.
Этот метод позволяет фильтровать поток. Для каждого куска в потоке будет вызвана функция fn
, и если она вернет истинное значение, то кусок будет передан в поток результатов. Если функция fn
возвращает обещание - это обещание будет ожидаться
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
readable.forEach¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
fn
<Function>
| {AsyncFunction} функция для вызова на каждом фрагменте потока.data
<any>
фрагмент данных из потока.options
<Object>
signal
<AbortSignal>
прерывается, если поток уничтожается, позволяя прервать вызовfn
раньше времени.
options
<Object>
concurrency
<number>
максимальное количество одновременных вызововfn
для потока. По умолчанию:1
.signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает:
<Promise>
обещание о завершении потока.
Этот метод позволяет итерировать поток. Для каждого куска в потоке будет вызвана функция fn
. Если функция fn
возвращает обещание - это обещание будет await
.
Этот метод отличается от циклов for await...of
тем, что он может обрабатывать фрагменты одновременно. Кроме того, итерацию forEach
можно остановить только передав опцию signal
и прервав соответствующий AbortController
, в то время как for await...of
можно остановить с помощьюbreak
илиreturn
. В любом случае поток будет уничтожен.
Этот метод отличается от прослушивания события 'data'
тем, что он использует событие readable
в базовой машине и может ограничить количество одновременных вызовов fn
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
|
readable.toArray¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
options
<Object>
signal
<AbortSignal>
позволяет отменить операцию toArray, если сигнал прерван.
- Возвращает:
<Promise>
обещание, содержащее массив с содержимым потока.
Этот метод позволяет легко получить содержимое потока.
Поскольку этот метод считывает весь поток в память, он сводит на нет преимущества потоков. Он предназначен для совместимости и удобства, а не как основной способ потребления потоков.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|
readable.some¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
fn
<Function>
| {AsyncFunction} функция для вызова на каждом фрагменте потока.data
<any>
фрагмент данных из потока.options
<Object>
signal
<AbortSignal>
прерывается, если поток уничтожается, позволяя прервать вызовfn
раньше времени.
options
<Object>
concurrency
<number>
максимальное количество одновременных вызововfn
для потока. По умолчанию:1
.signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает:
<Promise>
обещание, оценивающееtrue
, еслиfn
вернул истинное значение хотя бы для одного из чанков.
Этот метод похож на Array.prototype.some
и вызывает fn
на каждом куске в потоке, пока ожидаемое возвращаемое значение не станет true
(или любым истинным значением). Как только вызов fn
на куске, ожидающем возврата значения, становится истинным, поток уничтожается и обещание выполняется с true
. Если ни один из вызовов fn
на чанках не возвращает истинное значение, обещание выполняется с false
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
readable.find¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
fn
<Function>
| {AsyncFunction} функция для вызова на каждом фрагменте потока.data
<any>
фрагмент данных из потока.options
<Object>
signal
<AbortSignal>
прерывается, если поток уничтожается, позволяя прервать вызовfn
раньше времени.
options
<Object>
concurrency
<number>
максимальное количество одновременных вызововfn
для потока. По умолчанию:1
.signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает:
<Promise>
обещание, оценивающее первый чанк, для которогоfn
имеет истинностное значение, илиundefined
, если элемент не был найден.
Этот метод похож на Array.prototype.find
и вызывает fn
на каждом куске в потоке, чтобы найти кусок с истинностным значением для fn
. Как только ожидаемое возвращаемое значение вызова fn
становится истинным, поток уничтожается, а обещание выполняется значением, для которого fn
вернул истинное значение. Если все вызовы fn
в чанках возвращают ложное значение, обещание выполняется с undefined
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|
readable.every¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
fn
<Function>
| {AsyncFunction} функция для вызова на каждом куске потока.data
<any>
фрагмент данных из потока.options
<Object>
signal
<AbortSignal>
прерывается, если поток уничтожается, позволяя прервать вызовfn
раньше времени.
options
<Object>
concurrency
<number>
максимальное количество одновременных вызововfn
для потока. По умолчанию:1
.signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает:
<Promise>
обещание, оценивающееtrue
, еслиfn
вернул истинное значение для всех чанков.
Этот метод похож на Array.prototype.every
и вызывает fn
на каждом куске в потоке, чтобы проверить, являются ли все ожидаемые возвращаемые значения истинным значением для fn
. Как только вызов fn
на чанке, ожидающем возврата значения, оказывается ложным, поток уничтожается, а обещание выполняется с false
. Если все вызовы fn
на чанках возвращают истинное значение, обещание выполняется с true
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|
readable.flatMap¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
fn
{Function|AsyncGeneratorFunction|AsyncFunction} функция для отображения каждого куска в потоке.data
<any>
фрагмент данных из потока.options
<Object>
signal
<AbortSignal>
прерывается, если поток уничтожается, позволяя прервать вызовfn
раньше времени.
options
<Object>
concurrency
<number>
максимальное количество одновременных вызововfn
для потока. По умолчанию:1
.signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает: {Readable} поток, отображенный с помощью функции
fn
.
Этот метод возвращает новый поток, применяя заданный обратный вызов к каждому фрагменту потока и затем сглаживая результат.
Можно вернуть поток или другую итерабельную или асинхронную итерабельную функцию из fn
, и потоки результатов будут объединены (сплющены) в возвращаемый поток.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
readable.drop¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
limit
<number>
количество кусков, которые нужно отбросить из читаемого файла.options
<Object>
signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает: {Readable} поток с
лимитом
отброшенных чанков.
Этот метод возвращает новый поток с первым лимитом
отброшенных кусков.
1 2 3 |
|
readable.take¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
limit
<number>
количество кусков, которые нужно взять из читаемого файла.options
<Object>
signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает: {Readable} поток с
лимитом
занятых фрагментов.
Этот метод возвращает новый поток с первыми лимитными
чанками.
1 2 3 |
|
readable.asIndexedPairs¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
options
<Object>
signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает: {Readable} поток индексированных пар.
Этот метод возвращает новый поток с фрагментами базового потока в паре со счетчиком в виде [index, chunk]
. Первое значение индекса равно 0, и оно увеличивается на 1 для каждого полученного куска.
1 2 3 4 5 6 |
|
readable.reduce¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
fn
<Function>
| {AsyncFunction} функция редуктора для вызова над каждым куском в потоке.previous
<any>
значение, полученное от последнего вызоваfn
илиinitial
, если указано, или первый чанк потока в противном случае.data
<any>
фрагмент данных из потока.options
<Object>
signal
<AbortSignal>
прерывается, если поток уничтожается, позволяя прервать вызовfn
раньше времени.
initial
<any>
начальное значение для использования в сокращении.options
<Object>
signal
<AbortSignal>
позволяет уничтожить поток, если сигнал прерван.
- Возвращает:
<Promise>
обещание конечного значения редукции.
Этот метод вызывает fn
на каждом куске потока по порядку, передавая ему результат вычисления на предыдущем элементе. Он возвращает обещание конечного значения редукции.
Функция reducer итерирует поток элемент за элементом, что означает отсутствие параметра concurrency
или параллелизма. Чтобы выполнить reduce
параллельно, его можно подключить к методу readable.map
.
Если значение initial
не указано, то в качестве начального значения используется первый кусок потока. Если поток пуст, обещание отклоняется с TypeError
со свойством кода ERR_INVALID_ARGS
.
1 2 3 4 5 6 7 8 |
|
Дуплекс и преобразование потоков¶
stream.Duplex¶
Двусторонние потоки - это потоки, которые реализуют оба интерфейса Readable
и Writable
.
Примерами дуплексных
потоков являются:
duplex.allowHalfOpen¶
Если false
, то поток будет автоматически завершать записываемую сторону, когда заканчивается читаемая сторона. Изначально устанавливается опцией конструктора allowHalfOpen
, которая по умолчанию имеет значение true
.
Этот параметр можно изменить вручную, чтобы изменить поведение полуоткрытия существующего экземпляра потока Duplex
, но он должен быть изменен до того, как будет вызвано событие 'end'
.
stream.Transform¶
Потоки Transform
- это потоки Duplex
, в которых выход каким-то образом связан с входом. Как и все потоки Duplex
, потоки Transform
реализуют интерфейсы Readable
и Writable
.
Примеры потоков Transform
включают:
transform.destroy¶
1 |
|
Уничтожить поток и, по желанию, выдать событие 'error'
. После этого вызова поток преобразования освободит все внутренние ресурсы. Реализаторы не должны переопределять этот метод, а вместо этого реализовать readable._destroy()
. Реализация по умолчанию _destroy()
для Transform
также испускает 'close'
, если emitClose
не установлен в false.
После вызова destroy()
любые дальнейшие вызовы будут бесполезны, и никакие другие ошибки, кроме _destroy()
, не могут быть выданы как 'error'
.
stream.finished¶
1 |
|
stream
<Stream>
|<ReadableStream>
| {WritableStream}
Читаемый и/или записываемый поток/вебстрим.
options
<Object>
error
<boolean>
Если установлено значениеfalse
, то вызовemit('error', err)
не рассматривается как завершенный. По умолчанию:true
.readable
<boolean>
Если установлено значениеfalse
, обратный вызов будет вызван, когда поток завершится, даже если поток все еще может быть доступен для чтения. По умолчанию:true
.writable
<boolean>
Если установлено значениеfalse
, обратный вызов будет вызван при завершении потока, даже если поток может быть доступен для записи. По умолчанию:true
.signal
<AbortSignal>
позволяет прервать ожидание завершения потока. Основной поток не будет прерван, если сигнал прерван. Обратный вызов будет вызван с сообщениемAbortError
. Все зарегистрированные слушатели, добавленные этой функцией, также будут удалены.cleanup
<boolean>
удалить все зарегистрированные слушатели потока. По умолчанию:false
.
callback
<Function>
Функция обратного вызова, принимающая необязательный аргумент ошибки.- Возвращает:
<Function>
Функция очистки, которая удаляет всех зарегистрированных слушателей.
Функция для получения уведомления, когда поток больше не доступен для чтения, записи или произошла ошибка или событие преждевременного закрытия.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
Особенно полезен в сценариях обработки ошибок, когда поток уничтожается преждевременно (например, прерванный HTTP-запрос), и не выдает 'end'
или 'finish'
.
API finished
предоставляет промис версию.
Функция stream.finished()
оставляет висящие слушатели событий (в частности, 'error'
, 'end'
, 'finish'
и 'close'
) после вызова callback
. Это делается для того, чтобы неожиданные события ошибки
(из-за неправильной реализации потока) не приводили к неожиданным сбоям. Если это нежелательное поведение, то возвращаемая функция очистки должна быть вызвана в обратном вызове:
1 2 3 4 |
|
stream.pipeline¶
1 2 3 |
|
streams
{Stream[]} | {Iterable[]} | {AsyncIterable[]} | {Function[]} | {ReadableStream[]} | {WritableStream[]} | {TransformStream[]}source
<Stream>
|<Iterable>
|<AsyncIterable>
|<Function>
|<ReadableStream>
- Возвращает:
<Iterable>
|<AsyncIterable>
- Возвращает:
...transforms
<Stream>
|<Function>
| {TransformStream}source
<AsyncIterable>
- Возвращает:
<AsyncIterable>
destination
<Stream>
|<Function>
| {WritableStream}source
<AsyncIterable>
- Возвращает:
<AsyncIterable>
|<Promise>
callback
<Function>
Вызывается, когда конвейер полностью завершен.err
<Error>
val
Разрешенное значениеPromise
, возвращенноеdestination
.
- Возвращает:
<Stream>
Метод модуля для передачи данных между потоками и генераторами, пересылающими ошибки и должным образом очищающими их, а также предоставляющими обратный вызов, когда конвейер завершен.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
API pipeline
предоставляет промис версию.
stream.pipeline()
будет вызывать stream.destroy(err)
для всех потоков, кроме:
Readable
потоков, которые выдали команду'end'
или'close'
.Writable
потоков, которые выдали'finish'
или'close'
.
stream.pipeline()
оставляет висящие слушатели событий на потоках после вызова callback
. В случае повторного использования потоков после сбоя это может привести к утечке слушателей событий и проглоченным ошибкам. Если последний поток доступен для чтения, висячие слушатели событий будут удалены, чтобы последний поток мог быть использован позже.
stream.pipeline()
закрывает все потоки при возникновении ошибки. Использование IncomingRequest
с pipeline
может привести к неожиданному поведению, когда сокет будет уничтожен без отправки ожидаемого ответа. Смотрите пример ниже:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
stream.compose¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
stream.compose
является экспериментальным.
streams
{Stream[]} | {Iterable[]} | {AsyncIterable[]} | {Function[]} | {ReadableStream[]} | {WritableStream[]} | {TransformStream[]}- Возвращает: {stream.Duplex}
Объединяет два или более потоков в поток Duplex
, который пишет в первый поток и читает из последнего. Каждый предоставленный поток передается в следующий, используя stream.pipeline
. Если какой-либо из потоков ошибается, то все они уничтожаются, включая внешний поток Duplex
.
Поскольку stream.compose
возвращает новый поток, который, в свою очередь, может (и должен) передаваться в другие потоки, он обеспечивает композицию. Напротив, при передаче потоков в stream.pipeline
, обычно первый поток является потоком для чтения, а последний - потоком для записи, образуя замкнутую цепь.
Если передается Function
, то это должен быть фабричный метод, принимающий Iterable
источника.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|
stream.compose
можно использовать для преобразования асинхронных итерабельных, генераторов и функций в потоки.
AsyncIterable
преобразуется в читаемыйDuplex
. Не может выдатьnull
.AsyncGeneratorFunction
преобразует в читаемое/записываемое преобразованиеDuplex
. В качестве первого параметра должна принимать исходныйAsyncIterable
. Не может выдаватьnull
.AsyncFunction
преобразует в записываемоеDuplex
. Должна возвращать либоnull
, либоundefined
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
|
См. readable.compose(stream)
для stream.compose
как оператора.
stream.Readable.from¶
1 |
|
iterable
<Iterable>
Объект, реализующий протокол итератораSymbol.asyncIterator
илиSymbol.iterator
. Выдает событие 'error', если передано нулевое значение.options
<Object>
Параметры, предоставляемыеnew stream.Readable([options])
. По умолчанию,Readable.from()
будет устанавливатьoptions.objectMode
вtrue
, если это не будет явно отклонено установкойoptions.objectMode
вfalse
.- Возвращает:
<stream.Readable>
Метод утилиты для создания читаемых потоков из итераторов.
1 2 3 4 5 6 7 8 9 10 11 12 |
|
При вызове Readable.from(string)
или Readable.from(buffer)
строки или буферы не будут итерироваться в соответствии с семантикой других потоков по причинам производительности.
Если в качестве аргумента передается объект Iterable
, содержащий обещания, это может привести к необработанному отказу.
1 2 3 4 5 6 7 8 9 10 |
|
stream.Readable.fromWeb¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
readableStream
<ReadableStream>
options
<Object>
encoding
<string>
highWaterMark
<number>
objectMode
<boolean>
сигнал
<AbortSignal>
- Возвращает:
<stream.Readable>
stream.Readable.isDisturbed¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
stream
<stream.Readable>
|<ReadableStream>
- Возвращает:
boolean
.
Возвращает, был ли поток прочитан или отменен.
stream.isErrored¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
stream
{Readable} | {Writable} | {Duplex} | {WritableStream} |<ReadableStream>
- Возвращает:
<boolean>
Возвращает, столкнулся ли поток с ошибкой.
stream.isReadable¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
stream
{Readable} | {Duplex} |<ReadableStream>
- Возвращает:
<boolean>
Возвращает, является ли поток читаемым.
stream.Readable.toWeb¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
streamReadable
<stream.Readable>
options
<Object>
стратегия
<Object>
highWaterMark
<number>
Максимальный размер внутренней очереди (созданногоReadableStream
) перед применением обратного давления при чтении из данногоstream.Readable
. Если значение не указано, оно будет взято из данногоstream.Readable
.size
<Function>
Функция, определяющая размер заданного куска данных. Если значение не указано, размер будет равен1
для всех чанков.
- Возвращает:
<ReadableStream>
stream.Writable.fromWeb¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
writableStream
{WritableStream}options
<Object>
decodeStrings
<boolean>
highWaterMark
<number>
objectMode
<boolean>
signal
<AbortSignal>
- Возвращает:
<stream.Writable>
stream.Writable.toWeb¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
streamWritable
<stream.Writable>
- Возвращает: {WritableStream}
stream.Duplex.from¶
1 |
|
src
<Stream>
|<Blob>
|<ArrayBuffer>
|<string>
|<Iterable>
|<AsyncIterable>
| {AsyncGeneratorFunction} | {AsyncFunction} |<Promise>
|<Object>
|<ReadableStream>
| {WritableStream}
Утилита для создания дуплексных потоков.
Stream
преобразует записываемый поток в записываемыйDuplex
и читаемый поток вDuplex
.Blob
преобразует читаемый поток в читаемыйDuplex
.string
преобразует в читаемыйDuplex
.ArrayBuffer
преобразуется в читаемыйDuplex
.AsyncIterable
преобразуется в читаемыйDuplex
. Не может выдатьnull
.AsyncGeneratorFunction
преобразует в читаемое/записываемое преобразованиеDuplex
. В качестве первого параметра должна принимать исходныйAsyncIterable
. Не может выдаватьnull
.AsyncFunction
преобразует в записываемоеDuplex
. Должна возвращать либоnull
, либоundefined
.Object ({ writable, readable })
преобразуетreadable
иwritable
вStream
и затем объединяет их вDuplex
, гдеDuplex
будет писать вwritable
и читать изreadable
.Promise
преобразуется в читаемыйDuplex
. Значениеnull
игнорируется.ReadableStream
преобразуется в читаемыйDuplex
.WritableStream
преобразуется в записываемыйDuplex
.- Возвращает: {stream.Duplex}
Если в качестве аргумента передан объект Iterable
, содержащий обещания, это может привести к необработанному отказу.
1 2 3 4 5 6 7 8 9 10 |
|
stream.Duplex.fromWeb¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
pair
<Object>
readable
<ReadableStream>
writable
{WritableStream}
options
<Object>
- Возвращает: {stream.Duplex}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
|
stream.Duplex.toWeb¶
1 |
|
Стабильность: 1 – Экспериментальная
Фича изменяется и не допускается флагом командной строки. Может быть изменена или удалена в последующих версиях.
streamDuplex
{stream.Duplex}- Возвращает:
<Object>
читаемый
<ReadableStream>
записываемый
{WritableStream}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
stream.addAbortSignal¶
1 |
|
signal
<AbortSignal>
Сигнал, представляющий возможную отменуstream
<Stream>
|<ReadableStream>
| {WritableStream}
Поток, к которому нужно прикрепить сигнал.
Прикрепляет сигнал AbortSignal к читаемому или записываемому потоку. Это позволяет коду управлять уничтожением потока с помощью AbortController
.
Вызов abort
на AbortController
, соответствующем переданному AbortSignal
, будет вести себя так же, как вызов .destroy(new AbortError())
для потока, и controller.error(new AbortError())
для веб-потоков.
1 2 3 4 5 6 7 8 9 |
|
Или используя AbortSignal
с читаемым потоком в качестве асинхронного итерабельного:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
Или используя AbortSignal
с ReadableStream:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
|
API для реализаторов потоков¶
API модуля node:stream
был разработан для того, чтобы сделать возможной простую реализацию потоков с использованием прототипной модели наследования JavaScript.
Сначала разработчик потоков объявляет новый класс JavaScript, который расширяет один из четырех базовых классов потоков (stream.Writable
, stream.Readable
, stream.Duplex
или stream.Transform
), убеждаясь, что он вызывает соответствующий конструктор родительского класса:
1 2 3 4 5 6 7 8 |
|
При расширении потоков следует помнить о том, какие опции может и должен предоставлять пользователь, прежде чем передавать их базовому конструктору. Например, если реализация делает предположения относительно опций autoDestroy
и emitClose
, не позволяйте пользователю переопределять их. Явно указывайте, какие опции передаются, вместо того, чтобы неявно передавать все опции.
Затем новый класс потока должен реализовать один или несколько специфических методов, в зависимости от типа создаваемого потока, как показано на следующей схеме:
Use-case | Class | Method(s) to implement |
---|---|---|
Только чтение | Readable | _read() |
Только запись | Writable | _write() , _writev() , _final() |
Чтение и запись | Duplex | _read() , _write() , _writev() , _final() |
Оперировать с записанными данными, затем читать результат | Transform | _transform() , _flush() , _final() |
Код реализации потока никогда не должен вызывать "публичные" методы потока, предназначенные для использования потребителями (как описано в разделе "API для потребителей потоков"). Это может привести к неблагоприятным побочным эффектам в приложении.
Упрощенное построение¶
Для многих простых случаев можно создать поток, не полагаясь на наследование. Это можно сделать, непосредственно создавая экземпляры объектов stream.Writable
, stream.Readable
, stream.Duplex
или stream.Transform
и передавая соответствующие методы в качестве опций конструктора.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
Реализация потока с возможностью записи¶
Класс stream.Writable
расширен для реализации потока Writable
.
Пользовательские потоки Writable
должны вызывать конструктор new stream.Writable([options])
и реализовывать метод writable._write()
и/или writable._writev()
.
new stream.Writable¶
1 |
|
options
<Object>
highWaterMark
<number>
Уровень буфера, когдаstream.write()
начинает возвращатьfalse
. По умолчанию:16384
(16 KiB), или16
для потоковobjectMode
.decodeStrings
<boolean>
Кодировать листроки
, переданные вstream.write()
вбуфер
(с кодировкой, указанной в вызовеstream.write()
) перед передачей их вstream._write()
. Другие типы данных не преобразуются (т.е.буфер
не декодируется встроку
). Установка значения false предотвращает преобразованиестрок
. По умолчанию:true
.defaultEncoding
<string>
Кодировка по умолчанию, которая используется, если в качестве аргументаstream.write()
не указана кодировка. По умолчанию:'utf8'
.objectMode
<boolean>
Является лиstream.write(anyObj)
допустимой операцией. Если установлено, становится возможной запись JavaScript-значений, отличных от string,Buffer
илиUint8Array
, если это поддерживается реализацией потока. По умолчанию:false
.emitClose
<boolean>
Должен ли поток издавать сигнал'close'
после его уничтожения. По умолчанию:true
.write
<Function>
Реализация методаstream._write()
.writev
<Function>
Реализация для методаstream._writev()
.destroy
<Function>
Реализация для методаstream._destroy()
.final
<Function>
Реализация методаstream._final()
.construct
<Function>
Реализация для методаstream._construct()
.autoDestroy
<boolean>
Должен ли этот поток автоматически вызывать.destroy()
на себя после завершения. По умолчанию:true
.signal
<AbortSignal>
Сигнал, представляющий возможную отмену.
1 2 3 4 5 6 7 8 9 |
|
Или, при использовании конструкторов в стиле доES6:
1 2 3 4 5 6 7 8 9 |
|
Или, используя упрощенный подход с конструктором:
1 2 3 4 5 6 7 8 9 10 11 |
|
Вызов abort
на AbortController
, соответствующем переданному AbortSignal
, будет вести себя так же, как вызов .destroy(new AbortError())
на записываемом потоке.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
writable._construct¶
1 |
|
callback
<Function>
Вызовите эту функцию (опционально с аргументом об ошибке), когда поток закончит инициализацию.
Метод _construct()
НЕ ДОЛЖЕН вызываться напрямую. Он может быть реализован дочерними классами, и если это так, то будет вызываться только внутренними методами класса Writable
.
Эта необязательная функция будет вызвана в тике после возврата конструктора потока, откладывая любые вызовы _write()
, _final()
и _destroy()
до вызова callback
. Это полезно для инициализации состояния или асинхронной инициализации ресурсов до того, как поток может быть использован.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
|
writable._write¶
1 |
|
chunk
<Buffer>
|<string>
|<any>
Записываемыйбуфер
, преобразованный изстроки
, переданной вstream.write()
. Если опция потокаdecodeStrings
равнаfalse
или поток работает в объектном режиме, чанк не будет преобразован и будет тем, что было передано вstream.write()
.encoding
<string>
Если чанк является строкой, тоencoding
- это кодировка символов этой строки. Если чанк являетсябуфером
, или если поток работает в объектном режиме,encoding
может быть проигнорирован.callback
<Function>
Вызвать эту функцию (опционально с аргументом об ошибке), когда обработка будет завершена для предоставленного чанка.
Все реализации потока Writable
должны предоставлять метод writable._write()
и/или writable._writev()
для отправки данных на базовый ресурс.
Потоки Transform
предоставляют собственную реализацию метода writable._write()
.
Эта функция НЕ ДОЛЖНА вызываться кодом приложения напрямую. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Writable
.
Функция callback
должна вызываться синхронно внутри writable._write()
или асинхронно (т.е. разными тиками), чтобы сигнализировать либо об успешном завершении записи, либо об ошибке. Первым аргументом, передаваемым в callback
, должен быть объект Error
, если вызов не удался, или null
, если запись прошла успешно.
Все вызовы writable.write()
, которые происходят между вызовом writable._write()
и вызовом callback
, приводят к буферизации записанных данных. Когда вызывается callback
, поток может выдать событие 'drain'
. Если реализация потока способна обрабатывать несколько порций данных одновременно, следует реализовать метод writable._writev()
.
Если свойство decodeStrings
явно установлено в false
в опциях конструктора, то chunk
останется тем же объектом, который передается в .write()
, и может быть строкой, а не Buffer
. Это сделано для поддержки реализаций, оптимизированных для работы с определенными кодировками строковых данных. В этом случае аргумент encoding
будет указывать на кодировку символов строки. В противном случае аргумент encoding
можно смело игнорировать.
Метод writable._write()
помечен знаком подчеркивания, поскольку он является внутренним для класса, который его определяет, и никогда не должен вызываться напрямую пользовательскими программами.
writable._writev¶
1 |
|
куски
{Object[]} Данные, которые должны быть записаны. Значение представляет собой массив<Object>
, каждый из которых представляет собой отдельный фрагмент данных для записи. Свойствами этих объектов являются:chunk
<Buffer>
|<string>
Экземпляр буфера или строка, содержащая данные для записи. Объектchunk
будет строкой, еслиWritable
был создан с опциейdecodeStrings
, установленной вfalse
, и строка была передана вwrite()
.encoding
<string>
Кодировка символов дляchunk
. Еслиchunk
являетсябуфером
, тоencoding
будет'buffer'
.
callback
<Function>
Функция обратного вызова (опционально с аргументом ошибки), которая будет вызвана, когда обработка будет завершена для предоставленных чанков.
Эта функция НЕ ДОЛЖНА вызываться кодом приложения напрямую. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Writable
.
Метод writable._writev()
может быть реализован в дополнение или альтернативно к writable._write()
в реализациях потоков, способных обрабатывать несколько кусков данных одновременно. В случае реализации и при наличии буферизованных данных от предыдущих записей, _writev()
будет вызван вместо _write()
.
Метод writable._writev()
снабжен символом подчеркивания, поскольку он является внутренним для класса, который его определяет, и никогда не должен вызываться напрямую пользовательскими программами.
writable._destroy¶
1 |
|
err
<Error>
Возможная ошибка.callback
<Function>
Функция обратного вызова, принимающая необязательный аргумент ошибки.
Метод _destroy()
вызывается writable.destroy()
. Он может быть переопределен дочерними классами, но не должен вызываться напрямую. Кроме того, callback
не следует смешивать с async/await, поскольку он выполняется при разрешении обещания.
writable._final¶
1 |
|
callback
<Function>
Вызов этой функции (опционально с аргументом об ошибке) при завершении записи оставшихся данных.
Метод _final()
не должен вызываться напрямую. Он может быть реализован дочерними классами, и если это так, то будет вызываться только внутренними методами класса Writable
.
Эта необязательная функция будет вызвана до закрытия потока, откладывая событие 'finish'
до вызова callback
. Это полезно для закрытия ресурсов или записи буферизованных данных перед завершением потока.
Ошибки при записи¶
Ошибки, возникающие во время обработки методов writable._write()
, writable._writev()
и writable._final()
, должны распространяться путем вызова обратного вызова и передачи ошибки в качестве первого аргумента. Выброс Error
из этих методов или ручное создание события 'error'
приводит к неопределенному поведению.
Если поток Readable
передается в поток Writable
, когда Writable
выдает ошибку, поток Readable
будет распакован.
1 2 3 4 5 6 7 8 9 10 11 |
|
Пример потока с возможностью записи¶
Ниже показана довольно упрощенная (и в некоторой степени бессмысленная) реализация пользовательского потока Writable
. Хотя этот конкретный экземпляр потока Writable
не представляет особой пользы, пример иллюстрирует каждый из необходимых элементов пользовательского Writable
экземпляра потока:
1 2 3 4 5 6 7 8 9 10 11 |
|
Декодирование буферов в потоке с возможностью записи¶
Декодирование буферов является распространенной задачей, например, при использовании трансформаторов, входными данными которых является строка. Это нетривиальный процесс при использовании многобайтовой кодировки символов, такой как UTF-8. Следующий пример показывает, как декодировать многобайтовые строки с помощью StringDecoder
и Writable
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
|
Реализация читаемого потока¶
Класс stream.Readable
расширен для реализации потока Readable
.
Пользовательские потоки Readable
должны вызывать конструктор new stream.Readable([options])
и реализовывать метод readable._read()
.
new stream.Readable¶
1 |
|
options
<Object>
highWaterMark
<number>
Максимальное количество байт для хранения во внутреннем буфере перед прекращением чтения из базового ресурса. По умолчанию:16384
(16 KiB), или16
для потоковobjectMode
.encoding
<string>
Если указано, то буферы будут декодированы в строки с использованием указанной кодировки. По умолчанию:null
.objectMode
<boolean>
Должен ли этот поток вести себя как поток объектов. Это означает, чтоstream.read(n)
возвращает одно значение вместоBuffer
размеромn
. По умолчанию:false
.emitClose
<boolean>
Должен ли поток издавать сигнал'close'
после его уничтожения. По умолчанию:true
.read
<Function>
Реализация методаstream._read()
.destroy
<Function>
Реализация для методаstream._destroy()
.construct
<Function>
Реализация методаstream._construct()
.autoDestroy
<boolean>
Должен ли этот поток автоматически вызывать.destroy()
на себя после завершения. По умолчанию:true
.signal
<AbortSignal>
Сигнал, представляющий возможную отмену.
1 2 3 4 5 6 7 8 9 |
|
Или, при использовании конструкторов в стиле до ES6:
1 2 3 4 5 6 7 8 9 |
|
Или, используя упрощенный подход с конструктором:
1 2 3 4 5 6 7 |
|
Вызов abort
на AbortController
, соответствующем переданному AbortSignal
, будет вести себя так же, как вызов .destroy(new AbortError())
на созданном readable.
1 2 3 4 5 6 7 8 9 10 |
|
readable._construct¶
1 |
|
callback
<Function>
Вызовите эту функцию (опционально с аргументом об ошибке), когда поток закончит инициализацию.
Метод _construct()
НЕ ДОЛЖЕН вызываться напрямую. Он может быть реализован дочерними классами, и если это так, то будет вызываться только внутренними методами класса Readable
.
Эта необязательная функция будет запланирована на следующий такт конструктором потока, откладывая любые вызовы _read()
и _destroy()
до вызова callback
. Это полезно для инициализации состояния или асинхронной инициализации ресурсов перед использованием потока.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
|
readable._read¶
1 |
|
size
<number>
Количество байт для асинхронного чтения
Эта функция НЕ ДОЛЖНА вызываться непосредственно кодом приложения. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Readable
.
Все реализации потока Readable
должны предоставлять реализацию метода readable._read()
для получения данных из базового ресурса.
Когда вызывается readable._read()
, если данные доступны из ресурса, реализация должна начать проталкивать эти данные в очередь чтения, используя метод this.push(dataChunk)
. _read()
будет вызываться снова после каждого вызова this.push(dataChunk)
, когда поток будет готов принять больше данных. _read()
может продолжать читать из ресурса и проталкивать данные, пока readable.push()
не вернет false
. Только когда _read()
снова вызывается после остановки, он должен возобновить проталкивание дополнительных данных в очередь.
После вызова метода readable._read()
, он не будет вызван снова, пока больше данных не будет протолкнуто через метод readable.push()
. Пустые данные, такие как пустые буферы и строки, не вызовут метод readable._read()
.
Аргумент size
является рекомендательным. В тех реализациях, где "чтение" - это одна операция, возвращающая данные, аргумент size
может использоваться для определения того, сколько данных нужно получить. Другие реализации могут игнорировать этот аргумент и просто предоставлять данные всякий раз, когда они становятся доступными. Нет необходимости "ждать", пока size
байт станет доступен перед вызовом stream.push(chunk)
.
Метод readable._read()
помечен знаком подчеркивания, потому что он является внутренним для класса, который его определяет, и никогда не должен вызываться напрямую пользовательскими программами.
readable._destroy¶
1 |
|
err
<Error>
Возможная ошибка.callback
<Function>
Функция обратного вызова, принимающая необязательный аргумент ошибки.
Метод _destroy()
вызывается readable.destroy()
. Он может быть переопределен дочерними классами, но не должен вызываться напрямую.
readable.push¶
1 |
|
chunk
<Buffer>
|<Uint8Array>
|<string>
|<null>
|<any>
Кусок данных для передачи в очередь чтения. Для потоков, не работающих в объектном режиме,chunk
должен быть строкой,Buffer
илиUint8Array
. Для потоков, работающих в объектном режиме,chunk
может быть любым значением JavaScript.encoding
<string>
Кодировка кусков строки. Должна быть правильной кодировкойBuffer
, такой как'utf8
или'ascii
.- Возвращает:
<boolean>
true
, если можно продолжать проталкивать дополнительные куски данных;false
в противном случае.
Если chunk
является буфером
, Uint8Array
или строкой
, то кусок
данных будет добавлен во внутреннюю очередь для потребления пользователями потока. Передача chunk
как null
сигнализирует о конце потока (EOF), после чего данные больше не могут быть записаны.
Когда Readable
работает в режиме паузы, данные, добавленные с помощью readable.push()
, могут быть считаны путем вызова метода readable.read()
при появлении события 'readable'
.
Когда Readable
работает в потоковом режиме, данные, добавленные с помощью readable.push()
, будут доставлены путем испускания события 'data'
.
Метод readable.push()
разработан так, чтобы быть как можно более гибким. Например, при обертывании низкоуровневого источника, который обеспечивает некоторую форму механизма паузы/возобновления и обратного вызова данных, низкоуровневый источник может быть обернут пользовательским экземпляром Readable
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
Метод readable.push()
используется для проталкивания содержимого во внутренний буфер. Он может быть вызван методом readable._read()
.
Для потоков, не работающих в объектном режиме, если параметр chunk
метода readable.push()
имеет значение undefined
, он будет рассматриваться как пустая строка или буфер. Дополнительную информацию смотрите в readable.push('')
.
Ошибки при чтении¶
Ошибки, возникающие при обработке readable._read()
, должны передаваться через метод readable.destroy(err)
. Бросок Error
изнутри readable._read()
или ручное создание события 'error'
приводит к неопределенному поведению.
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Пример счетного потока¶
Ниже приведен базовый пример потока Readable
, который выдает цифры от 1
до 1 000 000
в порядке возрастания, а затем завершается.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
Реализация дуплексного потока¶
Поток Duplex
- это поток, который реализует как Readable
, так и Writable
, например, соединение TCP сокета.
Поскольку JavaScript не поддерживает множественное наследование, класс stream.Duplex
расширяется для реализации потока Duplex
(в отличие от расширения классов stream.Readable
и stream.Writable
).
Класс stream.Duplex
прототипически наследуется от stream.Readable
и паразитно от stream.Writable
, но instanceof
будет работать правильно для обоих базовых классов благодаря переопределению Symbol.hasInstance
для stream.Writable
.
Пользовательские потоки Duplex
должны вызывать конструктор new stream.Duplex([options])
и реализовывать обои методы readable._read()
и writable._write()
.
new stream.Duplex¶
1 |
|
options
<Object>
Передается конструкторамWritable
иReadable
. Также имеет следующие поля:allowHalfOpen
<boolean>
Если установлено значениеfalse
, то поток будет автоматически завершать записываемую сторону, когда завершается читаемая сторона. По умолчанию:true
.readable
<boolean>
Устанавливает, должен лиDuplex
быть доступен для чтения. По умолчанию:true
.writable
<boolean>
Устанавливает, должен лиDuplex
быть доступен для записи. По умолчанию:true
.readableObjectMode
<boolean>
УстанавливаетobjectMode
для читаемой стороны потока. Не имеет эффекта, еслиobjectMode
равенtrue
. По умолчанию:false
.writableObjectMode
<boolean>
УстанавливаетobjectMode
для записываемой стороны потока. Не имеет эффекта, еслиobjectMode
равенtrue
. По умолчанию:false
.readableHighWaterMark
<number>
УстанавливаетhighWaterMark
для читаемой стороны потока. Не имеет эффекта, еслиhighWaterMark
предоставлен.writableHighWaterMark
<number>
УстанавливаетhighWaterMark
для записываемой стороны потока. Не имеет эффекта, если заданаhighWaterMark
.
1 2 3 4 5 6 7 8 |
|
Или, при использовании конструкторов в стиле до ES6:
1 2 3 4 5 6 7 8 9 |
|
Или, используя упрощенный подход с конструктором:
1 2 3 4 5 6 7 8 9 10 |
|
При использовании конвейера:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
|
Пример дуплексного потока¶
Ниже показан простой пример потока Duplex
, который оборачивает гипотетический объект-источник нижнего уровня, в который могут быть записаны данные и из которого могут быть прочитаны данные, хотя и с использованием API, не совместимого с потоками Node.js. Ниже показан простой пример потока Duplex
, который буферизирует входящие записанные данные через интерфейс Writable
, которые считываются обратно через интерфейс Readable
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
|
Наиболее важным аспектом дуплексного потока является то, что стороны Readable
и Writable
работают независимо друг от друга, несмотря на сосуществование в одном экземпляре объекта.
Дуплексные потоки с объектным режимом¶
Для потоков Duplex
режим objectMode
может быть установлен исключительно для стороны Readable
или Writable
с помощью опций readableObjectMode
и writableObjectMode
соответственно.
Например, в следующем примере создается новый поток Transform
(который является типом потока Duplex
) с объектным режимом Writable
на стороне, принимающей числа JavaScript, которые преобразуются в шестнадцатеричные строки на стороне Readable
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
Реализация потока преобразования¶
Поток Transform
- это поток Duplex
, в котором выходной поток каким-то образом вычисляется из входного. Примерами могут служить потоки zlib или crypto, которые сжимают, шифруют или расшифровывают данные.
Нет требования, чтобы выходные данные были того же размера, что и входные, имели то же количество фрагментов или приходили в то же время. Например, поток Hash
будет иметь только один выходной фрагмент, который будет предоставлен после завершения ввода. Поток zlib
будет производить вывод, который либо намного меньше, либо намного больше, чем его вход.
Класс stream.Transform
расширен для реализации потока Transform
.
Класс stream.Transform
прототипически наследуется от stream.Duplex
и реализует свои собственные версии методов writable._write()
и readable._read()
. Пользовательские реализации Transform
должны реализовывать метод transform._transform()
и могут также реализовывать метод transform._flush()
.
При использовании потоков Transform
следует соблюдать осторожность, так как данные, записанные в поток, могут вызвать приостановку потока со стороны Writable
, если вывод на стороне Readable
не будет использован.
new stream.Transform¶
1 |
|
options
<Object>
Передается конструкторамWritable
иReadable
. Также имеет следующие поля:transform
<Function>
Реализация методаstream._transform()
.flush
<Function>
Реализация для методаstream._flush()
.
1 2 3 4 5 6 7 8 |
|
Или, при использовании конструкторов в стиле до ES6:
1 2 3 4 5 6 7 8 9 |
|
Или, используя упрощенный подход конструктора:
1 2 3 4 5 6 7 |
|
Событие: end¶
Событие 'end'
относится к классу stream.Readable
. Событие 'end'
испускается после вывода всех данных, что происходит после вызова обратного вызова в transform._flush()
. В случае ошибки событие 'end'
не должно выдаваться.
Событие: finish¶
Событие 'finish'
относится к классу stream.Writable
. Событие 'finish'
происходит после вызова stream.end()
и обработки всех кусков stream._transform()
. В случае ошибки, 'finish'
не должен выдаваться.
transform._flush¶
1 |
|
callback
<Function>
Функция обратного вызова (опционально с аргументом ошибки и данными), которая будет вызвана, когда оставшиеся данные будут удалены.
Эта функция НЕ ДОЛЖНА вызываться непосредственно кодом приложения. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Readable
.
В некоторых случаях операция преобразования может потребовать выдать дополнительный бит данных в конце потока. Например, поток сжатия zlib
будет хранить некоторое количество внутреннего состояния, используемого для оптимального сжатия выходных данных. Однако, когда поток заканчивается, эти дополнительные данные должны быть удалены, чтобы сжатые данные были полными.
Пользовательские реализации Transform
могут реализовать метод transform._flush()
. Он будет вызван, когда больше нет записанных данных для потребления, но до того, как произойдет событие 'end'
, сигнализирующее о завершении потока Readable
.
В рамках реализации transform._flush()
метод transform.push()
может быть вызван ноль или более раз, в зависимости от ситуации. Функция callback
должна быть вызвана после завершения операции flush.
Метод transform._flush()
снабжен символом подчеркивания, поскольку он является внутренним для класса, который его определяет, и никогда не должен вызываться напрямую пользовательскими программами.
transform._transform¶
1 |
|
chunk
<Buffer>
|<string>
|<any>
Преобразуемыйбуфер
, преобразованный изстроки
, переданной вstream.write()
. Если опция потокаdecodeStrings
равнаfalse
или поток работает в объектном режиме, чанк не будет преобразован и будет тем, что было передано вstream.write()
.encoding
<string>
Если чанк является строкой, то это тип кодировки. Если чанк является буфером, то это специальное значение `'buffer''. В этом случае игнорируйте его.callback
<Function>
Функция обратного вызова (опционально с аргументом ошибки и данными), которая будет вызвана после обработки предоставленногочанка
.
Эта функция НЕ ДОЛЖНА вызываться непосредственно кодом приложения. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Readable
.
Все реализации потока Transform
должны предоставлять метод _transform()
для приема входных данных и получения выходных. Реализация transform._transform()
обрабатывает записываемые байты, вычисляет выход, затем передает этот выход в читаемую часть с помощью метода transform.push()
.
Метод transform.push()
может быть вызван ноль или более раз для генерации вывода из одного входного чанка, в зависимости от того, какой объем должен быть выведен в результате работы чанка.
Возможно, что из любого заданного куска входных данных не будет сгенерирован выход.
Функция callback
должна быть вызвана только тогда, когда текущий чанк полностью потреблен. Первым аргументом, передаваемым в callback
, должен быть объект Error
, если при обработке входных данных произошла ошибка, или null
в противном случае. Если в callback
передан второй аргумент, он будет передан методу transform.push()
. Другими словами, следующие команды эквивалентны:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
Метод transform._transform()
снабжен символом подчеркивания, поскольку он является внутренним для класса, который его определяет, и никогда не должен вызываться напрямую пользовательскими программами.
Метод transform._transform()
никогда не вызывается параллельно; потоки реализуют механизм очереди, и для получения следующего куска необходимо вызвать callback
, либо синхронно, либо асинхронно.
stream.PassThrough¶
Класс stream.PassThrough
- это тривиальная реализация потока Transform
, который просто передает входные байты на выход. Он предназначен в основном для примеров и тестирования, но есть некоторые случаи использования, когда stream.PassThrough
полезен как строительный блок для новых видов потоков.
Дополнительные примечания¶
Совместимость потоков с асинхронными генераторами и асинхронными итераторами¶
С поддержкой асинхронных генераторов и итераторов в JavaScript, асинхронные генераторы фактически являются первоклассной конструкцией потока на уровне языка на данный момент.
Ниже приведены некоторые распространенные случаи взаимодействия потоков Node.js с генераторами async и итераторами async.
Потребление читаемых потоков с помощью асинхронных итераторов¶
1 2 3 4 5 |
|
Асинхронные итераторы регистрируют постоянный обработчик ошибок на потоке, чтобы предотвратить любые необработанные ошибки после уничтожения.
Создание читаемых потоков с помощью асинхронных генераторов¶
Читаемый поток Node.js может быть создан из асинхронного генератора с помощью метода Readable.from()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
Передача данных в записываемые потоки из асинхронных итераторов¶
При записи в записываемый поток из асинхронного итератора необходимо обеспечить правильную обработку обратного давления и ошибок. stream.pipeline()
абстрагирует обработку обратного давления и ошибок, связанных с обратным давлением:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
|
Совместимость со старыми версиями Node.js¶
До версии Node.js 0.10 интерфейс потока Readable
был более простым, но также менее мощным и менее полезным.
- Вместо того чтобы ждать вызова метода
stream.read()
, события'data'
начинали испускаться немедленно. Приложения, которые должны были выполнить определенный объем работы, чтобы решить, как обрабатывать данные, должны были хранить прочитанные данные в буферах, чтобы данные не были потеряны. - Метод
stream.pause()
был рекомендательным, а не гарантированным. Это означало, что необходимо быть готовым к получению событийdata
даже когда поток находится в состоянии паузы.
В Node.js 0.10 был добавлен класс Readable
. Для обратной совместимости со старыми программами Node.js, потоки Readable
переходят в "текущий режим" при добавлении обработчика события 'data'
или при вызове метода stream.resume()
. В результате, даже если не использовать новый метод stream.read()
и событие 'readable'
, больше не нужно беспокоиться о потере кусков 'data'
.
Хотя большинство приложений будут продолжать нормально функционировать, это вводит крайний случай в следующих условиях:
- Не добавлен слушатель событий
'data'
. - Метод
stream.resume()
никогда не вызывается. - Поток не передается ни в одно записываемое место назначения.
Например, рассмотрим следующий код:
1 2 3 4 5 6 7 8 9 10 |
|
До Node.js 0.10 данные входящего сообщения просто отбрасывались. Однако в Node.js 0.10 и последующих версиях сокет остается приостановленным навсегда.
Обходным решением в этой ситуации является вызов метода stream.resume()
, чтобы начать поток данных:
1 2 3 4 5 6 7 8 9 10 11 |
|
В дополнение к новым потокам Readable
, переходящим в режим потока, потоки в стиле pre-0.10 могут быть обернуты в класс Readable
с помощью метода readable.wrap()
.
readable.read(0)¶
Бывают случаи, когда необходимо вызвать обновление базовых механизмов потока readable, не потребляя при этом никаких данных. В таких случаях можно вызвать readable.read(0)
, который всегда будет возвращать null
.
Если внутренний буфер чтения находится ниже highWaterMark
, а поток в данный момент не читает, то вызов stream.read(0)
вызовет низкоуровневый вызов stream._read()
.
Хотя большинству приложений это почти никогда не понадобится, в Node.js есть ситуации, когда это делается, особенно во внутреннем интерфейсе класса потока Readable
.
readable.push('')¶
Использование readable.push('')
не рекомендуется.
Передача строки с нулевым байтом, Buffer
или Uint8Array
в поток, который не находится в объектном режиме, имеет интересный побочный эффект. Поскольку это является вызовом readable.push()
, вызов завершает процесс чтения. Однако, поскольку аргументом является пустая строка, никакие данные не добавляются в буфер readable, поэтому пользователю нечего потреблять.
highWaterMark
несоответствие после вызова readable.setEncoding()
¶
Использование readable.setEncoding()
изменит поведение того, как highWaterMark
работает в безобъектном режиме.
Обычно размер текущего буфера измеряется относительно highWaterMark
в байтах. Однако после вызова setEncoding()
функция сравнения начнет измерять размер буфера в символах.
Это не является проблемой в обычных случаях с latin1
или ascii
. Но рекомендуется помнить о таком поведении при работе со строками, которые могут содержать многобайтовые символы.