13.深入k8s:Pod 水平自动扩缩HPA及其源码分析 (2)

文件位置:cmd/kube-controller-manager/app/controllermanager.go

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc { ... controllers["horizontalpodautoscaling"] = startHPAController ... }

HPA Controller和其他的Controller一样,都在NewControllerInitializers方法中进行注册,然后通过startHPAController来启动。

startHPAController

文件位置:cmd/kube-controller-manager/app/autoscaling.go

func startHPAController(ctx ControllerContext) (http.Handler, bool, error) { ... return startHPAControllerWithLegacyClient(ctx) } func startHPAControllerWithLegacyClient(ctx ControllerContext) (http.Handler, bool, error) { hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") metricsClient := metrics.NewHeapsterMetricsClient( hpaClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort, ) return startHPAControllerWithMetricsClient(ctx, metricsClient) } func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) { hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery()) scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) if err != nil { return nil, false, err } // 初始化 go podautoscaler.NewHorizontalController( hpaClient.CoreV1(), scaleClient, hpaClient.AutoscalingV1(), ctx.RESTMapper, metricsClient, ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), ctx.InformerFactory.Core().V1().Pods(), ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, ).Run(ctx.Stop) return nil, true, nil }

最后会调用到startHPAControllerWithMetricsClient方法,启动一个线程来调用NewHorizontalController方法初始化一个HPA Controller,然后执行Run方法。

Run

文件位置:pkg/controller/podautoscaler/horizontal.go

func (a *HorizontalController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer a.queue.ShutDown() klog.Infof("Starting HPA controller") defer klog.Infof("Shutting down HPA controller") if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) { return } // 启动异步线程,每秒执行一次 go wait.Until(a.worker, time.Second, stopCh) <-stopCh }

这里会调用worker执行具体的扩缩容的逻辑。

核心代码分析

worker里面一路执行下来会走到reconcileAutoscaler方法里面,这里是HPA的核心。下面我们专注看看这部分。

reconcileAutoscaler:计算副本数 func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error { ... //副本数为0,不启动自动扩缩容 if scale.Spec.Replicas == 0 && minReplicas != 0 { // Autoscaling is disabled for this resource desiredReplicas = 0 rescale = false setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero") // 如果当前副本数大于最大期望副本数,那么设置期望副本数为最大副本数 } else if currentReplicas > hpa.Spec.MaxReplicas { rescaleReason = "Current number of replicas above Spec.MaxReplicas" desiredReplicas = hpa.Spec.MaxReplicas // 同上 } else if currentReplicas < minReplicas { rescaleReason = "Current number of replicas below Spec.MinReplicas" desiredReplicas = minReplicas } else { var metricTimestamp time.Time //计算需要扩缩容的数量 metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics) if err != nil { ... } klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference) rescaleMetric := "" if metricDesiredReplicas > desiredReplicas { desiredReplicas = metricDesiredReplicas rescaleMetric = metricName } if desiredReplicas > currentReplicas { rescaleReason = fmt.Sprintf("%s above target", rescaleMetric) } if desiredReplicas < currentReplicas { rescaleReason = "All metrics below target" } //从1.18开始支持behavior字段 //可以在扩缩容的时候指定一个稳定窗口,以防止缩放目标中的副本数量出现波动 //doc:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#support-for-configurable-scaling-behavior if hpa.Spec.Behavior == nil { desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas) } else { desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas) } rescale = desiredReplicas != currentReplicas } ... }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zypgss.html