曹工杂谈:花了两天时间,写了一个netty实现的http客户端,支持同步转异步和连接池(1)--核心逻辑讲解 (3)

说完了netty client,我们再说说调用的过程:

public NettyHttpResponse doPost(String url, Object body) { NettyHttpRequest request = new NettyHttpRequest(url, body); return doHttpRequest(request); } private static final DefaultEventLoop NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP = new DefaultEventLoop(null, new NamedThreadFactory("NettyResponsePromiseNotify")); private NettyHttpResponse doHttpRequest(NettyHttpRequest request) { // 1 Promise<NettyHttpResponse> defaultPromise = NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP.newPromise(); // 2 NettyHttpRequestContext context = new NettyHttpRequestContext(request, defaultPromise); channel.attr(CURRENT_REQ_BOUND_WITH_THE_CHANNEL).set(context); // 3 ChannelFuture channelFuture = channel.writeAndFlush(request); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { System.out.println(Thread.currentThread().getName() + " 请求发送完成"); } }); // 4 return get(defaultPromise); }

上面我已经标注了几个数字,分别讲一下:

新建一个promise,可以理解为一把可以我们手动完成的锁(一般主线程在这个锁上等待,在另一个线程去完成)

把锁和其他请求信息,一起放到channle里

使用channle发送数据

同步等待

第四步等待的get方法如下:

public <V> V get(Promise<V> future) { // 1. if (!future.isDone()) { CountDownLatch l = new CountDownLatch(1); future.addListener(new GenericFutureListener<Future<? super V>>() { @Override public void operationComplete(Future<? super V> future) throws Exception { log.info("received response,listener is invoked"); if (future.isDone()) { // 2 // promise的线程池,会回调该listener l.countDown(); } } }); boolean interrupted = false; if (!future.isDone()) { try { // 3 l.await(4, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("e:{}", e); interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } //4 if (future.isSuccess()) { return future.getNow(); } log.error("wait result time out "); return null; }

如果promise的状态还是没有完成,则我们new了一个闭锁

加了一个listner在promise上面,别人操作这个promise,这个listener会被回调,回调逻辑:将闭锁打开

主线程,在闭锁上等待

主线程,走到这里,说明已经等待超时,或者已经完成,可以获取结果并返回

什么地方会修改promise

前面我们提到了,在response的handler中:

/** * http请求响应的处理器 */ @Slf4j public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception { String s = fullHttpResponse.content().toString(CharsetUtil.UTF_8); NettyHttpResponse nettyHttpResponse = NettyHttpResponse.successResponse(s); // 1. NettyHttpRequestContext nettyHttpRequestContext = (NettyHttpRequestContext) ctx.channel().attr(NettyClient.CURRENT_REQ_BOUND_WITH_THE_CHANNEL).get(); log.info("req url:{},params:{},resp:{}", nettyHttpRequestContext.getNettyHttpRequest().getFullUrl(), nettyHttpRequestContext.getNettyHttpRequest().getBody(), nettyHttpResponse); // 2. Promise<NettyHttpResponse> promise = nettyHttpRequestContext.getDefaultPromise(); promise.setSuccess(nettyHttpResponse); } }

其中,2处,修改promise,此时就会回调前面说的那个listenr,打开闭锁,主线程也因此得以继续执行:

public <V> V get(Promise<V> future) { if (!future.isDone()) { CountDownLatch l = new CountDownLatch(1); future.addListener(new GenericFutureListener<Future<? super V>>() { @Override public void operationComplete(Future<? super V> future) throws Exception { log.info("received response,listener is invoked"); if (future.isDone()) { // io线程会回调该listener l.countDown(); } } }); ..... } 总结

本篇的大致思路差不多就是这样了,主要逻辑在于同步转异步那一块。

还有些没讲到的,后面再讲,大概还有2个部分。

断线重连

commons pool实现连接池。

代码我放在:

https://gitee.com/ckl111/pooled-netty-http-client.git

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzsgy.html