因此application-grid-wrapper会起服务,接受来自kube-proxy的请求,如下:
func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error { ... klog.Infof("Start to run interceptor server") /* filter */ server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)} if insecure { return server.ListenAndServe() } ... server.TLSConfig = tlsConfig return server.ListenAndServeTLS("", "") } func (s *interceptorServer) buildFilterChains(debug bool) http.Handler { handler := http.Handler(http.NewServeMux()) handler = s.interceptEndpointsRequest(handler) handler = s.interceptServiceRequest(handler) handler = s.interceptEventRequest(handler) handler = s.interceptNodeRequest(handler) handler = s.logger(handler) if debug { handler = s.debugger(handler) } return handler }这里会首先创建 interceptorServer,然后注册处理函数,由外到内依次如下:
debug:接受 debug 请求,返回 wrapper pprof 运行信息
logger:打印请求日志
node:接受 kube-proxy node GET(/api/v1/nodes/{node}) 请求,并返回node信息
event:接受 kube-proxy events POST (/events)请求,并将请求转发给 lite-apiserver
func (s *interceptorServer) interceptEventRequest(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost || !strings.HasSuffix(r.URL.Path, "/events") { handler.ServeHTTP(w, r) return } targetURL, _ := url.Parse(s.restConfig.Host) reverseProxy := httputil.NewSingleHostReverseProxy(targetURL) reverseProxy.Transport, _ = rest.TransportFor(s.restConfig) reverseProxy.ServeHTTP(w, r) }) }
service:接受 kube-proxy service List&Watch(/api/v1/services) 请求,并根据 storageCache 内容返回(GetServices)
endpoint:接受 kube-proxy endpoint List&Watch(/api/v1/endpoints) 请求,并根据 storageCache内容返回 (GetEndpoints)
下面先重点分析 cache 部分的逻辑,然后再回过头来分析具体的 http handler List&Watch 处理逻辑
wrapper 为了实现拓扑感知,自己维护了一个 cache,包括:node,service,endpoint。可以看到在 setupInformers 中注册了这三类资源的处理函数:
type storageCache struct { // hostName is the nodeName of node which application-grid-wrapper deploys on hostName string wrapperInCluster bool // mu lock protect the following map structure mu sync.RWMutex servicesMap map[types.NamespacedName]*serviceContainer endpointsMap map[types.NamespacedName]*endpointsContainer nodesMap map[types.NamespacedName]*nodeContainer // service watch channel serviceChan chan<- watch.Event // endpoints watch channel endpointsChan chan<- watch.Event } ... func NewStorageCache(hostName string, wrapperInCluster bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache { msc := &storageCache{ hostName: hostName, wrapperInCluster: wrapperInCluster, servicesMap: make(map[types.NamespacedName]*serviceContainer), endpointsMap: make(map[types.NamespacedName]*endpointsContainer), nodesMap: make(map[types.NamespacedName]*nodeContainer), serviceChan: serviceNotifier, endpointsChan: endpointsNotifier, } return msc } ... func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error { ... if err := s.setupInformers(ctx.Done()); err != nil { return err } klog.Infof("Start to run interceptor server") /* filter */ server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)} ... return server.ListenAndServeTLS("", "") } func (s *interceptorServer) setupInformers(stop <-chan struct{}) error { klog.Infof("Start to run service and endpoints informers") noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil) if err != nil { klog.Errorf("can't parse proxy label, %v", err) return err } noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil) if err != nil { klog.Errorf("can't parse headless label, %v", err) return err } labelSelector := labels.NewSelector() labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints) resyncPeriod := time.Minute * 5 client := kubernetes.NewForConfigOrDie(s.restConfig) nodeInformerFactory := informers.NewSharedInformerFactory(client, resyncPeriod) informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = labelSelector.String() })) nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer() serviceInformer := informerFactory.Core().V1().Services().Informer() endpointsInformer := informerFactory.Core().V1().Endpoints().Informer() /* */ nodeInformer.AddEventHandlerWithResyncPeriod(s.cache.NodeEventHandler(), resyncPeriod) serviceInformer.AddEventHandlerWithResyncPeriod(s.cache.ServiceEventHandler(), resyncPeriod) endpointsInformer.AddEventHandlerWithResyncPeriod(s.cache.EndpointsEventHandler(), resyncPeriod) go nodeInformer.Run(stop) go serviceInformer.Run(stop) go endpointsInformer.Run(stop) if !cache.WaitForNamedCacheSync("node", stop, nodeInformer.HasSynced, serviceInformer.HasSynced, endpointsInformer.HasSynced) { return fmt.Errorf("can't sync informers") } return nil } func (sc *storageCache) NodeEventHandler() cache.ResourceEventHandler { return &nodeHandler{cache: sc} } func (sc *storageCache) ServiceEventHandler() cache.ResourceEventHandler { return &serviceHandler{cache: sc} } func (sc *storageCache) EndpointsEventHandler() cache.ResourceEventHandler { return &endpointsHandler{cache: sc} }这里依次分析 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler,如下:
1、NodeEventHandler