ServiceGrid 对应产生的 service,命名为:{ServiceGrid}-svc
func (sgc *ServiceGridController) syncServiceGrid(key string) error { startTime := time.Now() klog.V(4).Infof("Started syncing service grid %q (%v)", key, startTime) defer func() { klog.V(4).Infof("Finished syncing service grid %q (%v)", key, time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } sg, err := sgc.svcGridLister.ServiceGrids(namespace).Get(name) if errors.IsNotFound(err) { klog.V(2).Infof("service grid %v has been deleted", key) return nil } if err != nil { return err } if sg.Spec.GridUniqKey == "" { sgc.eventRecorder.Eventf(sg, corev1.EventTypeWarning, "Empty", "This service grid has an empty grid key") return nil } // get service workload list of this grid svcList, err := sgc.getServiceForGrid(sg) if err != nil { return err } if sg.DeletionTimestamp != nil { return nil } // sync service grid relevant services workload return sgc.reconcile(sg, svcList) } func (sgc *ServiceGridController) getServiceForGrid(sg *crdv1.ServiceGrid) ([]*corev1.Service, error) { svcList, err := sgc.svcLister.Services(sg.Namespace).List(labels.Everything()) if err != nil { return nil, err } labelSelector, err := common.GetDefaultSelector(sg.Name) if err != nil { return nil, err } canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { fresh, err := sgc.crdClient.SuperedgeV1().ServiceGrids(sg.Namespace).Get(context.TODO(), sg.Name, metav1.GetOptions{}) if err != nil { return nil, err } if fresh.UID != sg.UID { return nil, fmt.Errorf("orignal service grid %v/%v is gone: got uid %v, wanted %v", sg.Namespace, sg.Name, fresh.UID, sg.UID) } return fresh, nil }) cm := controller.NewServiceControllerRefManager(sgc.svcClient, sg, labelSelector, util.ControllerKind, canAdoptFunc) return cm.ClaimService(svcList) } func (sgc *ServiceGridController) reconcile(g *crdv1.ServiceGrid, svcList []*corev1.Service) error { var ( adds []*corev1.Service updates []*corev1.Service deletes []*corev1.Service ) sgTargetSvcName := util.GetServiceName(g) isExistingSvc := false for _, svc := range svcList { if svc.Name == sgTargetSvcName { isExistingSvc = true template := util.KeepConsistence(g, svc) if !apiequality.Semantic.DeepEqual(template, svc) { updates = append(updates, template) } } else { deletes = append(deletes, svc) } } if !isExistingSvc { adds = append(adds, util.CreateService(g)) } return sgc.syncService(adds, updates, deletes) } func CreateService(sg *crdv1.ServiceGrid) *corev1.Service { svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: GetServiceName(sg), Namespace: sg.Namespace, // Append existed ServiceGrid labels to service to be created Labels: func() map[string]string { if sg.Labels != nil { newLabels := sg.Labels newLabels[common.GridSelectorName] = sg.Name newLabels[common.GridSelectorUniqKeyName] = sg.Spec.GridUniqKey return newLabels } else { return map[string]string{ common.GridSelectorName: sg.Name, common.GridSelectorUniqKeyName: sg.Spec.GridUniqKey, } } }(), Annotations: make(map[string]string), }, Spec: sg.Spec.Template, } keys := make([]string, 1) keys[0] = sg.Spec.GridUniqKey keyData, _ := json.Marshal(keys) svc.Annotations[common.TopologyAnnotationsKey] = string(keyData) return svc }由于逻辑与DeploymentGrid类似,这里不展开细节,重点关注 application-grid-wrapper 部分
application-grid-wrapper 分析在 ServiceGrid Controller 创建完 service 之后,application-grid-wrapper 的作用就开始启动了:
apiVersion: v1 kind: Service metadata: annotations: topologyKeys: '["zone1"]' creationTimestamp: "2021-03-03T07:33:30Z" labels: superedge.io/grid-selector: servicegrid-demo name: servicegrid-demo-svc namespace: default ownerReferences: - apiVersion: superedge.io/v1 blockOwnerDeletion: true controller: true kind: ServiceGrid name: servicegrid-demo uid: 78c74d3c-72ac-4e68-8c79-f1396af5a581 resourceVersion: "127987090" selfLink: /api/v1/namespaces/default/services/servicegrid-demo-svc uid: 8130ba7b-c27e-4c3a-8ceb-4f6dd0178dfc spec: clusterIP: 192.168.161.1 ports: - port: 80 protocol: TCP targetPort: 8080 selector: appGrid: echo sessionAffinity: None type: ClusterIP status: loadBalancer: {}为了实现 Kubernetes 零侵入,需要在 kube-proxy与apiserver 通信之间添加一层 wrapper,架构如下:
调用链路如下:
kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver