不识Netty真面目,只缘未读此真经 (10)

io.netty.util.internal.SocketUtils

public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { return socketChannel.connect(remoteAddress); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }

这里我们看到了熟悉的socketChannel.connect(remoteAddress)。

Server与Client通信

上面详细介绍了Server端的启动过程,Client端的启动过程,Client也向Server发出了连接请求。这时再回过头来看Server端。

Server端感知到了IO事件,会在io.netty.channel.nio.NioEventLoop的run()方法里,调用processSelectedKeys(),对于每个IO事件,最终调用的是processSelectedKey()来处理。

io.netty.channel.nio.NioEventLoop

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }

这里是SelectionKey.OP_ACCEPT,当然走的是unsafe.read()

io.netty.channel.nio.AbstractNioMessageChannel

private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); } finally { // ... } } }

这里面有很重要的两个方法,doReadMessages(readBuf)和pipeline.fireChannelRead()

io.netty.channel.socket.nio.NioServerSocketChannel

@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { // ... } return 0; }

io.netty.util.internal.SocketUtils

public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() { @Override public SocketChannel run() throws IOException { return serverSocketChannel.accept(); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }

serverSocketChannel接受了Client端的连接后,将该socketChannel放到了List中。

而后遍历该List,将每个socketChannel传入pipeline.fireChannelRead()中。

还记得当前serverSocketChannel的ChannelPipeline有哪些ChannelHandler吗?

head --> ServerBootstrapAcceptor --> tail

接下来就需要重点看下ServerBootstrapAcceptor的逻辑

io.netty.bootstrap.ServerBootstrap#ServerBootstrapAcceptor

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Runnable enableAutoReadTask; ServerBootstrapAcceptor( final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; enableAutoReadTask = new Runnable() { @Override public void run() { channel.config().setAutoRead(true); } }; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfjxs.html