日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
詳解kubernetes狀態(tài)同步機(jī)制核心

在K8s中將Pod調(diào)度到某一臺(tái)Node節(jié)點(diǎn)之后,后續(xù)的狀態(tài)維護(hù)信息則是由對(duì)應(yīng)機(jī)器上的kubelet進(jìn)行維護(hù),如何實(shí)時(shí)反饋本地運(yùn)行狀態(tài),并通知apiserver則是設(shè)計(jì)的難點(diǎn), 本節(jié)主要是通過感知Pod狀態(tài)變化和探測(cè)狀態(tài)改變兩個(gè)流程來(lái)實(shí)際分析其核心數(shù)據(jù)結(jié)構(gòu)。

創(chuàng)新互聯(lián)是專業(yè)的錫林郭勒盟網(wǎng)站建設(shè)公司,錫林郭勒盟接單;提供成都做網(wǎng)站、成都網(wǎng)站建設(shè)、成都外貿(mào)網(wǎng)站建設(shè),網(wǎng)頁(yè)設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行錫林郭勒盟網(wǎng)站開發(fā)網(wǎng)頁(yè)制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來(lái)合作!

靜態(tài)Pod

靜態(tài)Pod主要是指的那些不是通過感知apiserver創(chuàng)建的pod, 因?yàn)閍piserver上并不包含,但是同時(shí)也需要維護(hù)和獲取這類Pod的狀態(tài), k8s中就設(shè)計(jì)了一個(gè)鏡像Pod的概念,其實(shí)就是為靜態(tài)Pod鏡像出來(lái)一個(gè)Pod該P(yáng)od的主要信息與靜態(tài)Pod一致,并且在apiserver中進(jìn)行創(chuàng)建,通過apiserver可以感知的這個(gè)鏡像Pod來(lái)反映真實(shí)的靜態(tài)Pod的狀態(tài),

狀態(tài)數(shù)據(jù)源

statusManager是進(jìn)行狀態(tài)同步的關(guān)鍵組件其需要綜合當(dāng)前Pod運(yùn)行中的數(shù)據(jù)和apiserver存儲(chǔ)的數(shù)據(jù),從而決定最終的狀態(tài)轉(zhuǎn)換, 這里先關(guān)注圖上畫出來(lái)的,更多的狀態(tài)等后續(xù)會(huì)一一介紹

版本一致性

type versionedPodStatus struct {
status v1.PodStatus
// 單調(diào)遞增的版本號(hào)(每個(gè)pod)
version uint64
// Pod name & namespace, for sending updates to API server.
podName      string
podNamespace string
}

在Kubelet中為保證與apiserver端信息的同步,在本地保存了一個(gè)Pod狀態(tài)版本信息,其里面除了保存當(dāng)前Pod的狀態(tài)數(shù)據(jù)還有一個(gè)版本版本號(hào),通過單調(diào)遞增的版本號(hào)的對(duì)比來(lái)確定是否進(jìn)行狀態(tài)的同步

核心源碼實(shí)現(xiàn)

statusManager的流程其實(shí)還是蠻復(fù)雜的,今天我們就只講一個(gè)場(chǎng)景,即kubelet通過apiserver感知到一個(gè)Pod更新,然后順著該功能的數(shù)據(jù)流來(lái)進(jìn)行梳理statusMangaer里面的數(shù)據(jù)流轉(zhuǎn)

核心數(shù)據(jù)結(jié)構(gòu)

manager中的核心狀態(tài)相關(guān)的數(shù)據(jù)結(jié)構(gòu)可以主要分為兩大類:映射數(shù)據(jù)維護(hù)(podManager、podStatuses、apiStatusVersions)數(shù)據(jù)通信管道(podStatusChannel), 剩余的則是對(duì)與apiserver通信的kublet和進(jìn)行pod刪除檢查的 podDeletionSafety

type manager struct {
kubeClient clientset.Interface
       // 管理緩存Pod,包含鏡像pod和靜態(tài)pod的映射
podManager kubepod.Manager
// 從pod UID映射到相應(yīng)pod的版本狀態(tài)信息 。
podStatuses      map[types.UID]versionedPodStatus
podStatusesLock  sync.RWMutex
podStatusChannel chan podStatusSyncRequest
// 存儲(chǔ)鏡像pod的版本
apiStatusVersions map[kubetypes.MirrorPodUID]uint64
podDeletionSafety PodDeletionSafetyProvider
}

設(shè)置Pod狀態(tài)

設(shè)置Pod狀態(tài)主要是位于kubelet中的syncPod中,在接收到pod事件變更之后,會(huì)與apiserver進(jìn)行 Pod最新數(shù)據(jù)的同步從而獲取當(dāng)前pod在apiserver端的最新狀態(tài)

func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()

for _, c := range pod.Status.Conditions {
 if !kubetypes.PodConditionByKubelet(c.Type) {
  klog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+
   "But it is not owned by kubelet.", string(c.Type), format.Pod(pod))
 }
}
// Make sure we're caching a deep copy. status = *status.DeepCopy() // 如果Pod被刪除了則需要強(qiáng)制與apiserver進(jìn)行信息的同步 m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil) } 

更新內(nèi)部緩存狀態(tài)產(chǎn)生同步事件

獲取緩存狀態(tài)

var oldStatus v1.PodStatus
// 檢測(cè)之前的本地緩存數(shù)據(jù)
cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached {
 oldStatus = cachedStatus.status
} else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
 oldStatus = mirrorPod.Status
} else {
 oldStatus = pod.Status
}

檢測(cè)容器狀態(tài)

檢測(cè)容器狀態(tài)主要是針對(duì)容器終止?fàn)顟B(tài)轉(zhuǎn)發(fā)的合法性進(jìn)行檢測(cè),其實(shí)就是根據(jù)設(shè)定的Pod的RestartPolicy來(lái)檢測(cè)針對(duì)一個(gè)終止的容器是否可以進(jìn)行重啟

if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
 klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
 return false
}
if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
 klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
 return false
}

更新PodCondition最后轉(zhuǎn)換時(shí)間

通過最新的status里面的condition設(shè)定對(duì)應(yīng)PodCondition的LastTransitionTime更新時(shí)間未當(dāng)前時(shí)間

// Set ContainersReadyCondition.LastTransitionTime.
updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)

// Set ReadyCondition.LastTransitionTime.
updateLastTransitionTime(&status, &oldStatus, v1.PodReady)

// Set InitializedCondition.LastTransitionTime.
updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)

// Set PodScheduledCondition.LastTransitionTime.
updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)

校對(duì)時(shí)間截?cái)噙^長(zhǎng)信息

首先會(huì)根據(jù)當(dāng)前容器的個(gè)數(shù),從而決定每個(gè)容器最大的字節(jié)數(shù)大小,然后對(duì)容器里面的終止?fàn)顟B(tài)里面的Message信息,進(jìn)行截?cái)啵瑫r(shí)進(jìn)行時(shí)間的校對(duì)

 normalizeStatus(pod, &status)

狀態(tài)更新條件檢測(cè)

如果之前已經(jīng)緩存了對(duì)應(yīng)的數(shù)據(jù),并且緩存的數(shù)據(jù)與當(dāng)前的狀態(tài)未發(fā)生改變,也不需要強(qiáng)制更新,就直接返回

if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
 // 如果不強(qiáng)制更新 ,默認(rèn)是true此處不會(huì)成立
 klog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
 return false // No new status.
}

生成同步事件更新緩存

生成最新的狀態(tài)緩存數(shù)據(jù),并且遞增本地的版本信息

// 構(gòu)建新的狀態(tài)
newStatus := versionedPodStatus{
 status:       status,
 version:      cachedStatus.version + 1, // 更新器緩存
 podName:      pod.Name,
 podNamespace: pod.Namespace,
}
// 更新新的緩存狀態(tài)
m.podStatuses[pod.UID] = newStatus

select {
case m.podStatusChannel "Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
  pod.UID, newStatus.version, newStatus.status)
 
 return true
default:
 // Let the periodic syncBatch handle the update if the channel is full.
 // We can't block, since we hold the mutex lock.  klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",   format.Pod(pod), status)  return false } 

探測(cè)狀態(tài)更新

探測(cè)狀態(tài)其實(shí)就是Pod內(nèi)容器的運(yùn)行狀態(tài),比如如果設(shè)置了Readiness探測(cè),當(dāng)某個(gè)容器探測(cè)失敗的時(shí)候,就會(huì)通知對(duì)應(yīng)的service從后端的enpoint中移除該P(yáng)od, 讓我們一起看看Kubelet是如何將運(yùn)行狀態(tài)通知到apiserver端的

獲取當(dāng)前狀態(tài)

func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()

// 獲取本地的容器
pod, ok := m.podManager.GetPodByUID(podUID)
if !ok {
 klog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID))
 return
}

// 獲取當(dāng)前的狀態(tài)
oldStatus, found := m.podStatuses[pod.UID]
if !found {
 klog.Warningf("Container readiness changed before pod has synced: %q - %q",
  format.Pod(pod), containerID.String())
 return
}

// 獲取當(dāng)前的容器狀態(tài)
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
if !ok {
 klog.Warningf("Container readiness changed for unknown container: %q - %q",
  format.Pod(pod), containerID.String())
 return
}

檢測(cè)狀態(tài)是否發(fā)生改變

// 檢測(cè)前后的就緒狀態(tài)是否發(fā)生改變
if containerStatus.Ready == ready {
 klog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
  format.Pod(pod), containerID.String())
 return
}

修改容器的就緒狀態(tài)

獲取容器的狀態(tài),修改就緒為當(dāng)前的狀態(tài)

status := *oldStatus.status.DeepCopy()
containerStatus, _, _ = findContainerStatus(&status, containerID.String())
containerStatus.Ready = ready

根據(jù)最新的容器狀態(tài)修改

會(huì)根據(jù)當(dāng)前運(yùn)行時(shí)的容器探測(cè)的狀態(tài),來(lái)修改對(duì)應(yīng)PodCondition里面的狀態(tài),最后調(diào)用內(nèi)部的更新邏輯

updateConditionFunc := func(conditionType v1.PodConditionType, condition v1.PodCondition) {
 conditionIndex := -1
 // 獲取Pod對(duì)應(yīng)的PodCondition狀態(tài)
 for i, condition := range status.Conditions {
  if condition.Type == conditionType {
   conditionIndex = i
   break
  }
 }
       // 修改或追加Pod對(duì)應(yīng)的PodCondition狀態(tài)
 if conditionIndex != -1 {
  status.Conditions[conditionIndex] = condition
 } else {
  klog.Warningf("PodStatus missing %s type condition: %+v", conditionType, status)
  status.Conditions = append(status.Conditions, condition)
 }
}
// 計(jì)算Ready狀態(tài)
updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase))
// 計(jì)算容器Ready狀態(tài)
updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase))
m.updateStatusInternal(pod, status, false)

啟動(dòng)后臺(tái)同步任務(wù)

statusManager會(huì)啟動(dòng)一個(gè)后臺(tái)的線程來(lái)進(jìn)行更新管道里面同步請(qǐng)求的消費(fèi)

func (m *manager) Start() {
// 省略非核心代碼
go wait.Forever(func() {
 select {
 case syncRequest := "Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
   syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
  m.syncPod(syncRequest.podUID, syncRequest.status)
 case 

同步Pod狀態(tài)

同步條件檢測(cè)

同步條件檢測(cè)主要是檢測(cè)鏡像Pod的版本是否發(fā)送變化、Pod當(dāng)前是否被刪除,如果pod沒有被刪除則返回false,即對(duì)一個(gè)沒有刪除的Pod我們還是需要繼續(xù)更新其狀態(tài)的

if !m.needsUpdate(uid, status) {
 klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
 return
}

通過apiserver獲取最新Pod數(shù)據(jù)

如果沒有獲取到Pod信息,則直接進(jìn)行退出即可

pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{})
if errors.IsNotFound(err) {
 klog.V(3).Infof("Pod %q does not exist on the server", format.PodDesc(status.podName, status.podNamespace, uid))
 // 如果Pod已經(jīng)被刪除了,就直接退出就行
 return
}
if err != nil {
 klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
 return
}

調(diào)用Patch接口進(jìn)行更新

這里面會(huì)通過將最小的狀態(tài)與之前的狀態(tài)來(lái)進(jìn)行merge合并,然后調(diào)用kubeClient進(jìn)行apiserver端狀態(tài)的修改

oldStatus := pod.Status.DeepCopy()
// 更新服務(wù)端的狀態(tài)
newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
if err != nil {
 klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
 return
}

更新本地的Apiserver端的版本信息

// 當(dāng)前是最新的狀態(tài)
pod = newPod

klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version

檢測(cè)刪除Pod

這里主要是最后階段,即Pod對(duì)應(yīng)的資源都已經(jīng)釋放了,則才最終刪除apiserver端的Pod

// 如果pod的DeletionTimestamp被設(shè)置,則對(duì)應(yīng)的Pod需要被刪除
if m.canBeDeleted(pod, status.status) {
 deleteOptions := metav1.NewDeleteOptions(0)
 
 deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
 //  調(diào)用apiserver對(duì)Pod進(jìn)行刪除
 err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
 if err != nil {
  klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
  return
 }
 klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
 m.deletePodStatus(uid)
}

文章題目:詳解kubernetes狀態(tài)同步機(jī)制核心
轉(zhuǎn)載來(lái)于:http://m.5511xx.com/article/cdieeoo.html