apiserver源码分析——处理请求 (4)

AdmissionWebhook是准入控制器,它作为k8s-apiserver对外暴露的一种扩展方式,主要针对增删改资源时对暴露两个hook点。一个是Mutate,可修改提交上来的资源;另一个是Validate,是对提交上来的资源进行验证。当然Mutate里面也可以包含验证操作。但是本篇不对这两种准入控制器的使用实例作介绍。

准入控制器的配置在buildGenericConfig函数中,通过调用s.Admission.ApplyTo方法进行配置。经过两层调用后到达AdmissionOptions.ApplyTo执行实际的创建逻辑,即: s.Admission.ApplyTo->a.GenericAdmission.ApplyTo。代码位于 /vendor/k8s.io/apiserver/pkg/server/options/admission.go

func (a *AdmissionOptions) ApplyTo( c *server.Config, informers informers.SharedInformerFactory, kubeAPIServerClientConfig *rest.Config, features featuregate.FeatureGate, pluginInitializers ...admission.PluginInitializer, ) error { if a == nil { return nil } // Admission depends on CoreAPI to set SharedInformerFactory and ClientConfig. if informers == nil { return fmt.Errorf("admission depends on a Kubernetes core API shared informer, it cannot be nil") } pluginNames := a.enabledPluginNames() //获取各个准入控制器的provider pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(pluginNames, a.ConfigFile, configScheme) if err != nil { return fmt.Errorf("failed to read plugin config: %v", err) } clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig) if err != nil { return err } genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features) initializersChain := admission.PluginInitializers{} pluginInitializers = append(pluginInitializers, genericInitializer) initializersChain = append(initializersChain, pluginInitializers...) //将准入控制器集合串成一个admissionChain,再外面包一个Wrapper,类似于之前处理认证与授权一样的方式 admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators) if err != nil { return err } //又在外面套一个可统计指标的wrapper c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain) return nil } //代码位于 /vendor/k8s.io/apiserver/pkg/admission/plugins.go func (ps *Plugins) NewFromPlugins(pluginNames []string, configProvider ConfigProvider, pluginInitializer PluginInitializer, decorator Decorator) (Interface, error) { handlers := []Interface{} mutationPlugins := []string{} validationPlugins := []string{} for _, pluginName := range pluginNames { pluginConfig, err := configProvider.ConfigFor(pluginName) if err != nil { return nil, err } plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer) if err != nil { return nil, err } if plugin != nil { if decorator != nil { handlers = append(handlers, decorator.Decorate(plugin, pluginName)) } else { handlers = append(handlers, plugin) } if _, ok := plugin.(MutationInterface); ok { mutationPlugins = append(mutationPlugins, pluginName) } if _, ok := plugin.(ValidationInterface); ok { validationPlugins = append(validationPlugins, pluginName) } } } if len(mutationPlugins) != 0 { klog.Infof("Loaded %d mutating admission controller(s) successfully in the following order: %s.", len(mutationPlugins), strings.Join(mutationPlugins, ",")) } if len(validationPlugins) != 0 { klog.Infof("Loaded %d validating admission controller(s) successfully in the following order: %s.", len(validationPlugins), strings.Join(validationPlugins, ",")) } return newReinvocationHandler(chainAdmissionHandler(handlers)), nil }

准入控制器除了自定义的,从上述代码中也可以观察到也有内置的,内置的准入控制器大概有30+种。

但是准入控制器的调用却不像认证与授权那样在调用DefaultBuildHandlerChain时加入到handler调用链中,它是每个增删改的实际处理函数中被调用,GenericConfig的AdmissionControl字段也是在初始化GenericServer的时候传递给后者的同名字段

registerResourceHandlers方法

延续上篇介绍apiserver启动流程时,调用installer.Install方法,创建了webservice,api中各个URL的路由注册,实现了对应地址的handler,这个handler是通过registerResourceHandlers,方法篇幅即长(约900行),包含了对一个资源的增删改查各种请求的处理,对其只能分段介绍。代码位于/vendor/k8s.io/apiserver/pkg/endpoints/installer.go

这个方法有三个入参

代表URL的path

资源存储相关的类storage

用于存放路由的go-rest对象webservice

先从path以及APIInstaller对象中获取group,version,kind,分辨这种资源是cluster scope还是namespace scope的

admit := a.group.Admit optionsExternalVersion := a.group.GroupVersion if a.group.OptionsExternalVersion != nil { optionsExternalVersion = *a.group.OptionsExternalVersion } resource, subresource, err := splitSubresource(path) if err != nil { return nil, err } group, version := a.group.GroupVersion.Group, a.group.GroupVersion.Version fqKindToRegister, err := GetResourceKind(a.group.GroupVersion, storage, a.group.Typer) if err != nil { return nil, err } versionedPtr, err := a.group.Creater.New(fqKindToRegister) if err != nil { return nil, err } defaultVersionedObject := indirectArbitraryPointer(versionedPtr) kind := fqKindToRegister.Kind isSubresource := len(subresource) > 0 // If there is a subresource, namespace scoping is defined by the parent resource namespaceScoped := true if isSubresource { parentStorage, ok := a.group.Storage[resource] if !ok { return nil, fmt.Errorf("missing parent storage: %q", resource) } scoper, ok := parentStorage.(rest.Scoper) if !ok { return nil, fmt.Errorf("%q must implement scoper", resource) } namespaceScoped = scoper.NamespaceScoped() } else { scoper, ok := storage.(rest.Scoper) if !ok { return nil, fmt.Errorf("%q must implement scoper", resource) } namespaceScoped = scoper.NamespaceScoped() }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwpsds.html