Add tests around updates
This makes sure the update function works correctly after the pod is running if the podspec is changed. Upon writing the test, I realized we were accessing the variables outside of the goroutine that the workers with tests were running in, and we had no locks. Therefore, I converted all of those numbers to use atomics.
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -165,6 +166,16 @@ func TestPodLifecycle(t *testing.T) {
|
|||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("updatePodWhileRunningScenario", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
t.Run("mockProvider", func(t *testing.T) {
|
||||||
|
mp := newMockProvider()
|
||||||
|
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||||
|
testUpdatePodWhileRunningScenario(ctx, t, s, mp)
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type testFunction func(ctx context.Context, s *system)
|
type testFunction func(ctx context.Context, s *system)
|
||||||
@@ -415,6 +426,62 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) {
|
||||||
|
t.Parallel()
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
p := newPod()
|
||||||
|
|
||||||
|
listOptions := metav1.ListOptions{
|
||||||
|
FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(),
|
||||||
|
}
|
||||||
|
|
||||||
|
watchErrCh := make(chan error)
|
||||||
|
|
||||||
|
// Create a Pod
|
||||||
|
_, e := s.client.CoreV1().Pods(testNamespace).Create(p)
|
||||||
|
assert.NilError(t, e)
|
||||||
|
|
||||||
|
// Setup a watch to check if the pod is in running
|
||||||
|
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
|
||||||
|
assert.NilError(t, err)
|
||||||
|
go func() {
|
||||||
|
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
|
||||||
|
// Wait for the pod to be started
|
||||||
|
func(ev watch.Event) (bool, error) {
|
||||||
|
pod := ev.Object.(*corev1.Pod)
|
||||||
|
return pod.Status.Phase == corev1.PodRunning, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
watchErrCh <- watchErr
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Start the pod controller
|
||||||
|
podControllerErrCh := s.start(ctx)
|
||||||
|
|
||||||
|
// Wait for pod to be in running
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatalf("Context ended early: %s", ctx.Err().Error())
|
||||||
|
case err = <-podControllerErrCh:
|
||||||
|
assert.NilError(t, err)
|
||||||
|
t.Fatal("Pod controller exited prematurely without error")
|
||||||
|
case err = <-watchErrCh:
|
||||||
|
assert.NilError(t, err)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the pod
|
||||||
|
|
||||||
|
bumpResourceVersion(p)
|
||||||
|
p.Spec.SchedulerName = "joe"
|
||||||
|
_, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
|
||||||
|
assert.NilError(t, err)
|
||||||
|
for atomic.LoadUint64(&m.updates) == 0 {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkCreatePods(b *testing.B) {
|
func BenchmarkCreatePods(b *testing.B) {
|
||||||
sl := logrus.StandardLogger()
|
sl := logrus.StandardLogger()
|
||||||
sl.SetLevel(logrus.ErrorLevel)
|
sl.SetLevel(logrus.ErrorLevel)
|
||||||
@@ -458,6 +525,15 @@ func randomizeName(pod *corev1.Pod) {
|
|||||||
pod.Name = name
|
pod.Name = name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func bumpResourceVersion(pod *corev1.Pod) {
|
||||||
|
version, err := strconv.Atoi(pod.ResourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pod.ResourceVersion = strconv.Itoa(version + 1)
|
||||||
|
}
|
||||||
|
|
||||||
func newPod(podmodifiers ...podModifier) *corev1.Pod {
|
func newPod(podmodifiers ...podModifier) *corev1.Pod {
|
||||||
pod := &corev1.Pod{
|
pod := &corev1.Pod{
|
||||||
TypeMeta: metav1.TypeMeta{
|
TypeMeta: metav1.TypeMeta{
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||||
@@ -18,9 +19,9 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type mockV0Provider struct {
|
type mockV0Provider struct {
|
||||||
creates int
|
creates uint64
|
||||||
updates int
|
updates uint64
|
||||||
deletes int
|
deletes uint64
|
||||||
|
|
||||||
errorOnDelete error
|
errorOnDelete error
|
||||||
|
|
||||||
@@ -56,7 +57,7 @@ func newMockProvider() *mockProvider {
|
|||||||
func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||||
log.G(ctx).Infof("receive CreatePod %q", pod.Name)
|
log.G(ctx).Infof("receive CreatePod %q", pod.Name)
|
||||||
|
|
||||||
p.creates++
|
atomic.AddUint64(&p.creates, 1)
|
||||||
key, err := buildKey(pod)
|
key, err := buildKey(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -108,7 +109,7 @@ func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
|||||||
func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
||||||
log.G(ctx).Infof("receive UpdatePod %q", pod.Name)
|
log.G(ctx).Infof("receive UpdatePod %q", pod.Name)
|
||||||
|
|
||||||
p.updates++
|
atomic.AddUint64(&p.updates, 1)
|
||||||
key, err := buildKey(pod)
|
key, err := buildKey(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -128,7 +129,7 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error)
|
|||||||
return p.errorOnDelete
|
return p.errorOnDelete
|
||||||
}
|
}
|
||||||
|
|
||||||
p.deletes++
|
atomic.AddUint64(&p.deletes, 1)
|
||||||
key, err := buildKey(pod)
|
key, err := buildKey(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ package node
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
pkgerrors "github.com/pkg/errors"
|
pkgerrors "github.com/pkg/errors"
|
||||||
@@ -152,8 +153,8 @@ func TestPodCreateNewPod(t *testing.T) {
|
|||||||
|
|
||||||
assert.Check(t, is.Nil(err))
|
assert.Check(t, is.Nil(err))
|
||||||
// createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist
|
// createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist
|
||||||
assert.Check(t, is.Equal(svr.mock.creates, 1))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1)))
|
||||||
assert.Check(t, is.Equal(svr.mock.updates, 0))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodUpdateExisting(t *testing.T) {
|
func TestPodUpdateExisting(t *testing.T) {
|
||||||
@@ -179,8 +180,8 @@ func TestPodUpdateExisting(t *testing.T) {
|
|||||||
|
|
||||||
err := svr.provider.CreatePod(context.Background(), pod)
|
err := svr.provider.CreatePod(context.Background(), pod)
|
||||||
assert.Check(t, is.Nil(err))
|
assert.Check(t, is.Nil(err))
|
||||||
assert.Check(t, is.Equal(svr.mock.creates, 1))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1)))
|
||||||
assert.Check(t, is.Equal(svr.mock.updates, 0))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0)))
|
||||||
|
|
||||||
pod2 := &corev1.Pod{}
|
pod2 := &corev1.Pod{}
|
||||||
pod2.ObjectMeta.Namespace = "default"
|
pod2.ObjectMeta.Namespace = "default"
|
||||||
@@ -204,8 +205,8 @@ func TestPodUpdateExisting(t *testing.T) {
|
|||||||
assert.Check(t, is.Nil(err))
|
assert.Check(t, is.Nil(err))
|
||||||
|
|
||||||
// createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed
|
// createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed
|
||||||
assert.Check(t, is.Equal(svr.mock.creates, 1))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1)))
|
||||||
assert.Check(t, is.Equal(svr.mock.updates, 1))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(1)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodNoSpecChange(t *testing.T) {
|
func TestPodNoSpecChange(t *testing.T) {
|
||||||
@@ -231,15 +232,15 @@ func TestPodNoSpecChange(t *testing.T) {
|
|||||||
|
|
||||||
err := svr.mock.CreatePod(context.Background(), pod)
|
err := svr.mock.CreatePod(context.Background(), pod)
|
||||||
assert.Check(t, is.Nil(err))
|
assert.Check(t, is.Nil(err))
|
||||||
assert.Check(t, is.Equal(svr.mock.creates, 1))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1)))
|
||||||
assert.Check(t, is.Equal(svr.mock.updates, 0))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0)))
|
||||||
|
|
||||||
err = svr.createOrUpdatePod(context.Background(), pod)
|
err = svr.createOrUpdatePod(context.Background(), pod)
|
||||||
assert.Check(t, is.Nil(err))
|
assert.Check(t, is.Nil(err))
|
||||||
|
|
||||||
// createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change
|
// createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change
|
||||||
assert.Check(t, is.Equal(svr.mock.creates, 1))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1)))
|
||||||
assert.Check(t, is.Equal(svr.mock.updates, 0))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodDelete(t *testing.T) {
|
func TestPodDelete(t *testing.T) {
|
||||||
@@ -279,7 +280,7 @@ func TestPodDelete(t *testing.T) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
err = c.createOrUpdatePod(ctx, p) // make sure it's actually created
|
err = c.createOrUpdatePod(ctx, p) // make sure it's actually created
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
assert.Check(t, is.Equal(c.mock.creates, 1))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&c.mock.creates), uint64(1)))
|
||||||
|
|
||||||
err = c.deletePod(ctx, pod.Namespace, pod.Name)
|
err = c.deletePod(ctx, pod.Namespace, pod.Name)
|
||||||
assert.Equal(t, pkgerrors.Cause(err), err)
|
assert.Equal(t, pkgerrors.Cause(err), err)
|
||||||
@@ -288,7 +289,7 @@ func TestPodDelete(t *testing.T) {
|
|||||||
if tc.delErr == nil {
|
if tc.delErr == nil {
|
||||||
expectDeletes = 1
|
expectDeletes = 1
|
||||||
}
|
}
|
||||||
assert.Check(t, is.Equal(c.mock.deletes, expectDeletes))
|
assert.Check(t, is.Equal(atomic.LoadUint64(&c.mock.deletes), uint64(expectDeletes)))
|
||||||
|
|
||||||
expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr)
|
expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user