深入探究node之Transform(2)

示例代码很简单,创建了一个可读流,向消费者提供a-z的小写字母;创建了一个转换流,在_transform函数中针对数据并不做处理仅作打点输出,并向回调函数传递数据至读缓冲区。我们的目的是通过transform输出26个小写字母,但是当前程序执行的结果并不让人满意:

执行结果:
push a
push b
transform a
push c
transform b
push d
push e
push f

tranform仅仅处理到字母b,readable也仅仅提供了a-f的数据便戛然而止,这是为何?

这一切都归结于transform对象。认真读过上文后我们知道,所有的Transform实例同时有两个缓冲区,其中写缓冲区用来接收生产者的数据进行转换操作,读缓冲区则缓存数据给消费者使用。而在当前的实现中,transform._transform函数输出了待处理数据,同时执行next(null, buf);。该函数上文已有分析,即afterTransform函数,第一个参数为Error实例,第二个则为存入读缓冲区的数据。在本例中,执行完_transform后将处理后的数据存入读缓冲区,等待后面的消费者消费读缓冲区的数据。可是,transform后面没有消费者了,因此transform在处理完字母b存入读缓冲区后,读缓冲区已经满了(设定highWaterMark为2,即读写缓冲区的最大值均为2字节)。当字母c、d也执行到tranform._write后,由于不满足执行transform._read的条件无法执行transform._transform函数,更无法执行afterTransform函数,导致无法刷新写缓冲区的数据,造成字母c、d贮存在写缓冲区。而字母e、f则由于transform的写缓冲区满(transform.write()返回false),只有存储在readable的读缓冲区中,等待消费。这就造成了死循环,readable和transform所有的缓冲区都满了,流也就停止了。

解决这个问题的方法很简单,有两种不同方案:

1.transform的读缓冲区保持为空

2.增加消费者消费transform的读缓冲区

其实本质上都是让transform的读缓冲区得到消耗。

第一种方案:

保证transform的读缓冲区为空:

const transform = stream.Transform({ highWaterMark: 2, transform: function (buf, enc, next) { console.log('transform', buf.toString()) next(null, null) } })

只需向next函数传入null即可,这样transform消费完数据后即宣告数据处理结束,读缓冲区始终为空。

第二种方案:

添加消费者:

const transform = stream.Transform({ highWaterMark: 2, transform: function (buf, enc, next) { console.log('transform', buf.toString()) next(null, buf) } }) readable.pipe(transform).pipe(process.stdout);

transform实现不变,只是添加了消费者process.stdout。这样也同时保证了transform的读缓冲区处于可添加状态,也给了afterTransform函数刷新写缓冲区的机会,开启新的数据处理流程。

through2的实现

through2的重头戏在于Transform流,使用through2的API可方便的创建一个Transform实例,完成数据流的处理。

function through2 (construct) { return function (options, transform, flush) { if (typeof options == 'function') { flush = transform transform = options options = {} } if (typeof transform != 'function') transform = noop if (typeof flush != 'function') flush = null return construct(options, transform, flush) } } module.exports = through2(function (options, transform, flush) { var t2 = new DestroyableTransform(options) t2._transform = transform if (flush) t2._flush = flush return t2 })

可见,through2模块仅仅是封装了Transform的构造函数,并封装了更为易用的objectMode模式。之所以建议使用through2创建Transform对象,不仅仅是因为其提供了方便的API,更主要的是为了兼容性。Transform对象是属于Stream2.0的特性,早先版本的node并没有实现,而通过through2创建的Transform实例在之前版本的node下仍可正常使用,这是由于through2并未引用node默认提供的stream模块,而是使用社区中较为流行的“readable-stream”模块。

总结

本文旨在深入through2中的使用的Transform流进行探究,并作为上一篇文章node中的stream的回顾和应用。通过文末简单的示例了解Transform在开发中可能出现的问题,学会随意切换Transform的生产者和消费者的身份,更好的指导实际开发。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wyjjgz.html