【Dubbo源码阅读系列】服务暴露之远程暴露 (3)

optimizeSerialization(url) 序列化操作,本文不做具体分析
DubboProtocol.export() 返回的对象为 DubboExporter。值得我们注意是后面的 openServer() 方法!

openServer()

private void openServer(URL url) { // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override server.reset(url); } } }

openServer() 光从方法名看起来像是开启服务连接的。方法比较简单,取 url 的 address 作为 key,尝试从 serverMap 获取对应的 value 值。如果 value 值为 null 则调用 createServer(url) 方法创建 server 后添加到 serverMap 中。
createServer() 方法的流程比较冗长,我们这里通过一张时序图来给出该方法内部调用流程:

【Dubbo源码阅读系列】服务暴露之远程暴露

上图省略了从 ServiceConfigRegistryProtocol 以及从 RegistryProtocolDubboProtocol 的转换过程。这部分内容涉及到 Dubbo SPI 机制,如有疑问可以详见:【Dubbo源码阅读系列】之 Dubbo SPI 机制。这里给出简单的转换流程

ServiceConfig 到 RegistryProtocol
Protocol$Adaptive ==》ProtocolFilterWrapper ==》ProtocolListenerWrapper ==》RegistryProtocol

RegistryProtocol 到 DubboProtocol
Protocol$Adaptive ==》ProtocolFilterWrapper ==》ProtocolListenerWrapper ==》DubboProtocol

最后重点关注下 NettyServer 的 doOpen() 方法:

protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }

可以看到这段代码是比较经典的 Netty 服务端启动代码...也就是说 openServer() 方法用于 Netty 服务端启动。
我们知道 Netty 常用于客户端和服务端之间的通讯。在这里我们开启了服务端,那么在何处会开启对应的客户端呢?他们之间到底会进行什么交互呢?这个疑问我们先留着待后续文章讲解。

服务的暴露

上面讲了这么多,感觉还是和服务远程暴露没有沾多大的边?到底我们的服务是如何被其它机器感知的?别人是怎么知道我们某某台机器提供了短信服务的?其实揭秘的序幕已经拉开了!让我们继续娓娓道来!
回顾一下 RegistryProtocol.export() 方法:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); //to judge to delay publish whether or not boolean register = registeredProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); if (register) { register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyxywf.html