From fbed4ca70241a2f5af2921fa946b4b04742137d5 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Tue, 13 Aug 2019 08:43:20 -0700 Subject: [PATCH] Remove usage of atomics It turns out that running atomic.Read(...) in a tight loop breaks Golang. The goroutine would never yield control over the scheduler, so we ended up getting into a situation where the test would get stuck forever. This moves to a different model, in which there is a condition var, instead of atomics in loops. --- node/lifecycle_test.go | 18 ++---------- node/mock_test.go | 67 +++++++++++++++++++++++++++++++++++------- node/pod_test.go | 25 ++++++++-------- 3 files changed, 72 insertions(+), 38 deletions(-) diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 79c021946..484a5bd8c 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strconv" - "sync/atomic" "testing" "time" @@ -141,17 +140,7 @@ func TestPodLifecycle(t *testing.T) { t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) { mp := newMockProvider() deletionFunc := func(ctx context.Context, watcher watch.Interface) error { - select { - case <-mp.attemptedDeletes: - case <-ctx.Done(): - return ctx.Err() - } - select { - case <-mp.attemptedDeletes: - case <-ctx.Done(): - return ctx.Err() - } - return nil + return mp.attemptedDeletes.until(ctx, func(v int) bool { return v >= 2 }) } mp.errorOnDelete = errors.New("random error") assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { @@ -362,7 +351,7 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo // Start the pod controller s.start(ctx) - assert.Assert(t, m.deletes == 1) + assert.Assert(t, is.Equal(m.deletes.read(), 1)) } @@ -536,8 +525,7 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys log.G(ctx).WithField("pod", p).Info("Updating pod") _, err = s.client.CoreV1().Pods(p.Namespace).Update(p) assert.NilError(t, err) - for atomic.LoadUint64(&m.updates) == 0 { - } + assert.NilError(t, m.updates.until(ctx, func(v int) bool { return v > 0 })) } func BenchmarkCreatePods(b *testing.B) { diff --git a/node/mock_test.go b/node/mock_test.go index 76189167c..0ca3ff38c 100644 --- a/node/mock_test.go +++ b/node/mock_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sync" - "sync/atomic" "time" "github.com/virtual-kubelet/virtual-kubelet/errdefs" @@ -18,11 +17,56 @@ var ( _ PodNotifier = (*mockProvider)(nil) ) +type waitableInt struct { + cond *sync.Cond + val int +} + +func newWaitableInt() *waitableInt { + return &waitableInt{ + cond: sync.NewCond(&sync.Mutex{}), + } +} + +func (w *waitableInt) read() int { + defer w.cond.L.Unlock() + w.cond.L.Lock() + return w.val +} + +func (w *waitableInt) until(ctx context.Context, f func(int) bool) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + go func() { + <-ctx.Done() + w.cond.Broadcast() + }() + + w.cond.L.Lock() + defer w.cond.L.Unlock() + + for !f(w.val) { + if err := ctx.Err(); err != nil { + return err + } + w.cond.Wait() + } + return nil +} + +func (w *waitableInt) increment() { + w.cond.L.Lock() + defer w.cond.L.Unlock() + w.val += 1 + w.cond.Broadcast() +} + type mockV0Provider struct { - creates uint64 - updates uint64 - deletes uint64 - attemptedDeletes chan struct{} + creates *waitableInt + updates *waitableInt + deletes *waitableInt + attemptedDeletes *waitableInt errorOnDelete error @@ -39,7 +83,10 @@ type mockProvider struct { func newMockV0Provider() *mockV0Provider { provider := mockV0Provider{ startTime: time.Now(), - attemptedDeletes: make(chan struct{}, maxRetries+1), + creates: newWaitableInt(), + updates: newWaitableInt(), + deletes: newWaitableInt(), + attemptedDeletes: newWaitableInt(), } // By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface, // it will be set, and then we'll call a real underlying implementation. @@ -64,7 +111,7 @@ func (p *mockV0Provider) notifier(pod *v1.Pod) { func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { log.G(ctx).Infof("receive CreatePod %q", pod.Name) - atomic.AddUint64(&p.creates, 1) + p.creates.increment() key, err := buildKey(pod) if err != nil { return err @@ -116,7 +163,7 @@ func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { log.G(ctx).Infof("receive UpdatePod %q", pod.Name) - atomic.AddUint64(&p.updates, 1) + p.updates.increment() key, err := buildKey(pod) if err != nil { return err @@ -132,12 +179,12 @@ func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { log.G(ctx).Infof("receive DeletePod %q", pod.Name) - p.attemptedDeletes <- struct{}{} + p.attemptedDeletes.increment() if p.errorOnDelete != nil { return p.errorOnDelete } - atomic.AddUint64(&p.deletes, 1) + p.deletes.increment() key, err := buildKey(pod) if err != nil { return err diff --git a/node/pod_test.go b/node/pod_test.go index 3006d23fb..ce00a1b37 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -16,7 +16,6 @@ package node import ( "context" - "sync/atomic" "testing" pkgerrors "github.com/pkg/errors" @@ -153,8 +152,8 @@ func TestPodCreateNewPod(t *testing.T) { assert.Check(t, is.Nil(err)) // createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist - assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) - assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) + assert.Check(t, is.Equal(svr.mock.creates.read(), 1)) + assert.Check(t, is.Equal(svr.mock.updates.read(), 0)) } func TestPodUpdateExisting(t *testing.T) { @@ -180,8 +179,8 @@ func TestPodUpdateExisting(t *testing.T) { err := svr.provider.CreatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) - assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) - assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) + assert.Check(t, is.Equal(svr.mock.creates.read(), 1)) + assert.Check(t, is.Equal(svr.mock.updates.read(), 0)) pod2 := &corev1.Pod{} pod2.ObjectMeta.Namespace = "default" @@ -205,8 +204,8 @@ func TestPodUpdateExisting(t *testing.T) { assert.Check(t, is.Nil(err)) // createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed - assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) - assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(1))) + assert.Check(t, is.Equal(svr.mock.creates.read(), 1)) + assert.Check(t, is.Equal(svr.mock.updates.read(), 1)) } func TestPodNoSpecChange(t *testing.T) { @@ -232,15 +231,15 @@ func TestPodNoSpecChange(t *testing.T) { err := svr.mock.CreatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) - assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) - assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) + assert.Check(t, is.Equal(svr.mock.creates.read(), 1)) + assert.Check(t, is.Equal(svr.mock.updates.read(), 0)) err = svr.createOrUpdatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) // createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change - assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) - assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) + assert.Check(t, is.Equal(svr.mock.creates.read(), 1)) + assert.Check(t, is.Equal(svr.mock.updates.read(), 0)) } func TestPodDelete(t *testing.T) { @@ -280,7 +279,7 @@ func TestPodDelete(t *testing.T) { ctx := context.Background() err = c.createOrUpdatePod(ctx, p) // make sure it's actually created assert.NilError(t, err) - assert.Check(t, is.Equal(atomic.LoadUint64(&c.mock.creates), uint64(1))) + assert.Check(t, is.Equal(c.mock.creates.read(), 1)) err = c.deletePod(ctx, pod.Namespace, pod.Name) assert.Equal(t, pkgerrors.Cause(err), err) @@ -289,7 +288,7 @@ func TestPodDelete(t *testing.T) { if tc.delErr == nil { expectDeletes = 1 } - assert.Check(t, is.Equal(atomic.LoadUint64(&c.mock.deletes), uint64(expectDeletes))) + assert.Check(t, is.Equal(c.mock.deletes.read(), expectDeletes)) expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr)