node.js中的流 stream 是处理流式数据的抽象接口。node.js 提供了很多流对象,像http中的request和response,和 process.stdout 都是流的实例。
流可以是 可读的,可写的,或是可读可写的。所有流都是 events 的实例。
一、流的类型
node.js中有四种基本流类型:
1、Writable 可写流 (例:fs.createWriteStream() )
2、Readable 可读流 (例:fs.createReadStream() )
3、Duplex 可读又可写流 (例:net.Socket )
4、Transform 读写过程中可修改或转换数据的 Duplex 流 (例:zlib.createDeflate() )
二、流中的数据有两种模式
1、二进制模式,都是 string字符串 和 Buffer。
2、对象模式,流内部处理的是一系统普通对象。
三、可读流的两种模式
1、流动模式 ( flowing ) ,数据自动从系统底层读取,并通过事件,尽可能快地提供给应用程序。
2、暂停模式 ( paused ),必须显式的调用 read() 读取数据。
可读流 都开始于暂停模式,可以通过如下方法切换到流动模式:
1、添加 'data' 事件回调。
2、调用 resume()。
3、调用 pipe()。
可读流通过如下方法切换回暂停模式:
1、如果没有管道目标,调用 pause()。
2、如果有管道目标,移除所有管道目标,调用 unpipe() 移除多个管道目标。
四、创建可读流,并监听事件
const fs = require('fs'); //创建一个文件可读流 let rs = fs.createReadStream('./1.txt', { //文件系统标志 flags: 'r', //数据编码,如果调置了该参数,则读取的数据会自动解析 //如果没调置,则读取的数据会是 Buffer //也可以通过 rs.setEncoding() 进行设置 encoding: 'utf8', //文件描述符,默认为null fd: null, //文件权限 mode: 0o666, //文件读取的开始位置 start: 0, //文件读取的结束位置(包括结束位置) end: Infinity, //读取缓冲区的大小,默认64K highWaterMark: 3 }); //文件被打开时触发 rs.on('open', function () { console.log('文件打开'); }); //监听data事件,会让当前流切换到流动模式 //当流中将数据传给消费者后触发 //由于我们在上面配置了 highWaterMark 为 3字节,所以下面会打印多次。 rs.on('data', function (data) { console.log(data); }); //流中没有数据可供消费者时触发 rs.on('end', function () { console.log('数据读取完毕'); }); //读取数据出错时触发 rs.on('error', function () { console.log('读取错误'); }); //当文件被关闭时触发 rs.on('close', function () { console.log('文件关闭'); });
注意,'open' 和 'close' 事件并不是所有流都会触发。
当们监听'data'事件后,系统会尽可能快的读取出数据。但有时候,我们需要暂停一下流的读取,操作其他事情。
这时候就需要用到 pause() 和 resume() 方法。
const fs = require('fs'); //创建一个文件可读流 let rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); rs.on('data', function (data) { console.log(`读取了 ${data.length} 字节数据 : ${data.toString()}`); //使流动模式的流停止触发'data'事件,切换出流动模式,数据都会保留在内部缓存中。 rs.pause(); //等待3秒后,再恢复触发'data'事件,将流切换回流动模式。 setTimeout(function () { rs.resume(); }, 3000); });
可读流的 'readable' 事件,当流中有数据可供读取时就触发。
注意当监听 'readable' 事件后,会导致流停止流动,需调用 read() 方法读取数据。
注意 on('data'),on('readable'),pipe() 不要混合使用,会导致不明确的行为。
const fs = require('fs'); let rs = fs.createReadStream('./1.txt', { highWaterMark: 1 }); //当流中有数据可供读取时就触发 rs.on('readable', function () { let data; //循环读取数据 //参数表示要读取的字节数 //如果可读的数据不足字节数,则返回缓冲区剩余数据 //如是没有指定字节数,则返回缓冲区中所有数据 while (data = rs.read()) { console.log(`读取到 ${data.length} 字节数据`); console.log(data.toString()); } });
五、创建可写流,并监听事件
const fs = require('fs'); //创建一个文件可写流 let ws = fs.createWriteStream('./1.txt', { highWaterMark: 3 }); //往流中写入数据 //参数一表示要写入的数据 //参数二表示编码方式 //参数三表示写入成功的回调 //缓冲区满时返回false,未满时返回true。 //由于上面我们设置的缓冲区大小为 3字节,所以到写入第3个时,就返回了false。 console.log(ws.write('1', 'utf8')); console.log(ws.write('2', 'utf8')); console.log(ws.write('3', 'utf8')); console.log(ws.write('4', 'utf8')); function writeData() { let cnt = 9; return function () { let flag = true; while (cnt && flag) { flag = ws.write(`${cnt}`); console.log('缓冲区中写入的字节数', ws.writableLength); cnt--; } }; } let wd = writeData(); wd(); //当缓冲区中的数据满的时候,应停止写入数据, //一旦缓冲区中的数据写入文件了,并清空了,则会触发 'drain' 事件,告诉生产者可以继续写数据了。 ws.on('drain', function () { console.log('可以继续写数据了'); console.log('缓冲区中写入的字节数', ws.writableLength); wd(); }); //当流或底层资源关闭时触发 ws.on('close', function () { console.log('文件被关闭'); }); //当写入数据出错时触发 ws.on('error', function () { console.log('写入数据错误'); });
写入流的 end() 方法 和 'finish' 事件监听