Node.js的进程管理的深入理解(7)

主进程与子进程通信

那么到底在哪里对端口进行了监听呢?

前面提到过,fork子进程的时候,对子进程进行了internalMessage事件的监听。

worker.process.on('internalMessage', internal(worker, onmessage));

子进程向master进程发送消息,一般使用process.send方法,会被监听的message事件所接收。这里是因为发送的message指定了cmd: 'NODE_CLUSTER',只要cmd字段以NODE_开头,这样消息就会认为是内部通信,被internalMessage事件所接收。

// child.js
function send(message, cb) {
 return sendHelper(process, message, null, cb);
}

// utils.js
function sendHelper(proc, message, handle, cb) {
 if (!proc.connected)
  return false;

 // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
 message = { cmd: 'NODE_CLUSTER', ...message, seq };

 if (typeof cb === 'function')
  callbacks.set(seq, cb);

 seq += 1;
 return proc.send(message, handle);
}

master进程接收到消息后,根据act的类型开始执行不同的方法,这里act为queryServer。queryServer方法会构造一个key,如果这个key(规则主要为地址+端口+文件描述符)之前不存在,则对RoundRobinHandle构造函数进行了实例化,RoundRobinHandle构造函数中启动了一个TCP服务,并对之前指定的端口进行了监听。

// master.js
const handles = new Map();

function onmessage(message, handle) {
 const worker = this;
 if (message.act === 'online')
  online(worker);
 else if (message.act === 'queryServer')
  queryServer(worker, message);
 // other act logic
}
function queryServer(worker, message) {
 // ...
 const key = `${message.address}:${message.port}:${message.addressType}:` +
       `${message.fd}:${message.index}`;
 var handle = handles.get(key);
 // 如果之前没有对该key进行实例化,则进行实例化
 if (handle === undefined) {
  let address = message.address;
  // const RoundRobinHandle = require('internal/cluster/round_robin_handle');
  var constructor = RoundRobinHandle;

  handle = new constructor(key,
               address,
               message.port,
               message.addressType,
               message.fd,
               message.flags);
  handles.set(key, handle);
 }
 // ...
}

// internal/cluster/round_robin_handle
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
 this.server = net.createServer(assert.fail);
 // 这里启动一个TCP服务器
 this.server.listen({ port, host });
 
 // TCP服务器启动时的事件
 this.server.once('listening', () => {
  this.handle = this.server._handle;
  this.handle.onconnection = (err, handle) => this.distribute(err, handle);
 });
 // ...
}

可以看到TCP服务启动后,立马对connection