package node import ( "context" "sync" "time" "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" ) const ( podStatusReasonNotFound = "NotFound" podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider" containerStatusReasonNotFound = "NotFound" containerStatusMessageNotFound = "Container was not found and was likely deleted" containerStatusExitCodeNotFound = -137 statusTerminatedReason = "Terminated" containerStatusTerminatedMessage = "Container was terminated. The exit code may not reflect the real exit code" ) // syncProvderWrapper wraps a PodLifecycleHandler to give it async-like pod status notification behavior type syncProviderWrapper struct { PodLifecycleHandler notify func(*corev1.Pod) l corev1listers.PodLister // deletedPods makes sure we don't set the "NotFOund" status // for pods which have been requested to be deleted. // This is needed for our loop which just grabs pod statues every 5 seconds deletedPods sync.Map } // This is used to clean up keys for deleted pods after they have been fully deleted in the API server type syncWrapper interface { _deletePodKey(context.Context, string) } func (p *syncProviderWrapper) NotifyPods(ctx context.Context, f func(*corev1.Pod)) { p.notify = f } func (p *syncProviderWrapper) _deletePodKey(ctx context.Context, key string) { log.G(ctx).WithField("key", key).Debug("Cleaning up pod from deletion cache") p.deletedPods.Delete(key) } func (p *syncProviderWrapper) DeletePod(ctx context.Context, pod *corev1.Pod) error { log.G(ctx).Debug("syncProviderWrappped.DeletePod") key, err := cache.MetaNamespaceKeyFunc(pod) if err != nil { return err } p.deletedPods.Store(key, pod) if err := p.PodLifecycleHandler.DeletePod(ctx, pod.DeepCopy()); err != nil { log.G(ctx).WithField("key", key).WithError(err).Debug("Removed key from deleted pods cache") // We aren't going to actually delete the pod from the provider since there is an error so delete it from our cache, // otherwise we could end up leaking pods in our deletion cache. // Delete will be retried by the pod controller. p.deletedPods.Delete(key) return err } if shouldSkipPodStatusUpdate(pod) { log.G(ctx).Debug("skipping pod status update for terminated pod") return nil } updated := pod.DeepCopy() updated.Status.Phase = corev1.PodSucceeded now := metav1.NewTime(time.Now()) for i, cs := range updated.Status.ContainerStatuses { updated.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ Reason: statusTerminatedReason, Message: containerStatusTerminatedMessage, FinishedAt: now, } if cs.State.Running != nil { updated.Status.ContainerStatuses[i].State.Terminated.StartedAt = cs.State.Running.StartedAt } } updated.Status.Reason = statusTerminatedReason p.notify(updated) log.G(ctx).Debug("Notified pod terminal pod status") return nil } func (p *syncProviderWrapper) run(ctx context.Context) { interval := 5 * time.Second timer := time.NewTimer(interval) defer timer.Stop() if !timer.Stop() { <-timer.C } for { log.G(ctx).Debug("Pod status update loop start") timer.Reset(interval) select { case <-ctx.Done(): log.G(ctx).WithError(ctx.Err()).Debug("sync wrapper loop exiting") return case <-timer.C: } p.syncPodStatuses(ctx) } } func (p *syncProviderWrapper) syncPodStatuses(ctx context.Context) { ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.syncPodStatuses") defer span.End() // Update all the pods with the provider status. pods, err := p.l.List(labels.Everything()) if err != nil { err = errors.Wrap(err, "error getting pod list from kubernetes") span.SetStatus(err) log.G(ctx).WithError(err).Error("Error updating pod statuses") return } ctx = span.WithField(ctx, "nPods", int64(len(pods))) for _, pod := range pods { if shouldSkipPodStatusUpdate(pod) { log.G(ctx).Debug("Skipping pod status update") continue } if err := p.updatePodStatus(ctx, pod); err != nil { log.G(ctx).WithFields(map[string]interface{}{ "name": pod.Name, "namespace": pod.Namespace, }).WithError(err).Error("Could not fetch pod status") } } } func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubernetes *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.getPodStatus") defer span.End() ctx = addPodAttributes(ctx, span, podFromKubernetes) podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name) if err != nil { if !errdefs.IsNotFound(err) { span.SetStatus(err) return err } } if podStatus != nil { pod := podFromKubernetes.DeepCopy() podStatus.DeepCopyInto(&pod.Status) p.notify(pod) return nil } key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes) if err != nil { return err } if _, exists := p.deletedPods.Load(key); exists { log.G(ctx).Debug("pod is in known deleted state, ignoring") return nil } // Only change the status when the pod was already up // Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation. if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute { // Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created. podStatus = podFromKubernetes.Status.DeepCopy() podStatus.Phase = corev1.PodFailed podStatus.Reason = podStatusReasonNotFound podStatus.Message = podStatusMessageNotFound now := metav1.NewTime(time.Now()) for i, c := range podStatus.ContainerStatuses { if c.State.Running == nil { continue } podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ ExitCode: containerStatusExitCodeNotFound, Reason: containerStatusReasonNotFound, Message: containerStatusMessageNotFound, FinishedAt: now, StartedAt: c.State.Running.StartedAt, ContainerID: c.ContainerID, } podStatus.ContainerStatuses[i].State.Running = nil } log.G(ctx).Debug("Setting pod not found on pod status") } pod := podFromKubernetes.DeepCopy() podStatus.DeepCopyInto(&pod.Status) p.notify(pod) return nil }