这里初始化连接是直接同步等待的,如果失败,直接抛异常。第5步里,主要是把新的channel赋值给当前对象的一个field,同时,关闭旧的channle之类的。
private void cleanOldChannelAndCancelReconnect(ChannelFuture future, Channel oldChannel) { /** * 连接成功,关闭旧的channel,再用新的channel赋值给field */ try { if (oldChannel != null) { try { log.info("Close old netty channel " + oldChannel); oldChannel.close(); } catch (Exception e) { log.error("e:{}", e); } } } finally { /** * 新channel覆盖field */ NettyClient.this.channel = future.channel(); NettyClient.this.bIsConnectionOk = true; log.info("connection is ok,new channel:{}", NettyClient.this.channel); if (NettyClient.this.scheduledFuture != null) { log.info("cancel scheduledFuture"); NettyClient.this.scheduledFuture.cancel(true); } } } netty client中,涉及的出站handler这里说下前面的bootstrap的构造,如下:
private Bootstrap createBootstrap(HostAndPortConfig config) { Bootstrap bootstrap = new Bootstrap() .channel(NioSocketChannel.class) .group(NIO_EVENT_LOOP_GROUP); bootstrap.handler(new CustomChannelInitializer(bootstrap, config, this)); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); return bootstrap; }handler 链,主要在CustomChannelInitializer类中。
protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // http客户端编解码器,包括了客户端http请求编码,http响应的解码 pipeline.addLast(new HttpClientCodec()); // 把多个HTTP请求中的数据组装成一个 pipeline.addLast(new HttpObjectAggregator(65536)); // 用于处理大数据流 pipeline.addLast(new ChunkedWriteHandler()); /** * 重连handler */ pipeline.addLast(new ReconnectHandler(nettyClient)); /** * 发送业务数据前,进行json编码 */ pipeline.addLast(new HttpJsonRequestEncoder()); pipeline.addLast(new HttpResponseHandler()); }其中,出站时(即客户端向外部write时),涉及的handler如下:
HttpJsonRequestEncoder,把业务对象,转变为httpRequest
HttpClientCodec,把第一步传给我们的httpRequest,编码为bytebuf,交给channel发送
简单说下HttpJsonRequestEncoder,这个是我自定义的:
/** * http请求发送前,使用该编码器进行编码 * * 本来是打算在这里编码body为json,感觉没必要,直接上移到工具类了 */ public class HttpJsonRequestEncoder extends MessageToMessageEncoder<NettyHttpRequest> { final static String CHARSET_NAME = "UTF-8"; final static Charset UTF_8 = Charset.forName(CHARSET_NAME); @Override protected void encode(ChannelHandlerContext ctx, NettyHttpRequest nettyHttpRequest, List<Object> out) { // 1. 这个就是要最终传递出去的对象 FullHttpRequest request = null; if (nettyHttpRequest.getHttpMethod() == HttpMethod.POST) { ByteBuf encodeBuf = Unpooled.copiedBuffer((CharSequence) nettyHttpRequest.getBody(), UTF_8); request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, nettyHttpRequest.getUri(), encodeBuf); HttpUtil.setContentLength(request, encodeBuf.readableBytes()); } else if (nettyHttpRequest.getHttpMethod() == HttpMethod.GET) { request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, nettyHttpRequest.getUri()); } else { throw new RuntimeException(); } //2. 填充header populateHeaders(ctx, request); out.add(request); } private void populateHeaders(ChannelHandlerContext ctx, FullHttpRequest request) { /** * headers 设置 */ HttpHeaders headers = request.headers(); headers.set(HttpHeaderNames.HOST, ctx.channel().remoteAddress().toString().substring(1)); headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); headers.set(HttpHeaderNames.CONTENT_TYPE, "application/json"); /** * 设置我方可以接收的 */ headers.set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP.toString() + ',' + HttpHeaderValues.DEFLATE.toString()); headers.set(HttpHeaderNames.ACCEPT_CHARSET, "utf-8,ISO-8859-1;q=0.7,*;q=0.7"); headers.set(HttpHeaderNames.ACCEPT_LANGUAGE, "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"); headers.set(HttpHeaderNames.ACCEPT, "*/*"); /** * 设置agent */ headers.set(HttpHeaderNames.USER_AGENT, "Netty xml Http Client side"); } } netty client涉及的入站handlerHttpClientCodec和HttpObjectAggregator,主要是将bytebuf,转变为io.netty.handler.codec.http.FullHttpResponse 类型的对象
HttpResponseHandler,我们的业务handler
/** * http请求响应的处理器 */ @Slf4j public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception { String s = fullHttpResponse.content().toString(CharsetUtil.UTF_8); NettyHttpResponse nettyHttpResponse = NettyHttpResponse.successResponse(s); // 1. NettyHttpRequestContext nettyHttpRequestContext = (NettyHttpRequestContext) ctx.channel().attr(NettyClient.CURRENT_REQ_BOUND_WITH_THE_CHANNEL).get(); log.info("req url:{},params:{},resp:{}", nettyHttpRequestContext.getNettyHttpRequest().getFullUrl(), nettyHttpRequestContext.getNettyHttpRequest().getBody(), nettyHttpResponse); // 2. Promise<NettyHttpResponse> promise = nettyHttpRequestContext.getDefaultPromise(); promise.setSuccess(nettyHttpResponse); } }1处代码,主要从channel中,根据key,获取当前的请求相关信息
2处代码,从当前请求中,拿到promise,设置结果,此时,会唤醒主线程。
netty client 发起http post调用