先说下写这个的目的,其实是好奇,dubbo是怎么实现同步转异步的,然后了解到,其依赖了请求中携带的请求id来完成这个连接复用;然后我又发现,redisson这个redis客户端,底层也是用的netty,那就比较好奇了:netty是异步的,上层是同步的,要拿结果的,同时呢,redis协议也不可能按照redisson的要求,在请求和响应里携带请求id,那,它是怎么实现同步转异步的呢,异步结果回来后,又是怎么把结果对应上的呢?
对redisson debug调试了long long time之后(你们知道的,多线程不好调试),大概理清了思路,基本就是:连接池 的思路。比如,我要访问redis:
我会先去连接池里拿一个连接(其实是一个netty的socketChannel),然后用这个连接,去发起请求。
上层新建一个promise(可写的future,熟悉completablefuture的可以秒懂,不熟悉的话,可以理解为一个阻塞队列,你去取东西,取不到,阻塞;生产者往队列放一个东西,你就不再阻塞了,且拿到了东西),把发送请求的任务交给下层的netty channel后,将promise设置为netty channel的一个attribute,然后在这个promise上阻塞等待
下层的netty channel向redis 服务器发起请求
netty接收到redis 服务器的响应后,从channel中取到第二步设置的attribute,获取到promise,此时,相当于拿到了锁,然后打开锁,并把结果设置到promise中
主线程被第四步唤醒后,拿到结果并返回。
其实问题的关键是,第二步的promise传递,要设置为channel的一个attribute,不然的话,响应回来后,也不知道把响应给谁。
理清了redisson的基本思路后,我想到了很早之前,面试oppo,二面的面试官就问了我一个问题:写过类似代理的中间件没有?(因为当时面试的是中间件部门)
然后我说没有,然后基本就凉了。
其实,中间件最主要的要求,尤其是代理这种,一方面接收请求,一方面还得作为客户端去发起请求,发起请求这一步,很容易变成性能瓶颈,不少实现里,这一步都是直接使用http client这类同步请求的工具(也是支持异步的,只是同步更常见),所以我也一直想写一个netty这种异步的客户端,同时还能同步转异步的,不能同步转异步,应用场景就比较受限了。
实现思路源码给懒得看文字的同学:
https://gitee.com/ckl111/pooled-netty-http-client.git
扯了这么多,我说下我这个http client的思路,和上面那个redisson的差不多,我这边的场景也是作为一个中间件,要访问的后端服务就几个,比如要访问:8080下的若干服务,我这边是启动时候,就会去建一个连接池(直接配置commons pool2的池化参数,我这里配置的是,2个连接),连接池好了后,netty 的channel已经是ok的了,如下所示:
这每一个长连接,是包在我们的一个核心的数据结构里的,叫NettyClient。
核心的属性,其实主要下面两个:
//要连接的host和端口 private HostAndPortConfig config; /** * 当前使用的channel */ Channel channel; NettyClient的初始化 构造函数构造函数如下:
public NettyClient(HostAndPortConfig config) { this.config = config; } @Data @AllArgsConstructor @NoArgsConstructor public class HostAndPortConfig { private String host; private Integer port; }够简单吧,先不考虑连接池,最开始测试的时候,我就是这样,直接new对象的。
public static void main(String[] args) { HostAndPortConfig config = new HostAndPortConfig("192.168.19.102", 8080); NettyClient client = new NettyClient(config); client.initConnection(); NettyHttpResponse response = client.doPost("http://192.168.19.102:8080/BOL_WebService/xxxxx.do", JSONObject.toJSONString(new Object())); if (response == null) { return; } System.out.println(response.getBody()); } 初始化连接上面的测试代码,new完对象后,开始初始化连接。
public void initConnection() { log.info("initConnection starts..."); Bootstrap bootstrap; //1.创建netty所需的bootstrap配置 bootstrap = createBootstrap(config); //2.发起连接 ChannelFuture future = bootstrap.connect(config.getHost(), config.getPort()); log.info("current thread:{}", Thread.currentThread().getName()); //3.等待连接成功 boolean ret = future.awaitUninterruptibly(2000, MILLISECONDS); boolean bIsSuccess = ret && future.isSuccess(); if (!bIsSuccess) { //4.不成功抛异常 bIsConnectionOk = false; log.error("host config:{}",config); throw new RuntimeException("连接失败"); } //5.走到这里,说明成功了,新的channle赋值给field cleanOldChannelAndCancelReconnect(future, channel); bIsConnectionOk = true; }