From cdc261a08d8902df194446b9abe337be2e569d5a Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Thu, 10 Oct 2019 11:40:04 -0700 Subject: [PATCH 1/3] Use go-cmp to compare pods to suppress duplicate updates Rather than copying the pods, this uses go-cmp and filters out the paths which should not be compared. --- node/podcontroller.go | 26 +++++++++++++++------ node/podcontroller_test.go | 48 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/node/podcontroller.go b/node/podcontroller.go index 05dc7b88b..cef696ab5 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -17,10 +17,10 @@ package node import ( "context" "fmt" - "reflect" "strconv" "sync" + "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/internal/manager" @@ -241,14 +241,11 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er }, UpdateFunc: func(oldObj, newObj interface{}) { // Create a copy of the old and new pod objects so we don't mutate the cache. - oldPod := oldObj.(*corev1.Pod).DeepCopy() - newPod := newObj.(*corev1.Pod).DeepCopy() - // We want to check if the two objects differ in anything other than their resource versions. - // Hence, we make them equal so that this change isn't picked up by reflect.DeepEqual. - newPod.ResourceVersion = oldPod.ResourceVersion + oldPod := oldObj.(*corev1.Pod) + newPod := newObj.(*corev1.Pod) // Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed. // This guarantees that we don't attempt to sync the pod every time its .status field is updated. - if reflect.DeepEqual(oldPod.ObjectMeta, newPod.ObjectMeta) && reflect.DeepEqual(oldPod.Spec, newPod.Spec) { + if podsEffectivelyEqual(oldPod, newPod) { return } // At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod. @@ -503,3 +500,18 @@ func loggablePodName(pod *corev1.Pod) string { func loggablePodNameFromCoordinates(namespace, name string) string { return fmt.Sprintf("%s/%s", namespace, name) } + +// podsEffectivelyEqual compares two pods, and ignores the pod status, and the resource version +func podsEffectivelyEqual(p1, p2 *corev1.Pod) bool { + filterForResourceVersion := func(p cmp.Path) bool { + if p.String() == "ObjectMeta.ResourceVersion" { + return true + } + if p.String() == "Status" { + return true + } + return false + } + + return cmp.Equal(p1, p2, cmp.FilterPath(filterForResourceVersion, cmp.Ignore())) +} diff --git a/node/podcontroller_test.go b/node/podcontroller_test.go index e9ed7260d..c87d8c673 100644 --- a/node/podcontroller_test.go +++ b/node/podcontroller_test.go @@ -6,6 +6,8 @@ import ( "time" "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestPodControllerExitOnContextCancel(t *testing.T) { @@ -42,3 +44,49 @@ func TestPodControllerExitOnContextCancel(t *testing.T) { } assert.NilError(t, tc.Err()) } + +func TestCompareResourceVersion(t *testing.T) { + p1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + } + p2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + }, + } + assert.Assert(t, podsEffectivelyEqual(p1, p2)) +} + +func TestCompareStatus(t *testing.T) { + p1 := &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + p2 := &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + }, + } + assert.Assert(t, podsEffectivelyEqual(p1, p2)) +} + +func TestCompareLabels(t *testing.T) { + p1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar1", + }, + }, + } + p2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar2", + }, + }, + } + assert.Assert(t, !podsEffectivelyEqual(p1, p2)) +} From 871424368f52ac624489f5be99a70fa6e218d1f8 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Fri, 11 Oct 2019 15:45:33 -0700 Subject: [PATCH 2/3] Fix pod status updates for when pod is updated outside of VK Pods can be updated outside of VK. Right now, if this happens, pod status updates are dropped because the resourceversion from the provider will mismatch with what's on the server, breaking pod status updates. Since we're the only ones writing to the pod status, we can do a blind overwrite. --- node/pod.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/node/pod.go b/node/pod.go index bd2a901e9..0f20f6075 100644 --- a/node/pod.go +++ b/node/pod.go @@ -213,8 +213,12 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes } kPod := obj.(*knownPod) kPod.Lock() - podFromProvider := kPod.lastPodStatusReceivedFromProvider + podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy() kPod.Unlock() + // We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating + // the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore + // we need to copy the pod and set ResourceVersion to 0. + podFromProvider.ResourceVersion = "0" if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil { span.SetStatus(err) From d22265e5f5c2680f042ed891d166fd2127d0c948 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Thu, 10 Oct 2019 13:24:58 -0700 Subject: [PATCH 3/3] Do not delete pods in a non-graceful manner This moves from forcefully deleting pods to deleting pods in a graceful manner from the API Server. It waits for the pod to get to a terminal status prior to deleting the pod from api server. --- node/lifecycle_test.go | 56 +++++++++++++++--- node/mock_test.go | 18 ++++-- node/pod.go | 97 +++++++++++++++---------------- node/pod_test.go | 58 +------------------ node/podcontroller.go | 126 +++++++++++++++++++++++++++++++++++------ 5 files changed, 219 insertions(+), 136 deletions(-) diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 9b0d0971d..41480e029 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -94,8 +94,7 @@ func TestPodLifecycle(t *testing.T) { ctx = log.WithLogger(ctx, log.L) - // isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_, which is the VK's - // action when the pod is deleted from the provider + // isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_ isPodDeletedPermanentlyFunc := func(ctx context.Context, watcher watch.Interface) error { _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) { log.G(ctx).WithField("event", ev).Info("got event") @@ -107,13 +106,23 @@ func TestPodLifecycle(t *testing.T) { return watchErr } + // isPodDeletedGracefullyFunc is a condition func that waits until the pod is in a terminal state, which is the VK's + // action when the pod is deleted from the provider + isPodDeletedGracefullyFunc := func(ctx context.Context, watcher watch.Interface) error { + _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) { + log.G(ctx).WithField("event", ev).Info("got event") + pod := ev.Object.(*corev1.Pod) + return (pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded) && pod.Status.Reason == mockProviderPodDeletedReason, nil + }) + return watchErr + } + // createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting // it gracefully t.Run("createStartDeleteScenario", func(t *testing.T) { - t.Run("mockProvider", func(t *testing.T) { assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) + testCreateStartDeleteScenario(ctx, t, s, isPodDeletedGracefullyFunc, true) })) }) }) @@ -124,7 +133,7 @@ func TestPodLifecycle(t *testing.T) { mp := newMockProvider() mp.errorOnDelete = errdefs.NotFound("not found") assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) + testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc, false) })) }) @@ -137,7 +146,7 @@ func TestPodLifecycle(t *testing.T) { } mp.errorOnDelete = errors.New("random error") assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s, deletionFunc) + testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false) pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{}) assert.NilError(t, err) assert.Assert(t, is.Len(pods.Items, 1)) @@ -328,7 +337,7 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo } -func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error) { +func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error, waitForPermanentDeletion bool) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -396,6 +405,18 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, assert.NilError(t, err) } + // Setup a watch to look for the pod eventually going away completely + watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions) + assert.NilError(t, err) + defer watcher2.Stop() + waitDeleteCh := make(chan error) + go func() { + _, watchDeleteErr := watchutils.UntilWithoutRetry(ctx, watcher2, func(ev watch.Event) (bool, error) { + return ev.Type == watch.Deleted, nil + }) + waitDeleteCh <- watchDeleteErr + }() + // Setup a watch prior to pod deletion watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions) assert.NilError(t, err) @@ -410,7 +431,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{}) assert.NilError(t, err) // 2. Set the pod's deletion timestamp, version, and so on - var deletionGracePeriod int64 = 30 + var deletionGracePeriod int64 = 10 currentPod.DeletionGracePeriodSeconds = &deletionGracePeriod deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod))) currentPod.DeletionTimestamp = &deletionTimestamp @@ -424,6 +445,15 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, case err = <-watchErrCh: assert.NilError(t, err) } + + if waitForPermanentDeletion { + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case err = <-waitDeleteCh: + assert.NilError(t, err) + } + } } func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) { @@ -526,6 +556,7 @@ func randomizeName(pod *corev1.Pod) { } func newPod(podmodifiers ...podModifier) *corev1.Pod { + var terminationGracePeriodSeconds int64 = 30 pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", @@ -538,7 +569,14 @@ func newPod(podmodifiers ...podModifier) *corev1.Pod { ResourceVersion: "100", }, Spec: corev1.PodSpec{ - NodeName: testNodeName, + NodeName: testNodeName, + TerminationGracePeriodSeconds: &terminationGracePeriodSeconds, + Containers: []corev1.Container{ + { + Name: "my-container", + Image: "nginx", + }, + }, }, Status: corev1.PodStatus{ Phase: corev1.PodPending, diff --git a/node/mock_test.go b/node/mock_test.go index 8554a20cf..bf60f65da 100644 --- a/node/mock_test.go +++ b/node/mock_test.go @@ -12,6 +12,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + mockProviderPodDeletedReason = "MockProviderPodDeleted" +) + var ( _ PodLifecycleHandler = (*mockProvider)(nil) ) @@ -171,15 +175,19 @@ func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { log.G(ctx).Infof("receive DeletePod %q", pod.Name) p.attemptedDeletes.increment() + key, err := buildKey(pod) + if err != nil { + return err + } + + if errdefs.IsNotFound(p.errorOnDelete) { + p.pods.Delete(key) + } if p.errorOnDelete != nil { return p.errorOnDelete } p.deletes.increment() - key, err := buildKey(pod) - if err != nil { - return err - } if _, exists := p.pods.Load(key); !exists { return errdefs.NotFound("pod not found") @@ -188,7 +196,7 @@ func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { now := metav1.Now() pod.Status.Phase = v1.PodSucceeded - pod.Status.Reason = "MockProviderPodDeleted" + pod.Status.Reason = mockProviderPodDeletedReason for idx := range pod.Status.ContainerStatuses { pod.Status.ContainerStatuses[idx].Ready = false diff --git a/node/pod.go b/node/pod.go index 0f20f6075..ee3479abe 100644 --- a/node/pod.go +++ b/node/pod.go @@ -19,7 +19,6 @@ import ( "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" @@ -133,61 +132,19 @@ func (pc *PodController) handleProviderError(ctx context.Context, span trace.Spa span.SetStatus(origErr) } -func (pc *PodController) deletePod(ctx context.Context, namespace, name string) error { +func (pc *PodController) deletePod(ctx context.Context, pod *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "deletePod") defer span.End() - - pod, err := pc.provider.GetPod(ctx, namespace, name) - if err != nil { - if errdefs.IsNotFound(err) { - // The provider is not aware of the pod, but we must still delete the Kubernetes API resource. - return pc.forceDeletePodResource(ctx, namespace, name) - } - return err - } - - // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. - if pod == nil { - // The provider is not aware of the pod, but we must still delete the Kubernetes API resource. - return pc.forceDeletePodResource(ctx, namespace, name) - } - ctx = addPodAttributes(ctx, span, pod) - var delErr error - if delErr = pc.provider.DeletePod(ctx, pod.DeepCopy()); delErr != nil && !errdefs.IsNotFound(delErr) { - span.SetStatus(delErr) - return delErr + err := pc.provider.DeletePod(ctx, pod.DeepCopy()) + if err != nil { + span.SetStatus(err) + return err } log.G(ctx).Debug("Deleted pod from provider") - if err := pc.forceDeletePodResource(ctx, namespace, name); err != nil { - span.SetStatus(err) - return err - } - log.G(ctx).Info("Deleted pod from Kubernetes") - - return nil -} - -func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, name string) error { - ctx, span := trace.StartSpan(ctx, "forceDeletePodResource") - defer span.End() - ctx = span.WithFields(ctx, log.Fields{ - "namespace": namespace, - "name": name, - }) - - var grace int64 - if err := pc.client.Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil { - if errors.IsNotFound(err) { - log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete") - return nil - } - span.SetStatus(err) - return pkgerrors.Wrap(err, "Failed to delete Kubernetes pod") - } return nil } @@ -280,3 +237,47 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE return pc.updatePodStatus(ctx, pod, key) } + +func (pc *PodController) deletePodHandler(ctx context.Context, key string) error { + ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem") + defer span.End() + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + ctx = span.WithFields(ctx, log.Fields{ + "namespace": namespace, + "name": name, + }) + + if err != nil { + // Log the error as a warning, but do not requeue the key as it is invalid. + log.G(ctx).Warn(pkgerrors.Wrapf(err, "invalid resource key: %q", key)) + span.SetStatus(err) + return nil + } + + // If the pod has been deleted from API server, we don't need to do anything. + k8sPod, err := pc.podsLister.Pods(namespace).Get(name) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + span.SetStatus(err) + return err + } + + if running(&k8sPod.Status) { + log.G(ctx).Error("Force deleting pod in running state") + } + + // We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update + // was in progress, + err = pc.client.Pods(namespace).Delete(name, metav1.NewDeleteOptions(0)) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + span.SetStatus(err) + return err + } + return nil +} diff --git a/node/pod_test.go b/node/pod_test.go index abf0ec515..f281e95fe 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -19,14 +19,10 @@ import ( "testing" "time" - pkgerrors "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/errdefs" testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util" "gotest.tools/assert" is "gotest.tools/assert/cmp" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/util/workqueue" @@ -51,6 +47,7 @@ func newTestController() *TestController { resourceManager: rm, recorder: testutil.FakeEventRecorder(5), k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), done: make(chan struct{}), ready: make(chan struct{}), podsInformer: iFactory.Core().V1().Pods(), @@ -172,59 +169,6 @@ func TestPodNoSpecChange(t *testing.T) { assert.Check(t, is.Equal(svr.mock.updates.read(), 0)) } -func TestPodDelete(t *testing.T) { - type testCase struct { - desc string - delErr error - } - - cases := []testCase{ - {desc: "no error on delete", delErr: nil}, - {desc: "not found error on delete", delErr: errdefs.NotFound("not found")}, - {desc: "unknown error on delete", delErr: pkgerrors.New("random error")}, - } - - for _, tc := range cases { - t.Run(tc.desc, func(t *testing.T) { - c := newTestController() - c.mock.errorOnDelete = tc.delErr - - pod := &corev1.Pod{} - pod.ObjectMeta.Namespace = "default" - pod.ObjectMeta.Name = "nginx" - pod.Spec = newPodSpec() - - pc := c.client.CoreV1().Pods("default") - - p, err := pc.Create(pod) - assert.NilError(t, err) - - ctx := context.Background() - err = c.createOrUpdatePod(ctx, p.DeepCopy()) // make sure it's actually created - assert.NilError(t, err) - assert.Check(t, is.Equal(c.mock.creates.read(), 1)) - - err = c.deletePod(ctx, pod.Namespace, pod.Name) - assert.Equal(t, pkgerrors.Cause(err), err) - - var expectDeletes int - if tc.delErr == nil { - expectDeletes = 1 - } - assert.Check(t, is.Equal(c.mock.deletes.read(), expectDeletes)) - - expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr) - - _, err = pc.Get(pod.Name, metav1.GetOptions{}) - if expectDeleted { - assert.Assert(t, errors.IsNotFound(err)) - } else { - assert.NilError(t, err) - } - }) - } -} - func newPodSpec() corev1.PodSpec { return corev1.PodSpec{ Containers: []corev1.Container{ diff --git a/node/podcontroller.go b/node/podcontroller.go index cef696ab5..ec616b90a 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" @@ -51,7 +52,7 @@ type PodLifecycleHandler interface { // DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is // expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal - // state, as well as the pod. + // state, as well as the pod. DeletePod may be called multiple times for the same pod. DeletePod(ctx context.Context, pod *corev1.Pod) error // GetPod retrieves a pod by name from the provider (can be cached). @@ -101,6 +102,9 @@ type PodController struct { k8sQ workqueue.RateLimitingInterface + // deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period + deletionQ workqueue.RateLimitingInterface + // From the time of creation, to termination the knownPods map will contain the pods key // (derived from Kubernetes' cache library) -> a *knownPod struct. knownPods sync.Map @@ -127,6 +131,7 @@ type knownPod struct { // should be immutable to avoid having to hold the lock the entire time you're working with them sync.Mutex lastPodStatusReceivedFromProvider *corev1.Pod + lastPodUsed *corev1.Pod } // PodControllerConfig is used to configure a new PodController. @@ -190,6 +195,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { done: make(chan struct{}), recorder: cfg.EventRecorder, k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"), + deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"), } return pc, nil @@ -207,7 +213,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er defer func() { pc.k8sQ.ShutDown() - + pc.deletionQ.ShutDown() pc.mu.Lock() pc.err = retErr close(pc.done) @@ -241,13 +247,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er }, UpdateFunc: func(oldObj, newObj interface{}) { // Create a copy of the old and new pod objects so we don't mutate the cache. - oldPod := oldObj.(*corev1.Pod) newPod := newObj.(*corev1.Pod) - // Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed. - // This guarantees that we don't attempt to sync the pod every time its .status field is updated. - if podsEffectivelyEqual(oldPod, newPod) { - return - } + // At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod. if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { log.G(ctx).Error(err) @@ -261,6 +262,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er } else { pc.knownPods.Delete(key) pc.k8sQ.AddRateLimited(key) + // If this pod was in the deletion queue, forget about it + pc.deletionQ.Forget(key) } }, }) @@ -292,6 +295,15 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er }() } + for id := 0; id < podSyncWorkers; id++ { + wg.Add(1) + workerID := strconv.Itoa(id) + go func() { + defer wg.Done() + pc.runDeletionReconcilationWorker(ctx, workerID, pc.deletionQ) + }() + } + close(pc.ready) log.G(ctx).Info("started workers") @@ -299,6 +311,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er log.G(ctx).Info("shutting down workers") pc.k8sQ.ShutDown() podStatusQueue.ShutDown() + pc.deletionQ.ShutDown() wg.Wait() return nil @@ -350,6 +363,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error { // Add the current key as an attribute to the current span. ctx = span.WithField(ctx, "key", key) + log.G(ctx).WithField("key", key).Debug("sync handled") // Convert the namespace/name string into a distinct namespace and name. namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -369,35 +383,81 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error { span.SetStatus(err) return err } - // At this point we know the Pod resource doesn't exist, which most probably means it was deleted. - // Hence, we must delete it from the provider if it still exists there. - if err := pc.deletePod(ctx, namespace, name); err != nil { - err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name)) + + pod, err = pc.provider.GetPod(ctx, namespace, name) + if err != nil && !errdefs.IsNotFound(err) { + err = pkgerrors.Wrapf(err, "failed to fetch pod with key %q from provider", key) span.SetStatus(err) return err } - return nil + if errdefs.IsNotFound(err) || pod == nil { + return nil + } + + err = pc.provider.DeletePod(ctx, pod) + if errdefs.IsNotFound(err) { + return nil + } + if err != nil { + err = pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name)) + span.SetStatus(err) + } + return err + } // At this point we know the Pod resource has either been created or updated (which includes being marked for deletion). - return pc.syncPodInProvider(ctx, pod) + return pc.syncPodInProvider(ctx, pod, key) } // syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation. -func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) error { +func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, key string) (retErr error) { ctx, span := trace.StartSpan(ctx, "syncPodInProvider") defer span.End() // Add the pod's attributes to the current span. ctx = addPodAttributes(ctx, span, pod) + // If the pod('s containers) is no longer in a running state then we force-delete the pod from API server + // more context is here: https://github.com/virtual-kubelet/virtual-kubelet/pull/760 + if pod.DeletionTimestamp != nil && !running(&pod.Status) { + log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running") + pc.deletionQ.Add(key) + return nil + } + obj, ok := pc.knownPods.Load(key) + if !ok { + // That means the pod was deleted while we were working + return nil + } + kPod := obj.(*knownPod) + kPod.Lock() + if kPod.lastPodUsed != nil && podsEffectivelyEqual(kPod.lastPodUsed, pod) { + kPod.Unlock() + return nil + } + kPod.Unlock() + + defer func() { + if retErr == nil { + kPod.Lock() + defer kPod.Unlock() + kPod.lastPodUsed = pod + } + }() + // Check whether the pod has been marked for deletion. // If it does, guarantee it is deleted in the provider and Kubernetes. if pod.DeletionTimestamp != nil { - if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { + log.G(ctx).Debug("Deleting pod in provider") + if err := pc.deletePod(ctx, pod); errdefs.IsNotFound(err) { + log.G(ctx).Debug("Pod not found in provider") + } else if err != nil { err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod)) span.SetStatus(err) return err } + + pc.deletionQ.AddAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds)) return nil } @@ -416,6 +476,25 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) return nil } +// runDeletionReconcilationWorker is a long-running function that will continually call the processDeletionReconcilationWorkItem +// function in order to read and process an item on the work queue that is generated by the pod informer. +func (pc *PodController) runDeletionReconcilationWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { + for pc.processDeletionReconcilationWorkItem(ctx, workerID, q) { + } +} + +// processDeletionReconcilationWorkItem will read a single work item off the work queue and attempt to process it,by calling the deletionReconcilation. +func (pc *PodController) processDeletionReconcilationWorkItem(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool { + + // We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item. + ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem") + defer span.End() + + // Add the ID of the current worker as an attribute to the current span. + ctx = span.WithField(ctx, "workerId", workerID) + return handleQueueItem(ctx, q, pc.deletePodHandler) +} + // deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them. func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) { ctx, span := trace.StartSpan(ctx, "deleteDanglingPods") @@ -471,7 +550,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int // Add the pod's attributes to the current span. ctx = addPodAttributes(ctx, span, pod) // Actually delete the pod. - if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { + if err := pc.provider.DeletePod(ctx, pod.DeepCopy()); err != nil && !errdefs.IsNotFound(err) { span.SetStatus(err) log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod)) } else { @@ -515,3 +594,16 @@ func podsEffectivelyEqual(p1, p2 *corev1.Pod) bool { return cmp.Equal(p1, p2, cmp.FilterPath(filterForResourceVersion, cmp.Ignore())) } + +// borrowed from https://github.com/kubernetes/kubernetes/blob/f64c631cd7aea58d2552ae2038c1225067d30dde/pkg/kubelet/kubelet_pods.go#L944-L953 +// running returns true, unless if every status is terminated or waiting, or the status list +// is empty. +func running(podStatus *corev1.PodStatus) bool { + statuses := podStatus.ContainerStatuses + for _, status := range statuses { + if status.State.Terminated == nil && status.State.Waiting == nil { + return true + } + } + return false +}