上一篇文章讲了如何设计和实现高并发高性能的应用,从根本上说明了一些道理。且以rocketmq的mappedFile的实现作为一个突破点,讲解了rocketmq是如何具体实现高性能的。从中我们也知道,mappedFile只是其利用的操作系统的一个特性小点。
今天,我们就来说说,rockmq实现高性能的第二个小:线程池的应用。
1. rocketmq的线程模型概述
谈到多线程,一般我们都会谈到锁和线程模型。
锁我们就不多说了,基本原理都差不多,大家可以参考网上资料或查看我之前的文章。
而线程模型,则可能各有不同,它会根据各自应用的特性,玩出自己的花样。下面我们就来数数rocketmq中的线程模型吧!
事实上,rocketmq中,存在多种不同的角色组件,而线程模型也往往会根据组件的特性变得不一样。最好我们都能了解一点,以不至于一叶障目。
2. 通用的netty线程模型
首先,整个rockmq都是基于netty进行的网络通信,所以其最基础的线程模型既是netty的线程模型,即reactor模型,(注:以下netty线程模型来源于网络)
rocketmq中具体应用的通用的netty线程如下:
1个acceptor -> n个event.processor.selector -> serverWorkerThreads.worker -> 业务处理线程
其调用netty的应用实现代码如下:
// org.apache.rocketmq.remoting.netty.NettyRemotingServer#start @Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); prepareSharableHandlers(); // netty 核心参数 eventLoopGroupBoss, eventLoopGroupSelector ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }