diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 9b0d0971d..41480e029 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -94,8 +94,7 @@ func TestPodLifecycle(t *testing.T) { ctx = log.WithLogger(ctx, log.L) - // isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_, which is the VK's - // action when the pod is deleted from the provider + // isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_ isPodDeletedPermanentlyFunc := func(ctx context.Context, watcher watch.Interface) error { _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) { log.G(ctx).WithField("event", ev).Info("got event") @@ -107,13 +106,23 @@ func TestPodLifecycle(t *testing.T) { return watchErr } + // isPodDeletedGracefullyFunc is a condition func that waits until the pod is in a terminal state, which is the VK's + // action when the pod is deleted from the provider + isPodDeletedGracefullyFunc := func(ctx context.Context, watcher watch.Interface) error { + _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) { + log.G(ctx).WithField("event", ev).Info("got event") + pod := ev.Object.(*corev1.Pod) + return (pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded) && pod.Status.Reason == mockProviderPodDeletedReason, nil + }) + return watchErr + } + // createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting // it gracefully t.Run("createStartDeleteScenario", func(t *testing.T) { - t.Run("mockProvider", func(t *testing.T) { assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) + testCreateStartDeleteScenario(ctx, t, s, isPodDeletedGracefullyFunc, true) })) }) }) @@ -124,7 +133,7 @@ func TestPodLifecycle(t *testing.T) { mp := newMockProvider() mp.errorOnDelete = errdefs.NotFound("not found") assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) + testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc, false) })) }) @@ -137,7 +146,7 @@ func TestPodLifecycle(t *testing.T) { } mp.errorOnDelete = errors.New("random error") assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s, deletionFunc) + testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false) pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{}) assert.NilError(t, err) assert.Assert(t, is.Len(pods.Items, 1)) @@ -328,7 +337,7 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo } -func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error) { +func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error, waitForPermanentDeletion bool) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -396,6 +405,18 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, assert.NilError(t, err) } + // Setup a watch to look for the pod eventually going away completely + watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions) + assert.NilError(t, err) + defer watcher2.Stop() + waitDeleteCh := make(chan error) + go func() { + _, watchDeleteErr := watchutils.UntilWithoutRetry(ctx, watcher2, func(ev watch.Event) (bool, error) { + return ev.Type == watch.Deleted, nil + }) + waitDeleteCh <- watchDeleteErr + }() + // Setup a watch prior to pod deletion watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions) assert.NilError(t, err) @@ -410,7 +431,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{}) assert.NilError(t, err) // 2. Set the pod's deletion timestamp, version, and so on - var deletionGracePeriod int64 = 30 + var deletionGracePeriod int64 = 10 currentPod.DeletionGracePeriodSeconds = &deletionGracePeriod deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod))) currentPod.DeletionTimestamp = &deletionTimestamp @@ -424,6 +445,15 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, case err = <-watchErrCh: assert.NilError(t, err) } + + if waitForPermanentDeletion { + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case err = <-waitDeleteCh: + assert.NilError(t, err) + } + } } func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) { @@ -526,6 +556,7 @@ func randomizeName(pod *corev1.Pod) { } func newPod(podmodifiers ...podModifier) *corev1.Pod { + var terminationGracePeriodSeconds int64 = 30 pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", @@ -538,7 +569,14 @@ func newPod(podmodifiers ...podModifier) *corev1.Pod { ResourceVersion: "100", }, Spec: corev1.PodSpec{ - NodeName: testNodeName, + NodeName: testNodeName, + TerminationGracePeriodSeconds: &terminationGracePeriodSeconds, + Containers: []corev1.Container{ + { + Name: "my-container", + Image: "nginx", + }, + }, }, Status: corev1.PodStatus{ Phase: corev1.PodPending, diff --git a/node/mock_test.go b/node/mock_test.go index 8554a20cf..bf60f65da 100644 --- a/node/mock_test.go +++ b/node/mock_test.go @@ -12,6 +12,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + mockProviderPodDeletedReason = "MockProviderPodDeleted" +) + var ( _ PodLifecycleHandler = (*mockProvider)(nil) ) @@ -171,15 +175,19 @@ func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { log.G(ctx).Infof("receive DeletePod %q", pod.Name) p.attemptedDeletes.increment() + key, err := buildKey(pod) + if err != nil { + return err + } + + if errdefs.IsNotFound(p.errorOnDelete) { + p.pods.Delete(key) + } if p.errorOnDelete != nil { return p.errorOnDelete } p.deletes.increment() - key, err := buildKey(pod) - if err != nil { - return err - } if _, exists := p.pods.Load(key); !exists { return errdefs.NotFound("pod not found") @@ -188,7 +196,7 @@ func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { now := metav1.Now() pod.Status.Phase = v1.PodSucceeded - pod.Status.Reason = "MockProviderPodDeleted" + pod.Status.Reason = mockProviderPodDeletedReason for idx := range pod.Status.ContainerStatuses { pod.Status.ContainerStatuses[idx].Ready = false diff --git a/node/pod.go b/node/pod.go index bd2a901e9..ee3479abe 100644 --- a/node/pod.go +++ b/node/pod.go @@ -19,7 +19,6 @@ import ( "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" @@ -133,61 +132,19 @@ func (pc *PodController) handleProviderError(ctx context.Context, span trace.Spa span.SetStatus(origErr) } -func (pc *PodController) deletePod(ctx context.Context, namespace, name string) error { +func (pc *PodController) deletePod(ctx context.Context, pod *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "deletePod") defer span.End() - - pod, err := pc.provider.GetPod(ctx, namespace, name) - if err != nil { - if errdefs.IsNotFound(err) { - // The provider is not aware of the pod, but we must still delete the Kubernetes API resource. - return pc.forceDeletePodResource(ctx, namespace, name) - } - return err - } - - // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. - if pod == nil { - // The provider is not aware of the pod, but we must still delete the Kubernetes API resource. - return pc.forceDeletePodResource(ctx, namespace, name) - } - ctx = addPodAttributes(ctx, span, pod) - var delErr error - if delErr = pc.provider.DeletePod(ctx, pod.DeepCopy()); delErr != nil && !errdefs.IsNotFound(delErr) { - span.SetStatus(delErr) - return delErr + err := pc.provider.DeletePod(ctx, pod.DeepCopy()) + if err != nil { + span.SetStatus(err) + return err } log.G(ctx).Debug("Deleted pod from provider") - if err := pc.forceDeletePodResource(ctx, namespace, name); err != nil { - span.SetStatus(err) - return err - } - log.G(ctx).Info("Deleted pod from Kubernetes") - - return nil -} - -func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, name string) error { - ctx, span := trace.StartSpan(ctx, "forceDeletePodResource") - defer span.End() - ctx = span.WithFields(ctx, log.Fields{ - "namespace": namespace, - "name": name, - }) - - var grace int64 - if err := pc.client.Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil { - if errors.IsNotFound(err) { - log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete") - return nil - } - span.SetStatus(err) - return pkgerrors.Wrap(err, "Failed to delete Kubernetes pod") - } return nil } @@ -213,8 +170,12 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes } kPod := obj.(*knownPod) kPod.Lock() - podFromProvider := kPod.lastPodStatusReceivedFromProvider + podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy() kPod.Unlock() + // We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating + // the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore + // we need to copy the pod and set ResourceVersion to 0. + podFromProvider.ResourceVersion = "0" if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil { span.SetStatus(err) @@ -276,3 +237,47 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE return pc.updatePodStatus(ctx, pod, key) } + +func (pc *PodController) deletePodHandler(ctx context.Context, key string) error { + ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem") + defer span.End() + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + ctx = span.WithFields(ctx, log.Fields{ + "namespace": namespace, + "name": name, + }) + + if err != nil { + // Log the error as a warning, but do not requeue the key as it is invalid. + log.G(ctx).Warn(pkgerrors.Wrapf(err, "invalid resource key: %q", key)) + span.SetStatus(err) + return nil + } + + // If the pod has been deleted from API server, we don't need to do anything. + k8sPod, err := pc.podsLister.Pods(namespace).Get(name) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + span.SetStatus(err) + return err + } + + if running(&k8sPod.Status) { + log.G(ctx).Error("Force deleting pod in running state") + } + + // We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update + // was in progress, + err = pc.client.Pods(namespace).Delete(name, metav1.NewDeleteOptions(0)) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + span.SetStatus(err) + return err + } + return nil +} diff --git a/node/pod_test.go b/node/pod_test.go index abf0ec515..f281e95fe 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -19,14 +19,10 @@ import ( "testing" "time" - pkgerrors "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/errdefs" testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util" "gotest.tools/assert" is "gotest.tools/assert/cmp" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/util/workqueue" @@ -51,6 +47,7 @@ func newTestController() *TestController { resourceManager: rm, recorder: testutil.FakeEventRecorder(5), k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), done: make(chan struct{}), ready: make(chan struct{}), podsInformer: iFactory.Core().V1().Pods(), @@ -172,59 +169,6 @@ func TestPodNoSpecChange(t *testing.T) { assert.Check(t, is.Equal(svr.mock.updates.read(), 0)) } -func TestPodDelete(t *testing.T) { - type testCase struct { - desc string - delErr error - } - - cases := []testCase{ - {desc: "no error on delete", delErr: nil}, - {desc: "not found error on delete", delErr: errdefs.NotFound("not found")}, - {desc: "unknown error on delete", delErr: pkgerrors.New("random error")}, - } - - for _, tc := range cases { - t.Run(tc.desc, func(t *testing.T) { - c := newTestController() - c.mock.errorOnDelete = tc.delErr - - pod := &corev1.Pod{} - pod.ObjectMeta.Namespace = "default" - pod.ObjectMeta.Name = "nginx" - pod.Spec = newPodSpec() - - pc := c.client.CoreV1().Pods("default") - - p, err := pc.Create(pod) - assert.NilError(t, err) - - ctx := context.Background() - err = c.createOrUpdatePod(ctx, p.DeepCopy()) // make sure it's actually created - assert.NilError(t, err) - assert.Check(t, is.Equal(c.mock.creates.read(), 1)) - - err = c.deletePod(ctx, pod.Namespace, pod.Name) - assert.Equal(t, pkgerrors.Cause(err), err) - - var expectDeletes int - if tc.delErr == nil { - expectDeletes = 1 - } - assert.Check(t, is.Equal(c.mock.deletes.read(), expectDeletes)) - - expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr) - - _, err = pc.Get(pod.Name, metav1.GetOptions{}) - if expectDeleted { - assert.Assert(t, errors.IsNotFound(err)) - } else { - assert.NilError(t, err) - } - }) - } -} - func newPodSpec() corev1.PodSpec { return corev1.PodSpec{ Containers: []corev1.Container{ diff --git a/node/podcontroller.go b/node/podcontroller.go index 05dc7b88b..ec616b90a 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -17,10 +17,11 @@ package node import ( "context" "fmt" - "reflect" "strconv" "sync" + "time" + "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/internal/manager" @@ -51,7 +52,7 @@ type PodLifecycleHandler interface { // DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is // expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal - // state, as well as the pod. + // state, as well as the pod. DeletePod may be called multiple times for the same pod. DeletePod(ctx context.Context, pod *corev1.Pod) error // GetPod retrieves a pod by name from the provider (can be cached). @@ -101,6 +102,9 @@ type PodController struct { k8sQ workqueue.RateLimitingInterface + // deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period + deletionQ workqueue.RateLimitingInterface + // From the time of creation, to termination the knownPods map will contain the pods key // (derived from Kubernetes' cache library) -> a *knownPod struct. knownPods sync.Map @@ -127,6 +131,7 @@ type knownPod struct { // should be immutable to avoid having to hold the lock the entire time you're working with them sync.Mutex lastPodStatusReceivedFromProvider *corev1.Pod + lastPodUsed *corev1.Pod } // PodControllerConfig is used to configure a new PodController. @@ -190,6 +195,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { done: make(chan struct{}), recorder: cfg.EventRecorder, k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"), + deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"), } return pc, nil @@ -207,7 +213,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er defer func() { pc.k8sQ.ShutDown() - + pc.deletionQ.ShutDown() pc.mu.Lock() pc.err = retErr close(pc.done) @@ -241,16 +247,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er }, UpdateFunc: func(oldObj, newObj interface{}) { // Create a copy of the old and new pod objects so we don't mutate the cache. - oldPod := oldObj.(*corev1.Pod).DeepCopy() - newPod := newObj.(*corev1.Pod).DeepCopy() - // We want to check if the two objects differ in anything other than their resource versions. - // Hence, we make them equal so that this change isn't picked up by reflect.DeepEqual. - newPod.ResourceVersion = oldPod.ResourceVersion - // Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed. - // This guarantees that we don't attempt to sync the pod every time its .status field is updated. - if reflect.DeepEqual(oldPod.ObjectMeta, newPod.ObjectMeta) && reflect.DeepEqual(oldPod.Spec, newPod.Spec) { - return - } + newPod := newObj.(*corev1.Pod) + // At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod. if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { log.G(ctx).Error(err) @@ -264,6 +262,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er } else { pc.knownPods.Delete(key) pc.k8sQ.AddRateLimited(key) + // If this pod was in the deletion queue, forget about it + pc.deletionQ.Forget(key) } }, }) @@ -295,6 +295,15 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er }() } + for id := 0; id < podSyncWorkers; id++ { + wg.Add(1) + workerID := strconv.Itoa(id) + go func() { + defer wg.Done() + pc.runDeletionReconcilationWorker(ctx, workerID, pc.deletionQ) + }() + } + close(pc.ready) log.G(ctx).Info("started workers") @@ -302,6 +311,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er log.G(ctx).Info("shutting down workers") pc.k8sQ.ShutDown() podStatusQueue.ShutDown() + pc.deletionQ.ShutDown() wg.Wait() return nil @@ -353,6 +363,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error { // Add the current key as an attribute to the current span. ctx = span.WithField(ctx, "key", key) + log.G(ctx).WithField("key", key).Debug("sync handled") // Convert the namespace/name string into a distinct namespace and name. namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -372,35 +383,81 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error { span.SetStatus(err) return err } - // At this point we know the Pod resource doesn't exist, which most probably means it was deleted. - // Hence, we must delete it from the provider if it still exists there. - if err := pc.deletePod(ctx, namespace, name); err != nil { - err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name)) + + pod, err = pc.provider.GetPod(ctx, namespace, name) + if err != nil && !errdefs.IsNotFound(err) { + err = pkgerrors.Wrapf(err, "failed to fetch pod with key %q from provider", key) span.SetStatus(err) return err } - return nil + if errdefs.IsNotFound(err) || pod == nil { + return nil + } + + err = pc.provider.DeletePod(ctx, pod) + if errdefs.IsNotFound(err) { + return nil + } + if err != nil { + err = pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name)) + span.SetStatus(err) + } + return err + } // At this point we know the Pod resource has either been created or updated (which includes being marked for deletion). - return pc.syncPodInProvider(ctx, pod) + return pc.syncPodInProvider(ctx, pod, key) } // syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation. -func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) error { +func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, key string) (retErr error) { ctx, span := trace.StartSpan(ctx, "syncPodInProvider") defer span.End() // Add the pod's attributes to the current span. ctx = addPodAttributes(ctx, span, pod) + // If the pod('s containers) is no longer in a running state then we force-delete the pod from API server + // more context is here: https://github.com/virtual-kubelet/virtual-kubelet/pull/760 + if pod.DeletionTimestamp != nil && !running(&pod.Status) { + log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running") + pc.deletionQ.Add(key) + return nil + } + obj, ok := pc.knownPods.Load(key) + if !ok { + // That means the pod was deleted while we were working + return nil + } + kPod := obj.(*knownPod) + kPod.Lock() + if kPod.lastPodUsed != nil && podsEffectivelyEqual(kPod.lastPodUsed, pod) { + kPod.Unlock() + return nil + } + kPod.Unlock() + + defer func() { + if retErr == nil { + kPod.Lock() + defer kPod.Unlock() + kPod.lastPodUsed = pod + } + }() + // Check whether the pod has been marked for deletion. // If it does, guarantee it is deleted in the provider and Kubernetes. if pod.DeletionTimestamp != nil { - if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { + log.G(ctx).Debug("Deleting pod in provider") + if err := pc.deletePod(ctx, pod); errdefs.IsNotFound(err) { + log.G(ctx).Debug("Pod not found in provider") + } else if err != nil { err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod)) span.SetStatus(err) return err } + + pc.deletionQ.AddAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds)) return nil } @@ -419,6 +476,25 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) return nil } +// runDeletionReconcilationWorker is a long-running function that will continually call the processDeletionReconcilationWorkItem +// function in order to read and process an item on the work queue that is generated by the pod informer. +func (pc *PodController) runDeletionReconcilationWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { + for pc.processDeletionReconcilationWorkItem(ctx, workerID, q) { + } +} + +// processDeletionReconcilationWorkItem will read a single work item off the work queue and attempt to process it,by calling the deletionReconcilation. +func (pc *PodController) processDeletionReconcilationWorkItem(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool { + + // We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item. + ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem") + defer span.End() + + // Add the ID of the current worker as an attribute to the current span. + ctx = span.WithField(ctx, "workerId", workerID) + return handleQueueItem(ctx, q, pc.deletePodHandler) +} + // deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them. func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) { ctx, span := trace.StartSpan(ctx, "deleteDanglingPods") @@ -474,7 +550,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int // Add the pod's attributes to the current span. ctx = addPodAttributes(ctx, span, pod) // Actually delete the pod. - if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { + if err := pc.provider.DeletePod(ctx, pod.DeepCopy()); err != nil && !errdefs.IsNotFound(err) { span.SetStatus(err) log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod)) } else { @@ -503,3 +579,31 @@ func loggablePodName(pod *corev1.Pod) string { func loggablePodNameFromCoordinates(namespace, name string) string { return fmt.Sprintf("%s/%s", namespace, name) } + +// podsEffectivelyEqual compares two pods, and ignores the pod status, and the resource version +func podsEffectivelyEqual(p1, p2 *corev1.Pod) bool { + filterForResourceVersion := func(p cmp.Path) bool { + if p.String() == "ObjectMeta.ResourceVersion" { + return true + } + if p.String() == "Status" { + return true + } + return false + } + + return cmp.Equal(p1, p2, cmp.FilterPath(filterForResourceVersion, cmp.Ignore())) +} + +// borrowed from https://github.com/kubernetes/kubernetes/blob/f64c631cd7aea58d2552ae2038c1225067d30dde/pkg/kubelet/kubelet_pods.go#L944-L953 +// running returns true, unless if every status is terminated or waiting, or the status list +// is empty. +func running(podStatus *corev1.PodStatus) bool { + statuses := podStatus.ContainerStatuses + for _, status := range statuses { + if status.State.Terminated == nil && status.State.Waiting == nil { + return true + } + } + return false +} diff --git a/node/podcontroller_test.go b/node/podcontroller_test.go index e9ed7260d..c87d8c673 100644 --- a/node/podcontroller_test.go +++ b/node/podcontroller_test.go @@ -6,6 +6,8 @@ import ( "time" "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestPodControllerExitOnContextCancel(t *testing.T) { @@ -42,3 +44,49 @@ func TestPodControllerExitOnContextCancel(t *testing.T) { } assert.NilError(t, tc.Err()) } + +func TestCompareResourceVersion(t *testing.T) { + p1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + } + p2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + }, + } + assert.Assert(t, podsEffectivelyEqual(p1, p2)) +} + +func TestCompareStatus(t *testing.T) { + p1 := &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + p2 := &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + }, + } + assert.Assert(t, podsEffectivelyEqual(p1, p2)) +} + +func TestCompareLabels(t *testing.T) { + p1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar1", + }, + }, + } + p2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar2", + }, + }, + } + assert.Assert(t, !podsEffectivelyEqual(p1, p2)) +}