这里的 ZookeeperTransporter.connect() 经过 SPI 转换实际调用为 CuratorZookeeperTransporter.connect()。
public class CuratorZookeeperTransporter implements ZookeeperTransporter { @Override public ZookeeperClient connect(URL url) { return new CuratorZookeeperClient(url); } } public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> { private final CuratorFramework client; public CuratorZookeeperClient(URL url) { super(url); try { int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000); CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(timeout); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } client = builder.build(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); } } }); client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }上面这段代码使用 CuratorFrameworkFactory 工厂类创建了一个 CuratorFramework 实例,并启动该实例创建了一个与 zookeeper 的连接。
再回到 RegistryProtocol 中的 getRegistry() 方法。我们发现它通过层层调用最终创建了一个到 ZookeeperRegistry 实例。这个实例中的 ziClient 对象建立了到 zookeeper 的连接。
我们知道 ZooKeeper 经常被用作注册中心Ok。那我们现在已经连接上了 ZooKeeper 了,是不是该往 Zookeeper 上写点啥了?继续往下看,好戏要来啦!!~
在这里 register() 方法最终会调用 FailbackRegistry 类的 register() 方法(不想再赘述为什么!!!!)。
public abstract class FailbackRegistry extends AbstractRegistry { ... public void register(URL url) { super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // Sending a registration request to the server side doRegister(url); } catch (Exception e) { // ... } } ... } public class ZookeeperRegistry extends FailbackRegistry { protected void doRegister(URL url) { try { String str = toUrlPath(url); zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } }划重点啊筒子们!!! doRegister() 方法!!这里调用链也比较长。画个简图总结下:
小结:总之最后的目的是在 ZooKeeper 上创建通过 url 解析生成的 path 节点。大概长这个样子:dubbo%3A%2F%2F10.137.32.54%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D4264%26side%3Dprovider%26timestamp%3D1546848704035
最后还有一个地方需要注意下:这里调用 zkClient.create() 方法时,如果 dynamic 为空,默认会创建 zookeeper 临时节点。临时节点的好处在于如果客户端和 zookeeper 集群断开连接,对应的临时节点则会自动被删除。这样一来,是不是对我们的调用方好处多多呢? End
碍于篇幅限制,今天就先介绍这么多。回顾一下,我们在 RegistryProtocol.export() 方法里面创建了一个 DubboExporter 对象、开启了 Netty 服务端,同时还往注册中心 zookeeper 上创建了一个和服务有关的临时节点!关于 RegistryProtocol.export() 方法剩余的内容,我们以后有机会再说吧!