曹工杂谈:Spring boot应用,自己动手用Netty替换底层Tomcat容器 (2)

解决方式是,启动完成后,给dispatcherServlet设置这个field的值,同时,初始化我们的servlet(这里提一句,还记得servlet的生命周期吗,就是那个东西):

import org.springframework.mock.web.MockServletConfig; /** * 从spring上下文获取 DispatcherServlet,设置其字段config为mockServletConfig */ DispatcherServlet dispatcherServlet = applicationContext.getBean(DispatcherServlet.class); MockServletConfig myServletConfig = new MockServletConfig(); MyReflectionUtils.setFieldValue(dispatcherServlet,"config",myServletConfig); /** * 初始化servlet */ try { dispatcherServlet.init(); } catch (ServletException e) { log.error("e:{}",e); } netty处理过程 大致流程

这里,我们再将总共流程图贴一下:

曹工杂谈:Spring boot应用,自己动手用Netty替换底层Tomcat容器

中间的三个handler,是我们自定义的。每个handler具体做的事情,写得比较清楚了。具体看下面的com.ceiec.router.netty.DispatcherServletChannelInitializer:

public class DispatcherServletChannelInitializer extends ChannelInitializer<SocketChannel> { //可以使用单独的线程池,来处理业务请求 private static DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(4,new NamedThreadFactory("business_servlet")); @Override public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); // 对通信数据进行编解码 pipeline.addLast(new HttpServerCodec()); // 把多个HTTP请求中的数据组装成一个 pipeline.addLast(new HttpObjectAggregator(65536)); // 用于处理大的数据流 pipeline.addLast(new ChunkedWriteHandler()); /** * 生成servlet使用的request */ pipeline.addLast("GenerateServletRequestHandler", new GenerateServletRequestHandler()); /** * 过滤器处理器,模拟servlet中的 filter 链 */ FilterNettyHandler filterNettyHandler = SpringContextUtils.getApplicationContext().getBean(FilterNettyHandler.class); pipeline.addLast("FilterNettyHandler", filterNettyHandler); /** * 真正的业务handler,转交给:spring mvc的dispatcherServlet 处理 */ DispatcherServletHandler dispatcherServletHandler = SpringContextUtils.getApplicationContext().getBean(DispatcherServletHandler.class); //pipeline.addLast("dispatcherServletHandler", dispatcherServletHandler); // 使用下面的重载方法,第一个参数为线程池,则这里会异步执行我们的业务逻辑,正常也应该这样,避免长时间阻塞io线程 pipeline.addLast(eventExecutors,"handler", new ServletNettyHandler(dispatcherServlet)); } } 原始netty的http请求,转成servlet http请求

其中,GenerateServletRequestHandler完成这部分工作,传递给下一个handler的,就是MockHttpServletRequest类型:

@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception { if (!fullHttpRequest.decoderResult().isSuccess()) { sendError(channelHandlerContext, BAD_REQUEST); return; } // 设置请求的会话id String token = UUID.randomUUID().toString().replace("-", ""); MDC.put(SESSION_KEY, token); String remoteIP = getRemoteIP(fullHttpRequest, channelHandlerContext); MockHttpServletRequest servletRequest = createServletRequest(fullHttpRequest); String s = fullHttpRequest.content().toString(CharsetUtil.UTF_8); log.info("{},request:{},param:{}", remoteIP, fullHttpRequest.uri(), s); try { channelHandlerContext.fireChannelRead(servletRequest); } finally { // 删除SessionId MDC.remove(SESSION_KEY); } } 模拟servlet filter chain对请求进行处理

这里说下,为什么要使用spring来管理它,且类型为prototype,因为:每次请求进来,都会去调用

com.ceiec.router.netty.DispatcherServletChannelInitializer#initChannel,在那里面是如下的从spring上下文获取的方式来拿到FilterNettyHandler的。

@Override public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); ... /** * 过滤器处理器,模拟servlet中的 filter 链 */ FilterNettyHandler filterNettyHandler = SpringContextUtils.getApplicationContext().getBean(FilterNettyHandler.class); pipeline.addLast("FilterNettyHandler", filterNettyHandler); } package com.ceiec.router.netty.handler; import com.ceiec.router.netty.DispatcherServletChannelInitializer; import com.ceiec.router.netty.filter.ApplicationFilterChain; import com.ceiec.router.netty.filter.ApplicationFilterFactory; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Scope; import org.springframework.mock.web.MockHttpServletRequest; import org.springframework.mock.web.MockHttpServletResponse; import org.springframework.stereotype.Component; /** * desc: 模拟servlet的filter链 * netty handler链的初始化在{@link DispatcherServletChannelInitializer#initChannel(io.netty.channel.socket.SocketChannel)} * @author: ckl * creat_date: 2019/12/10 0010 * creat_time: 10:14 **/ @Slf4j @Component @Scope(scopeName = "prototype") public class FilterNettyHandler extends SimpleChannelInboundHandler<MockHttpServletRequest> { @Override protected void channelRead0(ChannelHandlerContext ctx, MockHttpServletRequest httpServletRequest) throws Exception { MockHttpServletResponse httpServletResponse = new MockHttpServletResponse(); ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(ctx,httpServletRequest); if (filterChain == null) { return; } filterChain.doFilter(httpServletRequest, httpServletResponse); } } handler最后一棒:将请求交给dispatcherServlet处理 package com.ceiec.router.netty.handler; import com.ceiec.router.netty.DispatcherServletChannelInitializer; import com.ceiec.router.netty.filter.RequestResponseWrapper; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.stream.ChunkedStream; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.mock.web.MockHttpServletRequest; import org.springframework.mock.web.MockHttpServletResponse; import org.springframework.stereotype.Component; import org.springframework.web.servlet.DispatcherServlet; import java.io.ByteArrayInputStream; import java.io.InputStream; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; /** * * desc: * 请求交给,Spring的dispatcherServlet处理 * netty handler链的初始化在{@link DispatcherServletChannelInitializer#initChannel(io.netty.channel.socket.SocketChannel)} * @author: caokunliang * creat_date: 2019/8/21 0021 * creat_time: 15:46 **/ @Slf4j @Component @Scope(scopeName = "prototype") public class DispatcherServletHandler extends SimpleChannelInboundHandler<RequestResponseWrapper> { @Autowired private DispatcherServlet dispatcherServlet; @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RequestResponseWrapper requestResponseWrapper) throws Exception { MockHttpServletRequest servletRequest = (MockHttpServletRequest) requestResponseWrapper.getServletRequest(); MockHttpServletResponse servletResponse = (MockHttpServletResponse) requestResponseWrapper.getServletResponse(); //这里调用dispatcherServlet的service,最终会调用controller的方法,响应流会写入到servletResponse中 dispatcherServlet.service(servletRequest, servletResponse); HttpResponseStatus status = HttpResponseStatus.valueOf(servletResponse.getStatus()); HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); for (String name : servletResponse.getHeaderNames()) { response.headers().add(name, servletResponse.getHeader(name)); } response.headers().add("Content-Type","application/json;charset=UTF-8"); // Write the initial line and the header. channelHandlerContext.write(response); InputStream contentStream = new ByteArrayInputStream(servletResponse.getContentAsByteArray()); ChunkedStream stream = new ChunkedStream(contentStream); ChannelFuture writeFuture = channelHandlerContext.writeAndFlush(stream); writeFuture.addListener(ChannelFutureListener.CLOSE); } } 总结

大概就上面这些东西了,整体来说,有很多需要优化的东西。但我本身对netty的使用,只能算相对勉强,很多细节性的东西没考虑。

比如:

我这里,是很粗暴地每次请求后,关闭了连接;

请求id在从worker线程,传给dispatcherServlet的业务线程时,丢失了(主要是直接使用了netty的api,来生成线程池,难以控制);

我使用了这个技术的微服务,qps不算高,高了之后,会不会有大问题,暂时未知,需要进一步测试,但最近也忙,时间有限。

channel的handler这里,现在用的prototype的bean,如果换成单例bean,在高并发下会不会有问题呢,待验证。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwwdpf.html