[toc]
Pod创建流程代码版本kubelet篇
1. 前言
在k8s的面试中Pod的创建流程是一个常问的问题,而kubelet则无疑重中之重,之前也写过一篇Pod的运行,不过没有涉及到具体的代码,本文尝试用代码的方式,来复数整个核心的流程,同时为了方便记忆,又将整个过程分为:准备、配置、清理、构建运行四个阶段,让我们一起来看下吧, 文末有大图总结
2. 准备阶段
当获取到Pod添加的事件的时候,首先会进行一些基础的工作,我吧这个过程称为准备阶段,准备阶段主要做的事情有如下:1)加入PodManager 2)准入控制检查 3)分发事件 4)根据Pod添加对应的探针, 让我们一起来看下关键实现
2.1 加入PodManager
PodManager中的功能除了存储Pod的信息,还会进行对应Pod的configMap和secret的管理,当心加入Pod的时候,会检查对应的Pod是否有对应的configMap和secret配置,如果有则就会创建对应的监听器,监听资源的变化,进行本地缓存
除此之外,如果对应的Pod的BootstrapCheckpointAnnotationKey有设定,则还会创建对应的checkpoint,即将pod的配置数据写入到本地磁盘
kl.podManager.AddPod(pod)
2.2 准入控制检查
准入控制检查主要是在运行Pod之前在kubelet上进行Pod运行条件的检查,检查当前节点在scheduler决策完成后到感知到Pod运行这段时间资源是否依旧满足,并且检查Pod的一些特殊资源比如比如sysctl、security等检查,这里我感觉比较重要的两个分别是eviction和predicate, 如果不满足准入检查,则会直接拒绝
2.2.1 eviction准入检查
如果当前节点只存在内存压力,则会根据对应的Pod的QOS等级来判断,如果说不是BestEffort或者容忍内存压力的污点,则会允许,否则则会拒绝运行
nodeOnlyHasMemoryPressureCondition := hasNodeCondition(m.nodeConditions, v1.NodeMemoryPressure) && len(m.nodeConditions) == 1
if nodeOnlyHasMemoryPressureCondition {
// 如果不是PodQOSBestEffort, 则都会尝试运行
notBestEffort := v1.PodQOSBestEffort != v1qos.GetPodQOS(attrs.Pod)
if notBestEffort {
return lifecycle.PodAdmitResult{Admit: true}
}
// 如果对应的Pod容忍内存压力的污点,则就可以继续进行其他准入控制器的检查
if v1helper.TolerationsTolerateTaint(attrs.Pod.Spec.Tolerations, &v1.Taint{
Key: v1.TaintNodeMemoryPressure,
Effect: v1.TaintEffectNoSchedule,
}) {
return lifecycle.PodAdmitResult{Admit: true}
}
}
2.2.2 predicate准入检查
predicate准入控制器中的逻辑主要是分为两个部分:1)检查对应的资源是否满足分配请求,同时会记录缺少的资源2)如果是Critical类型的Pod则会按照QOS等级来进行资源的抢占,满足这些高优先的Pod这里的Critical类型的Pod主要包含如下三类:静态Pod、镜像Pod、高优先Pod(优先级高于2000000000)
func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult {
node, err := w.getNodeAnyWayFunc()
// 踢出扩展资源,只进行内存和CPU资源的检查
podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)
// 进行预选算法筛选, 筛选出那些资源不足的资源
fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo)
if !fit {
// 如果预选失败,则尝试进行抢占
fit, reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
}
}
2.3 探针管理
k8s里面的探针主要分为三类:startup、readiness、liveness,在Pod通过准入控制检查后,会根据Pod的探针配置创建对应的探针,但是这里的探针并不会真正的进行探测,因为当前还无法感知到对应的pod的状态
kl.probeManager.AddPod(pod)
2.4 分发事件
在kubelet中会为每个Pod都创建一个对应的goroutine和事件管道,后续新的事件也都通过管道发送给对应的goroutine
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
// 获取pod信息
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
// kubelet会为每个pod创建一个goroutine, 并且通过管道来进行通信
if podUpdates, exists = p.podUpdates[uid]; !exists {
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
// 为当前pod启动一个goroutine
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
// 更新Pod的事件发送到管道
podUpdates <- *options
}
}
至此一个Pod的启动的准备阶段就基 本完成了,检查运行环境、拉取对应的cofnigMap和secret资源、创建探针、启动负责Pod状态维护的线程,至此准备阶段完成
3.配置阶段
在kubelet最终的状态同步都是由syncPod来完成,该函数会根据传递进来的目标状态和Pod的当前状态来进行决策,从而满足目标状态,因为内部逻辑的复杂,会分为:配置阶段、清理阶段、构建运行阶段,这里先看下配置阶段
配置阶段主要是获取当前的Pod状态、应用CGOUP配置、Pod数据目录构建、等待VOlume挂载、获取镜像拉取的secret等
3.1 计算Pod的状态
Pod的状态数据主要包含当前阶段、Conditions(容器Condition、初始化容器Condition、PodReadyCondition),而这些状态则需要根据当前的PodStatus里面的状态计算,还有probeManager里面探测的数据两部分共同完成
func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
allStatus := append(append([]v1.ContainerStatus{}, s.ContainerStatuses...), s.InitContainerStatuses...)
// 根据Pod的容器状态,设定当前的的阶段
s.Phase = getPhase(spec, allStatus)
kl.probeManager.UpdatePodStatus(pod.UID, s)
s.Conditions = append(s.Conditions, status.GeneratePodInitializedCondition(spec, s.InitContainerStatuses, s.Phase))
s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.Conditions, s.ContainerStatuses, s.Phase))
s.Conditions = append(s.Conditions, status.GenerateContainersReadyCondition(spec, s.ContainerStatuses, s.Phase))
return *s
}
3.2 运行环境准入检查
该运行环境是指的一些软件状态的,这里主要涉及到Appmor、特权模式、proc挂载,实现机制就是检测对应的Pod是否需要对应的操作,并且SecurityContext中是否允许对应的操作,从而确定Pod是否能够进行运行
func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
// 准入控制插件
for _, handler := range kl.softAdmitHandlers {
if result := handler.Admit(attrs); !result.Admit {
return result
}
}
return lifecycle.PodAdmitResult{Admit: true}
}
3.3 更新状态
更新状态主要是为了probeManager来进行状态检查的,如果probeManager无法获取到对应的状态,就不会执行对应的健康探针的检查,这里的状态就是根据之前的各种计算在kubelet上对应Pod的当前 状态
kl.statusManager.SetPodStatus(pod, apiPodStatus)
3.4 网络运行时检查
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
}
3.5 CGroup配置
Cgroup的配置主要是按照QOS等级来进行cgroup目录的构建,并且更新当前Pod的配置
pcm := kl.containerManager.NewPodContainerManager()
// cgroup应用cgroup
if !kl.podIsTerminated(pod) {
podKilled := false
if !pcm.Exists(pod) && !firstSync {
// 如果对于的cgroup不存在,并且也不是第一次运行,就先将之前的pod沙雕
if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
podKilled = true
}
}
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
if !pcm.Exists(pod) {
// 更新qoscgroup设置
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
}
// 更新podde的cgroup配置
if err := pcm.EnsureExists(pod); err != nil {
}
}
}
}
3.6 镜像Pod的检查
因为要通过镜像Pod来向apiserver传递静态Pod的状态,所以该阶段主要是为静态Pod创建对应的镜像Pod
if kubetypes.IsStaticPod(pod) {
// 静态pod
podFullName := kubecontainer.GetPodFullName(pod)
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
}
}
if mirrorPod == nil || deleted {
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
}
}
}
}
3.7 创建Pod的数据目录
Pod的数据目录主要是包含三个部分:Pod目录、Volume目录、Plugin目录三个目录
if err := kl.makePodDataDirs(pod); err != nil {
return err
}
3.8 等待volume的挂载
if !kl.podIsTerminated(pod) {
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
}
}
3.9 获取镜像拉取的secrets
pullSecrets := kl.getPullSecretsForPod(pod)
3.10 调用容器的运行时进行同步
着可能是最复杂的一部分了,接下来就进入到下一个阶段:清理阶段
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
4. 清理阶段
在Pod运行前可能已经有部分容器已经在运行,则此时就需要根据当前的状态,来进行一些容器的清理工作,为接下来的构建运行阶段提供一个相对干净的环境
4.1 计算Pod状态变更
在k8s中Pod的状态主要包含sandbox容器状态、初始化容器状态、临时容器状态、业务容器状态等几部分,我们依次来看下关键的实现
podContainerChanges := m.computePodActions(pod, podStatus)
沙箱状态计算:当且仅有一个Ready的沙箱并且沙箱的IP不为空的情况,沙箱的状态才不需要更改,其他情况下,都需要重新进行沙箱的构建,并且需要kill掉Pod关联的所有容器
func (m *kubeGenericRuntimeManager) podSandboxChanged(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, uint32, string) {
if len(podStatus.SandboxStatuses) == 0 {
return true, 0, ""
}
readySandboxCount := 0
for _, s := range podStatus.SandboxStatuses {
if s.State == runtimeapi.PodSandboxState_SANDBOX_READY {
readySandboxCount++
}
}
sandboxStatus := podStatus.SandboxStatuses[0]
if readySandboxCount > 1 {
return true, sandboxStatus.Metadata.Attempt + 1, sandboxStatus.Id
}
if sandboxStatus.State != runtimeapi.PodSandboxState_SANDBOX_READY {
return true, sandboxStatus.Metadata.Attempt + 1, sandboxStatus.Id
}
if sandboxStatus.GetLinux().GetNamespaces().GetOptions().GetNetwork() != networkNamespaceForPod(pod) {
return true, sandboxStatus.Metadata.Attempt + 1, ""
}
if !kubecontainer.IsHostNetworkPod(pod) && sandboxStatus.Network.Ip == "" {
return true, sandboxStatus.Metadata.Attempt + 1, sandboxStatus.Id
}
return false, sandboxStatus.Metadata.Attempt, sandboxStatus.Id
}
计算Pod的容器状态计算逻辑相对长一些,这里我就不贴代码了,其如要流程分为两个部分:
1.需要创建sandbox:
在该状态下,如果存在初始化容器,则会先进行初始化容器的初始化,即当前步骤只创建第一个初始化容器,如果没有初始化容器,则就将所有的业务容器加入到启动的列表里面
2.不需要创建sandbox:
该状态下会检查遍历所有的临时容器,初始化容器(如果存在失败的初始化容器,则就先启动初始化容器,不会进行业务容器的启动),业务容器,最终会构建一个需要kill掉的容器列表,还有两个启动的容器列表
4.2 killPod全部清理
需要进行KillPod的状态有两种:
sanbbox状态变更
即当sandbox状态不满足要求,则此时需要将Pod的所有容器都杀掉,然后进行重建
无需进行保留的容器
如果Pod对应的容器的hash值变更、状态为失败,则就需要重建
if podContainerChanges.KillPod {
// 杀死当前所有的pod
killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
if podContainerChanges.CreateSandbox {
// 终止初始化运行
m.purgeInitContainers(pod, podStatus)
}
}
4.3 部分清理
如果容器当前的状态是正常的,并且hash没有发生变化,则就不需要进行变更,此时就只需要将当前状态不正常的容器进行清理重建即可
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
return
}
}
清理初始化容器
在正式启动容器之前,除了上面两部分,还会进行初始化容器的清理工作
m.pruneInitContainersBeforeStart(pod, podStatus)
5.构建运行阶段
构建运行阶段,主要分为两个大的部分:创建并运行sandbox容器、运行用户容器
5.1 运行sandbox
检查需要创建sandbox,则会首先创建sandbox容器,并获取状态,然后填充当前的Pod的IP信息
// Step 4: Create a sandbox for the pod if necessary.
// 创建沙箱环境
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
if !kubecontainer.IsHostNetworkPod(pod) {
podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus)
}
}