diff --git a/node/pod.go b/node/pod.go index 49d17a0b1..5ff6ac867 100644 --- a/node/pod.go +++ b/node/pod.go @@ -214,11 +214,15 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes } kPod := obj.(*knownPod) kPod.Lock() + podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy() - kPod.Unlock() if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) && podFromProvider.DeletionTimestamp == nil { + kPod.lastPodStatusUpdateSkipped = true + kPod.Unlock() return nil } + kPod.lastPodStatusUpdateSkipped = false + kPod.Unlock() // Pod deleted by provider due some reasons. e.g. a K8s provider, pod created by deployment would be evicted when node is not ready. // If we do not delete pod in K8s, deployment would not create a new one. if podFromProvider.DeletionTimestamp != nil && podFromKubernetes.DeletionTimestamp == nil { diff --git a/node/podcontroller.go b/node/podcontroller.go index 490b4fbf6..3bf0046bf 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -146,6 +146,7 @@ type knownPod struct { sync.Mutex lastPodStatusReceivedFromProvider *corev1.Pod lastPodUsed *corev1.Pod + lastPodStatusUpdateSkipped bool } // PodControllerConfig is used to configure a new PodController. @@ -303,9 +304,30 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er // At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod. if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { log.G(ctx).Error(err) - } else if podShouldEnqueue(oldPod, newPod) { - pc.k8sQ.AddRateLimited(key) + } else { + obj, ok := pc.knownPods.Load(key) + if !ok { + // Pods are only ever *added* to knownPods in the above AddFunc, and removed + // in the below *DeleteFunc* + panic("Pod not found in known pods. This should never happen.") + } + kPod := obj.(*knownPod) + kPod.Lock() + if kPod.lastPodStatusUpdateSkipped && !cmp.Equal(newPod.Status, kPod.lastPodStatusReceivedFromProvider.Status) { + // The last pod from the provider -> kube api server was skipped, but we see they no longer match. + // This means that the pod in API server was changed by someone else [this can be okay], but we skipped + // a status update on our side because we compared the status received from the provider to the status + // received from the k8s api server based on outdated information. + pc.podStatusQ.AddRateLimited(key) + // Reset this to avoid re-adding it continuously + kPod.lastPodStatusUpdateSkipped = false + } + kPod.Unlock() + + if podShouldEnqueue(oldPod, newPod) { + pc.k8sQ.AddRateLimited(key) + } } }, DeleteFunc: func(pod interface{}) { @@ -472,6 +494,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error { return err } + // At this point we know the Pod resource has either been created or updated (which includes being marked for deletion). return pc.syncPodInProvider(ctx, pod, key) }