From 5c2b682cdce0c99117c5d3e14069ac57379fa25d Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Tue, 6 Aug 2019 20:36:50 -0700 Subject: [PATCH] Array of minor fixups to lifecycle tests * Fix the deletion test to actually test the pod is deleted * Fix the update pods test to update a value which is allowed to be updated * Shut down watches after tests * Do not delete pod statuses on DeletePod in mock_test This intentionally leaks pod statuses, but it makes the situation a lot less complicated around handling race conditions with the GetPodStatus callback --- node/lifecycle_test.go | 129 ++++++++++++++++++++++++++++------------- node/mock_test.go | 39 ++++++++----- 2 files changed, 113 insertions(+), 55 deletions(-) diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 3bccdf02e..79c021946 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -8,10 +8,13 @@ import ( "testing" "time" + "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" "gotest.tools/assert" + is "gotest.tools/assert/cmp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -90,11 +93,26 @@ func TestPodLifecycle(t *testing.T) { ctx = log.WithLogger(ctx, log.L) - t.Run("createStartDeleteScenario", func(t *testing.T) { - t.Run("mockProvider", func(t *testing.T) { + // isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_, which is the VK's + // action when the pod is deleted from the provider + isPodDeletedPermanentlyFunc := func(ctx context.Context, watcher watch.Interface) error { + _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) { + log.G(ctx).WithField("event", ev).Info("got event") + // TODO(Sargun): The pod should have transitioned into some status around failed / succeeded + // prior to being deleted. + // In addition, we should check if the deletion timestamp gets set + return ev.Type == watch.Deleted, nil + }) + return watchErr + } + // createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting + // it gracefully + t.Run("createStartDeleteScenario", func(t *testing.T) { + + t.Run("mockProvider", func(t *testing.T) { assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s) + testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) })) }) @@ -103,13 +121,51 @@ func TestPodLifecycle(t *testing.T) { } t.Run("mockV0Provider", func(t *testing.T) { assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s) + testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) })) }) }) + // createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider + // for some reason + t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) { + mp := newMockProvider() + mp.errorOnDelete = errdefs.NotFound("not found") + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) + })) + }) + + // createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the + // provider + t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) { + mp := newMockProvider() + deletionFunc := func(ctx context.Context, watcher watch.Interface) error { + select { + case <-mp.attemptedDeletes: + case <-ctx.Done(): + return ctx.Err() + } + select { + case <-mp.attemptedDeletes: + case <-ctx.Done(): + return ctx.Err() + } + return nil + } + mp.errorOnDelete = errors.New("random error") + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testCreateStartDeleteScenario(ctx, t, s, deletionFunc) + pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{}) + assert.NilError(t, err) + assert.Assert(t, is.Len(pods.Items, 1)) + assert.Assert(t, pods.Items[0].DeletionTimestamp != nil) + })) + }) + + // danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting, + // and ensures the pod controller deletes the pod prior to continuing. t.Run("danglingPodScenario", func(t *testing.T) { - t.Parallel() t.Run("mockProvider", func(t *testing.T) { mp := newMockProvider() assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { @@ -129,8 +185,8 @@ func TestPodLifecycle(t *testing.T) { }) }) + // failedPodScenario ensures that the VK ignores failed pods that were failed prior to the PC starting up t.Run("failedPodScenario", func(t *testing.T) { - t.Parallel() t.Run("mockProvider", func(t *testing.T) { mp := newMockProvider() assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { @@ -149,8 +205,8 @@ func TestPodLifecycle(t *testing.T) { }) }) + // succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the PC starting up. t.Run("succeededPodScenario", func(t *testing.T) { - t.Parallel() t.Run("mockProvider", func(t *testing.T) { mp := newMockProvider() assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { @@ -167,8 +223,9 @@ func TestPodLifecycle(t *testing.T) { }) }) + // updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated + // to the provider t.Run("updatePodWhileRunningScenario", func(t *testing.T) { - t.Parallel() t.Run("mockProvider", func(t *testing.T) { mp := newMockProvider() assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { @@ -309,8 +366,8 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo } -func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system) { - t.Parallel() +func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error) { + ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -325,7 +382,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system) // Setup a watch (prior to pod creation, and pod controller startup) watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions) assert.NilError(t, err) - + defer watcher.Stop() // This ensures that the pod is created. go func() { _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, @@ -355,6 +412,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system) // Setup a watch to check if the pod is in running watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions) assert.NilError(t, err) + defer watcher.Stop() go func() { _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, // Wait for the pod to be started @@ -369,35 +427,26 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system) // Start the pod controller podControllerErrCh := s.start(ctx) - // Wait for pod to be in running + // Wait for the pod to go into running select { case <-ctx.Done(): t.Fatalf("Context ended early: %s", ctx.Err().Error()) - case err = <-podControllerErrCh: - assert.NilError(t, err) - t.Fatal("Pod controller exited prematurely without error") case err = <-watchErrCh: assert.NilError(t, err) - + case err = <-podControllerErrCh: + assert.NilError(t, err) + t.Fatal("Pod controller terminated early") } // Setup a watch prior to pod deletion watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions) assert.NilError(t, err) + defer watcher.Stop() go func() { - _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, - // Wait for the pod to be started - func(ev watch.Event) (bool, error) { - log.G(ctx).WithField("event", ev).Info("got event") - // TODO(Sargun): The pod should have transitioned into some status around failed / succeeded - // prior to being deleted. - // In addition, we should check if the deletion timestamp gets set - return ev.Type == watch.Deleted, nil - }) - watchErrCh <- watchErr + watchErrCh <- waitFunction(ctx, watcher) }() - // Delete the pod + // Delete the pod via deletiontimestamp // 1. Get the pod currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{}) @@ -446,14 +495,16 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys // Setup a watch to check if the pod is in running watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions) assert.NilError(t, err) + defer watcher.Stop() go func() { - _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, + newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, // Wait for the pod to be started func(ev watch.Event) (bool, error) { pod := ev.Object.(*corev1.Pod) return pod.Status.Phase == corev1.PodRunning, nil }) - + // This deepcopy is required to please the race detector + p = newPod.Object.(*corev1.Pod).DeepCopy() watchErrCh <- watchErr }() @@ -473,9 +524,16 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys } // Update the pod + version, err := strconv.Atoi(p.ResourceVersion) + if err != nil { + t.Fatalf("Could not parse pod's resource version: %s", err.Error()) + } - bumpResourceVersion(p) - p.Spec.SchedulerName = "joe" + p.ResourceVersion = strconv.Itoa(version + 1) + var activeDeadlineSeconds int64 = 300 + p.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds + + log.G(ctx).WithField("pod", p).Info("Updating pod") _, err = s.client.CoreV1().Pods(p.Namespace).Update(p) assert.NilError(t, err) for atomic.LoadUint64(&m.updates) == 0 { @@ -525,15 +583,6 @@ func randomizeName(pod *corev1.Pod) { pod.Name = name } -func bumpResourceVersion(pod *corev1.Pod) { - version, err := strconv.Atoi(pod.ResourceVersion) - if err != nil { - panic(err) - } - - pod.ResourceVersion = strconv.Itoa(version + 1) -} - func newPod(podmodifiers ...podModifier) *corev1.Pod { pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ diff --git a/node/mock_test.go b/node/mock_test.go index d0835ca53..76189167c 100644 --- a/node/mock_test.go +++ b/node/mock_test.go @@ -19,15 +19,16 @@ var ( ) type mockV0Provider struct { - creates uint64 - updates uint64 - deletes uint64 + creates uint64 + updates uint64 + deletes uint64 + attemptedDeletes chan struct{} errorOnDelete error - pods sync.Map - startTime time.Time - notifier func(*v1.Pod) + pods sync.Map + startTime time.Time + realNotifier func(*v1.Pod) } type mockProvider struct { @@ -37,22 +38,28 @@ type mockProvider struct { // NewMockProviderMockConfig creates a new mockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface func newMockV0Provider() *mockV0Provider { provider := mockV0Provider{ - startTime: time.Now(), - // By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface, - // it will be set, and then we'll call a real underlying implementation. - // This makes it easier in the sense we don't need to wrap each method. - notifier: func(*v1.Pod) {}, + startTime: time.Now(), + attemptedDeletes: make(chan struct{}, maxRetries+1), } + // By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface, + // it will be set, and then we'll call a real underlying implementation. + // This makes it easier in the sense we don't need to wrap each method. return &provider } // NewMockProviderMockConfig creates a new MockProvider with the given config func newMockProvider() *mockProvider { - return &mockProvider{mockV0Provider: newMockV0Provider()} } +// notifier calls the callback that we got from the pod controller to notify it of updates (if it is set) +func (p *mockV0Provider) notifier(pod *v1.Pod) { + if p.realNotifier != nil { + p.realNotifier(pod) + } +} + // CreatePod accepts a Pod definition and stores it in memory. func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { log.G(ctx).Infof("receive CreatePod %q", pod.Name) @@ -125,6 +132,7 @@ func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { log.G(ctx).Infof("receive DeletePod %q", pod.Name) + p.attemptedDeletes <- struct{}{} if p.errorOnDelete != nil { return p.errorOnDelete } @@ -140,7 +148,7 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) } now := metav1.Now() - p.pods.Delete(key) + pod.Status.Phase = v1.PodSucceeded pod.Status.Reason = "MockProviderPodDeleted" @@ -157,7 +165,8 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) } p.notifier(pod) - + // TODO (Sargun): Eventually delete the pod from the map. We cannot right now, because GetPodStatus can / will + // be called momentarily later. return nil } @@ -206,7 +215,7 @@ func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { // NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done // within the provider. func (p *mockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) { - p.notifier = notifier + p.realNotifier = notifier } func buildKeyFromNames(namespace string, name string) (string, error) {