Merge pull request #908 from cwdsuzhou/race_delete
Fix race between k8s and provider when deleting pod
This commit is contained in:
25
node/pod.go
25
node/pod.go
@@ -17,6 +17,7 @@ package node
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
@@ -367,7 +368,8 @@ func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, ke
|
|||||||
ctx, span := trace.StartSpan(ctx, "deletePodsFromKubernetesHandler")
|
ctx, span := trace.StartSpan(ctx, "deletePodsFromKubernetesHandler")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
uid, metaKey := getUIDAndMetaNamespaceKey(key)
|
||||||
|
namespace, name, err := cache.SplitMetaNamespaceKey(metaKey)
|
||||||
ctx = span.WithFields(ctx, log.Fields{
|
ctx = span.WithFields(ctx, log.Fields{
|
||||||
"namespace": namespace,
|
"namespace": namespace,
|
||||||
"name": name,
|
"name": name,
|
||||||
@@ -397,15 +399,25 @@ func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, ke
|
|||||||
span.SetStatus(err)
|
span.SetStatus(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if string(k8sPod.UID) != uid {
|
||||||
|
log.G(ctx).WithField("k8sPodUID", k8sPod.UID).WithField("uid", uid).Warn("Not deleting pod because remote pod has different UID")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if running(&k8sPod.Status) {
|
if running(&k8sPod.Status) {
|
||||||
log.G(ctx).Error("Force deleting pod in running state")
|
log.G(ctx).Error("Force deleting pod in running state")
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update
|
// We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update
|
||||||
// was in progress,
|
// was in progress,
|
||||||
err = pc.client.Pods(namespace).Delete(ctx, name, *metav1.NewDeleteOptions(0))
|
deleteOptions := metav1.NewDeleteOptions(0)
|
||||||
|
deleteOptions.Preconditions = metav1.NewUIDPreconditions(uid)
|
||||||
|
err = pc.client.Pods(namespace).Delete(ctx, name, *deleteOptions)
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
|
log.G(ctx).Warnf("Not deleting pod because %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if errors.IsConflict(err) {
|
||||||
|
log.G(ctx).Warnf("There was a conflict, maybe trying to delete a Pod that has been recreated: %v", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -414,3 +426,10 @@ func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, ke
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getUIDAndMetaNamespaceKey(key string) (string, string) {
|
||||||
|
idx := strings.LastIndex(key, "/")
|
||||||
|
uid := key[idx+1:]
|
||||||
|
metaKey := key[:idx]
|
||||||
|
return uid, metaKey
|
||||||
|
}
|
||||||
|
|||||||
123
node/pod_test.go
123
node/pod_test.go
@@ -20,16 +20,20 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util"
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
"gotest.tools/assert"
|
"gotest.tools/assert"
|
||||||
is "gotest.tools/assert/cmp"
|
is "gotest.tools/assert/cmp"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
kubeinformers "k8s.io/client-go/informers"
|
kubeinformers "k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
core "k8s.io/client-go/testing"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
|
|
||||||
|
testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TestController struct {
|
type TestController struct {
|
||||||
@@ -329,6 +333,123 @@ func TestPodStatusDelete(t *testing.T) {
|
|||||||
t.Logf("pod updated, container status: %+v, pod delete Time: %v", newPod.Status.ContainerStatuses[0].State.Terminated, newPod.DeletionTimestamp)
|
t.Logf("pod updated, container status: %+v, pod delete Time: %v", newPod.Status.ContainerStatuses[0].State.Terminated, newPod.DeletionTimestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReCreatePodRace(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
c := newTestController()
|
||||||
|
pod := &corev1.Pod{}
|
||||||
|
pod.ObjectMeta.Namespace = "default"
|
||||||
|
pod.ObjectMeta.Name = "nginx"
|
||||||
|
pod.Spec = newPodSpec()
|
||||||
|
pod.UID = "aaaaa"
|
||||||
|
podCopy := pod.DeepCopy()
|
||||||
|
podCopy.UID = "bbbbb"
|
||||||
|
|
||||||
|
// test conflict
|
||||||
|
fk8s := &fake.Clientset{}
|
||||||
|
c.client = fk8s
|
||||||
|
c.PodController.client = fk8s.CoreV1()
|
||||||
|
key := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, pod.UID)
|
||||||
|
c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy})
|
||||||
|
c.deletePodsFromKubernetes.Enqueue(ctx, key)
|
||||||
|
if err := c.podsInformer.Informer().GetStore().Add(pod); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
c.client.AddReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
name := action.(core.DeleteAction).GetName()
|
||||||
|
t.Logf("deleted pod %s", name)
|
||||||
|
return true, nil, errors.NewConflict(schema.GroupResource{Group: "", Resource: "pods"}, "nginx", fmt.Errorf("test conflict"))
|
||||||
|
})
|
||||||
|
c.client.AddReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
name := action.(core.DeleteAction).GetName()
|
||||||
|
t.Logf("get pod %s", name)
|
||||||
|
return true, podCopy, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
err := c.deletePodsFromKubernetesHandler(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Failed")
|
||||||
|
}
|
||||||
|
p, err := c.client.CoreV1().Pods(podCopy.Namespace).Get(ctx, podCopy.Name, v1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Pod not exist, %v", err)
|
||||||
|
}
|
||||||
|
if p.UID != podCopy.UID {
|
||||||
|
t.Errorf("Desired uid: %v, get: %v", podCopy.UID, p.UID)
|
||||||
|
}
|
||||||
|
t.Log("pod conflict test success")
|
||||||
|
|
||||||
|
// test not found
|
||||||
|
c = newTestController()
|
||||||
|
fk8s = &fake.Clientset{}
|
||||||
|
c.client = fk8s
|
||||||
|
c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy})
|
||||||
|
c.deletePodsFromKubernetes.Enqueue(ctx, key)
|
||||||
|
if err = c.podsInformer.Informer().GetStore().Add(pod); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
c.client.AddReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
name := action.(core.DeleteAction).GetName()
|
||||||
|
t.Logf("deleted pod %s", name)
|
||||||
|
return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx")
|
||||||
|
})
|
||||||
|
|
||||||
|
c.client.AddReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
name := action.(core.DeleteAction).GetName()
|
||||||
|
t.Logf("get pod %s", name)
|
||||||
|
return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx")
|
||||||
|
})
|
||||||
|
|
||||||
|
err = c.deletePodsFromKubernetesHandler(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Failed")
|
||||||
|
}
|
||||||
|
_, err = c.client.CoreV1().Pods(podCopy.Namespace).Get(ctx, podCopy.Name, v1.GetOptions{})
|
||||||
|
if err == nil {
|
||||||
|
t.Log("delete success")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !errors.IsNotFound(err) {
|
||||||
|
t.Fatal("Desired pod not exist")
|
||||||
|
}
|
||||||
|
t.Log("pod not found test success")
|
||||||
|
|
||||||
|
// test uid not equal before query
|
||||||
|
c = newTestController()
|
||||||
|
fk8s = &fake.Clientset{}
|
||||||
|
c.client = fk8s
|
||||||
|
c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy})
|
||||||
|
c.deletePodsFromKubernetes.Enqueue(ctx, key)
|
||||||
|
// add new pod
|
||||||
|
if err = c.podsInformer.Informer().GetStore().Add(podCopy); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
c.client.AddReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
name := action.(core.DeleteAction).GetName()
|
||||||
|
t.Logf("deleted pod %s", name)
|
||||||
|
return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx")
|
||||||
|
})
|
||||||
|
|
||||||
|
c.client.AddReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
name := action.(core.DeleteAction).GetName()
|
||||||
|
t.Logf("get pod %s", name)
|
||||||
|
return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx")
|
||||||
|
})
|
||||||
|
|
||||||
|
err = c.deletePodsFromKubernetesHandler(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Failed")
|
||||||
|
}
|
||||||
|
_, err = c.client.CoreV1().Pods(podCopy.Namespace).Get(ctx, podCopy.Name, v1.GetOptions{})
|
||||||
|
if err == nil {
|
||||||
|
t.Log("delete success")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !errors.IsNotFound(err) {
|
||||||
|
t.Fatal("Desired pod not exist")
|
||||||
|
}
|
||||||
|
t.Log("pod uid conflict test success")
|
||||||
|
}
|
||||||
|
|
||||||
func newPodSpec() corev1.PodSpec {
|
func newPodSpec() corev1.PodSpec {
|
||||||
return corev1.PodSpec{
|
return corev1.PodSpec{
|
||||||
Containers: []corev1.Container{
|
Containers: []corev1.Container{
|
||||||
|
|||||||
@@ -364,10 +364,15 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
|||||||
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
|
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
|
||||||
log.G(ctx).Error(err)
|
log.G(ctx).Error(err)
|
||||||
} else {
|
} else {
|
||||||
|
k8sPod, ok := pod.(*corev1.Pod)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
ctx = span.WithField(ctx, "key", key)
|
ctx = span.WithField(ctx, "key", key)
|
||||||
pc.knownPods.Delete(key)
|
pc.knownPods.Delete(key)
|
||||||
pc.syncPodsFromKubernetes.Enqueue(ctx, key)
|
pc.syncPodsFromKubernetes.Enqueue(ctx, key)
|
||||||
// If this pod was in the deletion queue, forget about it
|
// If this pod was in the deletion queue, forget about it
|
||||||
|
key = fmt.Sprintf("%v/%v", key, k8sPod.UID)
|
||||||
pc.deletePodsFromKubernetes.Forget(ctx, key)
|
pc.deletePodsFromKubernetes.Forget(ctx, key)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -501,6 +506,8 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod,
|
|||||||
if pod.DeletionTimestamp != nil && !running(&pod.Status) {
|
if pod.DeletionTimestamp != nil && !running(&pod.Status) {
|
||||||
log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running")
|
log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running")
|
||||||
pc.deletePodsFromKubernetes.EnqueueWithoutRateLimit(ctx, key)
|
pc.deletePodsFromKubernetes.EnqueueWithoutRateLimit(ctx, key)
|
||||||
|
key = fmt.Sprintf("%v/%v", key, pod.UID)
|
||||||
|
pc.deletePodsFromKubernetes.EnqueueWithoutRateLimit(ctx, key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
obj, ok := pc.knownPods.Load(key)
|
obj, ok := pc.knownPods.Load(key)
|
||||||
@@ -536,6 +543,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
key = fmt.Sprintf("%v/%v", key, pod.UID)
|
||||||
pc.deletePodsFromKubernetes.EnqueueWithoutRateLimitWithDelay(ctx, key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds))
|
pc.deletePodsFromKubernetes.EnqueueWithoutRateLimitWithDelay(ctx, key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user