NodeEventHandler 负责监听 node 资源相关 event,并将 node 以及 node Labels 添加到storageCache.nodesMap 中 (key 为 nodeName,value 为 node 以及 node labels)
func (nh *nodeHandler) add(node *v1.Node) { sc := nh.cache sc.mu.Lock() nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name} klog.Infof("Adding node %v", nodeKey) sc.nodesMap[nodeKey] = &nodeContainer{ node: node, labels: node.Labels, } // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } } func (nh *nodeHandler) update(node *v1.Node) { sc := nh.cache sc.mu.Lock() nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name} klog.Infof("Updating node %v", nodeKey) nodeContainer, found := sc.nodesMap[nodeKey] if !found { sc.mu.Unlock() klog.Errorf("Updating non-existed node %v", nodeKey) return } nodeContainer.node = node // return directly when labels of node stay unchanged if reflect.DeepEqual(node.Labels, nodeContainer.labels) { sc.mu.Unlock() return } nodeContainer.labels = node.Labels // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } } ...同时由于 node 的改变会影响 endpoint,因此会调用 rebuildEndpointsMap 刷新 storageCache.endpointsMap
// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events func (sc *storageCache) rebuildEndpointsMap() []watch.Event { evts := make([]watch.Event, 0) for name, endpointsContainer := range sc.endpointsMap { newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster) if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) { continue } sc.endpointsMap[name].modified = newEps evts = append(evts, watch.Event{ Type: watch.Modified, Object: newEps, }) } return evts }rebuildEndpointsMap 是 cache 的核心函数,同时也是拓扑感知的算法实现:
// pruneEndpoints filters endpoints using serviceTopology rules combined by services topologyKeys and node labels func pruneEndpoints(hostName string, nodes map[types.NamespacedName]*nodeContainer, services map[types.NamespacedName]*serviceContainer, eps *v1.Endpoints, wrapperInCluster bool) *v1.Endpoints { epsKey := types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name} if wrapperInCluster { eps = genLocalEndpoints(eps) } // dangling endpoints svc, ok := services[epsKey] if !ok { klog.V(4).Infof("Dangling endpoints %s, %+#v", eps.Name, eps.Subsets) return eps } // normal service if len(svc.keys) == 0 { klog.V(4).Infof("Normal endpoints %s, %+#v", eps.Name, eps.Subsets) return eps } // topology endpoints newEps := eps.DeepCopy() for si := range newEps.Subsets { subnet := &newEps.Subsets[si] subnet.Addresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.Addresses) subnet.NotReadyAddresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.NotReadyAddresses) } klog.V(4).Infof("Topology endpoints %s: subnets from %+#v to %+#v", eps.Name, eps.Subsets, newEps.Subsets) return newEps } // filterConcernedAddresses aims to filter out endpoints addresses within the same node unit func filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer, addresses []v1.EndpointAddress) []v1.EndpointAddress { hostNode, found := nodes[types.NamespacedName{Name: hostName}] if !found { return nil } filteredEndpointAddresses := make([]v1.EndpointAddress, 0) for i := range addresses { addr := addresses[i] if nodeName := addr.NodeName; nodeName != nil { epsNode, found := nodes[types.NamespacedName{Name: *nodeName}] if !found { continue } if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) { filteredEndpointAddresses = append(filteredEndpointAddresses, addr) } } } return filteredEndpointAddresses } func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool { if n1 == nil || n2 == nil { return false } for _, key := range keys { val1, v1found := n1[key] val2, v2found := n2[key] if v1found && v2found && val1 == val2 { return true } } return false }算法逻辑如下: