转载请声明出处哦~,本篇文章发布于luozhiyun的博客:https://www.luozhiyun.com
由于这部分的代码是在client-go 中,所以使用的源码版本是client-go 1.19
这次讲解我用了很一些图,尽可能的把这个模块给描述清楚,如果感觉对你有所帮助不妨发一封邮件激励一下我~
Informer机制 机制设计Informer主要有两个作用:
通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。ListAndWatch通过 APIServer 的 LIST API“获取”所有最新版本的 API 对象;然后,再通过 WATCH API 来“监听”所有这些 API 对象的变化;
注册相应的事件,之后如果监听到的事件变化就会调用事件对应的EventHandler,实现回调。
Informer运行原理如下:
根据流程图来解释一下Informer中几个组件的作用:
Reflector:用于监控指定的k8s资源,当资源发生变化时,触发相应的变更事件,如Added事件、Updated事件、Deleted事件,并将器资源对象放到本地DeltaFIFO Queue中;
DeltaFIFO:DeltaFIFO是一个先进先出的队列,可以保存资源对象的操作类型;
Indexer:用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer;
Reflector 包会和 apiServer 建立长连接,并使用 ListAndWatch 方法获取并监听某一个资源的变化。List 方法将会获取某个资源的所有实例,Watch 方法则监听资源对象的创建、更新以及删除事件,然后将事件放入到DeltaFIFO Queue中;
然后Informer会不断的从 Delta FIFO Queue 中 pop 增量事件,并根据事件的类型来决定新增、更新或者是删除本地缓存;接着Informer 根据事件类型来触发事先注册好的 Event Handler触发回调函数,然后然后将该事件丢到 Work Queue 这个工作队列中。
实例将到了go-client部分的代码,我们可以直接通过实例来进行上手跑动,Informers Example代码示例如下:
package main import ( "flag" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "log" "path/filepath" "time" ) func main() { var kubeconfig *string //如果是windows,那么会读取C:\Users\xxx\.kube\config 下面的配置文件 //如果是linux,那么会读取~/.kube/config下面的配置文件 if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse() config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err) } stopCh := make(chan struct{}) defer close(stopCh) //表示每分钟进行一次resync,resync会周期性地执行List操作 sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute) informer := sharedInformers.Core().V1().Pods().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("New Pod Added to Store: %s", mObj.GetName()) }, UpdateFunc: func(oldObj, newObj interface{}) { oObj := oldObj.(v1.Object) nObj := newObj.(v1.Object) log.Printf("%s Pod Updated to %s", oObj.GetName(),nObj.GetName()) }, DeleteFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("Pod Deleted from Store: %s", mObj.GetName()) }, }) informer.Run(stopCh) }要运行这段代码,需要我们将k8s服务器上的~/.kube代码拷贝到本地,我是win10的机器所以拷贝到C:\Users\xxx\.kube中。
informers.NewSharedInformerFactory会传入两个参数,第1个参数clientset是用于与k8s apiserver交互的客户端,第2个参数是代表每分钟会执行一次resync,resync会周期性执行List将所有资源存放再Informer Store中,如果该参数是0,则禁用resync功能。
通过informer.AddEventHandler函数可以为pod资源添加资源事件回调方法,支持3种资源事件回调方法:
AddFunc
UpdateFunc
DeleteFunc
通过名称我们就可以知道是新增、更新、删除时会回调这些方法。
在我们初次执行run方法的时候,可以会将监控的k8s上pod存放到本地,并回调AddFunc方法,如下日志:
2020/10/17 15:13:10 New Pod Added to Store: dns-test 2020/10/17 15:13:10 New Pod Added to Store: web-1 2020/10/17 15:13:10 New Pod Added to Store: fluentd-elasticsearch-nwqph 2020/10/17 15:13:10 New Pod Added to Store: kube-flannel-ds-amd64-bjmt2 2020/10/17 15:13:10 New Pod Added to Store: kubernetes-dashboard-65665f84db-jrw6k 2020/10/17 15:13:10 New Pod Added to Store: mongodb 2020/10/17 15:13:10 New Pod Added to Store: web-0 .... 源码解析 初始化 shared Informer初始化shared Informer初始化的时候会调用到informers.NewSharedInformerFactory进行初始化。