fs.createReadStream(path[, options])源码实现
//文件名 ReadStream.js
let fs = require('fs');//读取文件
let EventEmitter = require('events');
class ReadStream extends EventEmitter {//流操作都是基于事件的
constructor(path, options = {}) {
super();
//需要的参数
this.path = path;//读取文件的路径
this.highWaterMark = options.highWaterMark || 64 * 1024;//缓冲区大小,默认64KB
this.autoClose = options.autoClose || true;//是否需要自动关闭文件描述符,默认为true
this.start = options.start || 0; //options 可以包括 start 和 end 值,使其可以从文件读取一定范围的字节而不是整个文件
this.pos = this.start; // 从文件的那个位置开始读取内容,pos会随着读取的位置而改变
this.end = options.end || null; // null表示没传递
this.encoding = options.encoding || null;
this.flags = options.flags || 'r';//以何种方式操作文件
// 参数的问题
this.flowing = null; // 默认为非流动模式
// 建一个buffer存放读出来的数据
this.buffer = Buffer.alloc(this.highWaterMark);
this.open();
// {newListener:[fn]}
// 次方法默认同步调用的
this.on('newListener', (type) => { // 等待着 它监听data事件
if (type === 'data') {//当监听到data事件时,把流设置为流动模式
this.flowing = true;
this.read();// 开始读取 客户已经监听了data事件
}
})
}
pause(){//将流从flowing模式切换为paused模式
this.flowing = false;
}
resume(){//将流从paused模式切换为flowing模式
this.flowing =true;
this.read();//将流从paused模式切换为flowing模式后,继续读取文件内容
}
read(){ // 默认第一次调用read方法时还没有获取fd,文件的打开是异步的,所以不能直接读
if(typeof this.fd !== 'number'){ //如果fd不是number类型,证明文件还没有打开,此时需要监听一次open事件,因为文件一打开,就会触发open事件,这个在this.open()里写了
return this.once('open',() => this.read()); // 等待着触发open事件后fd肯定拿到了,拿到以后再去执行read方法
}
// 当获取到fd时 开始读取文件了
// 第一次应该读2个 第二次应该读2个
// 第二次pos的值是4 end是4
// 读取文件里一共4有个数为123 4,我们读取里面的123 4
let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;//规定每次读取多少个字节
fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, byteRead) => { // byteRead为真实的读到了几个字节的内容
// 读取完毕
this.pos += byteRead; // 读出来两个,pos位置就往后移两位
// this.buffer默认就是三个
let b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);//对读出来的内容进行编码
this.emit('data', b);//触发data事件,将读到的内容输出给用户
if ((byteRead === this.highWaterMark)&&this.flowing){
return this.read(); // 继续读
}
// 这里就是没有更多的逻辑了
if (byteRead < this.highWaterMark){
// 没有更多了
this.emit('end'); // 读取完毕
this.destroy(); // 销毁即可
}
});
}
// 打开文件用的
destroy() {
if (typeof this.fd != 'number') { return this.emit('close'); } //如果文件还没打开,直接触发close事件
fs.close(this.fd, () => {
// 如果文件打开过了 那就关闭文件并且触发close事件
this.emit('close');
});
}
open() {
fs.open(this.path, this.flags, (err, fd) => { //fd是文件描述符,它标识的就是当前this.path这个文件,从3开始(number类型)
if (err) {
if (this.autoClose) { // 如果需要自动关闭我再去销毁fd
this.destroy(); // 销毁(关闭文件,触发关闭事件)
}
this.emit('error', err); // 如果有错误触发error事件
return;
}
this.fd = fd; // 保存文件描述符
this.emit('open', this.fd); // 文件被打开了,触发文件被打开的方法
});
}
pipe(dest){//管道流的实现 pipe()方法是ReadStream下的方法,它里面的参数是WritableStream
this.on('data',(data)=>{
let flag = dest.write(data);
if(!flag){//这个flag就是每次调用ws.write()后返回的读状态值
this.pause();// 已经不能继续写了,等他写完了再恢复
}
});
dest.on('drain',()=>{//当读取缓存区清空后
console.log('写一下停一下')
this.resume();//继续往dest写入数据
});
}
}
module.exports = ReadStream;//导出可读流
内容版权声明:除非注明,否则皆为本站原创文章。

