执行以下命令,会先下载依赖包,再下载代码生成工具,再执行代码生成工作:
cd $GOPATH/src \ && go get -u -v k8s.io/apimachinery/pkg/apis/meta/v1 \ && go get -u -v k8s.io/code-generator/... \ && cd $GOPATH/src/k8s.io/code-generator \ && ./generate-groups.sh all \ k8s_customize_controller/pkg/client \ k8s_customize_controller/pkg/apis \ bolingcavalry:v1 #如果code-generator安装失败(网络原因),可以手动下载代码安装,在执行上面命令 git clone https://github.com/kubernetes/code-generator ./generate-groups.sh all "$ROOT_PACKAGE/pkg/client" "$ROOT_PACKAGE/pkg/apis" "$CUSTOM_RESOURCE_NAME:$CUSTOME_RESOURCE_VERSION"
如果代码没问题,会看到以下输出
Generating deepcopy funcs Generating clientset for bolingcavalry:v1 at k8s_customize_controller/pkg/client/clientset Generating listers for bolingcavalry:v1 at k8s_customize_controller/pkg/client/listers Generating informers for bolingcavalry:v1 at k8s_customize_controller/pkg/client/informers
此时再去$GOPATH/src/k8s_customize_controller目录下执行tree命令,可见已生成了很多内容
[root@master k8s_customize_controller]# tree . └── pkg ├── apis │ └── bolingcavalry │ ├── register.go │ └── v1 │ ├── doc.go │ ├── register.go │ ├── types.go │ └── zz_generated.deepcopy.go └── client ├── clientset │ └── versioned │ ├── clientset.go │ ├── doc.go │ ├── fake │ │ ├── clientset_generated.go │ │ ├── doc.go │ │ └── register.go │ ├── scheme │ │ ├── doc.go │ │ └── register.go │ └── typed │ └── bolingcavalry │ └── v1 │ ├── bolingcavalry_client.go │ ├── doc.go │ ├── fake │ │ ├── doc.go │ │ ├── fake_bolingcavalry_client.go │ │ └── fake_student.go │ ├── generated_expansion.go │ └── student.go ├── informers │ └── externalversions │ ├── bolingcavalry │ │ ├── interface.go │ │ └── v1 │ │ ├── interface.go │ │ └── student.go │ ├── factory.go │ ├── generic.go │ └── internalinterfaces │ └── factory_interfaces.go └── listers └── bolingcavalry └── v1 ├── expansion_generated.go └── student.go 21 directories, 27 files如上所示,zz_generated.deepcopy.go就是DeepCopy代码文件,client目录下的内容都是客户端相关代码,在开发controller时会用到;
client目录下的clientset、informers、listers的身份和作用可以和前面的图结合来理解
现在已经能监听到Student对象的增删改等事件,接下来就是根据这些事件来做不同的事情,满足个性化的业务需求
编写的第一个go文件就是controller.go,在k8s_customize_controller目录下创建controller.go
package main import ( "fmt" "time" "github.com/golang/glog" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" bolingcavalryv1 "github.com/zq2599/k8s-controller-custom-resource/pkg/apis/bolingcavalry/v1" clientset "github.com/zq2599/k8s-controller-custom-resource/pkg/client/clientset/versioned" studentscheme "github.com/zq2599/k8s-controller-custom-resource/pkg/client/clientset/versioned/scheme" informers "github.com/zq2599/k8s-controller-custom-resource/pkg/client/informers/externalversions/bolingcavalry/v1" listers "github.com/zq2599/k8s-controller-custom-resource/pkg/client/listers/bolingcavalry/v1" ) const controllerAgentName = "student-controller" const ( SuccessSynced = "Synced" MessageResourceSynced = "Student synced successfully" ) // Controller is the controller implementation for Student resources type Controller struct { // kubeclientset is a standard kubernetes clientset kubeclientset kubernetes.Interface // studentclientset is a clientset for our own API group studentclientset clientset.Interface studentsLister listers.StudentLister studentsSynced cache.InformerSynced workqueue workqueue.RateLimitingInterface recorder record.EventRecorder } // NewController returns a new student controller func NewController( kubeclientset kubernetes.Interface, studentclientset clientset.Interface, studentInformer informers.StudentInformer) *Controller { utilruntime.Must(studentscheme.AddToScheme(scheme.Scheme)) glog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) controller := &Controller{ kubeclientset: kubeclientset, studentclientset: studentclientset, studentsLister: studentInformer.Lister(), studentsSynced: studentInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Students"), recorder: recorder, } glog.Info("Setting up event handlers") // Set up an event handler for when Student resources change studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueStudent, UpdateFunc: func(old, new interface{}) { oldStudent := old.(*bolingcavalryv1.Student) newStudent := new.(*bolingcavalryv1.Student) if oldStudent.ResourceVersion == newStudent.ResourceVersion { //版本一致,就表示没有实际更新的操作,立即返回 return } controller.enqueueStudent(new) }, DeleteFunc: controller.enqueueStudentForDelete, }) return controller } //在此处开始controller的业务 func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShutDown() glog.Info("开始controller业务,开始一次缓存数据同步") if ok := cache.WaitForCacheSync(stopCh, c.studentsSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } glog.Info("worker启动") for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } glog.Info("worker已经启动") <-stopCh glog.Info("worker已经结束") return nil } func (c *Controller) runWorker() { for c.processNextWorkItem() { } } // 取数据处理 func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false } // We wrap this block in a func so we can defer c.workqueue.Done. err := func(obj interface{}) error { defer c.workqueue.Done(obj) var key string var ok bool if key, ok = obj.(string); !ok { c.workqueue.Forget(obj) runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } // 在syncHandler中处理业务 if err := c.syncHandler(key); err != nil { return fmt.Errorf("error syncing '%s': %s", key, err.Error()) } c.workqueue.Forget(obj) glog.Infof("Successfully synced '%s'", key) return nil }(obj) if err != nil { runtime.HandleError(err) return true } return true } // 处理 func (c *Controller) syncHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } // 从缓存中取对象 student, err := c.studentsLister.Students(namespace).Get(name) if err != nil { // 如果Student对象被删除了,就会走到这里,所以应该在这里加入执行 if errors.IsNotFound(err) { glog.Infof("Student对象被删除,请在这里执行实际的删除业务: %s/%s ...", namespace, name) return nil } runtime.HandleError(fmt.Errorf("failed to list student by: %s/%s", namespace, name)) return err } glog.Infof("这里是student对象的期望状态: %#v ...", student) glog.Infof("实际状态是从业务层面得到的,此处应该去的实际状态,与期望状态做对比,并根据差异做出响应(新增或者删除)") c.recorder.Event(student, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) return nil } // 数据先放入缓存,再入队列 func (c *Controller) enqueueStudent(obj interface{}) { var key string var err error // 将对象放入缓存 if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { runtime.HandleError(err) return } // 将key放入队列 c.workqueue.AddRateLimited(key) } // 删除操作 func (c *Controller) enqueueStudentForDelete(obj interface{}) { var key string var err error // 从缓存中删除指定对象 key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { runtime.HandleError(err) return } //再将key放入队列 c.workqueue.AddRateLimited(key) }