格物致知-记一次Nodejs源码分析的经历

本文转载自微信公众号「编程杂技」,作者theanarkh。转载本文请联系编程杂技公众号。 

蒲江县ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联建站的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!

昨天分析http模块相关的代码时,遇到了一个晦涩的逻辑,看了想,想了看还是没看懂。百度、谷歌了很多帖子也没看到合适的答案。突然看到一个题目有点相识的搜索结果,点进去是Stack Overflow上的帖子,但是已经404,最后还是通过快照功能成功看到内容。这个帖子[1]和我的疑惑不相关,但是突然给了我一些灵感。沿着这个灵感去看了代码,最后下载nodejs源码,加了一些log,编译了一夜(太久了,等不及编译完成,得睡觉了)。上午起来验证,终于揭开了疑惑。这个问题源于下面这段代码。

 
 
 
 
  1. function connectionListenerInternal(server, socket) { 
  2.   socket.server = server; 
  3.   // 分配一个http解析器 
  4.   const parser = parsers.alloc(); 
  5.   // 解析请求报文 
  6.   parser.initialize( 
  7.     HTTPParser.REQUEST, 
  8.     new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket), 
  9.     server.maxHeaderSize || 0, 
  10.     server.insecureHTTPParser === undefined ? 
  11.       isLenient() : server.insecureHTTPParser, 
  12.   ); 
  13.   parser.socket = socket; 
  14.   // 开始解析头部的开始时间 
  15.   parser.parsingHeadersStart = nowDate(); 
  16.   socket.parser = parser; 
  17.   const state = { 
  18.     onData: null, 
  19.     onEnd: null, 
  20.     onClose: null, 
  21.     onDrain: null, 
  22.     // 同一tcp连接上,请求和响应的的队列 
  23.     outgoing: [], 
  24.     incoming: [], 
  25.     outgoingData: 0, 
  26.     keepAliveTimeoutSet: false 
  27.   }; 
  28.   state.onData = socketOnData.bind(undefined, server, socket, parser, state); 
  29.   socket.on('data', state.onData); 
  30.  
  31.   if (socket._handle && socket._handle.isStreamBase && 
  32.       !socket._handle._consumed) { 
  33.     parser._consumed = true; 
  34.     socket._handle._consumed = true; 
  35.     parser.consume(socket._handle); 
  36.   } 
  37.   parser[kOnExecute] = 
  38.     onParserExecute.bind(undefined, server, socket, parser, state); 
  39.  
  40.   socket._paused = false; 

这段代码看起来很多,这是启动http服务器后,有新的tcp连接建立时执行的回调。问题在于tcp上有数据到来时,是怎么处理的,上面代码中nodejs监听了socket的data事件,同时注册了钩子kOnExecute。data事件我们都知道是流上有数据到来时触发的事件。我们看一下socketOnData做了什么事情。

 
 
 
 
  1. function socketOnData(server, socket, parser, state, d) { 
  2.   // 交给http解析器处理,返回已经解析的字节数 
  3.   const ret = parser.execute(d); 
  4.   onParserExecuteCommon(server, socket, parser, state, ret, d); 

这看起来没有问题,socket上有数据,然后交给http解析器处理。几乎所有http模块源码解析的文章也是这样分析的,我第一反应也觉得这个没问题,那kOnExecute是做什么的呢?kOnExecute钩子函数的值是onParserExecute,这个看起来也是解析tcp上的数据的,看起来和onSocketData是一样的作用,难道tcp上的数据有两个消费者?我们看一下kOnExecute什么时候被回调的。

 
 
 
 
  1. void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override { 
  2.  
  3.     Local ret = Execute(buf.base, nread); 
  4.     Local cb = 
  5.         object()->Get(env()->context(), kOnExecute).ToLocalChecked(); 
  6.     MakeCallback(cb.As(), 1, &ret); 
  7.   } 

在node_http_parser.cc中的OnStreamRead中被回调,那么OnStreamRead又是什么时候被回调的呢?OnStreamRead是nodejs中c++层流操作的通用函数,当流有数据的时候就会执行该回调。而且OnStreamRead中也会把数据交给http解析器解析。这看起来真的有两个消费者?这就很奇怪,为什么一份数据会交给http解析器处理两次?这时候我的想法就是这两个地方肯定是互斥的。但是我一直没有找到是哪里做了处理。最后在connectionListenerInternal的一段代码中找到了答案。

 
 
 
 
  1. if (socket._handle && socket._handle.isStreamBase && !socket._handle._consumed) { 
  2.     parser._consumed = true; 
  3.     socket._handle._consumed = true; 
  4.     parser.consume(socket._handle); 
  5.   } 

因为tcp流是继承StreamBase类的,所以if成立(后面会具体分析)。我们看一下consume的实现。

 
 
 
 
  1. static void Consume(const FunctionCallbackInfo& args) { 
  2.     Parser* parser; 
  3.     ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder()); 
  4.     CHECK(args[0]->IsObject()); 
  5.     StreamBase* stream = StreamBase::FromObjject(args[0].As()); 
  6.     CHECK_NOT_NULL(stream); 
  7.     stream->PushStreamListener(parser); 
  8.   } 
  9. http解析器把自己注册为tcp stream的一个listener。这里涉及到了c++层对流的设计。我们从头开始。看一下PushStreamListener做了什么事情。c++层中,流的操作由类StreamResource进行了封装。

     
     
     
     
    1. class StreamResource { 
    2.  public: 
    3.   virtual ~StreamResource(); 
    4.   virtual int ReadStart() = 0; 
    5.   virtual int ReadStop() = 0; 
    6.   virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; 
    7.   virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); 
    8.   virtual int DoWrite(WriteWrap* w, 
    9.                       uv_buf_t* bufs, 
    10.                       size_t count, 
    11.                       uv_stream_t* send_handle) = 0; 
    12.   void PushStreamListener(StreamListener* listener); 
    13.   void RemoveStreamListener(StreamListener* listener); 
    14.  
    15.  protected: 
    16.   uv_buf_t EmitAlloc(size_t suggested_size); 
    17.   void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0)); 
    18.  
    19.   StreamListener* listener_ = nullptr; 
    20.   uint64_t bytes_read_ = 0; 
    21.   uint64_t bytes_written_ = 0; 
    22.   friend class StreamListener; 
    23. }; 

    我们看到StreamResource是一个基类,定义了操作流的公共方法。其中有一个成员是StreamListener类的实例。我们看看StreamListener的实现。

     
     
     
     
    1. class StreamListener { 
    2.  public: 
    3.   virtual ~StreamListener(); 
    4.   virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0; 
    5.   virtual void OnStreamRead(ssize_t nread, 
    6.                             const uv_buf_t& buf) = 0; 
    7.   virtual void OnStreamDestroy() {} 
    8.   inline StreamResource* stream() { return stream_; } 
    9.  
    10.  protected: 
    11.   void PassReadErrorToPreviousListener(ssize_t nread); 
    12.  
    13.   StreamResource* stream_ = nullptr; 
    14.   StreamListener* previous_listener_ = nullptr; 
    15.   friend class StreamResource; 
    16. }; 

    StreamListener是一个负责消费流数据的类。StreamListener 和StreamResource类的关系如下。

    null我们看到一个流可以注册多个listener,多个listener形成一个链表。接着我们看一下创建一个c++层的tcp对象是怎样的。下面是TCPWrap的继承关系。

     
     
     
     
    1. class TCPWrap : public ConnectionWrap{} 
    2. class ConnectionWrap : public LibuvStreamWrap{} 
    3. class LibuvStreamWrap : public HandleWrap, public StreamBase{} 
    4. class StreamBase : public StreamResource {} 

    我们看到tcp流是继承于StreamResource的。新建一个tcp的c++的对象时(tcp_wrap.cc),会不断往上调用父类的构造函数,其中在StreamBase中有一个关键的操作。

     
     
     
     
    1. inline StreamBase::StreamBase(Environment* env) : env_(env) { 
    2.   PushStreamListener(&default_listener_); 
    3.  
    4. EmitToJSStreamListener default_listener_; 

    StreamBase会默认给流注册一个listener。我们看下EmitToJSStreamListener 具体的定义。

     
     
     
     
    1. class ReportWritesToJSStreamListener : public StreamListener { 
    2.  public: 
    3.   void OnStreamAfterWrite(WriteWrap* w, int status) override; 
    4.   void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; 
    5.  
    6.  private: 
    7.   void OnStreamAfterReqFinished(StreamReq* req_wrap, int status); 
    8. }; 
    9.  
    10. class EmitToJSStreamListener : public ReportWritesToJSStreamListener { 
    11.  public: 
    12.   uv_buf_t OnStreamAlloc(size_t suggested_size) override; 
    13.   void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; 
    14. }; 

    EmitToJSStreamListener继承StreamListener ,定义了分配内存和读取接收数据的函数。接着我们看一下PushStreamListener做了什么事情。

     
     
     
     
    1. inline void StreamResource::PushStreamListener(StreamListener* listener) { 
    2.   // 头插法  
    3.   listener->previous_listener_ = listener_; 
    4.   listener->stream_ = this; 
    5.   listener_ = listener; 

    PushStreamListener就是构造出上图的结构。对应到创建一个c++层的tcp对象中,如下图。

    然后我们看一下对于流来说,读取数据的整个链路。首先是js层调用readStart

     
     
     
     
    1. function tryReadStart(socket) { 
    2.   socket._handle.reading = true; 
    3.   const err = socket._handle.readStart(); 
    4.   if (err) 
    5.     socket.destroy(errnoException(err, 'read')); 
    6.  
    7. // 注册等待读事件 
    8. Socket.prototype._read = function(n) { 
    9.   tryReadStart(this); 
    10. }; 

    我们看看readStart

     
     
     
     
    1. int LibuvStreamWrap::ReadStart() { 
    2.   return uv_read_start(stream(), [](uv_handle_t* handle, 
    3.                                     size_t suggested_size, 
    4.                                     uv_buf_t* buf) { 
    5.     static_cast(handle->data)->OnUvAlloc(suggested_size, buf); 
    6.   }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { 
    7.     static_cast(stream->data)->OnUvRead(nread, buf); 
    8.   }); 

    ReadStart调用libuv的uv_read_start注册等待可读事件,并且注册了两个回调函数OnUvAlloc和OnUvRead。

     
     
     
     
    1. void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) { 
    2.    EmitRead(nread, *buf); 
    3.  
    4. inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) { 
    5.   // bytes_read_表示已读的字节数 
    6.   if (nread > 0) 
    7.     bytes_read_ += static_cast(nread); 
    8.   listener_->OnStreamRead(nread, buf); 

    通过层层调用最后会调用listener_的OnStreamRead。我们看看tcp的OnStreamRead

     
     
     
     
    1. void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { 
    2.   StreamBase* stream = static_cast(stream_); 
    3.   Environment* env = stream->stream_env(); 
    4.   HandleScope handle_scope(env->isolate()); 
    5.   Context::Scope context_scope(env->context()); 
    6.   AllocatedBuffer buf(env, buf_); 
    7.   stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer()); 

    继续回调CallJSOnreadMethod

     
     
     
     
    1. MaybeLocal StreamBase::CallJSOnreadMethod(ssize_t nread, 
    2.                                                  Local ab, 
    3.                                                  size_t offset, 
    4.                                                  StreamBaseJSChecks checks) { 
    5.   Environment* env = env_; 
    6.   // ... 
    7.   AsyncWrap* wrap = GetAsyncWrap(); 
    8.   CHECK_NOT_NULL(wrap); 
    9.   Local onread = wrap->object()->GetInternalField(kOnReadFunctionField); 
    10.   CHECK(onread->IsFunction()); 
    11.   return wrap->MakeCallback(onread.As(), arraysize(argv), argv); 

    CallJSOnreadMethod会回调js层的onread回调函数。onread会把数据push到流中,然后触发data事件。这是tcp里默认的数据读取过程。而文章开头讲到的parser.consume打破了这个默认行为。stream->PushStreamListener(parser);修改了tcp流的listener链,http parser把自己作为数据的接收者。所以这时候tcp流上的数据是直接由node_http_parser.cc的OnStreamRead消费的。而不是触发socket的data事件,最后通过在nodejs源码中加log,重新编译验证的确如文中所述。最后提一个这个过程中还有一个关键的地方是调用consume函数的前提是socket._handle.isStreamBase为true。isStreamBase是在StreamBase::AddMethods中定义为true的,而tcp对象创建的过程中,调用了这个方法,所以tcp的isStreamBase是true,才会执行consume,才会执行kOnExecute回调。

    References

    [1] 帖子: http://cache.baiducontent.com/c?m=rZy2XovtTdJJuXWLM-s8wgpaz8NFubewtolyiC19iAKFJrbGdx2EFnArzlAIDisNP70zWWsCPv-4jwMHTGNcLaUsMVr-lvLqYmmHD-w_fUYz6a5K6OQRC9kZmLYN5RXsb34OdINb8xHIJsdyClaEWOtCGKMQ2saYK7ed7OG8v0E1pRKR4K46phl0rCBrw6amXE3QpPo62dMhvu_VASYYqq&p=cb77c64ad49111a05bee9e264d5693&newp=882a9646dc9712a05ab7cc374f0ccc231615d70e3ad3d501298ffe0cc4241a1a1a3aecbf2d29170ed6c27f630bae4856ecf630723d0834f1f689df08d2ecce7e7b&s=cfcd208495d565ef&user=baidu&fm=sc&query=onParserExecute&qid=869f73bc002e44f5&p1=11

    文章名称:格物致知-记一次Nodejs源码分析的经历
    网页链接:http://www.shufengxianlan.com/qtweb/news8/157658.html

    网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

    广告

    声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联