Hadoop源码分析之二(RPC机制之Call处理)

下面介绍在整个处理机制中怎么把具体的Request Call转换并调用到整体的实现逻辑。

主要以NameNode Client PRC Server作为例子来说明,整个转换通过Google Protocol Buffer RPC来实现。

final Call call = callQueue.take(); // pop the queue; maybe blocked here
        .....
          CurCall.set(call);
          try {
            // Make the call as the user via Subject.doAs, thus associating the call with the Subject
            if (call.connection.user == null) {
              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
                          call.timestamp);
            } else {
              value =
                call.connection.user.doAs
                  (new PrivilegedExceptionAction<Writable>() {
                    @Override
                    public Writable run() throws Exception {
                      // make the call
                      return call(call.rpcKind, call.connection.protocolName,
                                  call.rpcRequest, call.timestamp);

}
                  }
                  );
            }
          } catch (Throwable e) {
            //process exception
          }
          CurCall.set(null);
          synchronized (call.connection.responseQueue) {
            // setupResponse() needs to be sync'ed together with responder.doResponse() since setupResponse may use
            // SASL to encrypt response data and SASL enforces
            // its own message ordering.
            setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
                : RpcStatusProto.ERROR, value, errorClass, error);
           
            // Discard the large buf and reset it back to smaller size to free up heap
            if (buf.size() > maxRespSize) {
              LOG.warn("Large response size " + buf.size() + " for call "
                  + call.toString());
              buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
            }
            responder.doRespond(call);
          }

从上面可以看出从call queue中取出一个call,调用call方法来处理call的逻辑,处理之后把call的结果封装成response并通过responder的doRespond把call的response加到response queue中。那行call方法是怎么处理具体call的逻辑的呢,下面就来介绍,具体call方法的逻辑在RPC中实现具体如下:

public Writable call(RPC.RpcKind rpcKind, String protocol,
        Writable rpcRequest, long receiveTime) throws Exception {
      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
          receiveTime);
 }

根据rpcKind来取到一个RPC Invoker实现,当然这里的Kind的Protocol Buffer,因此具体的实现在ProtobufRpcEngine中实现,具体如下:

public Writable call(RPC.Server server, String protocol,
          Writable writableRequest, long receiveTime) throws Exception {
//get RPC Request
        RpcRequestWritable request = (RpcRequestWritable) writableRequest;
        HadoopRpcRequestProto rpcRequest = request.message;
        String methodName = rpcRequest.getMethodName();//get need to invoke method name
        String protoName = rpcRequest.getDeclaringClassProtocolName();
        long clientVersion = rpcRequest.getClientProtocolVersion();
     
        ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
            clientVersion);
        BlockingService service = (BlockingService) protocolImpl.protocolImpl;
        MethodDescriptor methodDescriptor = service.getDescriptorForType()
            .findMethodByName(methodName);//find method define
        if (methodDescriptor == null) {
          String msg = "Unknown method " + methodName + " called on " + protocol
              + " protocol.";
          LOG.warn(msg);
          throw new RpcServerException(msg);
        }
        Message prototype = service.getRequestPrototype(methodDescriptor);
        Message param = prototype.newBuilderForType()
            .mergeFrom(rpcRequest.getRequest()).build();//build parameter message
        Message result;
        try {
          long startTime = Time.now();
          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
    //call method
          result = service.callBlockingMethod(methodDescriptor, null, param);
        //do some metrics...
        } catch (ServiceException e) {
          throw (Exception) e.getCause();
        } catch (Exception e) {
          throw e;
        }
        return new RpcResponseWritable(result);
      }
    }

linux

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/0c5513c2e2a5868eca54594bbc358c35.html