前文说到Kubelet启动时,调用到kubelet.Run方法,里面最核心的就是调用到kubelet.syncLoop。它是一个循环,这个循环里面有若干个检查和同步操作,其中一个是地在监听Pod的增删改事件,当一个Pod被Scheduler调度到某个Node之后,就会触发到kubelet.syncLoop里面的事件,经过一系列的操作,最后达到Pod正常跑起来。
kubelet.syncLoop kubelet.syncLoop /pkg/kubelet/kubelet.go |--kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) |--u, open := <-configCh |--handler.HandlePodAdditions(u.Pods)即Kubelet.HandlePodAdditions |--sort.Sort(sliceutils.PodsByCreationTime(pods)) |--kl.handleMirrorPod(pod, start) |--kl.dispatchWork |--kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) |--kl.podWorkers.UpdatePod即podWorkers.UpdatePod /pkg/kubelet/pod_worker.go |--p.managePodLoop |--p.syncPodFn syncLoop即使没有需要更新的 pod 配置,kubelet 也会定时去做同步和清理 pod 的工作。然后在 for 循环中一直调用 syncLoopIteration,如果在每次循环过程中出现比较严重的错误,kubelet 会记录到 runtimeState 中,遇到错误就等待 5 秒中继续循环。
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { // syncTicker 每秒检测一次是否有需要同步的 pod workers syncTicker := time.NewTicker(time.Second) defer syncTicker.Stop() // 每两秒检测一次是否有需要清理的 pod housekeepingTicker := time.NewTicker(housekeepingPeriod) defer housekeepingTicker.Stop() // pod 的生命周期变化 plegCh := kl.pleg.Watch() ... for { if err := kl.runtimeState.runtimeErrors(); err != nil { klog.Errorf("skipping pod synchronization - %v", err) // exponential backoff time.Sleep(duration) duration = time.Duration(math.Min(float64(max), factor*float64(duration))) continue } // reset backoff if we have a success duration = base ... if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break } ... } ... } syncLoopIterationsyncLoopIteration 这个方法就会对多个管道进行遍历,发现任何一个管道有消息就交给 handler 去处理。对于pod创建相关的就是configCh,它会传递来自3个来源(file,http,apiserver)的pod的变化(增,删,改)。其他相关管道还有没1秒同步一次pod的syncCh,每1秒检查一下是否需要清理pod的housekeepingCh 等等。
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { select { case u, open := <-configCh: //三个来源的更新事件 .... switch u.Op { case kubetypes.ADD: klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods)) // After restarting, kubelet will get all existing pods through // ADD as if they are new pods. These pods will then go through the // admission process and *may* be rejected. This can be resolved // once we have checkpointing. handler.HandlePodAdditions(u.Pods) ..... } case <-syncCh: //定时器1秒一次,说是sync .... case update := <-kl.livenessManager.Updates(): ///存活检查 .... case <-housekeepingCh: //定时器2秒一次,清理的 pod } HandlePodAddtions 处理pod的新增事件 func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { sort.Sort(sliceutils.PodsByCreationTime(pods)) //将pods按照创建日期排列,保证最先创建的 pod 会最先被处理 for _, pod := range pods { // 把 pod 加入到 podManager 中。statusManager,volumeManager,runtimeManager都依赖于这个podManager kl.podManager.AddPod(pod) //处理静态pod,实际上内部同样是调用了kl.dispatchWork,这里主要跳过了拒绝掉pod的判断 if kubetypes.IsMirrorPod(pod) { kl.handleMirrorPod(pod, start) continue } if !kl.podIsTerminated(pod) { // Only go through the admission process if the pod is not // terminated. // We failed pods that we rejected, so activePods include all admitted // pods that are alive. activePods := kl.filterOutTerminatedPods(existingPods) ////验证 pod 是否能在该节点运行,如果不可以直接拒绝; // Check if we can admit the pod; if not, reject it. if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { kl.rejectPod(pod, reason, message) continue } } .... kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) ..... } } UpdatePod此处调用managePodLoop通过一个协程去执行,通过一个podUpdates的map标记是否有创建过协程,然后通过working这个map标记是否有运行,没有运行的往通道里面传递,让managePodLoop得以执行
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) { var podUpdates chan UpdatePodOptions if podUpdates, exists = p.podUpdates[uid]; !exists { p.podUpdates[uid] = podUpdates go func() { defer runtime.HandleCrash() p.managePodLoop(podUpdates) }() } if !p.isWorking[pod.UID] { p.isWorking[pod.UID] = true podUpdates <- *options } else { ... } .... } managePodLoop