自己实现一个Controller——精简型

controller-manager作为K8S master的其中一个组件,负责众多controller的启动和终止,这些controller负责监控着k8s中各种资源,执行调谐,使他们的实际状态能不断趋近与期望状态。这些controller包括servercontroller,nodecontroller,deploymentcontroller等。对于自定义资源(CRD)也需要为之配备controller,CRD的controller也需要有controller-manager启动之,停止它。整个过程篇幅较长,故鄙人将其拆分成多篇,通过本系列,首篇将介绍如何定义一个controller-manager去启动它所管辖的controller,并实现一个最简单的controller。第二篇再实现一个较为标准的controller,并介绍informer的结构;最后一篇将介绍不借助脚手架如何实现一个CRD的controller。

介绍一下整个项目的结构

controller-demo |---api //用于放定义CRD各个属性的struct |---v1 |---client |---versiond |----scheme //用于存放CRD的scheme |----typed //用于存放CRD对应的client |---controller //用于存放各个controller |---informers //用于存放informer,包含各个apiGroup各个version及一个factory |---ecsbind/v1 //其中一个apiGroup,其中一个version的informer,当然也是唯一一个 |---internalinterfaces //informer的interface接口 |---listers //用于存放lister,包含各个apiGroup各个version |---ecsbind/v1 //其中一个apiGroup,其中一个version的informer,同样也是唯一一个 controller-manager

controller有两个函数,一个负责供main函数调用启动controller-manager,作为controller-manager的入口;另一个是用于启动他所管理的所有controller。

供main函数调用的Run函数定义如下

func Run(stopCh <-chan struct{}) error { run :=func(stopCh <-chan struct{}){ err := StartController(stopCh) if err != nil { glog.Fatalf("error running service controllers: %v", err) } select {} } ///忽略leader选举的相关逻辑 ...... run(stopCh) panic("unreachable") }

上述函数传入一个通道,用于传递给各个controller一个终止的信号,函数里定义了一个run的函数,用于调用StartController,之所以需要定义一个run函数,是因为一般这类的组件虽然为了高可用会运行多个副本,但是仅有一个副本是真正运行,其他的副本是作为待命状态运行,而这个真正运行的副本称为leader,从普通副本中通过资源争夺称为leader的过程称为leader选举,仅有leader挂掉了,剩余的副本再进行一次leader选举成为新leader。当然也可进行leader选举模式运行。因此Run函数中应该包含是否进行leader选举,若是则执行leader选举的逻辑,当选成leader才执行run函数;如果不进行leader选举则直接执行run。不过这段逻辑被省略了。

controller-manager的另一个函数是真正启动各个controller。StartController同样接收了从Run函数传过来的通道,这个通道最终转给各个controller,传递停止的信号。在函数中会构造各个controller,通过开辟一个协程调用controller的Run方法将controller启动,代码如下所示

unc StartController(stopCh <-chan struct{}) error { cfg, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { glog.Fatalf("error building kubernetes config:%s", err.Error()) } kubeClient, err := kubernetes.NewForConfig(cfg) factory := informers.NewSharedInformerFactory(kubeClient, 0) podInformer:=factory.Core().V1().Pods() pc:=controller.NewPodController(kubeClient,podInformer,"k8s-cluster") go pc.Run(stopCh) factory.Start(stopCh) return nil }

controller.NewPodController是构造了一个PodController,构造PodController时所需要的kubeClient,informer需要预先构造。对于k8s原有的资源,其informer都可以通过SharedInformerFactory获得,通过协程执行 pc.Run(stopCh)后,也需要执行factory.Start(stopCh),factroy.Start需要等各个controller Run了之后方可执行,否则对应Controlle则会没有运行效果。

一个精简的Controller

这个controller的作用是统计集群中所有pod的数量,然后将pod的总数写到master的某个label上,且被统计过的pod都会在它的event中产生一条新记录来表明此pod被统计过。

罗列一下这个podcontroller结构的字段

type PodController struct { kubeClient kubernetes.Interface //用于给master打label clusterName string podLister corelisters.PodLister //用于获取被监控的pod资源 podListerSynced cache.InformerSynced //用于同步cache broadcaster record.EventBroadcaster //用于广播事件 recorder record.EventRecorder //用于记录pod的event }

Controller的构造函数如下

func NewPodController(kubeClient kubernetes.Interface,podInformer coreinformers.PodInformer,clusterName string)*PodController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "pod_controller"}) rc:=&PodController{ kubeClient:kubeClient, clusterName:clusterName, podLister:podInformer.Lister(), podListerSynced:podInformer.Informer().HasSynced, broadcaster:eventBroadcaster, recorder:recorder, } return rc }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzzyzf.html