说完了netty client,我们再说说调用的过程:
public NettyHttpResponse doPost(String url, Object body) { NettyHttpRequest request = new NettyHttpRequest(url, body); return doHttpRequest(request); } private static final DefaultEventLoop NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP = new DefaultEventLoop(null, new NamedThreadFactory("NettyResponsePromiseNotify")); private NettyHttpResponse doHttpRequest(NettyHttpRequest request) { // 1 Promise<NettyHttpResponse> defaultPromise = NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP.newPromise(); // 2 NettyHttpRequestContext context = new NettyHttpRequestContext(request, defaultPromise); channel.attr(CURRENT_REQ_BOUND_WITH_THE_CHANNEL).set(context); // 3 ChannelFuture channelFuture = channel.writeAndFlush(request); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { System.out.println(Thread.currentThread().getName() + " 请求发送完成"); } }); // 4 return get(defaultPromise); }上面我已经标注了几个数字,分别讲一下:
新建一个promise,可以理解为一把可以我们手动完成的锁(一般主线程在这个锁上等待,在另一个线程去完成)
把锁和其他请求信息,一起放到channle里
使用channle发送数据
同步等待
第四步等待的get方法如下:
public <V> V get(Promise<V> future) { // 1. if (!future.isDone()) { CountDownLatch l = new CountDownLatch(1); future.addListener(new GenericFutureListener<Future<? super V>>() { @Override public void operationComplete(Future<? super V> future) throws Exception { log.info("received response,listener is invoked"); if (future.isDone()) { // 2 // promise的线程池,会回调该listener l.countDown(); } } }); boolean interrupted = false; if (!future.isDone()) { try { // 3 l.await(4, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("e:{}", e); interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } //4 if (future.isSuccess()) { return future.getNow(); } log.error("wait result time out "); return null; }如果promise的状态还是没有完成,则我们new了一个闭锁
加了一个listner在promise上面,别人操作这个promise,这个listener会被回调,回调逻辑:将闭锁打开
主线程,在闭锁上等待
主线程,走到这里,说明已经等待超时,或者已经完成,可以获取结果并返回
什么地方会修改promise前面我们提到了,在response的handler中:
/** * http请求响应的处理器 */ @Slf4j public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception { String s = fullHttpResponse.content().toString(CharsetUtil.UTF_8); NettyHttpResponse nettyHttpResponse = NettyHttpResponse.successResponse(s); // 1. NettyHttpRequestContext nettyHttpRequestContext = (NettyHttpRequestContext) ctx.channel().attr(NettyClient.CURRENT_REQ_BOUND_WITH_THE_CHANNEL).get(); log.info("req url:{},params:{},resp:{}", nettyHttpRequestContext.getNettyHttpRequest().getFullUrl(), nettyHttpRequestContext.getNettyHttpRequest().getBody(), nettyHttpResponse); // 2. Promise<NettyHttpResponse> promise = nettyHttpRequestContext.getDefaultPromise(); promise.setSuccess(nettyHttpResponse); } }其中,2处,修改promise,此时就会回调前面说的那个listenr,打开闭锁,主线程也因此得以继续执行:
public <V> V get(Promise<V> future) { if (!future.isDone()) { CountDownLatch l = new CountDownLatch(1); future.addListener(new GenericFutureListener<Future<? super V>>() { @Override public void operationComplete(Future<? super V> future) throws Exception { log.info("received response,listener is invoked"); if (future.isDone()) { // io线程会回调该listener l.countDown(); } } }); ..... } 总结本篇的大致思路差不多就是这样了,主要逻辑在于同步转异步那一块。
还有些没讲到的,后面再讲,大概还有2个部分。
断线重连
commons pool实现连接池。
代码我放在:
https://gitee.com/ckl111/pooled-netty-http-client.git