下面介绍在整个处理机制中怎么把具体的Request Call转换并调用到整体的实现逻辑。
主要以NameNode Client PRC Server作为例子来说明,整个转换通过Google Protocol Buffer RPC来实现。
final Call call = callQueue.take(); // pop the queue; maybe blocked here
.....
CurCall.set(call);
try {
// Make the call as the user via Subject.doAs, thus associating the call with the Subject
if (call.connection.user == null) {
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
call.connection.user.doAs
(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);
}
}
);
}
} catch (Throwable e) {
//process exception
}
CurCall.set(null);
synchronized (call.connection.responseQueue) {
// setupResponse() needs to be sync'ed together with responder.doResponse() since setupResponse may use
// SASL to encrypt response data and SASL enforces
// its own message ordering.
setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
: RpcStatusProto.ERROR, value, errorClass, error);
// Discard the large buf and reset it back to smaller size to free up heap
if (buf.size() > maxRespSize) {
LOG.warn("Large response size " + buf.size() + " for call "
+ call.toString());
buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
}
responder.doRespond(call);
}
从上面可以看出从call queue中取出一个call,调用call方法来处理call的逻辑,处理之后把call的结果封装成response并通过responder的doRespond把call的response加到response queue中。那行call方法是怎么处理具体call的逻辑的呢,下面就来介绍,具体call方法的逻辑在RPC中实现具体如下:
public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}
根据rpcKind来取到一个RPC Invoker实现,当然这里的Kind的Protocol Buffer,因此具体的实现在ProtobufRpcEngine中实现,具体如下:
public Writable call(RPC.Server server, String protocol,
Writable writableRequest, long receiveTime) throws Exception {
//get RPC Request
RpcRequestWritable request = (RpcRequestWritable) writableRequest;
HadoopRpcRequestProto rpcRequest = request.message;
String methodName = rpcRequest.getMethodName();//get need to invoke method name
String protoName = rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
clientVersion);
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);//find method define
if (methodDescriptor == null) {
String msg = "Unknown method " + methodName + " called on " + protocol
+ " protocol.";
LOG.warn(msg);
throw new RpcServerException(msg);
}
Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = prototype.newBuilderForType()
.mergeFrom(rpcRequest.getRequest()).build();//build parameter message
Message result;
try {
long startTime = Time.now();
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
//call method
result = service.callBlockingMethod(methodDescriptor, null, param);
//do some metrics...
} catch (ServiceException e) {
throw (Exception) e.getCause();
} catch (Exception e) {
throw e;
}
return new RpcResponseWritable(result);
}
}