大佬教程收集整理的这篇文章主要介绍了Node.js Stream中Readable类的内部实现,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
本次试图浅析探索Nodejs的Stream模块中对于Readable类的一部分实现(可写流也差不多)。其中会以可读流两种模式中的paused mode即暂停模式的表现形式来解读源码上的实现,为什么不分析flowing mode自然是因为这个模式是我们常用的其原理相比暂停模式下相对简单(其实是因为笔者总是喜欢关注一些边边角角的东西,不按套路出牌=。=),同时核心方法都是一样的,一通百通嘛,有兴趣的童鞋可以自己看下完整源码。
欢迎关注我的博客,不定期更新中——
首先先明确为什么Nodejs要实现一个stream,这就要清楚关于生产者消费者问题的概念。
简单来说就是内存问题。与前端不同,后端对于内存还是相当敏感的,比如读取文件这种操作,如果文件很小就算了,但如果这个文件一个g呢?难道全读出来?这肯定是不可取的。通过流的形式读一部分写一部分慢慢处理才是一个可取的方式。PS:有关为什么使用stream欢迎大家百(谷)度(歌)一下。
现在我们将自己实现一个可读流,以此来方便观察之后数据的流动过程:
const Readable = require('stream').Readable; // 实现一个可读流 class SubReadable extends Readable { constructor(datasource,options) { super(options); this.datasource = datasource; } // 文档提出必须通过_read方法调用push来实现对底层数据的读取 _read() { console.log('阈值规定大小:',arguments['0'] + ' bytes') const data = this.datasource.makeData() let result = this.push(data) if(data) console.log('添加数据大小:',data.toString().length + ' bytes') console.log('已缓存数据大小: ',subReadable._readableState.length + ' bytes') console.log('超过阈值限制或数据推送完毕:',!result) console.log('====================================') } } // 模拟资源池 const datasource = { data: new Array(1000000).fill('1'),// 每次读取时推送一定量数据 makeData() { if (!datasource.data.length) return null; return datasource.data.splice(datasource.data.length - 5000).reduce((a,b) => a + '' + b) } //每次向缓存推5000字节数据 }; const subReadable = new SubReadable(datasourcE);
先来看下整体的流程:
_read()
方式从资源读取数据到缓存池,同时设置了一个阈值highWaterMark
,标记数据到缓存池大小的一个上限,这个阈值是会浮动的,最小值也是默认值为16384。当消费者监听了readable
事件之后,就可以显式调用read()
方法来读取数据。
通过注册readable事件以此来触发暂停模式:
subReadable.on('readable',() => { console.log('缓存剩余数据大小: ',subReadable._readableState.length + ' byte') console.log('------------------------------------') })
readable
事件后可对流会从底层资源推送数据到缓存直到达到超过阈值或者底层数据全部加载完。
首先修改资源池大小data: new Array(10000).fill('1')
(方便打印数据),执行read(1000)每次读取1000字节资源读取资源:
subReadable.on('readable',() => { let chunk = subReadable.read(1000) if(chunk) console.log(`读取 ${Chunk.length} bytes数据`); console.log('缓存剩余数据大小: ',subReadable._readableState.length + ' byte') console.log('------------------------------------') })
无参调用read()
subReadable.on('readable',() => { let chunk = subReadable.read() if(chunk) console.log(`读取 ${Chunk.length} bytes数据`); console.log('缓存剩余数据大小: ',subReadable._readableState.length + ' byte') console.log('------------------------------------') })
直接调用read()
后,会逐步读取完全部资源,至于每次读取多少下文会统一探讨。
以上我们依次尝试了在实现可读流后触发暂停模式会发生的事情,接下来作者将会对以下几个可能有疑问的点进行探究:
_read()
方法并在其中调用push()
Readable.prototype._read = function(n) { this.emit('error',new errors.Error('ERR_STREAM_READ_NOT_IMPLEMENTED')); }; //只是定义接口 Readable.prototype.read = function(n) { ... var doRead = state.needReadable; if (doRead) { this._read(state.highWaterMark); } }
当我们调用subReadable.read()便会执行到上面的代码,可以发现,源码中
对于_read()
只是定义了一个接口,里面并没有具体实现,如果我们不自己定义那么就会报错。同时read()
中会执行它通过它调用push()
来从资源中读取数据,并且传入highWaterMark
,这个值你可以用也可以不用因为_read()
是我们自己实现的。
Readable.prototype.push = function(chunk,encoding) { ... return readableAddChunk(this,chunk,encoding,false,skipChunkcheck); };
从代码中可以看出,将底层资源推送到缓存中的核心操作是通过push,通过语义化也可以看出push方法中最后会进行添加新数据的操作。由于之后方法中嵌套很多,不一一展示,直接来看最后调用的方法:
// readableAddChunk最后会调用addChunk function addChunk(stream,state,addToFront) { ... state.buffer.push(chunk); //数据推送到buffer中 if (state.needReadablE)//判断此属性值来看是否触发readable事件 emitReadable(stream); maybeReadMore(stream,statE);//可能会推送更多数据到缓存 }
我们可以看出,方法调用的最后确实执行了资源数据推送到缓存的操作。与此同时在会判断needReadable属性值来看是否触发readable回调事件。而这也为之后我们来分析为什么注册了readable事件之后会执行一次回调埋下了伏笔。最后@L_74_13@maybeReadMore()则是蓄满缓存池的方法。
先来看下源码里是如何绑定的事件:
Readable.prototype.on = function(ev,fn) { if (ev === 'data') { ... } else if (ev === 'readable') { const state = this._readableState; state.needReadable = true;//设定属性为true,触发readable回调 ... process.nextTick(nReadingNextTick,this); } }; function nReadingNextTick(self) { self.read(0); } //之后执行read(0) => _read() => push() => addChunk() // => maybeReadMore()@H_68_6@maybeReadMore()中当缓存池存储大小小于阈值时则会一直调用read(0)不读取数据,但是会一直push底层资源到缓存:
function maybeReadMore_(stream,statE) { ... if (state.length < state.highWaterMark) { stream.read(0); } }
上文提到过,绑定事件后会开始推送数据至缓存池,最后会执行到addChunk()方法,内部通过needReadable属性来判断是否触发readable事件。当你第一次绑定事件时会执行state.needReadable = true;,从而在最后推送数据后会执行触发readable的操作。
read()
与传入特定数值的区别区别在执行read()方法的时候,会将参数n传入到下面这个函数中由它来计算现在应该应该读取多少数据:
function howMuchToRead(n,statE) { if (n <= 0 || (state.length === 0 && state.ended)) return 0; if (state.objectModE) return 1; if (n !== n) { // Only flow one buffer at a time if (state.flowing && state.length) return state.buffer.head.data.length; else return state.length; } // If we're asking for more than the current hwm,then raise the hwm. if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); if (n <= state.length) return n; // Don't have enough if (!state.ended) { //传输没有结束都是false state.needReadable = true; return 0; } return state.length; }
当直接调用read(),n参数则为NaN,当处于流动模式的时候n则为buffer头数据的长度,否则是整个缓存的数据长度。若为read(n)传入数字,大于当前的hwm时可以发现会重新计算一个hwm,与此同时如果已缓存的数据小于请求的数据量,那么将设置state.needReadable = true;
并返回0;
第一次试图梳理源码的思路,一路写下来发现有很多想说但是又不知道怎么连贯的理清楚=。= 既然代码细节也有些说不清,不过最后还是进行一个核心思路的提炼:
源码的边界情况比较多。作者如果哪里说错了请指正=。=
PS:源码地址
惯例po作者的博客,不定时更新中——有问题欢迎在issues下交流。
以上是大佬教程为你收集整理的Node.js Stream中Readable类的内部实现全部内容,希望文章能够帮你解决Node.js Stream中Readable类的内部实现所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。