Node.js中你不可不精的Stream(流)(5)


write() 方法有三个参数

  • chunk {String| Buffer},表示要写入的数据
  • encoding 当写入的数据是字符串的时候可以设置编码
  • callback 数据被写入之后的回调函数

drain事件

如果调用 stream.write(chunk)方法返回false,表示当前缓存区已满,流将在适当的时机(缓存区清空后)触发drain事件。

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);
 
rs.setEncoding('utf-8'); // 设置编码格式
rs.on('data', chunk => {
let flag = ws.write(chunk); // 写入数据
if (!flag) { // 如果缓存区已满暂停读取
rs.pause();
}
});
 
ws.on('drain', () => {
rs.resume(); // 缓存区已清空 继续读取写入
});

fs.createWriteStream(path[, options])源码实现

// 文件 WriteStream.js
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
 constructor(path, options = {}) {
 super();
 this.path = path;
 this.flags = options.flags || 'w';
 this.encoding = options.encoding || 'utf8';
 this.start = options.start || 0;
 this.pos = this.start;
 this.mode = options.mode || 0o666;
 this.autoClose = options.autoClose || true;
 this.highWaterMark = options.highWaterMark || 16 * 1024;
 this.open(); // fd 异步的 //触发一个open事件,当触发open事件后fd肯定就存在了

 // 写文件的时候 需要的参数有哪些
 // 第一次写入是真的往文件里写
 this.writing = false; // 默认第一次就不是正在写入
 // 用简单的数组来模拟一下缓存
 this.cache = [];
 // 维护一个变量,表示缓存的长度
 this.len = 0;
 // 是否触发drain事件
 this.needDrain = false;
 }
 clearBuffer() {
 let buffer = this.cache.shift();
 if (buffer) { // 如果缓存里有
  this._write(buffer.chunk, buffer.encoding, () => this.clearBuffer());
 } else {// 如果缓存里没有了
  if (this.needDrain) { // 需要触发drain事件
  this.writing = false; // 告诉下次直接写就可以了 不需要写到内存中了
  this.needDrain = false;
  this.emit('drain');
  }
 }
 }
 _write(chunk, encoding, clearBuffer) { // 因为write方法是同步调用的此时fd还没有获取到,所以等待获取到再执行write操作
 if (typeof this.fd != 'number') {
  return this.once('open', () => this._write(chunk, encoding, clearBuffer));
 }
 fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => {
  this.pos += byteWritten;
  this.len -= byteWritten; // 每次写入后就要在内存中减少一下
  clearBuffer(); // 第一次就写完了
 })
 }
 write(chunk, encoding = this.encoding) { // 客户调用的是write方法去写入内容
 // 要判断 chunk必须是buffer或者字符串 为了统一,如果传递的是字符串也要转成buffer
 chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
 this.len += chunk.length; // 维护缓存的长度 3
 let ret = this.len < this.highWaterMark;
 if (!ret) {
  this.needDrain = true; // 表示需要触发drain事件
 }
 if (this.writing) { // 表示正在写入,应该放到内存中
  this.cache.push({
  chunk,
  encoding,
  });
 } else { // 第一次
  this.writing = true;
  this._write(chunk, encoding, () => this.clearBuffer()); // 专门实现写的方法
 }
 return ret; // 能不能继续写了,false表示下次的写的时候就要占用更多内存了
 }
 destroy() {
 if (typeof this.fd != 'number') {
  this.emit('close');
 } else {
  fs.close(this.fd, () => {
  this.emit('close');
  });
 }
 }
 open() {
 fs.open(this.path, this.flags, this.mode, (err, fd) => {
  if (err) {
  this.emit('error', err);
  if (this.autoClose) {
   this.destroy(); // 如果自动关闭就销毁文件描述符
  }
  return;
  }
  this.fd = fd;
  this.emit('open', this.fd);
 });
 }
}
module.exports = WriteStream;
      

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/68.html