客户端与服务端的事件watcher源码阅读

watcher存在的必要性

举个特容易懂的例子: 假如我的项目是基于dubbo+zookeeper搭建的分布式项目, 我有三个功能相同的服务提供者,用zookeeper当成注册中心,我的三个项目得注册进zookeeper才能对外暴露服务,但是问题来了,写的java代码怎么才能注册进zookeeper呢?当然加入依赖,写好配置文件再启动就成了,这时,这三个服务体提供者就是zookeeper的客户端了,zookeeper的客户端不止一个,我选择了哪个依赖,就是哪个客户端,光有服务提供者不成啊,对外提供服务,我得需要服务消费者啊,于是用同样的方式,把消费者也注册进zookeeper,zookeeper中就存在了4个node,也就是4个客户端,服务消费者订阅zookeeper,向它拉取服务提供者的address,然后把地址缓存在本地, 进而可以远程调用服务消费者,那么问题又来了,万一哪一台服务提供者挂了,怎么办呢?zookeeper是不是得通知消费者呢? 万一哪一天服务提供者的address变了,是不是也得通知消费者? 这就是watcher存在的意义,它解决了这件事

实验场景:

假设我们已经成功启动了zookeeper的服务端和客户端,并且预先添加了watcher,然后使用控制台动态的修改下node的data,我们会发现watcher回调的现象

添加的钩子函数代码如下:

public class ZookepperClientTest { public static void main(String[] args) throws Exception { ZooKeeper client = new ZooKeeper("localhost", 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.err.println("连接,触发"); } }); Stat stat = new Stat(); // todo 下面添加的事件监听器可是实现事件的消费订阅 String content = new String(client.getData("/node1", new Watcher() { @Override public void process(WatchedEvent event) { // todo 任何连接上这个节点的客户端修改了这个节点的 data数据,都会引起process函数的回调 // todo 特点1: watch只能使用1次 if (event.getType().equals(Event.EventType.NodeDataChanged)){ System.err.println("当前节点数据发生了改变"); } } }, stat));

看如上的代码, 添加了一个自己的watcher也就是client.getData("/node1", new Watcher() {} 这是个回调的钩子函数,执行时不会运行,当满足的某个条件时才会执行, 比如: node1被删除了, node1的data被修改了

getData做了哪些事情?

源码如下: getdata,顾名思义,返回服务端的node的data+stat, 当然是当服务端的node发生了变化后调用的

主要主流如下几个工作

创建WatchRegistration wcb= new DataWatchRegistration(watcher, clientPath);

其实就是一个简单的内部类,将path 和 watch 封装进了一个对象

创建一个request,并且初始化这个request.head=getData=4

调用ClientCnxn.submitRequest(...) , 将现存的这些信息进一步封装

request.setWatch(watcher != null);说明他并没有将watcher封装进去,而是仅仅做了个有没有watcher的标记

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { // todo 校验path final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { // todo DataWatchRegistration 继承了 WatchRegistration // todo DataWatchRegistration 其实就是一个简单的内部类,将path 和 watch 封装进了一个对象 wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); // todo 创建一个请求头 RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); // todo 创建了一个GetDataRequest GetDataRequest request = new GetDataRequest(); // todo 给这个请求初始化,path 是传递进来的path,但是 watcher不是!!! 如果我们给定了watcher , 这里面的条件就是 true request.setPath(serverPath); request.setWatch(watcher != null); // todo 可看看看服务端接收到请求是怎么办的 GetDataResponse response = new GetDataResponse(); // todo 同样由 clientCnxn 上下文进行提交请求, 这个操作应该同样是阻塞的 // todo EventThread 和 SendThread 同时使用一份 clientCnxn的 submitRequest() ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getData(); }

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 的源码我卸载下面, 这里来到这个方法中,一眼能看到,它依然是阻塞的式的,并且requet被进一步封装进packet

更重要的是 queuePacket()方法的最后一个参数,存在我们刚刚创建的path+watcher的封装类

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); // todo 来到这个 queuePacket() 方法在下面, 这个方法就是将 用户输入-> string ->>> request ->>> packet 的过程 Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); // todo 使用同步代码块,在下面的进行 同步阻塞等待, 直到有了Response响应才会跳出这个循环, 这个finished状态就是在客户端接受到服务端的 // todo 的响应后, 将服务端的响应解析出来,然后放置到 pendingqueue里时,设置上去的 synchronized (packet) { while (!packet.finished) { // todo 这个等待是需要唤醒的 packet.wait(); } } // todo 直到上面的代码块被唤醒,才会这个方法才会返回 return r; }

同样,在queuePacket()方法中将packet提交到outgoingQueue中,最终被seadThread消费发送到服务端

服务端如何处理watchRegistration不为空的packet

后续我准备用一整篇博客详解单机模式下服务端处理请求的流程,所以这篇博客只说结论

在服务端,用户的请求最终会按顺序流向三个Processor,分别是

PrepRequestProcessor

负责进行一些状态的修改

SyncRequestProcessor

将事务日志同步到磁盘

FinalRequestProcessor

相应用户的请求

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzdwsy.html