一文读懂SuperEdge拓扑算法 (6)

对于 service 资源的改动,这里用 Update event 说明:

func (sh *serviceHandler) update(service *v1.Service) { sc := sh.cache sc.mu.Lock() serviceKey := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} klog.Infof("Updating service %v", serviceKey) newTopologyKeys := getTopologyKeys(&service.ObjectMeta) serviceContainer, found := sc.servicesMap[serviceKey] if !found { sc.mu.Unlock() klog.Errorf("update non-existed service, %v", serviceKey) return } sc.serviceChan <- watch.Event{ Type: watch.Modified, Object: service, } serviceContainer.svc = service // return directly when topologyKeys of service stay unchanged if reflect.DeepEqual(serviceContainer.keys, newTopologyKeys) { sc.mu.Unlock() return } serviceContainer.keys = newTopologyKeys // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } }

逻辑如下:

获取 service topologyKeys

构建 service event.Modified event

比较 service topologyKeys 与已经存在的是否有差异

如果有差异则更新 topologyKeys,且调用 rebuildEndpointsMap刷新该 service 对应的 endpoints,如果 endpoints 发生变化,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest) 处理

3、EndpointsEventHandler

storageCache.endpointsMap 结构体 key 为 endpoints 名称(namespace/name),value 为 endpointsContainer,包含如下数据:

endpoints:拓扑修改前的 endpoints

modified:拓扑修改后的 endpoints

对于 endpoints 资源的改动,这里用 Update event 说明:

func (eh *endpointsHandler) update(endpoints *v1.Endpoints) { sc := eh.cache sc.mu.Lock() endpointsKey := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} klog.Infof("Updating endpoints %v", endpointsKey) endpointsContainer, found := sc.endpointsMap[endpointsKey] if !found { sc.mu.Unlock() klog.Errorf("Updating non-existed endpoints %v", endpointsKey) return } endpointsContainer.endpoints = endpoints newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster) changed := !apiequality.Semantic.DeepEqual(endpointsContainer.modified, newEps) if changed { endpointsContainer.modified = newEps } sc.mu.Unlock() if changed { sc.endpointsChan <- watch.Event{ Type: watch.Modified, Object: newEps, } } }

逻辑如下:

更新 endpointsContainer.endpoint 为新的 endpoints 对象

调用 pruneEndpoints 获取拓扑刷新后的 endpoints

比较 endpointsContainer.modified 与新刷新后的 endpoints

如果有差异则更新 endpointsContainer.modified,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest) 处理

在分析完NodeEventHandler,ServiceEventHandler以及EndpointsEventHandler之后,我们回到具体的http handler List&Watch处理逻辑上,这里以endpoints为例:

func (s *interceptorServer) interceptEndpointsRequest(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet || !strings.HasPrefix(r.URL.Path, "/api/v1/endpoints") { handler.ServeHTTP(w, r) return } queries := r.URL.Query() acceptType := r.Header.Get("Accept") info, found := s.parseAccept(acceptType, s.mediaSerializer) if !found { klog.Errorf("can't find %s serializer", acceptType) w.WriteHeader(http.StatusBadRequest) return } encoder := scheme.Codecs.EncoderForVersion(info.Serializer, v1.SchemeGroupVersion) // list request if queries.Get("watch") == "" { w.Header().Set("Content-Type", info.MediaType) allEndpoints := s.cache.GetEndpoints() epsItems := make([]v1.Endpoints, 0, len(allEndpoints)) for _, eps := range allEndpoints { epsItems = append(epsItems, *eps) } epsList := &v1.EndpointsList{ Items: epsItems, } err := encoder.Encode(epsList, w) if err != nil { klog.Errorf("can't marshal endpoints list, %v", err) w.WriteHeader(http.StatusInternalServerError) return } return } // watch request timeoutSecondsStr := r.URL.Query().Get("timeoutSeconds") timeout := time.Minute if timeoutSecondsStr != "" { timeout, _ = time.ParseDuration(fmt.Sprintf("%ss", timeoutSecondsStr)) } timer := time.NewTimer(timeout) defer timer.Stop() flusher, ok := w.(http.Flusher) if !ok { klog.Errorf("unable to start watch - can't get http.Flusher: %#v", w) w.WriteHeader(http.StatusMethodNotAllowed) return } e := restclientwatch.NewEncoder( streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w), scheme.Codecs.EncoderForVersion(info.StreamSerializer, v1.SchemeGroupVersion)), encoder) if info.MediaType == runtime.ContentTypeProtobuf { w.Header().Set("Content-Type", runtime.ContentTypeProtobuf+";stream=watch") } else { w.Header().Set("Content-Type", runtime.ContentTypeJSON) } w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) flusher.Flush() for { select { case <-r.Context().Done(): return case <-timer.C: return case evt := <-s.endpointsWatchCh: klog.V(4).Infof("Send endpoint watch event: %+#v", evt) err := e.Encode(&evt) if err != nil { klog.Errorf("can't encode watch event, %v", err) return } if len(s.endpointsWatchCh) == 0 { flusher.Flush() } } } }) }

逻辑如下:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpyxgf.html