diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 059f3d571..3bccdf02e 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "sync/atomic" "testing" "time" @@ -165,6 +166,16 @@ func TestPodLifecycle(t *testing.T) { })) }) }) + + t.Run("updatePodWhileRunningScenario", func(t *testing.T) { + t.Parallel() + t.Run("mockProvider", func(t *testing.T) { + mp := newMockProvider() + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testUpdatePodWhileRunningScenario(ctx, t, s, mp) + })) + }) + }) } type testFunction func(ctx context.Context, s *system) @@ -415,6 +426,62 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system) } } +func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) { + t.Parallel() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + p := newPod() + + listOptions := metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(), + } + + watchErrCh := make(chan error) + + // Create a Pod + _, e := s.client.CoreV1().Pods(testNamespace).Create(p) + assert.NilError(t, e) + + // Setup a watch to check if the pod is in running + watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions) + assert.NilError(t, err) + go func() { + _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, + // Wait for the pod to be started + func(ev watch.Event) (bool, error) { + pod := ev.Object.(*corev1.Pod) + return pod.Status.Phase == corev1.PodRunning, nil + }) + + watchErrCh <- watchErr + }() + + // Start the pod controller + podControllerErrCh := s.start(ctx) + + // Wait for pod to be in running + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case err = <-podControllerErrCh: + assert.NilError(t, err) + t.Fatal("Pod controller exited prematurely without error") + case err = <-watchErrCh: + assert.NilError(t, err) + + } + + // Update the pod + + bumpResourceVersion(p) + p.Spec.SchedulerName = "joe" + _, err = s.client.CoreV1().Pods(p.Namespace).Update(p) + assert.NilError(t, err) + for atomic.LoadUint64(&m.updates) == 0 { + } +} + func BenchmarkCreatePods(b *testing.B) { sl := logrus.StandardLogger() sl.SetLevel(logrus.ErrorLevel) @@ -458,6 +525,15 @@ func randomizeName(pod *corev1.Pod) { pod.Name = name } +func bumpResourceVersion(pod *corev1.Pod) { + version, err := strconv.Atoi(pod.ResourceVersion) + if err != nil { + panic(err) + } + + pod.ResourceVersion = strconv.Itoa(version + 1) +} + func newPod(podmodifiers ...podModifier) *corev1.Pod { pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ diff --git a/node/mock_test.go b/node/mock_test.go index 1ab237dd2..d0835ca53 100644 --- a/node/mock_test.go +++ b/node/mock_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/virtual-kubelet/virtual-kubelet/errdefs" @@ -18,9 +19,9 @@ var ( ) type mockV0Provider struct { - creates int - updates int - deletes int + creates uint64 + updates uint64 + deletes uint64 errorOnDelete error @@ -56,7 +57,7 @@ func newMockProvider() *mockProvider { func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { log.G(ctx).Infof("receive CreatePod %q", pod.Name) - p.creates++ + atomic.AddUint64(&p.creates, 1) key, err := buildKey(pod) if err != nil { return err @@ -108,7 +109,7 @@ func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { log.G(ctx).Infof("receive UpdatePod %q", pod.Name) - p.updates++ + atomic.AddUint64(&p.updates, 1) key, err := buildKey(pod) if err != nil { return err @@ -128,7 +129,7 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) return p.errorOnDelete } - p.deletes++ + atomic.AddUint64(&p.deletes, 1) key, err := buildKey(pod) if err != nil { return err diff --git a/node/pod_test.go b/node/pod_test.go index 56eff411f..3006d23fb 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -16,6 +16,7 @@ package node import ( "context" + "sync/atomic" "testing" pkgerrors "github.com/pkg/errors" @@ -152,8 +153,8 @@ func TestPodCreateNewPod(t *testing.T) { assert.Check(t, is.Nil(err)) // createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist - assert.Check(t, is.Equal(svr.mock.creates, 1)) - assert.Check(t, is.Equal(svr.mock.updates, 0)) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) } func TestPodUpdateExisting(t *testing.T) { @@ -179,8 +180,8 @@ func TestPodUpdateExisting(t *testing.T) { err := svr.provider.CreatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) - assert.Check(t, is.Equal(svr.mock.creates, 1)) - assert.Check(t, is.Equal(svr.mock.updates, 0)) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) pod2 := &corev1.Pod{} pod2.ObjectMeta.Namespace = "default" @@ -204,8 +205,8 @@ func TestPodUpdateExisting(t *testing.T) { assert.Check(t, is.Nil(err)) // createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed - assert.Check(t, is.Equal(svr.mock.creates, 1)) - assert.Check(t, is.Equal(svr.mock.updates, 1)) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(1))) } func TestPodNoSpecChange(t *testing.T) { @@ -231,15 +232,15 @@ func TestPodNoSpecChange(t *testing.T) { err := svr.mock.CreatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) - assert.Check(t, is.Equal(svr.mock.creates, 1)) - assert.Check(t, is.Equal(svr.mock.updates, 0)) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) err = svr.createOrUpdatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) // createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change - assert.Check(t, is.Equal(svr.mock.creates, 1)) - assert.Check(t, is.Equal(svr.mock.updates, 0)) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) } func TestPodDelete(t *testing.T) { @@ -279,7 +280,7 @@ func TestPodDelete(t *testing.T) { ctx := context.Background() err = c.createOrUpdatePod(ctx, p) // make sure it's actually created assert.NilError(t, err) - assert.Check(t, is.Equal(c.mock.creates, 1)) + assert.Check(t, is.Equal(atomic.LoadUint64(&c.mock.creates), uint64(1))) err = c.deletePod(ctx, pod.Namespace, pod.Name) assert.Equal(t, pkgerrors.Cause(err), err) @@ -288,7 +289,7 @@ func TestPodDelete(t *testing.T) { if tc.delErr == nil { expectDeletes = 1 } - assert.Check(t, is.Equal(c.mock.deletes, expectDeletes)) + assert.Check(t, is.Equal(atomic.LoadUint64(&c.mock.deletes), uint64(expectDeletes))) expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr)