Node Module Stream

这个模块是介绍nodejs的stream,node的很多api都实现了stream

stream 有四种基本的类型:

  1. 可读流
  2. 可写流
  3. 双工流
  4. 转换流

Object mode

由 Node.js API 创建的所有流仅对字符串和 Buffer (或 Uint8Array)对象进行操作。但是,流实现可以与其他类型的 JavaScript 值一起工作(null 除外,它在流中有特殊用途)。这些流被认为是在“对象模式”下操作的。

在创建流时,使用 objectMode 选项将流实例切换到对象模式。尝试将现有流切换到对象模式是不安全的。

buffering

Writable 和 Readable 流都将数据存储在内部缓冲区中。

可能缓冲的数据量取决于传递到流的构造函数中的 highWaterMark 选项。对于普通流,highWaterMark 选项指定总字节数。对于在对象模式下运行的流,highWaterMark 指定对象的总数。

当实现调用 stream.push(chunk)时,数据在 Readable 流中缓冲。如果 Stream 的使用者没有调用 Stream.read() ,那么数据将一直保存在内部队列中,直到被使用为止。

一旦内部读取缓冲区的总大小达到 highWaterMark 指定的阈值,流将暂时停止从底层资源读取数据,直到当前缓冲的数据可以被使用(也就是说,流将停止调用用于填充读取缓冲区的内部 readable._read()方法)。

当重复调用 Writable.write(chunk)方法时,数据在 Writable 流中缓冲。虽然内部写缓冲区的总大小低于 highWaterMark 设置的阈值,但是对 writable.write()的调用将返回 true。一旦内部缓冲区的大小达到或超过 highWaterMark,将返回 false。

流 API 的一个关键目标,特别是 stream.pipe()方法,是将数据缓冲限制在可接受的级别,这样不同速度的源和目标就不会占用可用内存。

HighWaterMark 选项是一个阈值,而不是一个限制: 它指定流在停止请求更多数据之前缓冲的数据量。一般来说,它不会强制执行严格的内存限制。特定的流实现可以选择强制执行更严格的限制,但这样做是可选的。

因为双工和转换流都是可读和可写的,每个都维护两个用于读和写的独立内部缓冲区,允许每一方独立操作,同时维护适当和有效的数据流。例如,net.Socket 实例是双工流,其可读端允许使用从套接字接收的数据,其可写端允许将数据写入套接字。因为数据写入套接字的速度可能比接收数据的速度更快或更慢,所以每一端都应该独立于另一端进行操作(和缓冲)。

内部缓冲的机制是一个内部实现细节,可以随时更改。但是,对于某些高级实现,可以使用 writable.writableBufferreadable.readableBuffer 检索内部缓冲区。不鼓励使用这些未记录的属性。

API for stream implementers

这个 node:stream 模块 API 已经设计成可以使用 JavaScript 的原型继承模型轻松实现流。

首先,流开发人员会声明一个新的 JavaScript 类,该类扩展了四个基本流类之一(stream.Writablestream.Readablestream.Duplexstream.Transform),确保调用适当的父类构造函数:

const { Writable } = require('node:stream');

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark });
    // ...
  }
}

在扩展流时,请牢记用户可以和应该在将这些选项转发到基类构造函数之前提供哪些选项。例如,如果实现对于 autoDestroy 和 emitClose 选项有假设,不要允许用户覆盖这些选项。明确指出要转发的选项,而不是隐式转发所有选项。

然后,新的流类必须实现一个或多个特定方法,具体取决于要创建的流类型,详见下表:

Use-case Class Method(s) to implement
Reading only Readable _read()
Writing only Writable _write(), _writev(), _final()
Reading and writing Duplex _read(), _write(), _writev(), _final()
Operate on written data, then read the result Transform _transform(), _flush(), _final()

流的实现代码不应该调用那些旨在供消费者使用的流的 “公共” 方法(如流的 API 部分所描述)。这样做可能会导致应用程序中消耗流的代码出现不良的副作用。

避免覆盖公共方法,如 write()、end()、cork()、uncork()、read() 和 destroy(),或通过 .emit() 发出内部事件,如 ’error’、‘data’、’end’、‘finish’ 和 ‘close’。这样做可能会破坏当前和未来的流不变性,导致与其他流、流实用程序和用户期望的行为和/或兼容性问题。

Simplified construction

对于许多简单情况,可以在不依赖继承的情况下创建流。这可以通过直接创建stream.Writablestream.Readablestream.Duplexstream.Transform对象的实例,并将适当的方法作为构造函数选项传递来实现。

const { Writable } = require('node:stream');

const myWritable = new Writable({
  construct(callback) {
    // Initialize state and load resources...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // Free resources...
  },
});
Implementing a writable stream

stream.Writable 类被扩展以实现可写流。

自定义可写流必须调用新的 stream.Writable([options]) 构造函数并实现 writable._write() 和/或 writable._writev() 方法。

new stream.Writable([options])

  • options
    • highWaterMark :当流的 write() 开始返回 false 时的缓冲区水位标记。默认值:16384(16 KiB),或 16 对于 objectMode 流。
    • decodeStrings :在将传递给 stream.write() 的字符串编码为缓冲区之前(使用在 stream.write() 调用中指定的编码)是否转换它们为 Buffers。其他类型的数据不会被转换(即,不会将 Buffers 解码为字符串)。将其设置为 false 将阻止字符串的转换。默认值:true。
    • defaultEncoding :当没有将编码指定为传递给 stream.write() 的参数时使用的默认编码。默认值:‘utf8’。
    • objectMode :流的 write(anyObj) 是否是有效操作。当设置时,如果流实现支持,可以写入除字符串、Buffer 或 Uint8Array 之外的 JavaScript 值。默认值:false。
    • emitClose :流销毁后是否应该发出 ‘close’ 事件。默认值:true。
    • write stream._write() 方法的实现。
    • writev stream._writev() 方法的实现。
    • destroy stream._destroy() 方法的实现。
    • final stream._final() 方法的实现。
    • construct stream._construct() 方法的实现。
    • autoDestroy :该流是否在结束后自动调用 .destroy()默认值:true。
    • signal :表示可能取消的信号。
    const { Writable } = require('node:stream');
    
    class MyWritable extends Writable {
      constructor(options) {
        // Calls the stream.Writable() constructor.
        super(options);
        // ...
      }
    }
    

    或者,当使用ES6之前的构造函数风格时:

    const { Writable } = require('node:stream');
    const util = require('node:util');
    
    function MyWritable(options) {
      if (!(this instanceof MyWritable))
        return new MyWritable(options);
      Writable.call(this, options);
    }
    util.inherits(MyWritable, Writable);
    

    或者,使用简化的构造函数方法:

    const { Writable } = require('node:stream');
    
    const myWritable = new Writable({
      write(chunk, encoding, callback) {
        // ...
      },
      writev(chunks, callback) {
        // ...
      },
    });
    

    在传递的AbortSignal对应的AbortController上调用abort将与在可写流上调用.destroy(new AbortError())的行为相同。

    const { Writable } = require('node:stream');
    
    const controller = new AbortController();
    const myWritable = new Writable({
      write(chunk, encoding, callback) {
        // ...
      },
      writev(chunks, callback) {
        // ...
      },
      signal: controller.signal,
    });
    // Later, abort the operation closing the stream
    controller.abort();
    
    writable._construct(callback)
    • callback 当流初始化完成时,调用此函数(可选带有错误参数)。
      不得直接调用_construct()方法。它可以由子类实现,如果是这样,只会被内部Writable类方法调用。

    这个可选的函数将在流构造函数返回后的一个刻度后调用,延迟任何_write()、_final()和_destroy()的调用,直到调用回调函数。这对于在流可以使用之前初始化状态或异步初始化资源非常有用。

    const { Writable } = require('node:stream');
    const fs = require('node:fs');
    
    class WriteStream extends Writable {
      constructor(filename) {
        super();
        this.filename = filename;
        this.fd = null;
      }
      _construct(callback) {
        fs.open(this.filename, (err, fd) => {
          if (err) {
            callback(err);
          } else {
            this.fd = fd;
            callback();
          }
        });
      }
      _write(chunk, encoding, callback) {
        fs.write(this.fd, chunk, callback);
      }
      _destroy(err, callback) {
        if (this.fd) {
          fs.close(this.fd, (er) => callback(er || err));
        } else {
          callback(err);
        }
      }
    }
    

    (未完待续)