diff --git a/node/pod.go b/node/pod.go index 2a72ee6a9..7e36a2c42 100644 --- a/node/pod.go +++ b/node/pod.go @@ -209,15 +209,15 @@ func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, return nil } -// updatePodStatuses syncs the providers pod status with the kubernetes pod status. -func (pc *PodController) updatePodStatuses(ctx context.Context, q workqueue.RateLimitingInterface) { - ctx, span := trace.StartSpan(ctx, "updatePodStatuses") +// fetchPodStatusesFromProvider syncs the providers pod status with the kubernetes pod status. +func (pc *PodController) fetchPodStatusesFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) { + ctx, span := trace.StartSpan(ctx, "fetchPodStatusesFromProvider") defer span.End() // Update all the pods with the provider status. pods, err := pc.podsLister.List(labels.Everything()) if err != nil { - err = pkgerrors.Wrap(err, "error getting pod list") + err = pkgerrors.Wrap(err, "error getting pod list from kubernetes") span.SetStatus(err) log.G(ctx).WithError(err).Error("Error updating pod statuses") return @@ -226,78 +226,114 @@ func (pc *PodController) updatePodStatuses(ctx context.Context, q workqueue.Rate for _, pod := range pods { if !shouldSkipPodStatusUpdate(pod) { - enqueuePodStatusUpdate(ctx, q, pod) + enrichedPod, err := pc.fetchPodStatusFromProvider(ctx, q, pod) + if err != nil { + log.G(ctx).WithFields(map[string]interface{}{ + "name": pod.Name, + "namespace": pod.Namespace, + }).WithError(err).Error("Could not fetch pod status") + } else if enrichedPod != nil { + pc.enqueuePodStatusUpdate(ctx, q, enrichedPod) + } } } } +// fetchPodStatusFromProvider returns a pod (the pod we pass in) enriched with the pod status from the provider. If the pod is not found, +// and it has been 1 minute since the pod was created, or the pod was previously running, it will be marked as failed. +// If a valid pod status cannot be generated, for example, if a pod is not found in the provider, and it has been less than 1 minute +// since pod creation, we will return nil for the pod. +func (pc *PodController) fetchPodStatusFromProvider(ctx context.Context, q workqueue.RateLimitingInterface, podFromKubernetes *corev1.Pod) (*corev1.Pod, error) { + podStatus, err := pc.provider.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name) + if errdefs.IsNotFound(err) || (err == nil && podStatus == nil) { + // Only change the status when the pod was already up + // Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation. + if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute { + // Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created. + podStatus = podFromKubernetes.Status.DeepCopy() + podStatus.Phase = corev1.PodFailed + podStatus.Reason = "NotFound" + podStatus.Message = "The pod status was not found and may have been deleted from the provider" + now := metav1.NewTime(time.Now()) + for i, c := range podStatus.ContainerStatuses { + if c.State.Running == nil { + continue + } + podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ + ExitCode: -137, + Reason: "NotFound", + Message: "Container was not found and was likely deleted", + FinishedAt: now, + StartedAt: c.State.Running.StartedAt, + ContainerID: c.ContainerID, + } + podStatus.ContainerStatuses[i].State.Running = nil + } + } else if err != nil { + return nil, nil + } + } else if err != nil { + return nil, err + } + + pod := podFromKubernetes.DeepCopy() + podStatus.DeepCopyInto(&pod.Status) + return pod, nil +} + func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool { return pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed || pod.Status.Reason == podStatusReasonProviderFailed } -func (pc *PodController) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { - if shouldSkipPodStatusUpdate(pod) { +func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes *corev1.Pod, key string) error { + if shouldSkipPodStatusUpdate(podFromKubernetes) { return nil } ctx, span := trace.StartSpan(ctx, "updatePodStatus") defer span.End() - ctx = addPodAttributes(ctx, span, pod) + ctx = addPodAttributes(ctx, span, podFromKubernetes) - status, err := pc.provider.GetPodStatus(ctx, pod.Namespace, pod.Name) - if err != nil && !errdefs.IsNotFound(err) { - span.SetStatus(err) - return pkgerrors.Wrap(err, "error retrieving pod status") + obj, ok := pc.knownPods.Load(key) + if !ok { + // This means there was a race and the pod has been deleted from K8s + return nil } + kPod := obj.(*knownPod) + kPod.Lock() + podFromProvider := kPod.lastPodStatusReceivedFromProvider + kPod.Unlock() - // Do not modify the pod that we got from the cache - pod = pod.DeepCopy() - - // Update the pod's status - if status != nil { - pod.Status = *status - } else { - // Only change the status when the pod was already up - // Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation. - if pod.Status.Phase == corev1.PodRunning || pod.ObjectMeta.CreationTimestamp.Add(time.Minute).Before(time.Now()) { - // Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created. - pod.Status.Phase = corev1.PodFailed - pod.Status.Reason = "NotFound" - pod.Status.Message = "The pod status was not found and may have been deleted from the provider" - for i, c := range pod.Status.ContainerStatuses { - pod.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ - ExitCode: -137, - Reason: "NotFound", - Message: "Container was not found and was likely deleted", - FinishedAt: metav1.NewTime(time.Now()), - StartedAt: c.State.Running.StartedAt, - ContainerID: c.ContainerID, - } - pod.Status.ContainerStatuses[i].State.Running = nil - } - } - } - - if _, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod); err != nil { + if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil { span.SetStatus(err) return pkgerrors.Wrap(err, "error while updating pod status in kubernetes") } log.G(ctx).WithFields(log.Fields{ - "new phase": string(pod.Status.Phase), - "new reason": pod.Status.Reason, + "new phase": string(podFromProvider.Status.Phase), + "new reason": podFromProvider.Status.Reason, + "old phase": string(podFromKubernetes.Status.Phase), + "old reason": podFromKubernetes.Status.Reason, }).Debug("Updated pod status in kubernetes") return nil } -func enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { +// enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd +// prior to enqueuePodStatusUpdate. +func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key") } else { - q.AddRateLimited(key) + if obj, ok := pc.knownPods.Load(key); ok { + kpod := obj.(*knownPod) + kpod.Lock() + kpod.lastPodStatusReceivedFromProvider = pod + kpod.Unlock() + q.AddRateLimited(key) + } } } @@ -328,5 +364,5 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE return pkgerrors.Wrap(err, "error looking up pod") } - return pc.updatePodStatus(ctx, pod) + return pc.updatePodStatus(ctx, pod, key) } diff --git a/node/podcontroller.go b/node/podcontroller.go index 97675b6e8..0aa089c00 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -109,6 +109,17 @@ type PodController struct { resourceManager *manager.ResourceManager k8sQ workqueue.RateLimitingInterface + + // From the time of creation, to termination the knownPods map will contain the pods key + // (derived from Kubernetes' cache library) -> a *knownPod struct. + knownPods sync.Map +} + +type knownPod struct { + // You cannot read (or modify) the fields in this struct without taking the lock. The individual fields + // should be immutable to avoid having to hold the lock the entire time you're working with them + sync.Mutex + lastPodStatusReceivedFromProvider *corev1.Pod } // PodControllerConfig is used to configure a new PodController. @@ -199,6 +210,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { log.G(ctx).Error(err) } else { + pc.knownPods.Store(key, &knownPod{}) pc.k8sQ.AddRateLimited(key) } }, @@ -225,6 +237,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { log.G(ctx).Error(err) } else { + pc.knownPods.Delete(key) pc.k8sQ.AddRateLimited(key) } }, diff --git a/node/queue.go b/node/queue.go index b9a8eebec..d9355d098 100644 --- a/node/queue.go +++ b/node/queue.go @@ -134,7 +134,7 @@ func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateL t.Stop() ctx, span := trace.StartSpan(ctx, "syncActualState") - pc.updatePodStatuses(ctx, q) + pc.fetchPodStatusesFromProvider(ctx, q) span.End() // restart the timer @@ -146,7 +146,7 @@ func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateL func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) { if pn, ok := pc.provider.(PodNotifier); ok { pn.NotifyPods(ctx, func(pod *corev1.Pod) { - enqueuePodStatusUpdate(ctx, q, pod) + pc.enqueuePodStatusUpdate(ctx, q, pod.DeepCopy()) }) } else { go pc.providerSyncLoop(ctx, q)