fs.createReadStream(path[, options])源码实现
//文件名 ReadStream.js let fs = require('fs');//读取文件 let EventEmitter = require('events'); class ReadStream extends EventEmitter {//流操作都是基于事件的 constructor(path, options = {}) { super(); //需要的参数 this.path = path;//读取文件的路径 this.highWaterMark = options.highWaterMark || 64 * 1024;//缓冲区大小,默认64KB this.autoClose = options.autoClose || true;//是否需要自动关闭文件描述符,默认为true this.start = options.start || 0; //options 可以包括 start 和 end 值,使其可以从文件读取一定范围的字节而不是整个文件 this.pos = this.start; // 从文件的那个位置开始读取内容,pos会随着读取的位置而改变 this.end = options.end || null; // null表示没传递 this.encoding = options.encoding || null; this.flags = options.flags || 'r';//以何种方式操作文件 // 参数的问题 this.flowing = null; // 默认为非流动模式 // 建一个buffer存放读出来的数据 this.buffer = Buffer.alloc(this.highWaterMark); this.open(); // {newListener:[fn]} // 次方法默认同步调用的 this.on('newListener', (type) => { // 等待着 它监听data事件 if (type === 'data') {//当监听到data事件时,把流设置为流动模式 this.flowing = true; this.read();// 开始读取 客户已经监听了data事件 } }) } pause(){//将流从flowing模式切换为paused模式 this.flowing = false; } resume(){//将流从paused模式切换为flowing模式 this.flowing =true; this.read();//将流从paused模式切换为flowing模式后,继续读取文件内容 } read(){ // 默认第一次调用read方法时还没有获取fd,文件的打开是异步的,所以不能直接读 if(typeof this.fd !== 'number'){ //如果fd不是number类型,证明文件还没有打开,此时需要监听一次open事件,因为文件一打开,就会触发open事件,这个在this.open()里写了 return this.once('open',() => this.read()); // 等待着触发open事件后fd肯定拿到了,拿到以后再去执行read方法 } // 当获取到fd时 开始读取文件了 // 第一次应该读2个 第二次应该读2个 // 第二次pos的值是4 end是4 // 读取文件里一共4有个数为123 4,我们读取里面的123 4 let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;//规定每次读取多少个字节 fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, byteRead) => { // byteRead为真实的读到了几个字节的内容 // 读取完毕 this.pos += byteRead; // 读出来两个,pos位置就往后移两位 // this.buffer默认就是三个 let b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);//对读出来的内容进行编码 this.emit('data', b);//触发data事件,将读到的内容输出给用户 if ((byteRead === this.highWaterMark)&&this.flowing){ return this.read(); // 继续读 } // 这里就是没有更多的逻辑了 if (byteRead < this.highWaterMark){ // 没有更多了 this.emit('end'); // 读取完毕 this.destroy(); // 销毁即可 } }); } // 打开文件用的 destroy() { if (typeof this.fd != 'number') { return this.emit('close'); } //如果文件还没打开,直接触发close事件 fs.close(this.fd, () => { // 如果文件打开过了 那就关闭文件并且触发close事件 this.emit('close'); }); } open() { fs.open(this.path, this.flags, (err, fd) => { //fd是文件描述符,它标识的就是当前this.path这个文件,从3开始(number类型) if (err) { if (this.autoClose) { // 如果需要自动关闭我再去销毁fd this.destroy(); // 销毁(关闭文件,触发关闭事件) } this.emit('error', err); // 如果有错误触发error事件 return; } this.fd = fd; // 保存文件描述符 this.emit('open', this.fd); // 文件被打开了,触发文件被打开的方法 }); } pipe(dest){//管道流的实现 pipe()方法是ReadStream下的方法,它里面的参数是WritableStream this.on('data',(data)=>{ let flag = dest.write(data); if(!flag){//这个flag就是每次调用ws.write()后返回的读状态值 this.pause();// 已经不能继续写了,等他写完了再恢复 } }); dest.on('drain',()=>{//当读取缓存区清空后 console.log('写一下停一下') this.resume();//继续往dest写入数据 }); } } module.exports = ReadStream;//导出可读流
内容版权声明:除非注明,否则皆为本站原创文章。