Merge pull request #902 from sargun/fix-899

Fix issue #899: Pod status out of sync
This commit is contained in:
Brian Goff
2020-12-07 17:14:52 -08:00
committed by GitHub
2 changed files with 30 additions and 3 deletions

View File

@@ -214,11 +214,15 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
}
kPod := obj.(*knownPod)
kPod.Lock()
podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy()
kPod.Unlock()
if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) && podFromProvider.DeletionTimestamp == nil {
kPod.lastPodStatusUpdateSkipped = true
kPod.Unlock()
return nil
}
kPod.lastPodStatusUpdateSkipped = false
kPod.Unlock()
// Pod deleted by provider due some reasons. e.g. a K8s provider, pod created by deployment would be evicted when node is not ready.
// If we do not delete pod in K8s, deployment would not create a new one.
if podFromProvider.DeletionTimestamp != nil && podFromKubernetes.DeletionTimestamp == nil {

View File

@@ -146,6 +146,7 @@ type knownPod struct {
sync.Mutex
lastPodStatusReceivedFromProvider *corev1.Pod
lastPodUsed *corev1.Pod
lastPodStatusUpdateSkipped bool
}
// PodControllerConfig is used to configure a new PodController.
@@ -303,9 +304,30 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.G(ctx).Error(err)
} else if podShouldEnqueue(oldPod, newPod) {
pc.k8sQ.AddRateLimited(key)
} else {
obj, ok := pc.knownPods.Load(key)
if !ok {
// Pods are only ever *added* to knownPods in the above AddFunc, and removed
// in the below *DeleteFunc*
panic("Pod not found in known pods. This should never happen.")
}
kPod := obj.(*knownPod)
kPod.Lock()
if kPod.lastPodStatusUpdateSkipped && !cmp.Equal(newPod.Status, kPod.lastPodStatusReceivedFromProvider.Status) {
// The last pod from the provider -> kube api server was skipped, but we see they no longer match.
// This means that the pod in API server was changed by someone else [this can be okay], but we skipped
// a status update on our side because we compared the status received from the provider to the status
// received from the k8s api server based on outdated information.
pc.podStatusQ.AddRateLimited(key)
// Reset this to avoid re-adding it continuously
kPod.lastPodStatusUpdateSkipped = false
}
kPod.Unlock()
if podShouldEnqueue(oldPod, newPod) {
pc.k8sQ.AddRateLimited(key)
}
}
},
DeleteFunc: func(pod interface{}) {
@@ -472,6 +494,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
return err
}
// At this point we know the Pod resource has either been created or updated (which includes being marked for deletion).
return pc.syncPodInProvider(ctx, pod, key)
}