Stream在node.js中是一个抽象的接口,基于EventEmitter,也是一种Buffer的高级封装,用来处理流数据。流模块便是提供各种API让我们可以很简单的使用Stream。
流分为四种类型,如下所示:
Readable,可读流
Writable,可写流
Duplex,读写流
Transform,扩展的Duplex,可修改写入的数据
1、Readable可读流
通过stream.Readable可创建一个可读流,它有两种模式:暂停和流动。
在流动模式下,将自动从下游系统读取数据并使用data事件输出;暂停模式下,必须显示调用stream.read()方法读取数据,并触发data事件。
所有的可读流最开始都是暂停模式,可以通过以下方法切换到流动模式:
监听'data'事件
调用stream.resume()方法
调用stream.pipe()方法将数据输出到一个可写流Writable
同样地,也可以切换到暂停模式,有两种方法:
如果没有设置pipe目标,调用stream.pause()方法即可。
如果设置了pipe目标,则需要移除所有的data监听和调用stream.unpipe()方法
在Readable对象中有一个_readableSate的对象,通过该对象可以得知流当前处于什么模式,如下所示:
readable._readableState.flowing = null,没有数据消费者,流不产生数据
readable._readableState.flowing = true,处于流动模式
readable._readableState.flowing = false,处于暂停模式
为什么使用流取数据
对于小文件,使用fs.readFile()方法读取数据更方便,但需要读取大文件的时候,比如几G大小的文件,使用该方法将消耗大量的内存,甚至使程序崩溃。这种情况下,使用流来处理是更合适的,采用分段读取,便不会造成内存的'爆仓'问题。
data事件
在stream提供数据块给消费者时触发,有可能是切换到流动模式的时候,也有可能是调用readable.read()方法且有有效数据块的时候,使用如下所示:
const fs = require('fs'); const rs = fs.createReadStream('./appbak.js'); var chunkArr = [], chunkLen = 0; rs.on('data',(chunk)=>{ chunkArr.push(chunk); chunkLen+=chunk.length; }); rs.on('end',(chunk)=>{ console.log(Buffer.concat(chunkArr,chunkLen).toString()); });
readable事件
当流中有可用数据能被读取时触发,分为两种,新的可用的数据和到达流的末尾,前者stream.read()方法返回可用数据,后者返回null,如下所示:
const rs = fs.createReadStream('./appbak.js'); var chunkArr = [], chunkLen = 0; rs.on('readable',()=>{ var chunk = null; //这里需要判断是否到了流的末尾 if((chunk = rs.read()) !== null){ chunkArr.push(chunk); chunkLen+=chunk.length; } }); rs.on('end',(chunk)=>{ console.log(Buffer.concat(chunkArr,chunkLen).toString()); });
pause和resume方法
stream.pause()方法让流进入暂停模式,并停止'data'事件触发,stream.resume()方法使流进入流动模式,并恢复'data'事件触发,也可以用来消费所有数据,如下所示:
const rs = fs.createReadStream('./下载.png'); rs.on('data',(chunk)=>{ console.log(`接收到${chunk.length}字节数据...`); rs.pause(); console.log(`数据接收将暂停1.5秒.`); setTimeout(()=>{ rs.resume(); },1000); }); rs.on('end',(chunk)=>{ console.log(`数据接收完毕`); });
pipe(destination[, options])方法
pipe()方法绑定一个可写流到可读流上,并自动切换到流动模式,将所有数据输出到可写流,以及做好了数据流的管理,不会发生数据丢失的问题,使用如下所示:
const rs = fs.createReadStream('./app.js'); rs.pipe(process.stdout);
以上介绍了多种可读流的数据消费的方法,但对于一个可读流,最好只选择其中的一种,推荐使用pipe()方法。
2、Writable可写流
所有的可写流都是基于stream.Writable类创建的,创建之后便可将数据写入该流中。
write(chunk[, encoding][, callback])方法
write()方法向可写流中写入数据,参数含义:
chunk,字符串或buffer
encoding,若chunk为字符串,则是chunk的编码
callback,当前chunk数据写入磁盘时的回调函数
该方法的返回值为布尔值,如果为false,则表示需要写入的数据块被缓存并且此时缓存的大小超出highWaterMark阀值,否则为true。
使用如下所示:
const ws = fs.createWriteStream('./test.txt'); ws.write('nihao','utf8',()=>{process.stdout.write('this chunk is flushed.');}); ws.end('done.')
背压机制
如果可写流的写入速度跟不上可读流的读取速度,write方法添加的数据将被缓存,逐渐增多,导致占用大量内存。我们希望的是消耗一个数据,再去读取一个数据,这样内存就维持在一个水平上。如何做到这一点?可以利用write方法的返回值来判断可写流的缓存状态和'drain'事件,及时切换可读流的模式,如下所示: