From 966a960eef94d1188785881db0d3669d788a7398 Mon Sep 17 00:00:00 2001 From: wadecai Date: Sat, 29 Aug 2020 21:06:01 +0800 Subject: [PATCH] Allow to delete pod in K8s if it is deleting in Provider For example: Provier is a K8s provider, pod created by deployment would be evicted when node is not ready. If we do not delete pod in K8s, deployment would not create a new one. Add some tests for updateStatus --- node/pod.go | 24 +++++++++++++++-- node/pod_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/node/pod.go b/node/pod.go index 7d164e00d..49d17a0b1 100644 --- a/node/pod.go +++ b/node/pod.go @@ -216,14 +216,34 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes kPod.Lock() podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy() kPod.Unlock() - if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) { + if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) && podFromProvider.DeletionTimestamp == nil { return nil } + // Pod deleted by provider due some reasons. e.g. a K8s provider, pod created by deployment would be evicted when node is not ready. + // If we do not delete pod in K8s, deployment would not create a new one. + if podFromProvider.DeletionTimestamp != nil && podFromKubernetes.DeletionTimestamp == nil { + deleteOptions := metav1.DeleteOptions{ + GracePeriodSeconds: podFromProvider.DeletionGracePeriodSeconds, + } + current := metav1.NewTime(time.Now()) + if podFromProvider.DeletionTimestamp.Before(¤t) { + deleteOptions.GracePeriodSeconds = new(int64) + } + // check status here to avoid pod re-created deleted incorrectly. e.g. delete a pod from K8s and re-create it(statefulSet and so on), + // pod in provider may not delete immediately. so deletionTimestamp is not nil. Then the re-created one would be deleted if we do not check pod status. + if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) { + if err := pc.client.Pods(podFromKubernetes.Namespace).Delete(ctx, podFromKubernetes.Name, deleteOptions); err != nil && !errors.IsNotFound(err) { + span.SetStatus(err) + return pkgerrors.Wrap(err, "error while delete pod in kubernetes") + } + } + } + // We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating // the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore // we need to copy the pod and set ResourceVersion to 0. podFromProvider.ResourceVersion = "0" - if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(ctx, podFromProvider, metav1.UpdateOptions{}); err != nil { + if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(ctx, podFromProvider, metav1.UpdateOptions{}); err != nil && !errors.IsNotFound(err) { span.SetStatus(err) return pkgerrors.Wrap(err, "error while updating pod status in kubernetes") } diff --git a/node/pod_test.go b/node/pod_test.go index 5eadd7854..7ed8983e9 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -16,6 +16,8 @@ package node import ( "context" + "fmt" + "sync" "testing" "time" @@ -23,6 +25,7 @@ import ( "gotest.tools/assert" is "gotest.tools/assert/cmp" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -52,6 +55,7 @@ func newTestController() *TestController { podStatusQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), done: make(chan struct{}), ready: make(chan struct{}), + knownPods: sync.Map{}, podsInformer: iFactory.Core().V1().Pods(), podsLister: iFactory.Core().V1().Pods().Lister(), }, @@ -242,6 +246,71 @@ func TestPodNoSpecChange(t *testing.T) { assert.Check(t, is.Equal(svr.mock.updates.read(), 0)) } +func TestPodStatusDelete(t *testing.T) { + ctx := context.Background() + c := newTestController() + pod := &corev1.Pod{} + pod.ObjectMeta.Namespace = "default" + pod.ObjectMeta.Name = "nginx" + pod.Spec = newPodSpec() + fk8s := fake.NewSimpleClientset(pod) + c.client = fk8s + c.PodController.client = fk8s.CoreV1() + podCopy := pod.DeepCopy() + deleteTime := v1.Time{Time: time.Now().Add(30 * time.Second)} + podCopy.DeletionTimestamp = &deleteTime + key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name) + c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy}) + + // test pod in provider delete + err := c.updatePodStatus(ctx, pod, key) + if err != nil { + t.Fatal("pod updated failed") + } + newPod, err := c.client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, v1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + t.Fatalf("Get pod %v failed", key) + } + if newPod != nil && newPod.DeletionTimestamp == nil { + t.Fatalf("Pod %v delete failed", key) + } + t.Logf("pod delete success") + + // test pod in provider delete + pod.DeletionTimestamp = &deleteTime + if _, err = c.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, v1.CreateOptions{}); err != nil { + t.Fatal("Parepare pod in k8s failed") + } + podCopy.Status.ContainerStatuses = []corev1.ContainerStatus{ + { + State: corev1.ContainerState{ + Waiting: nil, + Running: nil, + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 1, + Message: "Exit", + }, + }, + }, + } + c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy}) + err = c.updatePodStatus(ctx, pod, key) + if err != nil { + t.Fatalf("pod updated failed %v", err) + } + newPod, err = c.client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, v1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + t.Fatalf("Get pod %v failed", key) + } + if newPod.DeletionTimestamp == nil { + t.Fatalf("Pod %v delete failed", key) + } + if newPod.Status.ContainerStatuses[0].State.Terminated == nil { + t.Fatalf("Pod status %v update failed", key) + } + t.Logf("pod updated, container status: %+v, pod delete Time: %v", newPod.Status.ContainerStatuses[0].State.Terminated, newPod.DeletionTimestamp) +} + func newPodSpec() corev1.PodSpec { return corev1.PodSpec{ Containers: []corev1.Container{