diff --git a/node/pod.go b/node/pod.go index 805504546..e323b468e 100644 --- a/node/pod.go +++ b/node/pod.go @@ -17,6 +17,7 @@ package node import ( "context" "fmt" + "strings" "time" "github.com/google/go-cmp/cmp" @@ -367,7 +368,8 @@ func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, ke ctx, span := trace.StartSpan(ctx, "deletePodsFromKubernetesHandler") defer span.End() - namespace, name, err := cache.SplitMetaNamespaceKey(key) + uid, metaKey := getUIDAndMetaNamespaceKey(key) + namespace, name, err := cache.SplitMetaNamespaceKey(metaKey) ctx = span.WithFields(ctx, log.Fields{ "namespace": namespace, "name": name, @@ -397,15 +399,25 @@ func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, ke span.SetStatus(err) return err } - + if string(k8sPod.UID) != uid { + log.G(ctx).WithField("k8sPodUID", k8sPod.UID).WithField("uid", uid).Warn("Not deleting pod because remote pod has different UID") + return nil + } if running(&k8sPod.Status) { log.G(ctx).Error("Force deleting pod in running state") } // We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update // was in progress, - err = pc.client.Pods(namespace).Delete(ctx, name, *metav1.NewDeleteOptions(0)) + deleteOptions := metav1.NewDeleteOptions(0) + deleteOptions.Preconditions = metav1.NewUIDPreconditions(uid) + err = pc.client.Pods(namespace).Delete(ctx, name, *deleteOptions) if errors.IsNotFound(err) { + log.G(ctx).Warnf("Not deleting pod because %v", err) + return nil + } + if errors.IsConflict(err) { + log.G(ctx).Warnf("There was a conflict, maybe trying to delete a Pod that has been recreated: %v", err) return nil } if err != nil { @@ -414,3 +426,10 @@ func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, ke } return nil } + +func getUIDAndMetaNamespaceKey(key string) (string, string) { + idx := strings.LastIndex(key, "/") + uid := key[idx+1:] + metaKey := key[:idx] + return uid, metaKey +} diff --git a/node/pod_test.go b/node/pod_test.go index 7bf73b194..39adc61b7 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -20,16 +20,20 @@ import ( "testing" "time" - testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util" "golang.org/x/time/rate" "gotest.tools/assert" is "gotest.tools/assert/cmp" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" "k8s.io/client-go/util/workqueue" + + testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util" ) type TestController struct { @@ -329,6 +333,123 @@ func TestPodStatusDelete(t *testing.T) { t.Logf("pod updated, container status: %+v, pod delete Time: %v", newPod.Status.ContainerStatuses[0].State.Terminated, newPod.DeletionTimestamp) } +func TestReCreatePodRace(t *testing.T) { + ctx := context.Background() + c := newTestController() + pod := &corev1.Pod{} + pod.ObjectMeta.Namespace = "default" + pod.ObjectMeta.Name = "nginx" + pod.Spec = newPodSpec() + pod.UID = "aaaaa" + podCopy := pod.DeepCopy() + podCopy.UID = "bbbbb" + + // test conflict + fk8s := &fake.Clientset{} + c.client = fk8s + c.PodController.client = fk8s.CoreV1() + key := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, pod.UID) + c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy}) + c.deletePodsFromKubernetes.Enqueue(ctx, key) + if err := c.podsInformer.Informer().GetStore().Add(pod); err != nil { + t.Fatal(err) + } + c.client.AddReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + name := action.(core.DeleteAction).GetName() + t.Logf("deleted pod %s", name) + return true, nil, errors.NewConflict(schema.GroupResource{Group: "", Resource: "pods"}, "nginx", fmt.Errorf("test conflict")) + }) + c.client.AddReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + name := action.(core.DeleteAction).GetName() + t.Logf("get pod %s", name) + return true, podCopy, nil + }) + + err := c.deletePodsFromKubernetesHandler(ctx, key) + if err != nil { + t.Error("Failed") + } + p, err := c.client.CoreV1().Pods(podCopy.Namespace).Get(ctx, podCopy.Name, v1.GetOptions{}) + if err != nil { + t.Fatalf("Pod not exist, %v", err) + } + if p.UID != podCopy.UID { + t.Errorf("Desired uid: %v, get: %v", podCopy.UID, p.UID) + } + t.Log("pod conflict test success") + + // test not found + c = newTestController() + fk8s = &fake.Clientset{} + c.client = fk8s + c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy}) + c.deletePodsFromKubernetes.Enqueue(ctx, key) + if err = c.podsInformer.Informer().GetStore().Add(pod); err != nil { + t.Fatal(err) + } + c.client.AddReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + name := action.(core.DeleteAction).GetName() + t.Logf("deleted pod %s", name) + return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx") + }) + + c.client.AddReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + name := action.(core.DeleteAction).GetName() + t.Logf("get pod %s", name) + return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx") + }) + + err = c.deletePodsFromKubernetesHandler(ctx, key) + if err != nil { + t.Error("Failed") + } + _, err = c.client.CoreV1().Pods(podCopy.Namespace).Get(ctx, podCopy.Name, v1.GetOptions{}) + if err == nil { + t.Log("delete success") + return + } + if !errors.IsNotFound(err) { + t.Fatal("Desired pod not exist") + } + t.Log("pod not found test success") + + // test uid not equal before query + c = newTestController() + fk8s = &fake.Clientset{} + c.client = fk8s + c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy}) + c.deletePodsFromKubernetes.Enqueue(ctx, key) + // add new pod + if err = c.podsInformer.Informer().GetStore().Add(podCopy); err != nil { + t.Fatal(err) + } + c.client.AddReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + name := action.(core.DeleteAction).GetName() + t.Logf("deleted pod %s", name) + return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx") + }) + + c.client.AddReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + name := action.(core.DeleteAction).GetName() + t.Logf("get pod %s", name) + return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx") + }) + + err = c.deletePodsFromKubernetesHandler(ctx, key) + if err != nil { + t.Error("Failed") + } + _, err = c.client.CoreV1().Pods(podCopy.Namespace).Get(ctx, podCopy.Name, v1.GetOptions{}) + if err == nil { + t.Log("delete success") + return + } + if !errors.IsNotFound(err) { + t.Fatal("Desired pod not exist") + } + t.Log("pod uid conflict test success") +} + func newPodSpec() corev1.PodSpec { return corev1.PodSpec{ Containers: []corev1.Container{ diff --git a/node/podcontroller.go b/node/podcontroller.go index 9654f9f9d..4add45566 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -364,10 +364,15 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { log.G(ctx).Error(err) } else { + k8sPod, ok := pod.(*corev1.Pod) + if !ok { + return + } ctx = span.WithField(ctx, "key", key) pc.knownPods.Delete(key) pc.syncPodsFromKubernetes.Enqueue(ctx, key) // If this pod was in the deletion queue, forget about it + key = fmt.Sprintf("%v/%v", key, k8sPod.UID) pc.deletePodsFromKubernetes.Forget(ctx, key) } }, @@ -501,6 +506,8 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, if pod.DeletionTimestamp != nil && !running(&pod.Status) { log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running") pc.deletePodsFromKubernetes.EnqueueWithoutRateLimit(ctx, key) + key = fmt.Sprintf("%v/%v", key, pod.UID) + pc.deletePodsFromKubernetes.EnqueueWithoutRateLimit(ctx, key) return nil } obj, ok := pc.knownPods.Load(key) @@ -536,6 +543,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, return err } + key = fmt.Sprintf("%v/%v", key, pod.UID) pc.deletePodsFromKubernetes.EnqueueWithoutRateLimitWithDelay(ctx, key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds)) return nil }