nodejs stream module
stream 有四种基本的类型:
- 可读流
- 可写流
- 双工流
- 转换流
Object mode
由 Node.js API 创建的所有流仅对字符串和 Buffer (或 Uint8Array)对象进行操作。但是,流实现可以与其他类型的 JavaScript 值一起工作(null 除外,它在流中有特殊用途)。这些流被认为是在“对象模式”下操作的。
在创建流时,使用 objectMode
选项将流实例切换到对象模式。尝试将现有流切换到对象模式是不安全的。
buffering
Writable 和 Readable 流都将数据存储在内部缓冲区中。
可能缓冲的数据量取决于传递到流的构造函数中的 highWaterMark
选项。对于普通流,highWaterMark
选项指定总字节数。对于在对象模式下运行的流,highWaterMark
指定对象的总数。
当实现调用 stream.push(chunk)
时,数据在 Readable 流中缓冲。如果 Stream 的使用者没有调用 Stream.read()
,那么数据将一直保存在内部队列中,直到被使用为止。
一旦内部读取缓冲区的总大小达到 highWaterMark
指定的阈值,流将暂时停止从底层资源读取数据,直到当前缓冲的数据可以被使用(也就是说,流将停止调用用于填充读取缓冲区的内部 readable._read()
方法)。
当重复调用 Writable.write(chunk)
方法时,数据在 Writable 流中缓冲。虽然内部写缓冲区的总大小低于 highWaterMark
设置的阈值,但是对 writable.write()
的调用将返回 true。一旦内部缓冲区的大小达到或超过 highWaterMark
,将返回 false。
流 API 的一个关键目标,特别是 stream.pipe()
方法,是将数据缓冲限制在可接受的级别,这样不同速度的源和目标就不会占用可用内存。
HighWaterMark
选项是一个阈值,而不是一个限制: 它指定流在停止请求更多数据之前缓冲的数据量。一般来说,它不会强制执行严格的内存限制。特定的流实现可以选择强制执行更严格的限制,但这样做是可选的。
因为双工和转换流都是可读和可写的,每个都维护两个用于读和写的独立内部缓冲区,允许每一方独立操作,同时维护适当和有效的数据流。例如,net.Socket
实例是双工流,其可读端允许使用从套接字接收的数据,其可写端允许将数据写入套接字。因为数据写入套接字的速度可能比接收数据的速度更快或更慢,所以每一端都应该独立于另一端进行操作(和缓冲)。
内部缓冲的机制是一个内部实现细节,可以随时更改。但是,对于某些高级实现,可以使用 writable.writableBuffer
或 readable.readableBuffer
检索内部缓冲区。不鼓励使用这些未记录的属性。
API for stream consumers(为流的消费者提供的 API)
几乎所有 Node.js 应用程序,无论多么简单,都以某种方式使用流。
const http = require('node:http');
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a readable stream.
// `res` is an http.ServerResponse, which is a writable stream.
let body = '';
// Get the data as utf8 strings.
// If an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// Readable streams emit 'data' events once a listener is added.
req.on('data', (chunk) => {
body += chunk;
});
// The 'end' event indicates that the entire body has been received.
req.on('end', () => {
try {
const data = JSON.parse(body);
// Write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
可写流(例如示例中的 res)公开 write() 和 end() 等方法,用于将数据写入流中。
可读流使用 EventEmitter API 通知应用程序代码何时可以从流中读取数据。这些可用的数据可以用多种方式从流中读取。
可写流和可读流都以不同的方式使用 EventEmitter API 来传达流的当前状态。
Duplex 和 Transform 流都即是可写流也是可读流。
要么向流写入数据,要么从流中消耗数据的应用程序不需要直接实现流接口,并且通常没有理由调用require(’node:stream’)。
希望开发人员实现新类型的流时,请参考流实现人员 API 部分。
可写流 API
可写流是一种抽象,用于将数据写入目的地。它们提供了一种将数据写入远程资源的方法,例如文件、网络套接字或另一个进程。
可写流的示例包括:
- 客户端的 HTTP 请求 (HTTP requests, on the client)
- 服务器上的 HTTP 响应 (HTTP responses, on the server)
- 文件系统写入流 (fs write streams)
- 压缩流 (zlib streams)
- 加密流 (crypto streams)
- TCP 套接字 (TCP sockets)
- 子进程的标准输入 (child process stdin)
- 进程的标准输出、标准错误 (process.stdout, process.stderr)
其中,有些示例实际上是实现了可写接口的双工流。
所有可写流都实现了 stream.Writable 类定义的接口。
const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');
Class:stream.Writable
event
close
close
事件在流及其底层资源(例如文件描述符)关闭时被触发。该事件表示不会再发出任何事件,也不会进行进一步的计算。
如果使用 emitClose
选项创建了一个可写流,它将始终发出 close
事件。
drain
如果对 stream.write(chunk)
的调用返回 false
,则会在合适恢复向流中写入数据时发出 drain
事件。
// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time!
writer.write(data, encoding, callback);
} else {
// See if we should continue, or wait.
// Don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// Had to stop early!
// Write some more once it drains.
writer.once('drain', write);
}
}
}
error
error
事件在写入或传输数据时发生错误时被触发。当调用时,监听器回调会传递一个 Error 参数。
除非在创建流时将 autoDestroy
选项设置为 false
,否则在触发 error
事件时流会被关闭。
在 error
事件之后,除了 close
事件外不应再触发其他事件,包括 error
事件本身。
finish
finish
事件在调用了 stream.end()
方法并且所有数据已经刷新到底层系统之后被触发。
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
console.log('All writes are now complete.');
});
writer.end('This is the end\n');
pipe
- src <stream.Readable> 是正在导向此可写流的源流。
pipe
事件在对可读流调用 stream.pipe()
方法时被触发,将此可写流添加到其目标集合中。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
console.log('Something is piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
unpipe
- src <stream.Readable> 是取消导向此可写流的源流。
unpipe
事件在对可读流调用 stream.unpipe()
方法时被触发,从其目标集合中移除此可写流。
此事件还在可读流导向此可写流时,如果此可写流发出错误,也会触发。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
console.log('Something has stopped piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()
可写流方法 writable.cork()
强制将所有写入的数据缓存在内存中。缓存在内存中的数据将在调用 stream.uncork()
或 stream.end()
方法时刷新。
writable.cork()
的主要目的是为了适应一种情况,即在短时间内将多个小的块写入流中。writable.cork()
不会立即将这些块转发到底层目的地,而是将其全部缓冲起来,直到调用 writable.uncork()
,writable.uncork()
将将其全部传递给 writable._writev()
(如果存在)。这可以防止在等待第一个小块被处理时数据被缓冲而导致的头阻塞情况。但是,如果不实现 writable._writev() 就使用 writable.cork()
,可能会对吞吐量产生负面影响。
writable.destroy([error])
- error
<Error>
(可选):用于发出 ’error’ 事件的错误。 - Returns:
<this>
销毁流。可选择发出 'error'
事件,并发出 'close'
事件(除非设置 emitClose
为 false
)。在此调用之后,可写流已经结束,后续对 write()
或 end()
的调用将导致 ERR_STREAM_DESTROYED
错误。这是一种破坏性和立即销毁流的方式。以前的 write()
调用可能尚未排空,并可能触发 ERR_STREAM_DESTROYED
错误。如果数据应在关闭之前刷新,请使用 end()
而不是 destroy
,或者在销毁流之前等待 'drain'
事件。
const { Writable } = require('node:stream');
const myStream = new Writable();
const fooErr = new Error('foo error');
myStream.destroy(fooErr);
myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.on('error', function wontHappen() {});
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.write('foo', (error) => console.error(error.code));
// ERR_STREAM_DESTROYED
一旦调用了 destroy()
方法,进一步的调用将成为无操作,并且除了来自 _destroy()
的错误外,不会触发其他错误事件。
实现者不应该覆盖此方法,而是应该实现 writable._destroy()
。
writable.closed
- boolean
在 close
事件之后,这个值是 true
。
writable.destroyed
- boolean
writable.destroy()
被调用之后,值为 true
。
const { Writable } = require('node:stream');
const myStream = new Writable();
console.log(myStream.destroyed); // false
myStream.destroy();
console.log(myStream.destroyed); // true
writable.end([chunk[, encoding]][, callback])
- chunk
<string> | <Buffer> | <Uint8Array> | <any>
(可选):要写入的数据。对于不在对象模式下运行的流,chunk 必须是字符串、Buffer 或 Uint8Array。对于对象模式流,chunk 可以是除了 null 之外的任何 JavaScript 值。 - encoding
<string>
:如果 chunk 是字符串时的编码。 - callback
<Function>
:当流完成时的回调函数。 - 返回:`
``
调用 writable.end()
方法表示不会再向可写流写入更多数据。可选的 chunk
和 encoding
参数允许在关闭流之前立即写入最后一个附加数据块。
在调用 stream.end()
后再调用 stream.write()
方法将会引发错误。
writable.setDefaultEncoding(encoding)
- encoding
<string>
The new default encoding - Returns:
<this>
writable.setDefaultEncoding()
方法用于设置可写流的默认编码。
writable.uncork()
writable.uncork()
方法会刷新(flushes)自从调用 stream.cork()
后缓冲的所有数据。
当使用 writable.cork()
和 writable.uncork()
来管理对流的写入缓冲时,应使用 process.nextTick()
推迟调用 writable.uncork()
。这样做允许将在给定的 Node.js 事件循环阶段内发生的所有 writable.write()
调用进行批处理。
stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());
如果在流上多次调用 writable.cork() 方法,必须调用相同数量的 writable.uncork() 方法来刷新缓冲的数据。
stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
stream.uncork();
// The data will not be flushed until uncork() is called a second time.
stream.uncork();
});
writable.writable
- boolean
在可以安全调用 writable.write()
时,值为 true
。这意味着流尚未被销毁、发生错误或结束。
Here is a summary of the writable-related properties:
writable.writableEnded
<boolean>
在调用 writable.end()
后为 true
。此属性不指示数据是否已刷新,请使用 writable.writableFinished
。
writable.writableCorked
<integer>
需要调用 writable.uncork()
的次数,以完全取消流的封堵。
writable.errored
<Error>
如果流已发生错误,返回错误信息。
writable.writableFinished
<boolean>
在发出 ‘finish’ 事件之前立即设置为 true
。
writable.writableHighWaterMark
<number>
返回创建此可写流时传递的 highWaterMark 值。
writable.writableLength
<number>
此属性包含队列中待写入的字节数(或对象数)。该值提供关于 highWaterMark 状态的内省数据。
writable.writableNeedDrain
<boolean>
如果流的缓冲区已满且流将发出 ‘drain’,则为 true
。
writable.writableObjectMode
<boolean>
给定可写流的 objectMode
属性的获取器。
writable.write(chunk[, encoding][, callback])
chunk
<string> | <Buffer> | <Uint8Array> | <any>
:可选的要写入的数据。对于非对象模式的流,chunk
必须是字符串、Buffer 或 Uint8Array。对于对象模式流,chunk
可以是除了null
之外的任何 JavaScript 值。encoding
<string> | <null>
:如果chunk
是字符串,则为其编码。默认值:‘utf8’。callback
<Function>
:数据块刷新时的回调函数。- Returns:
<boolean>
- 如果流希望调用代码在继续写入额外数据之前等待 ‘drain’ 事件触发,则返回false
;否则返回true
。
writable.write()
方法将一些数据写入流,并在数据完全处理后调用提供的回调函数。如果发生错误,回调函数将被异步调用,并将错误作为其第一个参数。在触发 ’error’ 事件之前,回调函数将被调用。
如果在流创建后调用 chunk
参数后,内部缓冲区的大小小于配置的 highWaterMark 时,返回值为 true。如果返回 false,则应停止进一步尝试将数据写入流,直到触发 ‘drain’ 事件。
当流不处于排空状态时,调用 write()
将缓冲 chunk
,并返回 false。一旦所有当前缓冲的 chunk
(已被操作系统接受以便传递)都被排空,将触发 ‘drain’ 事件。一旦 write()
返回 false,请勿再写入更多的 chunk
,直到触发 ‘drain’ 事件。虽然允许在不处于排空状态的流上调用 write()
,但是 Node.js 将缓冲所有写入的 chunk
,直到发生最大内存使用,此时将无条件终止。即使在终止之前,高内存使用量也会导致垃圾回收性能不佳和高 RSS(即使在不再需要内存后,通常也不会释放回系统)。由于TCP套接字可能永远不会排空,如果远程对等方不读取数据,则写入不处于排空状态的套接字可能导致远程可利用的漏洞。
在流不处于排空状态时写入数据对于转换(Transform)而言尤为棘手,因为默认情况下,转换流会暂停,直到它们被管道化或添加了 ‘data’ 或 ‘readable’ 事件处理程序。
如果要写入的数据可以按需生成或获取,建议将逻辑封装到一个 Readable 流中,并使用 stream.pipe()
。然而,如果更喜欢调用 write()
,可以通过使用 ‘drain’ 事件来遵守背压并避免内存问题:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb);
} else {
process.nextTick(cb);
}
}
// Wait for cb to be called before doing any other write.
write('hello', () => {
console.log('Write completed, do more writes now.');
});
在对象模式下的可写流始终会忽略编码参数。
Readable streams(可读流)
可读流是一个用于消费数据的抽象表示。
可读流的示例包括:
- 在客户端的HTTP响应(HTTP responses, on the client)
- 在服务器端的HTTP请求(HTTP requests, on the server)
- 文件系统读取流(fs read streams)
- 压缩流(zlib streams)
- 加密流(crypto streams)
- TCP套接字(TCP sockets)
- 子进程的标准输出和标准错误(child process stdout and stderr)
- 进程的标准输入(process.stdin)
所有可读流都实现了由stream.Readable
类定义的接口。
两种读取模式
可读流有效地运行在以下两种模式之一:流动模式和暂停模式。这些模式与对象模式是分开的。一个可读流可以是对象模式或非对象模式,而不管它是在流动模式还是暂停模式下。
-
在流动模式下,数据会自动从底层系统读取,并通过 EventEmitter 接口的事件尽可能快地提供给应用程序。
-
在暂停模式下,必须显式调用 stream.read() 方法来从流中读取数据块。
所有可读流开始时都处于暂停模式,但可以通过以下方式之一切换到流动模式:
- 添加 ‘data’ 事件处理程序。
- 调用 stream.resume() 方法。
- 调用 stream.pipe() 方法将数据发送到可写流(Writable)。
可读流可以通过以下方式切换回暂停模式:
- 如果没有流目标,通过调用 stream.pause() 方法。
- 如果存在流目标,通过移除所有流目标。可以通过调用 stream.unpipe() 方法来移除多个流目标。
重要的概念是,可读流不会生成数据,直到提供了一种消费或忽略该数据的机制。如果消费机制被禁用或被移除,可读流将尝试停止生成数据。
出于向后兼容性原因,移除 ‘data’ 事件处理程序不会自动暂停流。此外,如果存在流目标,则调用 stream.pause() 不会保证在这些目标排空并请求更多数据后流仍然保持暂停状态。
如果将可读流切换到流动模式而没有可用的消费者来处理数据,那么数据将会丢失。这可能发生,例如,当调用 readable.resume() 方法时,没有附加到 ‘data’ 事件的监听器,或者当从流中移除 ‘data’ 事件处理程序时。
添加 ‘readable’ 事件处理程序会自动使流停止流动,数据必须通过 readable.read() 来消费。如果移除 ‘readable’ 事件处理程序,那么如果存在 ‘data’ 事件处理程序,流将会重新开始流动。
三种状态
对于可读流,“两种模式"的操作是对可读流内部状态管理的简化抽象。在可读流实现中,每个可读流在任何时刻都处于以下三种可能的状态之一:
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
当 readable.readableFlowing
为 null
时,没有提供消费流数据的机制。因此,流不会生成数据。在此状态下,附加 ‘data’ 事件的监听器、调用 readable.pipe()
方法或调用 readable.resume()
方法将会将 readable.readableFlowing
切换为 true
,使可读流开始主动发出事件,即在生成数据时触发事件。
调用 readable.pause()
、readable.unpipe()
或遇到背压将导致 readable.readableFlowing
被设置为 false
,暂时停止事件的流动,但不会停止数据的生成。在此状态下,附加 ‘data’ 事件的监听器将不会将 readable.readableFlowing
切换为 true
。
const { PassThrough, Writable } = require('node:stream');
const pass = new PassThrough();
const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false.
pass.on('data', (chunk) => { console.log(chunk.toString()); });
// readableFlowing is still false.
pass.write('ok'); // Will not emit 'data'.
pass.resume(); // Must be called to make stream emit 'data'.
// readableFlowing is now true.
在 readable.readableFlowing
为 false
时,数据可能会在流的内部缓冲区中累积。
选择一种API风格
可读流(Readable stream)的API在多个Node.js版本中不断演化,并提供了多种消费流数据的方法。一般来说,开发人员应选择一种数据消费方法,不应在单个流上使用多种不同的方法来消费数据。具体而言,同时使用 on('data')
、on('readable')
、pipe()
或异步迭代器的组合可能导致不直观的行为。因此,在编写代码时应选择一种风格来一致地消费流数据。
Class: stream.Readable
Event: ‘close’
当流及其底层资源(例如文件描述符)已关闭时,将触发’close’事件。该事件表示不会再发出任何事件,也不会进行进一步的计算。
如果使用emitClose选项创建了一个可读流,它将始终触发’close’事件。
Event:‘data’
chunk
<Buffer> | <string> | <any>
数据块。对于不以对象模式运行的流,数据块可以是字符串或缓冲区。对于以对象模式运行的流,数据块可以是除了 null 以外的任何JavaScript值。
每当流将数据块释放给消费者时,将触发 data
事件。这可能发生在通过调用 readable.pipe()
、readable.resume()
或将监听器回调附加到 data
事件时将流切换到流动模式时。当调用 readable.read()
方法并且有可用的数据块可返回时,也会触发 data
事件。
如果将 data
事件监听器附加到未明确暂停的流上,流将切换到流动模式。然后,数据将尽快传递。
如果使用 readable.setEncoding()
方法为流指定了默认编码,则监听器回调将将数据块作为字符串传递;否则,数据将作为缓冲区传递。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
Event:’end’
’end’ 事件在流没有更多可被消费的数据时触发。
只有在数据完全被消费之后,’end’ 事件才会被触发。可以通过将流切换到流动模式,或者通过重复调用 stream.read()
直到所有数据都被消费来实现这一点。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
console.log('There will be no more data.');
});
Event:’error’
<Error>
错误对象
’error’ 事件可能会在任何时候被可读实现触发。通常情况下,这可能发生在底层流由于底层内部故障而无法生成数据时,或者当流实现尝试推送无效的数据块时。
监听器回调将传递一个单独的错误对象。
Event:‘pause’
‘pause’ 事件在调用 stream.pause()
且 readableFlowing
不为 false
时触发。
Event:‘readable’
‘readable’ 事件在可以从流中读取数据时或已到达流的末尾时触发。实际上,‘readable’ 事件表示流有新信息可用。如果有可用数据,stream.read()
将返回该数据。
const readable = getReadableStreamSomehow();
readable.on('readable', function() {
// There is some data to read now.
let data;
while ((data = this.read()) !== null) {
console.log(data);
}
});
如果已经到达流的末尾,调用 stream.read()
将返回 null
并触发 ’end’ 事件。即使在没有可读数据的情况下,这种行为也是一致的,例如在空文件 ‘foo.txt’ 的情况下:
const fs = require('node:fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
console.log('end');
});
运行此脚本的输出是:
$ node test.js
readable: null
end
在某些情况下,附加 ‘readable’ 事件的监听器会导致一定量的数据被读入内部缓冲区。
总体来说,readable.pipe()
和 ‘data’ 事件机制比 ‘readable’ 事件更容易理解。然而,处理 ‘readable’ 可能会导致增加吞吐量。
如果同时使用 ‘readable’ 和 ‘data’,‘readable’ 优先控制流,即只有在调用 stream.read()
时才会触发 ‘data’ 事件。readableFlowing
属性将变为 false
。如果在移除 ‘readable’ 时存在 ‘data’ 监听器,则流将开始流动,即将触发 ‘data’ 事件而无需调用 .resume()
。
Event:‘resume’
‘resume’ 事件在调用 stream.resume()
且 readableFlowing
不为 true
时触发。
readable.destroy([error])
error
<Error>
将作为 ’error’ 事件的负载传递的错误
返回值: <this>
销毁流。可选择触发 ’error’ 事件,并触发 ‘close’ 事件(除非设置 emitClose 为 false)。调用此方法后,可读流将释放任何内部资源,并忽略后续对 push()
的调用。
一旦调用了 destroy(),任何进一步的调用都将成为无操作,并且除了来自 _destroy() 的错误外,不会触发其他错误。
实现者不应覆盖此方法,而应实现 readable._destroy()
。
readable.closed
<boolean>
在 ‘close’ 事件触发后为 true。
readable.destroyed
<boolean>
在调用readable.destroy()
后为 true。
readable.isPaused()
返回值: <boolean>
readable.isPaused()
方法返回可读流的当前操作状态。这主要由底层支持 readable.pipe()
方法的机制使用。在大多数典型情况下,通常不需要直接使用此方法。
const readable = new stream.Readable();
readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()
返回值: <this>
readable.pause()
方法将导致处于流动模式的流停止发出 ‘data’ 事件,从流动模式切换出来。任何可用的数据将保留在内部缓冲区中。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
如果存在 ‘readable’ 事件监听器,则 readable.pause()
方法不会产生任何效果。
readable.pipe(destination[, options])
destination
<stream.Writable>
用于写入数据的目标流options
<Object>
管道选项end
<boolean>
当读取器结束时是否结束写入器。默认值为true
。
返回值: <stream.Writable>
目标流,如果它是双工流(Duplex)或转换流(Transform),则允许进行链式管道操作。
readable.pipe()
方法将可写流(Writable stream)附加到可读流(Readable),导致它自动切换到流动模式并将所有数据推送到附加的可写流。数据的流动将被自动管理,以确保目标可写流不会被更快的可读流压倒。
以下示例将可读流中的所有数据传输到名为 ‘file.txt’ 的文件中:
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'.
readable.pipe(writable);
可以将多个可写流附加到单个可读流上。
readable.pipe()
方法返回对目标流的引用,从而可以设置一系列连续的管道流:
const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
默认情况下,当源可读流触发 ’end’ 事件时,会调用 stream.end()
来关闭目标可写流,使其不再可写。要禁用此默认行为,可以将 end
选项传递为 false
,从而使目标流保持打开状态:
reader.pipe(writer, { end: false });
reader.on('end', () => {
writer.end('Goodbye\n');
});
有一个重要的注意事项,如果在处理过程中可读流触发了错误,可写目标不会自动关闭。如果发生错误,必须手动关闭每个流以防止内存泄漏。
但是,无论指定的选项如何,process.stderr
和 process.stdout
可写流直到 Node.js 进程退出时才会关闭。
readable.read([size])
- size
<number>
可选参数,用于指定要读取的数据量。 - 返回:
<string> | <Buffer> | <null> | <any>
readable.read()
方法从内部缓冲区中读取数据并返回。如果没有可读取的数据,将返回 null
。默认情况下,数据以Buffer对象的形式返回,除非使用readable.setEncoding()方法指定了编码或流处于对象模式。
可选的 size
参数指定要读取的字节数。如果没有 size
字节可供读取,除非流已结束,否则将返回 null
,此时将返回内部缓冲区中剩余的所有数据。——这里指的是给定了 size
的情况下,流结束了,最后的 chunk
不足 size
的数,就返回剩下的全部。如果流还在,没有结束,因为某些原因,比如错误之类,读取不到 size
数量的 chunk
,就会返回 null
。
如果未指定 size
参数,将返回内部缓冲区中包含的所有数据。
size
参数必须小于或等于1 GiB。
readable.read()
方法只应在处于暂停模式的可读流上调用。在流动模式下,会自动调用 readable.read()
,直到内部缓冲区完全耗尽。
const readable = getReadableStreamSomehow();
// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
let chunk;
console.log('Stream is readable (new data received in buffer)');
// Use a loop to make sure we read all currently available data
while (null !== (chunk = readable.read())) {
console.log(`Read ${chunk.length} bytes of data...`);
}
});
// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
console.log('Reached end of stream.');
});
每次调用readable.read()都会返回一个数据块或null。这些数据块不会连接在一起。需要使用while循环来消费当前缓冲区中的所有数据。当读取一个大文件时,.read()可能会返回null,表示已经消费了到目前为止缓冲的所有内容,但还有更多的数据尚未缓冲。在这种情况下,当缓冲区中有更多数据时,将触发新的’readable’事件。最后,当没有更多数据时,将触发’end’事件。
因此,要从可读流中读取文件的全部内容,需要在多个’readable’事件中收集数据块:
const chunks = [];
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
chunks.push(chunk);
}
});
readable.on('end', () => {
const content = chunks.join('');
});
对象模式下的可读流无论 size
参数的值如何,始终从 readable.read(size)
的调用中返回单个项。
如果 readable.read()
方法返回一个数据块,还会触发 data
事件。
在触发 end
事件之后调用 stream.read([size])
将返回 null
。不会引发运行时错误。
readable.readable
<boolean>
如果可以安全调用 readable.read()
,则为 true
,这意味着流尚未被销毁或触发 error
或 end
事件。
readable.readableEncoding
<null> | <string>
用于获取给定可读流的 encoding
属性的 getter
方法。可以使用 readable.setEncoding()
方法设置 encoding
属性。
readable.readableEnded
<boolean>
当触发 end
事件时,该属性变为 true
。
readable.errored
<Error>
如果流已被错误销毁,则返回错误。
readable.readableFlowing
<boolean>
此属性反映了可读流的当前状态,如“三个状态”部分所述。
readable.readableHighWaterMark
<number>
返回创建此可读流时传递的 highWaterMark
的值。
readable.readableLength
<number>
此属性包含队列中准备读取的字节数(或对象)的数量。该值提供了关于 highWaterMark
状态的内省数据。
readable.readableObjectMode
<boolean>
用于获取给定可读流的 objectMode
属性的 getter
方法。
readable.resume()
- Returns:
readable.resume()
方法会导致明确暂停的可读流(Readable stream)恢复发出 ‘data’ 事件,将流切换到流动模式。
readable.resume()
方法可用于完全消耗流中的数据,而不实际处理任何数据:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.');
});
如果存在 ‘readable’ 事件监听器,则 readable.resume()
方法不会产生任何影响。
readable.setEncoding(encoding)
- encoding
The encoding to use. - Returns:
readable.setEncoding(encoding)
方法用于设置从可读流(Readable stream)读取的数据的字符编码。
默认情况下,不会分配任何编码,流数据将作为 Buffer 对象返回。设置编码会导致流数据以指定编码的字符串形式返回,而不是作为 Buffer 对象。例如,调用 readable.setEncoding('utf8')
会使输出数据被解释为 UTF-8 数据并以字符串形式传递。调用 readable.setEncoding('hex')
会导致数据以十六进制字符串格式编码。
可读流将正确处理通过流传递的多字节字符,否则如果仅将其从流中拉出为 Buffer 对象,则可能会出现解码不正确的情况。
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('Got %d characters of string data:', chunk.length);
});
readable.unpipe([destination])
destination
<stream.Writable>
Optional specific stream to unpipe- Returns:
<this>
readable.unpipe()
方法用于分离之前通过 stream.pipe()
方法连接的可写流(Writable stream)。
如果未指定目标流,则会分离所有的管道连接。
如果指定了目标流,但没有为它设置管道连接,则此方法不会产生任何效果。
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
console.log('Stop writing to file.txt.');
readable.unpipe(writable);
console.log('Manually close the file stream.');
writable.end();
}, 1000);
readable.unshift(chunk[, encoding])
-
chunk <Buffer> | <Uint8Array> | <string> | <null> | <any>
:要添加到读取队列的数据块。对于不在对象模式下运行的流,chunk
必须是字符串、Buffer、Uint8Array 或 null 中的一种。对于对象模式的流,chunk
可以是任何 JavaScript 值。 -
encoding <string>
:字符串块的编码。必须是有效的 Buffer 编码,例如 ‘utf8’ 或 ‘ascii’。
将 chunk
设置为 null 表示流的结束(EOF),并且与 readable.push(null)
的行为相同,之后将无法再写入更多数据。EOF 信号被放置在缓冲区的末尾,任何已缓冲的数据仍然会被刷新。
readable.unshift()
方法将一块数据推回内部缓冲区。这在某些情况下非常有用,其中一个流被代码消耗,而该代码需要“取消消耗”它已经从源中乐观地拉取出的某些数据,以便数据可以传递给其他方。
在触发 ’end’ 事件后不能调用 stream.unshift(chunk)
方法,否则会引发运行时错误。
经常使用 stream.unshift()
的开发人员应考虑切换到使用 Transform 流。有关更多信息,请参阅流实现者部分的 API 文档。
// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder');
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
const decoder = new StringDecoder('utf8');
let header = '';
function onReadable() {
let chunk;
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.includes('\n\n')) {
// Found the header boundary.
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// Remove the 'readable' listener before unshifting.
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
// Now the body of the message can be read from the stream.
callback(null, header, stream);
return;
}
// Still reading the header.
header += str;
}
}
}
与 stream.push(chunk)
不同,stream.unshift(chunk)
不会通过重置流的内部读取状态来结束读取过程。如果在读取过程中调用了 readable.unshift()
,这可能导致意外的结果(例如,在自定义流的 stream._read()
实现中)。在调用 readable.unshift()
后立即跟上 stream.push('')
将适当地重置读取状态,但最好在执行读取过程中避免调用 readable.unshift()
。
readable[Symbol.asyncIterator]()
Returns
:<AsyncIterator>
to fully consume the stream.
const fs = require('node:fs');
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const chunk of readable) {
data += chunk;
}
console.log(data);
}
print(fs.createReadStream('file')).catch(console.error);
如果循环由于 break、return 或 throw 而终止,流将被销毁。换句话说,迭代流将完全消耗流。流将按照 highWaterMark 选项的大小以块的形式读取。在上面的代码示例中,如果文件的数据小于 64 KiB,数据将在一个单独的块中,因为没有提供 highWaterMark 选项给 fs.createReadStream()。
Duplex and transform streams
以下是有关 stream.Duplex
和 stream.Transform
类的信息的翻译:
Class: stream.Duplex
Duplex(双工)流是同时实现了可读(Readable)和可写(Writable)接口的流。
Duplex 流的示例包括:
- TCP sockets(TCP 套接字)
- zlib streams (zlib 流)
- crypto streams (crypto 流)
duplex.allowHalfOpen
<boolean>
如果为 false
,则当可读端结束时,流会自动结束可写端。最初由 allowHalfOpen
构造函数选项设置,其默认值为 true
。
可以手动更改此属性以更改现有 Duplex 流实例的半开放行为,但必须在 ’end’ 事件被触发之前进行更改。
Class: stream.Transform
Transform(转换)流是输出与输入在某种方式上相关的 Duplex(双工)流。与所有 Duplex 流一样,Transform 流实现了可读和可写接口。
Transform 流的示例包括:
- zlib streams (zlib 流)
- crypto streams (crypto 流)
transform.destroy([error])
error <Error>
- 返回:
<this>
销毁流,并可选择发出 ’error’ 事件。在此调用之后,转换流将释放任何内部资源。实现者不应该覆盖此方法,而应该实现 readable._destroy()
。Transform 的 _destroy()
的默认实现也会在 emitClose
设置为 false
时发出 ‘close’ 事件。
一旦调用了 destroy()
,任何进一步的调用都将成为无操作,除非来自 _destroy()
的错误以外,不会再触发其他错误事件。
stream.finished(stream[, options], callback)
stream <Stream> | <ReadableStream> | <WritableStream>
可读和/或可写的流/网络流options
对象error <boolean>
:如果设置为false
,则对emit('error', err)
的调用不会被视为已完成。默认值:true
。readable <boolean>
:当设置为false
时,即使流仍然可读,也会在流结束时调用回调函数。默认值:true
。writable <boolean>
:当设置为false
时,即使流仍然可写,也会在流结束时调用回调函数。默认值:true
。signal <AbortSignal>
:允许中止等待流完成。如果信号被中止,底层流不会被中止。回调函数将以 AbortError 被调用。由此函数添加的所有已注册的监听器也将被移除。cleanup <boolean>
:移除所有已注册的流监听器。默认值:false
。callback <Function>
:一个回调函数,可以接受一个可选的错误参数。
- 返回值: 返回一个清理函数,用于移除所有已注册的监听器。
一个函数,用于在流不再可读、可写,或者发生错误或意外关闭事件时获得通知。
const { finished } = require('node:stream');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
finished(rs, (err) => {
if (err) {
console.error('Stream failed.', err);
} else {
console.log('Stream is done reading.');
}
});
rs.resume(); // Drain the stream.
这在处理错误情况时特别有用,例如流被意外销毁(比如中止的HTTP请求),并且不会发出 ’end’ 或 ‘finish’ 事件。
finished API 提供了 Promise 版本。
stream.finished() 在回调被调用后会保留悬空的事件侦听器(特别是 ’error’、’end’、‘finish’ 和 ‘close’),原因是为了防止意外的 ’error’ 事件(由于不正确的流实现而导致)引发意外崩溃。如果不希望这种行为,那么在回调中需要调用返回的清理函数:
const cleanup = finished(rs, (err) => {
cleanup();
// ...
});
stream.pipeline(source[, …transforms], destination, callback)
stream.pipeline(streams, callback)
- streams 是一个包含各种类型的流对象的数组。
- source 是一个表示数据源的参数,可以是不同类型的流或函数。
- source
<AsyncIterable>
- Returns:
<AsyncIterable>
- source
- transforms 是一个表示转换操作的参数,可以是不同类型的流或函数。
- source
<AsyncIterable>
- Returns:
<AsyncIterable> | <Promise>
- source
- destination 是一个表示目标位置的参数,可以是不同类型的流或函数。
source <AsyncIterable>
Returns: <AsyncIterable> | <Promise>
- callback 是在管道操作完成时调用的函数。
err 是一个表示错误的参数。
val 是 Promise 返回的值。
此模块方法用于在不同类型的流和生成器之间进行数据传输,同时处理错误,进行适当的清理,并在管道完成时提供回调。函数返回一个流对象。
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge tar file efficiently:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
},
);
Pipeline API提供了Promise版本。
stream.pipeline()会在所有流上调用stream.destroy(err),除非:
- 可读流已发出 ’end’ 或 ‘close’ 事件。
- 可写流已发出 ‘finish’ 或 ‘close’ 事件。
在回调被调用后,stream.pipeline()会在流上保留悬挂的事件侦听器。在流失败后重新使用的情况下,这可能导致事件侦听器泄漏和错误被吞噬。如果最后一个流是可读流,则悬挂的事件侦听器将被移除,以便稍后可以消耗最后一个流。
当引发错误时,stream.pipeline()会关闭所有流。使用pipeline的IncomingRequest可能会导致意外行为,因为它会在不发送预期响应的情况下销毁套接字。请参考下面的示例:
const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt');
pipeline(fileStream, res, (err) => {
if (err) {
console.log(err); // No such file
// this message can't be sent once `pipeline` already destroyed the socket
return res.end('error!!!');
}
});
});
stream.Readable.from(iterable[, options])
iterable:<Iterable>
是一个实现了Symbol.asyncIterator或Symbol.iterator可迭代协议的对象。如果传递了null值,它会发出一个’error’事件。options:<Object>
是一个对象,用于传递给stream.Readable([options])
的选项。默认情况下,Readable.from()
会将options.objectMode
设置为true,除非通过将options.objectMode
显式设置为false来选择不启用该选项。- 该函数返回一个
stream.Readable
对象。
这是一个用于从迭代器创建可读流的实用方法。
const { Readable } = require('node:stream');
async function * generate() {
yield 'hello';
yield 'streams';
}
const readable = Readable.from(generate());
readable.on('data', (chunk) => {
console.log(chunk);
});
如果调用 Readable.from(string)
或 Readable.from(buffer)
,字符串或缓冲区将不会被迭代以匹配其他流的语义,这是为了提高性能。
如果作为参数传递了一个包含 promises 的可迭代对象,可能会导致未处理的拒绝(unhandled rejection)情况。
const { Readable } = require('node:stream');
Readable.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.Duplex.from(src)
以下是有关创建双工流的实用方法的描述,以及支持的输入类型和返回类型:
src
可以是多种类型,包括流、Blob、ArrayBuffer、字符串、可迭代对象、异步可迭代对象、异步生成器函数、异步函数、Promise、对象、可读流和可写流。
这个实用方法将不同的输入类型转换成可读和可写的双工流(Duplex)。具体的转换方式如下:
- 流(Stream):将可写流转换成可读和可写的双工流。
- Blob:转换成可读的双工流。
- 字符串(string):转换成可读的双工流。
- ArrayBuffer:转换成可读的双工流。
- 异步可迭代对象(AsyncIterable):转换成可读的双工流,但不能产生 null 值。
- 异步生成器函数(AsyncGeneratorFunction):转换成可读和可写的转换型双工流。必须将源异步可迭代对象作为第一个参数,并且不能产生 null 值。
- 异步函数(AsyncFunction):转换成可写的双工流,必须返回 null 或 undefined。
- 对象({ writable, readable }):将可写和可读转换成流,然后将它们组合成双工流,其中双工流将写入可写部分并从可读部分读取。
- Promise:转换成可读的双工流,忽略值为 null 的情况。
- 可读流(ReadableStream):转换成可读的双工流。
- 可写流(WritableStream):转换成可写的双工流。
这个方法返回一个 stream.Duplex
对象。需要注意,如果传递包含 promises 的可迭代对象作为参数,可能会导致未处理的拒绝(unhandled rejection)。
const { Duplex } = require('node:stream');
Duplex.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.addAbortSignal(signal, stream)
signal
<AbortSignal>
表示可能的取消信号stream
<Stream> | <ReadableStream> | <WritableStream>
可以附加信号的流
将一个 AbortSignal 附加到一个可读或可写流上。这使得代码可以使用 AbortController 控制流的销毁。
调用与传递的 AbortSignal 对应的 AbortController 上的 abort 将与在流上调用 .destroy(new AbortError()) 以及对于 Web 流调用 controller.error(new AbortError()) 的行为相同。
const fs = require('node:fs');
const controller = new AbortController();
const read = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
// Later, abort the operation closing the stream
controller.abort();
或者将 AbortSignal 与可读流一起使用作为异步可迭代对象:
const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // set a timeout
const stream = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
(async () => {
try {
for await (const chunk of stream) {
await process(chunk);
}
} catch (e) {
if (e.name === 'AbortError') {
// The operation was cancelled
} else {
throw e;
}
}
})();
或者使用 AbortSignal 与 ReadableStream:
const controller = new AbortController();
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.enqueue('world');
controller.close();
},
});
addAbortSignal(controller.signal, rs);
finished(rs, (err) => {
if (err) {
if (err.name === 'AbortError') {
// The operation was cancelled
}
}
});
const reader = rs.getReader();
reader.read().then(({ value, done }) => {
console.log(value); // hello
console.log(done); // false
controller.abort();
});
stream.getDefaultHighWaterMark(objectMode)
objectMode
<boolean>
:表示是否启用对象模式。- 返回:默认高水位标记,以整数形式返回。默认值为 16384(16 KiB),或者在启用对象模式时为 16。
stream.setDefaultHighWaterMark(objectMode, value)
objectMode
<boolean>
:表示是否启用对象模式。value
<integer>
:高水位标记的新值。
设置流使用的默认高水位标记。
API for stream implementers
这个 node:stream
模块 API 已经设计成可以使用 JavaScript 的原型继承模型轻松实现流。
首先,流开发人员会声明一个新的 JavaScript 类,该类扩展了四个基本流类之一(stream.Writable
、stream.Readable
、stream.Duplex
或 stream.Transform
),确保调用适当的父类构造函数:
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark });
// ...
}
}
在扩展流时,请牢记用户可以和应该在将这些选项转发到基类构造函数之前提供哪些选项。例如,如果实现对于 autoDestroy 和 emitClose 选项有假设,不要允许用户覆盖这些选项。明确指出要转发的选项,而不是隐式转发所有选项。
然后,新的流类必须实现一个或多个特定方法,具体取决于要创建的流类型,详见下表:
Use-case | Class | Method(s) to implement |
---|---|---|
Reading only | Readable | _read() |
Writing only | Writable | _write(), _writev(), _final() |
Reading and writing | Duplex | _read(), _write(), _writev(), _final() |
Operate on written data, then read the result | Transform | _transform(), _flush(), _final() |
流的实现代码不应该调用那些旨在供消费者使用的流的 “公共” 方法(如流的 API 部分所描述)。这样做可能会导致应用程序中消耗流的代码出现不良的副作用。
避免覆盖公共方法,如 write()、end()、cork()、uncork()、read() 和 destroy(),或通过 .emit() 发出内部事件,如 ’error’、‘data’、’end’、‘finish’ 和 ‘close’。这样做可能会破坏当前和未来的流不变性,导致与其他流、流实用程序和用户期望的行为和/或兼容性问题。
Simplified construction
对于许多简单情况,可以在不依赖继承的情况下创建流。这可以通过直接创建stream.Writable
、stream.Readable
、stream.Duplex
或stream.Transform
对象的实例,并将适当的方法作为构造函数选项传递来实现。
const { Writable } = require('node:stream');
const myWritable = new Writable({
construct(callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Free resources...
},
});
Implementing a writable stream
stream.Writable
类被扩展以实现可写流。
自定义可写流必须调用新的 stream.Writable([options])
构造函数并实现 writable._write()
和/或 writable._writev()
方法。
new stream.Writable([options])
- `options` <Object>
* `highWaterMark` <number>:当流的 `write()` 开始返回 false 时的缓冲区水位标记。**默认值**:16384(16 KiB),或 16 对于 objectMode 流。
* `decodeStrings` <boolean>:在将传递给 `stream.write()` 的字符串编码为缓冲区之前(使用在 `stream.write()` 调用中指定的编码)是否转换它们为 Buffers。其他类型的数据不会被转换(即,不会将 Buffers 解码为字符串)。将其设置为 false 将阻止字符串的转换。**默认值**:true。
* `defaultEncoding` <string>:当没有将编码指定为传递给 `stream.write()` 的参数时使用的默认编码。**默认值**:'utf8'。
* `objectMode` <boolean>:流的 `write(anyObj)` 是否是有效操作。当设置时,如果流实现支持,可以写入除字符串、Buffer 或 Uint8Array 之外的 JavaScript 值。**默认值**:false。
* `emitClose` <boolean>:流销毁后是否应该发出 'close' 事件。**默认值**:true。
* `write` <Function>:`stream._write()` 方法的实现。
* `writev` <Function>:`stream._writev()` 方法的实现。
* `destroy` <Function>:`stream._destroy()` 方法的实现。
* `final` <Function>:`stream._final()` 方法的实现。
* `construct` <Function>:`stream._construct()` 方法的实现。
* `autoDestroy` <boolean>:该流是否在结束后自动调用 `.destroy()`。**默认值**:true。
* `signal` <AbortSignal>:表示可能取消的信号。
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor(options) {
// Calls the stream.Writable() constructor.
super(options);
// ...
}
}
或者,当使用ES6之前的构造函数风格时:
const { Writable } = require('node:stream');
const util = require('node:util');
function MyWritable(options) {
if (!(this instanceof MyWritable))
return new MyWritable(options);
Writable.call(this, options);
}
util.inherits(MyWritable, Writable);
或者,使用简化的构造函数方法:
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
});
在传递的AbortSignal对应的AbortController上调用abort将与在可写流上调用.destroy(new AbortError())
的行为相同。
const { Writable } = require('node:stream');
const controller = new AbortController();
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort();
writable._construct(callback)
- callback
当流初始化完成时,调用此函数(可选带有错误参数)。
不得直接调用_construct()方法。它可以由子类实现,如果是这样,只会被内部Writable类方法调用。
这个可选的函数将在流构造函数返回后的一个刻度后调用,延迟任何_write()、_final()和_destroy()的调用,直到调用回调函数。这对于在流可以使用之前初始化状态或异步初始化资源非常有用。
const { Writable } = require('node:stream');
const fs = require('node:fs');
class WriteStream extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
流在创建后是处于暂停状态的,可要注册可读流的data事件和可写流的drain事件
在 Node.js 中从本地读取一个大文件,并在中间使用 ‘drain’ 事件来控制数据流的写入是一个常见的操作,特别是当您需要处理大文件时。以下是一段示例代码,演示了如何实现这个功能:
首先,您可以使用 Node.js 的 fs 模块来创建可读流和可写流,然后通过监听 ‘data’ 事件来读取文件,并监听 ‘drain’ 事件来控制写入。这里假设您要将文件内容从 ./cc.txt 复制到另一个文件 ./output.txt。
const fs = require('fs');
// 创建可读流和可写流
const readStream = fs.createReadStream('./cc.txt');
const writeStream = fs.createWriteStream('./output.txt');
// 监听可读流的 'data' 事件,将数据写入可写流
readStream.on('data', (chunk) => {
// 尝试将数据写入可写流
if (!writeStream.write(chunk)) {
// 如果返回 false,表示缓冲区已满,需要等待 'drain' 事件
readStream.pause();
}
});
// 监听可写流的 'drain' 事件,继续读取并写入数据
writeStream.on('drain', () => {
// 当可写流的缓冲区排空时,继续读取并写入数据
readStream.resume();
});
// 当可读流读取完成时,关闭可写流
readStream.on('end', () => {
writeStream.end();
});
// 处理错误
readStream.on('error', (err) => {
console.error('读取文件出错:', err);
});
writeStream.on('error', (err) => {
console.error('写入文件出错:', err);
});
这段代码会逐块读取大文件并写入到另一个文件中,当可写流的缓冲区满时,会暂停可读流的读取操作,直到 ‘drain’ 事件触发后继续读取。这样可以有效控制内存使用,处理大文件时尤为重要。同时,处理了文件读取和写入可能出现的错误情况。