write() 方法有三个参数
- chunk {String| Buffer},表示要写入的数据
- encoding 当写入的数据是字符串的时候可以设置编码
- callback 数据被写入之后的回调函数
drain事件
如果调用 stream.write(chunk)方法返回false,表示当前缓存区已满,流将在适当的时机(缓存区清空后)触发drain事件。
const fs = require('fs'); const rs = fs.createReadStream(sourcePath); const ws = fs.createWriteStream(destPath); rs.setEncoding('utf-8'); // 设置编码格式 rs.on('data', chunk => { let flag = ws.write(chunk); // 写入数据 if (!flag) { // 如果缓存区已满暂停读取 rs.pause(); } }); ws.on('drain', () => { rs.resume(); // 缓存区已清空 继续读取写入 });
fs.createWriteStream(path[, options])源码实现
// 文件 WriteStream.js let fs = require('fs'); let EventEmitter = require('events'); class WriteStream extends EventEmitter { constructor(path, options = {}) { super(); this.path = path; this.flags = options.flags || 'w'; this.encoding = options.encoding || 'utf8'; this.start = options.start || 0; this.pos = this.start; this.mode = options.mode || 0o666; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark || 16 * 1024; this.open(); // fd 异步的 //触发一个open事件,当触发open事件后fd肯定就存在了 // 写文件的时候 需要的参数有哪些 // 第一次写入是真的往文件里写 this.writing = false; // 默认第一次就不是正在写入 // 用简单的数组来模拟一下缓存 this.cache = []; // 维护一个变量,表示缓存的长度 this.len = 0; // 是否触发drain事件 this.needDrain = false; } clearBuffer() { let buffer = this.cache.shift(); if (buffer) { // 如果缓存里有 this._write(buffer.chunk, buffer.encoding, () => this.clearBuffer()); } else {// 如果缓存里没有了 if (this.needDrain) { // 需要触发drain事件 this.writing = false; // 告诉下次直接写就可以了 不需要写到内存中了 this.needDrain = false; this.emit('drain'); } } } _write(chunk, encoding, clearBuffer) { // 因为write方法是同步调用的此时fd还没有获取到,所以等待获取到再执行write操作 if (typeof this.fd != 'number') { return this.once('open', () => this._write(chunk, encoding, clearBuffer)); } fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => { this.pos += byteWritten; this.len -= byteWritten; // 每次写入后就要在内存中减少一下 clearBuffer(); // 第一次就写完了 }) } write(chunk, encoding = this.encoding) { // 客户调用的是write方法去写入内容 // 要判断 chunk必须是buffer或者字符串 为了统一,如果传递的是字符串也要转成buffer chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); this.len += chunk.length; // 维护缓存的长度 3 let ret = this.len < this.highWaterMark; if (!ret) { this.needDrain = true; // 表示需要触发drain事件 } if (this.writing) { // 表示正在写入,应该放到内存中 this.cache.push({ chunk, encoding, }); } else { // 第一次 this.writing = true; this._write(chunk, encoding, () => this.clearBuffer()); // 专门实现写的方法 } return ret; // 能不能继续写了,false表示下次的写的时候就要占用更多内存了 } destroy() { if (typeof this.fd != 'number') { this.emit('close'); } else { fs.close(this.fd, () => { this.emit('close'); }); } } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit('error', err); if (this.autoClose) { this.destroy(); // 如果自动关闭就销毁文件描述符 } return; } this.fd = fd; this.emit('open', this.fd); }); } } module.exports = WriteStream;
内容版权声明:除非注明,否则皆为本站原创文章。