判断 endpoint 是否为 default kubernetes service,如果是,则将该 endpoint 转化为 wrapper 所在边缘节点的 lite-apiserver 地址(127.0.0.1)和端口(51003)
apiVersion: v1 kind: Endpoints metadata: annotations: superedge.io/local-endpoint: 127.0.0.1 superedge.io/local-port: "51003" name: kubernetes namespace: default subsets: - addresses: - ip: 172.31.0.60 ports: - name: https port: xxx protocol: TCP func genLocalEndpoints(eps *v1.Endpoints) *v1.Endpoints { if eps.Namespace != metav1.NamespaceDefault || eps.Name != MasterEndpointName { return eps } klog.V(4).Infof("begin to gen local ep %v", eps) ipAddress, e := eps.Annotations[EdgeLocalEndpoint] if !e { return eps } portStr, e := eps.Annotations[EdgeLocalPort] if !e { return eps } klog.V(4).Infof("get local endpoint %s:%s", ipAddress, portStr) port, err := strconv.ParseInt(portStr, 10, 32) if err != nil { klog.Errorf("parse int %s err %v", portStr, err) return eps } ip := net.ParseIP(ipAddress) if ip == nil { klog.Warningf("parse ip %s nil", ipAddress) return eps } nep := eps.DeepCopy() nep.Subsets = []v1.EndpointSubset{ { Addresses: []v1.EndpointAddress{ { IP: ipAddress, }, }, Ports: []v1.EndpointPort{ { Protocol: v1.ProtocolTCP, Port: int32(port), Name: "https", }, }, }, } klog.V(4).Infof("gen new endpoint complete %v", nep) return nep }这样做的目的是使边缘节点上的服务采用集群内 (InCluster) 方式访问的 apiserver 为本地的 lite-apiserver,而不是云端的 apiserver
从 storageCache.servicesMap cache 中根据 endpoint 名称 (namespace/name) 取出对应 service,如果该 service 没有 topologyKeys 则无需做拓扑转化 (非service group)
func getTopologyKeys(objectMeta *metav1.ObjectMeta) []string { if !hasTopologyKey(objectMeta) { return nil } var keys []string keyData := objectMeta.Annotations[TopologyAnnotationsKey] if err := json.Unmarshal([]byte(keyData), &keys); err != nil { klog.Errorf("can't parse topology keys %s, %v", keyData, err) return nil } return keys }调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint
// filterConcernedAddresses aims to filter out endpoints addresses within the same node unit func filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer, addresses []v1.EndpointAddress) []v1.EndpointAddress { hostNode, found := nodes[types.NamespacedName{Name: hostName}] if !found { return nil } filteredEndpointAddresses := make([]v1.EndpointAddress, 0) for i := range addresses { addr := addresses[i] if nodeName := addr.NodeName; nodeName != nil { epsNode, found := nodes[types.NamespacedName{Name: *nodeName}] if !found { continue } if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) { filteredEndpointAddresses = append(filteredEndpointAddresses, addr) } } } return filteredEndpointAddresses } func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool { if n1 == nil || n2 == nil { return false } for _, key := range keys { val1, v1found := n1[key] val2, v2found := n2[key] if v1found && v2found && val1 == val2 { return true } } return false }注意:如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该 service
回到 rebuildEndpointsMap,在调用 pruneEndpoints 刷新了同一个拓扑域内的 endpoint 后,会将修改后的 endpoints 赋值给 storageCache.endpointsMap [endpoint]. modified (该字段记录了拓扑感知后修改的endpoints)。
func (nh *nodeHandler) add(node *v1.Node) { sc := nh.cache sc.mu.Lock() nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name} klog.Infof("Adding node %v", nodeKey) sc.nodesMap[nodeKey] = &nodeContainer{ node: node, labels: node.Labels, } // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } } // rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events func (sc *storageCache) rebuildEndpointsMap() []watch.Event { evts := make([]watch.Event, 0) for name, endpointsContainer := range sc.endpointsMap { newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster) if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) { continue } sc.endpointsMap[name].modified = newEps evts = append(evts, watch.Event{ Type: watch.Modified, Object: newEps, }) } return evts }另外,如果 endpoints (拓扑感知后修改的 endpoints) 发生改变,会构建 watch event,传递给 endpoints handler (interceptEndpointsRequest) 处理
2、ServiceEventHandler
storageCache.servicesMap 结构体 key 为 service 名称 (namespace/name),value 为 serviceContainer,包含如下数据:
svc:service对象
keys:service topologyKeys