自己实现一个Controller——精简型 (2)

controller的各个属性中,除了broadcaster和recorder是自身构造外,其余都是通过参数传入。由于controller中需要用到事件记录,提供这一功能的是recorder,然而触发了事件需要将其散播出去给订阅者的需要一个broadcaster,这里涉及到k8s的事件机制,并不打算在细述。在构造函数中已经把事件广播绑定到glog.Infof,也就是说recorder触发了事件,会在日志中输出事件的信息。后面在Run controller的时候还会用到这个广播器。

整个Controller的启动方法如下

func (p *PodController)Run(stopCh <-chan struct{}) { glog.Info("Starting pod controller\n") defer glog.Info("Shutting down pod controller\n") if !controller.WaitForCacheSync("pod", stopCh, p.podListerSynced) { return } if p.broadcaster != nil { p.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(p.kubeClient.CoreV1().RESTClient()).Events("")}) } go wait.NonSlidingUntil(func() { if err := p.reconcilePods(); err != nil { glog.Errorf("Couldn't reconcile pod: %v", err) } }, metav1.Duration{Duration: 10 * time.Second}.Duration, stopCh) <-stopCh }

WaitForCacheSync用于同步指定资源的缓存,这个缓存后续会使用到,万一同步失败的话controller将不能运行

StartRecordingToSink用于把事件广播到apiserver中,如果此处不执行,即便是recorder触发了事件,apiserver没收到这个事件,最终事件信息没保存到对应pod中,我们通过kubectl describe po时就会看不到相应的event记录

通过启动一个协程去定期执行reconcilePods()方法,NonSlidingUntil函数被调用后马上执行传进去的func,后续每隔10秒就重复执行一次,直到收到stopCh通道传来的终止信号才停止。

最后通过等待接收stopCh传来的信号而阻塞当前协程,从而阻止了完成本函数的调用

reconcilePods方法的大致逻辑如前所述,经过多次调用从lister中获取所有pod,遍历每个pod把pod的命名空间和pod的名称打印出来,然后通过labelselector找出集群中的master节点将pod的数量打到名为hopegi/pod-count的label上,最后给每个pod的event事件添加一条pod count is n(这个n是pod的总数)这样的记录。

func (p *PodController)reconcilePods()error { glog.Infof("reconcilePods ") pods,err:= p.podLister.List(labels.Everything()) if err!=nil{ return fmt.Errorf("error listing pods: %v", err) } return p.reconcile(pods) } func (p *PodController)reconcile(pods []*v1.Pod)error { glog.Infof("reconcile pods") for _,pod :=range pods{ fmt.Printf("pod name is %s.%s \n",(*pod).Namespace,(*pod).Name) } nodes,err:= p.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{LabelSelector:"node-role.kubernetes.io/master"}) if err!=nil{ glog.Infof("get master error %v\n",err) return err } for _,n:=range nodes.Items{ n.Labels["hopegi/pod-count"]=fmt.Sprintf("%d",len(pods)) _,err= p.kubeClient.CoreV1().Nodes().Update(&n) if err!=nil{ glog.Infof("label node error:%v ",err) } } if p.recorder!=nil { msg:=fmt.Sprintf("pod count is %d",len(pods)) for _, pod := range pods { p.recorder.Eventf(&v1.ObjectReference{ Kind:"Pod", Name:pod.Name, UID:pod.UID, Namespace:pod.Namespace, },v1.EventTypeNormal,"SuccessCalculatePod",msg) } } return nil }

获取集群里所有的pod用的是lister而不是通过kubeclient去获取差别在于,作为某个K8S资源的informer(本例中是pod),它内部都有一个对应资源的缓存,这个缓存在listAndWatch机制的作用下与集群中存储的pod数据保持一致,这个listAndWatch将在下一篇中介绍;后者是每访问一次都会往apiserver中发一次请求,在众多controller频繁地跟apiserver通讯,apiserver会不堪重负,且消耗大量网络资源,获取效率也低下。

小结

本篇简单的实现了一个controller,虽然它并没有如开篇所说的那样对资源的实际状态与期望状态的差异进行调谐,通过最简单的方式周期性地检查pod的状态,引入了informer,获取了它listAndWatch的结果,后续将介绍这个informer的机制,它如何执行listAndWatch,一个较为常见的标准的Controller是如何实现的。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzzyzf.html