Node.js中你不可不精的Stream(流)(3)

fs.createReadStream(path[, options])源码实现

//文件名 ReadStream.js
let fs = require('fs');//读取文件
let EventEmitter = require('events');
class ReadStream extends EventEmitter {//流操作都是基于事件的
 constructor(path, options = {}) {
 super();
 //需要的参数
 this.path = path;//读取文件的路径
 this.highWaterMark = options.highWaterMark || 64 * 1024;//缓冲区大小,默认64KB
 this.autoClose = options.autoClose || true;//是否需要自动关闭文件描述符,默认为true
 this.start = options.start || 0; //options 可以包括 start 和 end 值,使其可以从文件读取一定范围的字节而不是整个文件
 this.pos = this.start; // 从文件的那个位置开始读取内容,pos会随着读取的位置而改变
 this.end = options.end || null; // null表示没传递
 this.encoding = options.encoding || null;
 this.flags = options.flags || 'r';//以何种方式操作文件

 // 参数的问题
 this.flowing = null; // 默认为非流动模式
 // 建一个buffer存放读出来的数据
 this.buffer = Buffer.alloc(this.highWaterMark);
 this.open(); 
 // {newListener:[fn]}
 // 次方法默认同步调用的
 this.on('newListener', (type) => { // 等待着 它监听data事件
  if (type === 'data') {//当监听到data事件时,把流设置为流动模式
  this.flowing = true;
  this.read();// 开始读取 客户已经监听了data事件
  }
 })
 }
 pause(){//将流从flowing模式切换为paused模式
 this.flowing = false;
 }
 resume(){//将流从paused模式切换为flowing模式
 this.flowing =true;
 this.read();//将流从paused模式切换为flowing模式后,继续读取文件内容
 }
 read(){ // 默认第一次调用read方法时还没有获取fd,文件的打开是异步的,所以不能直接读
 if(typeof this.fd !== 'number'){ //如果fd不是number类型,证明文件还没有打开,此时需要监听一次open事件,因为文件一打开,就会触发open事件,这个在this.open()里写了
  return this.once('open',() => this.read()); // 等待着触发open事件后fd肯定拿到了,拿到以后再去执行read方法
 }
 // 当获取到fd时 开始读取文件了
 // 第一次应该读2个 第二次应该读2个
 // 第二次pos的值是4 end是4
 // 读取文件里一共4有个数为123 4,我们读取里面的123 4
 let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;//规定每次读取多少个字节
 fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, byteRead) => { // byteRead为真实的读到了几个字节的内容
  // 读取完毕
  this.pos += byteRead; // 读出来两个,pos位置就往后移两位
  // this.buffer默认就是三个
  let b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);//对读出来的内容进行编码
  this.emit('data', b);//触发data事件,将读到的内容输出给用户
  if ((byteRead === this.highWaterMark)&&this.flowing){
  return this.read(); // 继续读
  }
  // 这里就是没有更多的逻辑了
  if (byteRead < this.highWaterMark){
  // 没有更多了
  this.emit('end'); // 读取完毕
  this.destroy(); // 销毁即可
  }
 });
 }
 // 打开文件用的
 destroy() {
 if (typeof this.fd != 'number') { return this.emit('close'); } //如果文件还没打开,直接触发close事件
 fs.close(this.fd, () => {
  // 如果文件打开过了 那就关闭文件并且触发close事件
  this.emit('close');
 });
 }
 open() {
 fs.open(this.path, this.flags, (err, fd) => { //fd是文件描述符,它标识的就是当前this.path这个文件,从3开始(number类型)
  if (err) {
  if (this.autoClose) { // 如果需要自动关闭我再去销毁fd
   this.destroy(); // 销毁(关闭文件,触发关闭事件)
  }
  this.emit('error', err); // 如果有错误触发error事件
  return;
  }
  this.fd = fd; // 保存文件描述符
  this.emit('open', this.fd); // 文件被打开了,触发文件被打开的方法
 });
 }
 pipe(dest){//管道流的实现 pipe()方法是ReadStream下的方法,它里面的参数是WritableStream
 this.on('data',(data)=>{
  let flag = dest.write(data);
  if(!flag){//这个flag就是每次调用ws.write()后返回的读状态值
  this.pause();// 已经不能继续写了,等他写完了再恢复
  }
 });
 dest.on('drain',()=>{//当读取缓存区清空后
  console.log('写一下停一下')
  this.resume();//继续往dest写入数据
 });
 }
}
module.exports = ReadStream;//导出可读流
      

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/68.html