Fix race between k8s and provider when deleting pod
This commit is contained in:
123
node/pod_test.go
123
node/pod_test.go
@@ -20,16 +20,20 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util"
|
||||
"golang.org/x/time/rate"
|
||||
"gotest.tools/assert"
|
||||
is "gotest.tools/assert/cmp"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util"
|
||||
)
|
||||
|
||||
type TestController struct {
|
||||
@@ -329,6 +333,123 @@ func TestPodStatusDelete(t *testing.T) {
|
||||
t.Logf("pod updated, container status: %+v, pod delete Time: %v", newPod.Status.ContainerStatuses[0].State.Terminated, newPod.DeletionTimestamp)
|
||||
}
|
||||
|
||||
func TestReCreatePodRace(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
c := newTestController()
|
||||
pod := &corev1.Pod{}
|
||||
pod.ObjectMeta.Namespace = "default"
|
||||
pod.ObjectMeta.Name = "nginx"
|
||||
pod.Spec = newPodSpec()
|
||||
pod.UID = "aaaaa"
|
||||
podCopy := pod.DeepCopy()
|
||||
podCopy.UID = "bbbbb"
|
||||
|
||||
// test conflict
|
||||
fk8s := &fake.Clientset{}
|
||||
c.client = fk8s
|
||||
c.PodController.client = fk8s.CoreV1()
|
||||
key := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, pod.UID)
|
||||
c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy})
|
||||
c.deletePodsFromKubernetes.Enqueue(ctx, key)
|
||||
if err := c.podsInformer.Informer().GetStore().Add(pod); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.client.AddReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
name := action.(core.DeleteAction).GetName()
|
||||
t.Logf("deleted pod %s", name)
|
||||
return true, nil, errors.NewConflict(schema.GroupResource{Group: "", Resource: "pods"}, "nginx", fmt.Errorf("test conflict"))
|
||||
})
|
||||
c.client.AddReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
name := action.(core.DeleteAction).GetName()
|
||||
t.Logf("get pod %s", name)
|
||||
return true, podCopy, nil
|
||||
})
|
||||
|
||||
err := c.deletePodsFromKubernetesHandler(ctx, key)
|
||||
if err != nil {
|
||||
t.Error("Failed")
|
||||
}
|
||||
p, err := c.client.CoreV1().Pods(podCopy.Namespace).Get(ctx, podCopy.Name, v1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Pod not exist, %v", err)
|
||||
}
|
||||
if p.UID != podCopy.UID {
|
||||
t.Errorf("Desired uid: %v, get: %v", podCopy.UID, p.UID)
|
||||
}
|
||||
t.Log("pod conflict test success")
|
||||
|
||||
// test not found
|
||||
c = newTestController()
|
||||
fk8s = &fake.Clientset{}
|
||||
c.client = fk8s
|
||||
c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy})
|
||||
c.deletePodsFromKubernetes.Enqueue(ctx, key)
|
||||
if err = c.podsInformer.Informer().GetStore().Add(pod); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.client.AddReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
name := action.(core.DeleteAction).GetName()
|
||||
t.Logf("deleted pod %s", name)
|
||||
return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx")
|
||||
})
|
||||
|
||||
c.client.AddReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
name := action.(core.DeleteAction).GetName()
|
||||
t.Logf("get pod %s", name)
|
||||
return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx")
|
||||
})
|
||||
|
||||
err = c.deletePodsFromKubernetesHandler(ctx, key)
|
||||
if err != nil {
|
||||
t.Error("Failed")
|
||||
}
|
||||
_, err = c.client.CoreV1().Pods(podCopy.Namespace).Get(ctx, podCopy.Name, v1.GetOptions{})
|
||||
if err == nil {
|
||||
t.Log("delete success")
|
||||
return
|
||||
}
|
||||
if !errors.IsNotFound(err) {
|
||||
t.Fatal("Desired pod not exist")
|
||||
}
|
||||
t.Log("pod not found test success")
|
||||
|
||||
// test uid not equal before query
|
||||
c = newTestController()
|
||||
fk8s = &fake.Clientset{}
|
||||
c.client = fk8s
|
||||
c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy})
|
||||
c.deletePodsFromKubernetes.Enqueue(ctx, key)
|
||||
// add new pod
|
||||
if err = c.podsInformer.Informer().GetStore().Add(podCopy); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.client.AddReactor("delete", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
name := action.(core.DeleteAction).GetName()
|
||||
t.Logf("deleted pod %s", name)
|
||||
return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx")
|
||||
})
|
||||
|
||||
c.client.AddReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
name := action.(core.DeleteAction).GetName()
|
||||
t.Logf("get pod %s", name)
|
||||
return true, nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "nginx")
|
||||
})
|
||||
|
||||
err = c.deletePodsFromKubernetesHandler(ctx, key)
|
||||
if err != nil {
|
||||
t.Error("Failed")
|
||||
}
|
||||
_, err = c.client.CoreV1().Pods(podCopy.Namespace).Get(ctx, podCopy.Name, v1.GetOptions{})
|
||||
if err == nil {
|
||||
t.Log("delete success")
|
||||
return
|
||||
}
|
||||
if !errors.IsNotFound(err) {
|
||||
t.Fatal("Desired pod not exist")
|
||||
}
|
||||
t.Log("pod uid conflict test success")
|
||||
}
|
||||
|
||||
func newPodSpec() corev1.PodSpec {
|
||||
return corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
|
||||
Reference in New Issue
Block a user