diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index e647d89ef..6af38c037 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -106,106 +106,121 @@ func TestPodLifecycle(t *testing.T) { return watchErr } - // isPodDeletedGracefullyFunc is a condition func that waits until the pod is in a terminal state, which is the VK's - // action when the pod is deleted from the provider + // isPodDeletedGracefullyFunc is a condition func that waits until the pod is in a terminal state, which is the VK's + // action when the pod is deleted from the provider. isPodDeletedGracefullyFunc := func(ctx context.Context, watcher watch.Interface) error { _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) { log.G(ctx).WithField("event", ev).Info("got event") pod := ev.Object.(*corev1.Pod) - return (pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded) && pod.Status.Reason == mockProviderPodDeletedReason, nil + return (pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded) && (pod.Status.Reason == mockProviderPodDeletedReason || pod.Status.Reason == statusTerminatedReason), nil }) return watchErr } - // createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting - // it gracefully - t.Run("createStartDeleteScenario", func(t *testing.T) { - t.Run("mockProvider", func(t *testing.T) { - assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s, isPodDeletedGracefullyFunc, true) - })) + envs := map[string]func(*testing.T) testingProvider{ + "Async": func(t *testing.T) testingProvider { + return newMockProvider() + }, + "Sync": func(t *testing.T) testingProvider { + if testing.Short() { + t.Skip() + } + return newSyncMockProvider() + }, + } + + for env, h := range envs { + t.Run(env, func(t *testing.T) { + + // createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting + // it gracefully. + t.Run("createStartDeleteScenario", func(t *testing.T) { + assert.NilError(t, wireUpSystem(ctx, h(t), func(ctx context.Context, s *system) { + testCreateStartDeleteScenario(ctx, t, s, isPodDeletedGracefullyFunc, true) + })) + }) + + // createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider + // for some reason. + t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) { + mp := h(t) + mp.setErrorOnDelete(errdefs.NotFound("not found")) + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc, false) + })) + }) + + // createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the + // provider. + t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) { + mp := h(t) + deletionFunc := func(ctx context.Context, watcher watch.Interface) error { + return mp.getAttemptedDeletes().until(ctx, func(v int) bool { return v >= 2 }) + } + mp.setErrorOnDelete(errors.New("random error")) + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false) + pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{}) + assert.NilError(t, err) + assert.Assert(t, is.Len(pods.Items, 1)) + assert.Assert(t, pods.Items[0].DeletionTimestamp != nil) + })) + }) + + // danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting, + // and ensures the pod controller deletes the pod prior to continuing. + t.Run("danglingPodScenario", func(t *testing.T) { + t.Run("mockProvider", func(t *testing.T) { + mp := h(t) + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testDanglingPodScenario(ctx, t, s, mp) + })) + }) + }) + + // testDanglingPodScenarioWithDeletionTimestamp tests if a pod is created in the provider and on api server it had + // deletiontimestamp set. It ensures deletion occurs. + t.Run("testDanglingPodScenarioWithDeletionTimestamp", func(t *testing.T) { + t.Run("mockProvider", func(t *testing.T) { + mp := h(t) + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testDanglingPodScenarioWithDeletionTimestamp(ctx, t, s, mp) + })) + }) + }) + + // failedPodScenario ensures that the VK ignores failed pods that were failed prior to the pod controller starting up. + t.Run("failedPodScenario", func(t *testing.T) { + t.Run("mockProvider", func(t *testing.T) { + mp := h(t) + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testFailedPodScenario(ctx, t, s) + })) + }) + }) + + // succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the pod controller starting up. + t.Run("succeededPodScenario", func(t *testing.T) { + t.Run("mockProvider", func(t *testing.T) { + mp := h(t) + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testSucceededPodScenario(ctx, t, s) + })) + }) + }) + + // updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated + // to the provider. + t.Run("updatePodWhileRunningScenario", func(t *testing.T) { + t.Run("mockProvider", func(t *testing.T) { + mp := h(t) + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testUpdatePodWhileRunningScenario(ctx, t, s, mp) + })) + }) + }) }) - }) - - // createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider - // for some reason - t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) { - mp := newMockProvider() - mp.errorOnDelete = errdefs.NotFound("not found") - assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc, false) - })) - }) - - // createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the - // provider - t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) { - mp := newMockProvider() - deletionFunc := func(ctx context.Context, watcher watch.Interface) error { - return mp.attemptedDeletes.until(ctx, func(v int) bool { return v >= 2 }) - } - mp.errorOnDelete = errors.New("random error") - assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false) - pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{}) - assert.NilError(t, err) - assert.Assert(t, is.Len(pods.Items, 1)) - assert.Assert(t, pods.Items[0].DeletionTimestamp != nil) - })) - }) - - // danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting, - // and ensures the pod controller deletes the pod prior to continuing. - t.Run("danglingPodScenario", func(t *testing.T) { - t.Run("mockProvider", func(t *testing.T) { - mp := newMockProvider() - assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testDanglingPodScenario(ctx, t, s, mp) - })) - }) - }) - - // testDanglingPodScenarioWithDeletionTimestamp tests if a pod is created in the provider and on api server it had - // deletiontimestamp set. It ensures deletion occurs. - t.Run("testDanglingPodScenarioWithDeletionTimestamp", func(t *testing.T) { - t.Run("mockProvider", func(t *testing.T) { - mp := newMockProvider() - assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testDanglingPodScenarioWithDeletionTimestamp(ctx, t, s, mp) - })) - }) - }) - - // failedPodScenario ensures that the VK ignores failed pods that were failed prior to the PC starting up - t.Run("failedPodScenario", func(t *testing.T) { - t.Run("mockProvider", func(t *testing.T) { - mp := newMockProvider() - assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testFailedPodScenario(ctx, t, s) - })) - }) - }) - - // succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the PC starting up. - t.Run("succeededPodScenario", func(t *testing.T) { - t.Run("mockProvider", func(t *testing.T) { - mp := newMockProvider() - assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testSucceededPodScenario(ctx, t, s) - })) - }) - }) - - // updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated - // to the provider - t.Run("updatePodWhileRunningScenario", func(t *testing.T) { - t.Run("mockProvider", func(t *testing.T) { - mp := newMockProvider() - assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testUpdatePodWhileRunningScenario(ctx, t, s, mp) - })) - }) - }) + } } type testFunction func(ctx context.Context, s *system) @@ -331,7 +346,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system, assert.DeepEqual(t, p1, p2) } -func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) { +func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m testingProvider) { t.Parallel() pod := newPod() @@ -340,11 +355,11 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo // Start the pod controller assert.NilError(t, s.start(ctx)) - assert.Assert(t, is.Equal(m.deletes.read(), 1)) + assert.Assert(t, is.Equal(m.getDeletes().read(), 1)) } -func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testing.T, s *system, m *mockProvider) { +func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testing.T, s *system, m testingProvider) { t.Parallel() pod := newPod() @@ -506,7 +521,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, } } -func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) { +func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m testingProvider) { t.Parallel() ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -563,7 +578,7 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys log.G(ctx).WithField("pod", p).Info("Updating pod") _, err = s.client.CoreV1().Pods(p.Namespace).Update(p) assert.NilError(t, err) - assert.NilError(t, m.updates.until(ctx, func(v int) bool { return v > 0 })) + assert.NilError(t, m.getUpdates().until(ctx, func(v int) bool { return v > 0 })) } func BenchmarkCreatePods(b *testing.B) { diff --git a/node/mock_test.go b/node/mock_test.go index bf60f65da..297ad487a 100644 --- a/node/mock_test.go +++ b/node/mock_test.go @@ -79,7 +79,15 @@ type mockProvider struct { } // newMockProvider creates a new mockProvider. -func newMockProvider() *mockProvider { +func newMockProvider() *mockProviderAsync { + provider := newSyncMockProvider() + // By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface, + // it will be set, and then we'll call a real underlying implementation. + // This makes it easier in the sense we don't need to wrap each method. + return &mockProviderAsync{provider} +} + +func newSyncMockProvider() *mockProvider { provider := mockProvider{ startTime: time.Now(), creates: newWaitableInt(), @@ -87,10 +95,6 @@ func newMockProvider() *mockProvider { deletes: newWaitableInt(), attemptedDeletes: newWaitableInt(), } - // By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface, - // it will be set, and then we'll call a real underlying implementation. - // This makes it easier in the sense we don't need to wrap each method. - return &provider } @@ -258,10 +262,24 @@ func (p *mockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { return pods, nil } -// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done -// within the provider. -func (p *mockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) { - p.realNotifier = notifier +func (p *mockProvider) setErrorOnDelete(err error) { + p.errorOnDelete = err +} + +func (p *mockProvider) getAttemptedDeletes() *waitableInt { + return p.attemptedDeletes +} + +func (p *mockProvider) getCreates() *waitableInt { + return p.creates +} + +func (p *mockProvider) getDeletes() *waitableInt { + return p.deletes +} + +func (p *mockProvider) getUpdates() *waitableInt { + return p.updates } func buildKeyFromNames(namespace string, name string) (string, error) { @@ -280,3 +298,22 @@ func buildKey(pod *v1.Pod) (string, error) { return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name) } + +type mockProviderAsync struct { + *mockProvider +} + +// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done +// within the provider. +func (p *mockProviderAsync) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) { + p.realNotifier = notifier +} + +type testingProvider interface { + PodLifecycleHandler + setErrorOnDelete(error) + getAttemptedDeletes() *waitableInt + getDeletes() *waitableInt + getCreates() *waitableInt + getUpdates() *waitableInt +} diff --git a/node/pod.go b/node/pod.go index ee3479abe..ef525003c 100644 --- a/node/pod.go +++ b/node/pod.go @@ -238,7 +238,7 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE return pc.updatePodStatus(ctx, pod, key) } -func (pc *PodController) deletePodHandler(ctx context.Context, key string) error { +func (pc *PodController) deletePodHandler(ctx context.Context, key string) (retErr error) { ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem") defer span.End() @@ -255,6 +255,14 @@ func (pc *PodController) deletePodHandler(ctx context.Context, key string) error return nil } + defer func() { + if retErr == nil { + if w, ok := pc.provider.(syncWrapper); ok { + w._deletePodKey(ctx, key) + } + } + }() + // If the pod has been deleted from API server, we don't need to do anything. k8sPod, err := pc.podsLister.Pods(namespace).Get(name) if errors.IsNotFound(err) { diff --git a/node/pod_test.go b/node/pod_test.go index f281e95fe..48041a783 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -30,7 +30,7 @@ import ( type TestController struct { *PodController - mock *mockProvider + mock *mockProviderAsync client *fake.Clientset } diff --git a/node/podcontroller.go b/node/podcontroller.go index ec616b90a..094b29dab 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -72,7 +72,10 @@ type PodLifecycleHandler interface { // concurrently outside of the calling goroutine. Therefore it is recommended // to return a version after DeepCopy. GetPods(context.Context) ([]*corev1.Pod, error) +} +// PodNotifier is used as an extension to PodLifecycleHandler to support async updates of pod statuses. +type PodNotifier interface { // NotifyPods instructs the notifier to call the passed in function when // the pod status changes. It should be called when a pod's status changes. // @@ -157,6 +160,7 @@ type PodControllerConfig struct { ServiceInformer corev1informers.ServiceInformer } +// NewPodController creates a new pod controller with the provided config. func NewPodController(cfg PodControllerConfig) (*PodController, error) { if cfg.PodClient == nil { return nil, errdefs.InvalidInput("missing core client") @@ -201,6 +205,11 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { return pc, nil } +type asyncProvider interface { + PodLifecycleHandler + PodNotifier +} + // Run will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. It will block until the // context is cancelled, at which point it will shutdown the work queue and @@ -220,10 +229,24 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er pc.mu.Unlock() }() + var provider asyncProvider + runProvider := func(context.Context) {} + + if p, ok := pc.provider.(asyncProvider); ok { + provider = p + } else { + wrapped := &syncProviderWrapper{PodLifecycleHandler: pc.provider, l: pc.podsLister} + runProvider = wrapped.run + provider = wrapped + log.G(ctx).Debug("Wrapped non-async provider with async") + } + pc.provider = provider + podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") - pc.provider.NotifyPods(ctx, func(pod *corev1.Pod) { + provider.NotifyPods(ctx, func(pod *corev1.Pod) { pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy()) }) + go runProvider(ctx) defer podStatusQueue.ShutDown() diff --git a/node/sync.go b/node/sync.go new file mode 100644 index 000000000..eca751a25 --- /dev/null +++ b/node/sync.go @@ -0,0 +1,209 @@ +package node + +import ( + "context" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" +) + +const ( + podStatusReasonNotFound = "NotFound" + podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider" + containerStatusReasonNotFound = "NotFound" + containerStatusMessageNotFound = "Container was not found and was likely deleted" + containerStatusExitCodeNotFound = -137 + statusTerminatedReason = "Terminated" + containerStatusTerminatedMessage = "Container was terminated. The exit code may not reflect the real exit code" +) + +// syncProviderWrapper wraps a PodLifecycleHandler to give it async-like pod status notification behavior. +type syncProviderWrapper struct { + PodLifecycleHandler + notify func(*corev1.Pod) + l corev1listers.PodLister + + // deletedPods makes sure we don't set the "NotFound" status + // for pods which have been requested to be deleted. + // This is needed for our loop which just grabs pod statuses every 5 seconds. + deletedPods sync.Map +} + +// This is used to clean up keys for deleted pods after they have been fully deleted in the API server. +type syncWrapper interface { + _deletePodKey(context.Context, string) +} + +func (p *syncProviderWrapper) NotifyPods(ctx context.Context, f func(*corev1.Pod)) { + p.notify = f +} + +func (p *syncProviderWrapper) _deletePodKey(ctx context.Context, key string) { + log.G(ctx).WithField("key", key).Debug("Cleaning up pod from deletion cache") + p.deletedPods.Delete(key) +} + +func (p *syncProviderWrapper) DeletePod(ctx context.Context, pod *corev1.Pod) error { + log.G(ctx).Debug("syncProviderWrappper.DeletePod") + key, err := cache.MetaNamespaceKeyFunc(pod) + if err != nil { + return err + } + + p.deletedPods.Store(key, pod) + if err := p.PodLifecycleHandler.DeletePod(ctx, pod.DeepCopy()); err != nil { + log.G(ctx).WithField("key", key).WithError(err).Debug("Removed key from deleted pods cache") + // We aren't going to actually delete the pod from the provider since there is an error so delete it from our cache, + // otherwise we could end up leaking pods in our deletion cache. + // Delete will be retried by the pod controller. + p.deletedPods.Delete(key) + return err + } + + if shouldSkipPodStatusUpdate(pod) { + log.G(ctx).Debug("skipping pod status update for terminated pod") + return nil + } + + updated := pod.DeepCopy() + updated.Status.Phase = corev1.PodSucceeded + now := metav1.NewTime(time.Now()) + for i, cs := range updated.Status.ContainerStatuses { + updated.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ + Reason: statusTerminatedReason, + Message: containerStatusTerminatedMessage, + FinishedAt: now, + } + if cs.State.Running != nil { + updated.Status.ContainerStatuses[i].State.Terminated.StartedAt = cs.State.Running.StartedAt + } + } + updated.Status.Reason = statusTerminatedReason + + p.notify(updated) + log.G(ctx).Debug("Notified pod terminal pod status") + + return nil +} + +func (p *syncProviderWrapper) run(ctx context.Context) { + interval := 5 * time.Second + timer := time.NewTimer(interval) + + defer timer.Stop() + + if !timer.Stop() { + <-timer.C + } + + for { + log.G(ctx).Debug("Pod status update loop start") + timer.Reset(interval) + select { + case <-ctx.Done(): + log.G(ctx).WithError(ctx.Err()).Debug("sync wrapper loop exiting") + return + case <-timer.C: + } + p.syncPodStatuses(ctx) + } +} + +func (p *syncProviderWrapper) syncPodStatuses(ctx context.Context) { + ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.syncPodStatuses") + defer span.End() + + // Update all the pods with the provider status. + pods, err := p.l.List(labels.Everything()) + if err != nil { + err = errors.Wrap(err, "error getting pod list from kubernetes") + span.SetStatus(err) + log.G(ctx).WithError(err).Error("Error updating pod statuses") + return + } + ctx = span.WithField(ctx, "nPods", int64(len(pods))) + + for _, pod := range pods { + if shouldSkipPodStatusUpdate(pod) { + log.G(ctx).Debug("Skipping pod status update") + continue + } + + if err := p.updatePodStatus(ctx, pod); err != nil { + log.G(ctx).WithFields(map[string]interface{}{ + "name": pod.Name, + "namespace": pod.Namespace, + }).WithError(err).Error("Could not fetch pod status") + } + } +} + +func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubernetes *corev1.Pod) error { + ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.updatePodStatus") + defer span.End() + ctx = addPodAttributes(ctx, span, podFromKubernetes) + + podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name) + if err != nil { + if !errdefs.IsNotFound(err) { + span.SetStatus(err) + return err + } + } + if podStatus != nil { + pod := podFromKubernetes.DeepCopy() + podStatus.DeepCopyInto(&pod.Status) + p.notify(pod) + return nil + } + + key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes) + if err != nil { + return err + } + + if _, exists := p.deletedPods.Load(key); exists { + log.G(ctx).Debug("pod is in known deleted state, ignoring") + return nil + } + + // Only change the status when the pod was already up. + // Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation. + if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute { + // Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created. + podStatus = podFromKubernetes.Status.DeepCopy() + podStatus.Phase = corev1.PodFailed + podStatus.Reason = podStatusReasonNotFound + podStatus.Message = podStatusMessageNotFound + now := metav1.NewTime(time.Now()) + for i, c := range podStatus.ContainerStatuses { + if c.State.Running == nil { + continue + } + podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ + ExitCode: containerStatusExitCodeNotFound, + Reason: containerStatusReasonNotFound, + Message: containerStatusMessageNotFound, + FinishedAt: now, + StartedAt: c.State.Running.StartedAt, + ContainerID: c.ContainerID, + } + podStatus.ContainerStatuses[i].State.Running = nil + } + log.G(ctx).Debug("Setting pod not found on pod status") + } + + pod := podFromKubernetes.DeepCopy() + podStatus.DeepCopyInto(&pod.Status) + p.notify(pod) + return nil +}