浅谈手写node可读流之流动模式(2)

constructor(path, options = {}) { super(); this.path = path; //指定要读取的文件地址 this.highWaterMark = options.highWaterMark || 64 * 1024; this.autoClose = options.autoClose || true; //是否自动关闭文件 this.start = options.start || 0; //从文件哪个位置开始读取 this.pos = this.start; // pos会随着读取的位置改变 this.end = options.end || null; // null表示没传递 this.encoding = options.encoding || null;// buffer编码 this.flags = options.flags || 'r'; this.flowing = null; // 模式开关 this.buffer = Buffer.alloc(this.highWaterMark);// 根据设置创建一个buffer存储读出来的数 this.open(); }

通常配置的原则是以用户配置的参数为准,如果用户没有对这个参数进行设置的话,就采用默认的配置。

实现可读流第三步:打开文件

这里原理是使用node模块fs中的open方法。首先我们来回顾下fs.open()方法的使用。

fs.open(filename,flags,[mode],callback); //实例 fs.open('./1,txt','r',function(err,fd){});

这里需要说明下,回调函数callback中有2个参数:

第一个是error,node中异步回调都会返回的一个参数,用来说明具体的错误信息

第二个参数是fd,是文件描述符,用来标识文件,等价于open函数的第一个参数

好了,现在我们来看看我们自己的可读流的open方法该如何实现吧:

open() { fs.open(this.path, this.flags, (err, fd) => { //fd标识的就是当前this.path这个文件,从3开始(number类型) if (err) { if (this.autoClose) { // 如果需要自动关闭则去关闭文件 this.destroy(); // 销毁(关闭文件,触发关闭事件) } this.emit('error', err); // 如果有错误触发error事件 return; } this.fd = fd; // 保存文件描述符 this.emit('open', this.fd); // 触发文件的打开的方法 }); }

从代码上我们可以看出:

fs.open函数是异步函数,也就是说callback是异步执行的,在成功打开文件的情况下,fd这个属性也是异步获取到的,这点需要注意。

另外重要的一点是,如果在打开文件发生错误时,则表明打开文件失败,那么此时就需要将文件关闭。

实现可读流第四步:读取文件内容

上面我们详细说过,可读流自身定义了一个"开关",当我们要读取文件中的内容的时候,我们需要将这个"开关"打开,那么node可读流本身是如何来打开这个"开关"的呢?

监听data事件

node可读流通过监听data事件来实现这个"开关"的开启:

rs.on("data", data => { console.log(data); });

当用户监听data事件的时候,"开关"开启,不停的从文件中读取内容。那么node是怎么监听data事件的呢?
答案就是 事件模块的newListener

这是因为node可读流是基于事件的,而事件中,服务器就可以通过newListener事件监听到从用户这边过来的所有事件,每个事件都有对应的类型,当用户监听的是data事件的时候,我们就可以获取到,然后就可以去读取文件中的内容了,那我们自己的可读流该如何实现呢?

// 监听newListener事件,看是否监听了data事件,如果监听了data事件的话,就开始启动流动模式,读取文件中的内容 this.on("newListener", type => { if (type === "data") { // 开启流动模式,开始读取文件中的内容 this.flowing = true; this.read(); } });

好了,知道了这个"开关"是如何打开的,那么这个时候就到了真正读取文件中内容的关键时候了,先上代码先:

read() { // 第一次读取文件的话,有可能文件是还没有打开的,此时this.fd可能还没有值 if (typeof this.fd !== "number") { // 如果此时文件还是没有打开的话,就触发一次open事件,这样文件就真的打开了,然后再读取 return this.once("open", () => this.read()); } // 具体每次读取多少个字符,需要进行计算,因为最后一次读取倒的可能比highWaterMark小 let howMuchRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark; fs.read(this.fd, this.buffer, 0, howMuchRead, this.pos, (err, byteRead) => { // this.pos 是每次读取文件读取的位置,是一个偏移量,每次读取会发生变化 this.pos += byteRead; // 将读取到的内容转换成字符串串,然后通过data事件,将内容发布出去 let srr = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead); // 将内容通过data事件发布出去 this.emit("data", srr); // 当读取到到内容长度和设置的highWaterMark一致的话,并且还是流动模式的话,就继续读取 if ((byteRead === this.highWaterMark) && this.flowing) { return this.read(); } // 没有更多的内容了,此时表示文件中的内容已经读取完毕 if (byteRead < this.highWaterMark) { // 读取完成,发布end方法,并关闭文件 this.emit("end"); this.destory(); } }); }

这里我们特别要注意的是:

文件是否已经打开,是否获取到fd,如果没有打开的话,则再次触发open方法

分批次读取文件内容,每次读取的内容是变化的,所以位置和偏移量是要动态计算的

控制读取停止的条件。

浅谈手写node可读流之流动模式

实现可读流第五步:关闭文件

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/dc13eed59dbd9252daded3ed2c115db0.html