博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kubernetes源码阅读笔记——Kubelet(之三)
阅读量:4518 次
发布时间:2019-06-08

本文共 15078 字,大约阅读时间需要 50 分钟。

回顾第一篇文章(),我们讲到RunKubelet方法实现kubelet的运行,而RunKubelet方法核心在于先调用CreateAndInitKubelet方法创建并初始化kubelet,后调用startKubelet方法运行kubelet:

cmd/kubelet/app/server.gofunc RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {	...	k, err := CreateAndInitKubelet(......)	if err != nil {		return fmt.Errorf("failed to create kubelet: %v", err)	}     if runOnce {           if _, err := k.RunOnce(podCfg.Updates()); err != nil {                 return fmt.Errorf("runonce failed: %v", err)           }           klog.Info("Started kubelet as runonce")     } else {           startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)           klog.Info("Started kubelet")     }     ...	return nil}

CreateAndInitKubelet方法我们上一篇文章已经分析过了,这一篇我们来分析startKubelet方法。

一、startKubelet

cmd/kubelet/app/kubelet.gofunc startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {    // start the kubelet    go wait.Until(func() {        k.Run(podCfg.Updates())    }, 0, wait.NeverStop)    // start the kubelet server    if enableServer {        go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)    }    if kubeCfg.ReadOnlyPort > 0 {        go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))    }    if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {        go k.ListenAndServePodResources()    }}

后面几行都是在运行HTTP Server,不提。关键还是在于启动goroutine运行Run方法。

进入Run方法:

pkg/kubelet/kubelet.go// Run starts the kubelet reacting to config updatesfunc (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {    if kl.logServer == nil {        kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))    }    if kl.kubeClient == nil {        klog.Warning("No api server defined - no node status update will be sent.")    }    // Start the cloud provider sync manager    if kl.cloudResourceSyncManager != nil {        go kl.cloudResourceSyncManager.Run(wait.NeverStop)    }    if err := kl.initializeModules(); err != nil {        kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())        klog.Fatal(err)    }    // Start volume manager    go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)    if kl.kubeClient != nil {        // Start syncing node status immediately, this may set up things the runtime needs to run.        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)        go kl.fastStatusUpdateOnce()        // start syncing lease        if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {            go kl.nodeLeaseController.Run(wait.NeverStop)        }    }    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)    // Start loop to sync iptables util rules    if kl.makeIPTablesUtilChains {        go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)    }    // Start a goroutine responsible for killing pods (that are not properly    // handled by pod workers).    go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)    // Start component sync loops.    kl.statusManager.Start()    kl.probeManager.Start()    // Start syncing RuntimeClasses if enabled.    if kl.runtimeClassManager != nil {        go kl.runtimeClassManager.Run(wait.NeverStop)    }    // Start the pod lifecycle event generator.    kl.pleg.Start()    kl.syncLoop(updates, kl)}

方法很重要,大体上执行了以下几件事:

(1)启动kubelet的重要组件,如volumemanager、probemanager、runtimeclassmanager等。

(2)持续向API Server注册自身状态(通过go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) 这一段代码实现),向API Server通知节点没有丢失。

(3)持续监测容器运行时状态(通过go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) 这一行代码实现)。

(4)运行kubelet主循环,也是最关键的一步。正如注释中所说,syncLoop的作用就是通过API Server、file、http三条途径获取pod的理想状态,并不断地将pod的当前状态向理想状态同步。这个方法的核心,在于for循环中调用的syncLoopIteration方法。

二、syncLoopIteration

进入syncLoopIteration方法:

pkg/kubelet/kubelet.gofunc (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {    select {    case u, open := <-configCh:        // Update from a config source; dispatch it to the right handler        // callback.        if !open {            klog.Errorf("Update channel is closed. Exiting the sync loop.")            return false        }        switch u.Op {        case kubetypes.ADD:            klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))            // After restarting, kubelet will get all existing pods through            // ADD as if they are new pods. These pods will then go through the            // admission process and *may* be rejected. This can be resolved            // once we have checkpointing.            handler.HandlePodAdditions(u.Pods)        case kubetypes.UPDATE:            klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))            handler.HandlePodUpdates(u.Pods)        case kubetypes.REMOVE:            klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))            handler.HandlePodRemoves(u.Pods)        case kubetypes.RECONCILE:            klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))            handler.HandlePodReconcile(u.Pods)        case kubetypes.DELETE:            klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))            // DELETE is treated as a UPDATE because of graceful deletion.            handler.HandlePodUpdates(u.Pods)        case kubetypes.RESTORE:            klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))            // These are pods restored from the checkpoint. Treat them as new            // pods.            handler.HandlePodAdditions(u.Pods)        case kubetypes.SET:            // TODO: Do we want to support this?            klog.Errorf("Kubelet does not support snapshot update")        }        if u.Op != kubetypes.RESTORE {            // If the update type is RESTORE, it means that the update is from            // the pod checkpoints and may be incomplete. Do not mark the            // source as ready.            // Mark the source ready after receiving at least one update from the            // source. Once all the sources are marked ready, various cleanup            // routines will start reclaiming resources. It is important that this            // takes place only after kubelet calls the update handler to process            // the update to ensure the internal pod cache is up-to-date.            kl.sourcesReady.AddSource(u.Source)        }    case e := <-plegCh:        if isSyncPodWorthy(e) {            // PLEG event for a pod; sync it.            if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {                klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)                handler.HandlePodSyncs([]*v1.Pod{pod})            } else {                // If the pod no longer exists, ignore the event.                klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)            }        }        if e.Type == pleg.ContainerDied {            if containerID, ok := e.Data.(string); ok {                kl.cleanUpContainersInPod(e.ID, containerID)            }        }    case <-syncCh:        // Sync pods waiting for sync        podsToSync := kl.getPodsToSync()        if len(podsToSync) == 0 {            break        }        klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))        handler.HandlePodSyncs(podsToSync)    case update := <-kl.livenessManager.Updates():        if update.Result == proberesults.Failure {            // The liveness manager detected a failure; sync the pod.            // We should not use the pod from livenessManager, because it is never updated after            // initialization.            pod, ok := kl.podManager.GetPodByUID(update.PodUID)            if !ok {                // If the pod no longer exists, ignore the update.                klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)                break            }            klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))            handler.HandlePodSyncs([]*v1.Pod{pod})        }    case <-housekeepingCh:        if !kl.sourcesReady.AllReady() {            // If the sources aren't ready or volume manager has not yet synced the states,            // skip housekeeping, as we may accidentally delete pods from unready sources.            klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")        } else {            klog.V(4).Infof("SyncLoop (housekeeping)")            if err := handler.HandlePodCleanups(); err != nil {                klog.Errorf("Failed cleaning pods: %v", err)            }        }    }    return true}

可以看到,方法通过select,筛选了五种可能的获取pod状态变化的渠道,分别是:

(1)configCh:读取配置事件的管道,就是之前说过的通过file、http和apiserver汇聚起来的事件。

(2)syncCh:定时器管道,每隔一段事件去同步最新保存的pod状态。

(3)houseKeepingCh:housekeeping事件的管道,对pod进行定时清理。

(4)plegCh:PLEG 状态,如果 pod 的状态发生改变(因为某些情况被杀死,被暂停等),通过这个管道通知kubelet。

(5)livenessManager.Updates():通过健康检查监测pod是否可用,如果不可用会通知kubelet。

事实上,我们可以看到,无论哪种渠道最后都会去调用syncHandler接口中的方法HandlePodXXX方法。我们以HandlePodAdditions为例:

pkg/kubelet/kubelet.go// HandlePodAdditions is the callback in SyncHandler for pods being added from// a config source.func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {    start := kl.clock.Now()    sort.Sort(sliceutils.PodsByCreationTime(pods))    for _, pod := range pods {        // Responsible for checking limits in resolv.conf        if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {            kl.dnsConfigurer.CheckLimitsForResolvConf()        }        existingPods := kl.podManager.GetPods()        // Always add the pod to the pod manager. Kubelet relies on the pod        // manager as the source of truth for the desired state. If a pod does        // not exist in the pod manager, it means that it has been deleted in        // the apiserver and no action (other than cleanup) is required.        kl.podManager.AddPod(pod)        if kubepod.IsMirrorPod(pod) {            kl.handleMirrorPod(pod, start)            continue        }        if !kl.podIsTerminated(pod) {            // Only go through the admission process if the pod is not            // terminated.            // We failed pods that we rejected, so activePods include all admitted            // pods that are alive.            activePods := kl.filterOutTerminatedPods(existingPods)            // Check if we can admit the pod; if not, reject it.            if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {                kl.rejectPod(pod, reason, message)                continue            }        }        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)        kl.probeManager.AddPod(pod)    }}

有以下几点比较关键:

(1)将pod加入podmanager中。podmanager是kubelet创建时初始化的众多manager之一,用于保存pod的理想状态。

(2)mirror pod的意思是,对于apiserver之外的途径(即file或http)途径生成的pod(称为静态pod),apiserver不能感知到它的存在。因此kubelet会创建一个name和namespace与这个静态pod一样的pod,以便apiserver可以感知到这个pod的存在。这个由kubelet自动创建的pod就被称为mirror pod。静态pod的使用较少,可以不用太关注。

(3)调用dispatchWork方法,执行具体的同步操作。事实上,其他几个syncHandler方法最后都会调用这个dispatchWork方法。

三、dispatchWork

pkg/kubelet/kubelet.go// dispatchWork starts the asynchronous sync of the pod in a pod worker.// If the pod is terminated, dispatchWorkfunc (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {    if kl.podIsTerminated(pod) {        if pod.DeletionTimestamp != nil {            // If the pod is in a terminated state, there is no pod worker to            // handle the work item. Check if the DeletionTimestamp has been            // set, and force a status update to trigger a pod deletion request            // to the apiserver.            kl.statusManager.TerminatePod(pod)        }        return    }    // Run the sync in an async worker.    kl.podWorkers.UpdatePod(&UpdatePodOptions{        Pod:        pod,        MirrorPod:  mirrorPod,        UpdateType: syncType,        OnCompleteFunc: func(err error) {            if err != nil {                metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))            }        },    })    // Note the number of containers for new pods.    if syncType == kubetypes.SyncPodCreate {        metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))    }}

可以看到,这个方法首先判断pod是不是即将删除,如果不是才会继续下面的操作。下面的操作就是调用UpdatePod方法,对pod异步地进行更新。

Update方法位于pkg/kubelet/pod_workers.go中:

pkg/kubelet/pod_workers.go// Apply the new setting to the specified pod.// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.// Update requests are ignored if a kill pod request is pending.func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {    pod := options.Pod    uid := pod.UID    var podUpdates chan UpdatePodOptions    var exists bool    p.podLock.Lock()    defer p.podLock.Unlock()    if podUpdates, exists = p.podUpdates[uid]; !exists {        // We need to have a buffer here, because checkForUpdates() method that        // puts an update into channel is called from the same goroutine where        // the channel is consumed. However, it is guaranteed that in such case        // the channel is empty, so buffer of size 1 is enough.        podUpdates = make(chan UpdatePodOptions, 1)        p.podUpdates[uid] = podUpdates        // Creating a new pod worker either means this is a new pod, or that the        // kubelet just restarted. In either case the kubelet is willing to believe        // the status of the pod for the first pod worker sync. See corresponding        // comment in syncPod.        go func() {            defer runtime.HandleCrash()            p.managePodLoop(podUpdates)        }()    }    if !p.isWorking[pod.UID] {        p.isWorking[pod.UID] = true        podUpdates <- *options    } else {        // if a request to kill a pod is pending, we do not let anything overwrite that request.        update, found := p.lastUndeliveredWorkUpdate[pod.UID]        if !found || update.UpdateType != kubetypes.SyncPodKill {            p.lastUndeliveredWorkUpdate[pod.UID] = *options        }    }}

方法首先会查找需要进行的更新操作是否已存在。如果不存在则创建一个channel,将更新操作传给podWorkers(podWorkers也是kubelet创建时初始化的一个组件),并创建一个goroutine处理这个更新操作。后面是判断这个更新操作是否已被处理了。

更新操作通过managePodLoop方法处理,而这个方法的核心是调用了podWorkers.syncPodFn方法。此方法是在podworkers初始化时传给podworkers的:

pkg/kubelet/kubelet.gofunc NewMainKubelet(...){    ...    klet.podWorkers = newPodWorkers(klet.syncPod, ...)    ...}

因此,本质上是调用了pkg/kubelet/kubelet.go中的syncPod方法。

下一篇文章,我们再来分析syncPod方法。

转载于:https://www.cnblogs.com/00986014w/p/10907712.html

你可能感兴趣的文章
ipv6下jdbc的连接数据库方式
查看>>
201521123069 《Java程序设计》第1周学习总结
查看>>
一线咨询师的絮絮叨叨
查看>>
文字分散对齐
查看>>
【NOIP 2012 国王游戏】 贪心+高精度
查看>>
【UOJ 117】欧拉回路
查看>>
用Pytorch训练MNIST分类模型
查看>>
一些不错的动画效果---郭雪彬
查看>>
iOS - TableViewCell分割线 --By吴帮雷
查看>>
jquery 获取input的值
查看>>
UVA 10003 - Cutting Sticks ( 区间dp )
查看>>
BETA 版冲刺前准备
查看>>
vue-表单绑定
查看>>
字典树(Trie)的基本实现(C++)
查看>>
Linux SSH & SCP命令
查看>>
用SQL语句操作数据
查看>>
Android——计算器
查看>>
函数指针的调用方式
查看>>
jacob 给word加印的功能
查看>>
利用for循环来实现全选
查看>>