Merge branch 'master' into document-api
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -141,17 +140,7 @@ func TestPodLifecycle(t *testing.T) {
|
|||||||
t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) {
|
t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) {
|
||||||
mp := newMockProvider()
|
mp := newMockProvider()
|
||||||
deletionFunc := func(ctx context.Context, watcher watch.Interface) error {
|
deletionFunc := func(ctx context.Context, watcher watch.Interface) error {
|
||||||
select {
|
return mp.attemptedDeletes.until(ctx, func(v int) bool { return v >= 2 })
|
||||||
case <-mp.attemptedDeletes:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-mp.attemptedDeletes:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
mp.errorOnDelete = errors.New("random error")
|
mp.errorOnDelete = errors.New("random error")
|
||||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||||
@@ -344,6 +333,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
|
|||||||
s.start(ctx)
|
s.start(ctx)
|
||||||
|
|
||||||
for s.pc.k8sQ.Len() > 0 {
|
for s.pc.k8sQ.Len() > 0 {
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
p2, err := s.client.CoreV1().Pods(testNamespace).Get(p1.Name, metav1.GetOptions{})
|
p2, err := s.client.CoreV1().Pods(testNamespace).Get(p1.Name, metav1.GetOptions{})
|
||||||
@@ -362,7 +352,7 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo
|
|||||||
// Start the pod controller
|
// Start the pod controller
|
||||||
s.start(ctx)
|
s.start(ctx)
|
||||||
|
|
||||||
assert.Assert(t, m.deletes == 1)
|
assert.Assert(t, is.Equal(m.deletes.read(), 1))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -536,8 +526,7 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
|
|||||||
log.G(ctx).WithField("pod", p).Info("Updating pod")
|
log.G(ctx).WithField("pod", p).Info("Updating pod")
|
||||||
_, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
|
_, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
for atomic.LoadUint64(&m.updates) == 0 {
|
assert.NilError(t, m.updates.until(ctx, func(v int) bool { return v > 0 }))
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkCreatePods(b *testing.B) {
|
func BenchmarkCreatePods(b *testing.B) {
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||||
@@ -18,11 +17,56 @@ var (
|
|||||||
_ PodNotifier = (*mockProvider)(nil)
|
_ PodNotifier = (*mockProvider)(nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type waitableInt struct {
|
||||||
|
cond *sync.Cond
|
||||||
|
val int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWaitableInt() *waitableInt {
|
||||||
|
return &waitableInt{
|
||||||
|
cond: sync.NewCond(&sync.Mutex{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *waitableInt) read() int {
|
||||||
|
defer w.cond.L.Unlock()
|
||||||
|
w.cond.L.Lock()
|
||||||
|
return w.val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *waitableInt) until(ctx context.Context, f func(int) bool) error {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
w.cond.Broadcast()
|
||||||
|
}()
|
||||||
|
|
||||||
|
w.cond.L.Lock()
|
||||||
|
defer w.cond.L.Unlock()
|
||||||
|
|
||||||
|
for !f(w.val) {
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.cond.Wait()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *waitableInt) increment() {
|
||||||
|
w.cond.L.Lock()
|
||||||
|
defer w.cond.L.Unlock()
|
||||||
|
w.val += 1
|
||||||
|
w.cond.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
type mockV0Provider struct {
|
type mockV0Provider struct {
|
||||||
creates uint64
|
creates *waitableInt
|
||||||
updates uint64
|
updates *waitableInt
|
||||||
deletes uint64
|
deletes *waitableInt
|
||||||
attemptedDeletes chan struct{}
|
attemptedDeletes *waitableInt
|
||||||
|
|
||||||
errorOnDelete error
|
errorOnDelete error
|
||||||
|
|
||||||
@@ -39,7 +83,10 @@ type mockProvider struct {
|
|||||||
func newMockV0Provider() *mockV0Provider {
|
func newMockV0Provider() *mockV0Provider {
|
||||||
provider := mockV0Provider{
|
provider := mockV0Provider{
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
attemptedDeletes: make(chan struct{}, maxRetries+1),
|
creates: newWaitableInt(),
|
||||||
|
updates: newWaitableInt(),
|
||||||
|
deletes: newWaitableInt(),
|
||||||
|
attemptedDeletes: newWaitableInt(),
|
||||||
}
|
}
|
||||||
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
|
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
|
||||||
// it will be set, and then we'll call a real underlying implementation.
|
// it will be set, and then we'll call a real underlying implementation.
|
||||||
@@ -64,7 +111,7 @@ func (p *mockV0Provider) notifier(pod *v1.Pod) {
|
|||||||
func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||||
log.G(ctx).Infof("receive CreatePod %q", pod.Name)
|
log.G(ctx).Infof("receive CreatePod %q", pod.Name)
|
||||||
|
|
||||||
atomic.AddUint64(&p.creates, 1)
|
p.creates.increment()
|
||||||
key, err := buildKey(pod)
|
key, err := buildKey(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -116,7 +163,7 @@ func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
|||||||
func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
||||||
log.G(ctx).Infof("receive UpdatePod %q", pod.Name)
|
log.G(ctx).Infof("receive UpdatePod %q", pod.Name)
|
||||||
|
|
||||||
atomic.AddUint64(&p.updates, 1)
|
p.updates.increment()
|
||||||
key, err := buildKey(pod)
|
key, err := buildKey(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -132,12 +179,12 @@ func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
|||||||
func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
|
func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
|
||||||
log.G(ctx).Infof("receive DeletePod %q", pod.Name)
|
log.G(ctx).Infof("receive DeletePod %q", pod.Name)
|
||||||
|
|
||||||
p.attemptedDeletes <- struct{}{}
|
p.attemptedDeletes.increment()
|
||||||
if p.errorOnDelete != nil {
|
if p.errorOnDelete != nil {
|
||||||
return p.errorOnDelete
|
return p.errorOnDelete
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.AddUint64(&p.deletes, 1)
|
p.deletes.increment()
|
||||||
key, err := buildKey(pod)
|
key, err := buildKey(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ package node
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
pkgerrors "github.com/pkg/errors"
|
pkgerrors "github.com/pkg/errors"
|
||||||
@@ -153,8 +152,8 @@ func TestPodCreateNewPod(t *testing.T) {
|
|||||||
|
|
||||||
assert.Check(t, is.Nil(err))
|
assert.Check(t, is.Nil(err))
|
||||||
// createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist
|
// createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1)))
|
assert.Check(t, is.Equal(svr.mock.creates.read(), 1))
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0)))
|
assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodUpdateExisting(t *testing.T) {
|
func TestPodUpdateExisting(t *testing.T) {
|
||||||
@@ -180,8 +179,8 @@ func TestPodUpdateExisting(t *testing.T) {
|
|||||||
|
|
||||||
err := svr.provider.CreatePod(context.Background(), pod)
|
err := svr.provider.CreatePod(context.Background(), pod)
|
||||||
assert.Check(t, is.Nil(err))
|
assert.Check(t, is.Nil(err))
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1)))
|
assert.Check(t, is.Equal(svr.mock.creates.read(), 1))
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0)))
|
assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
|
||||||
|
|
||||||
pod2 := &corev1.Pod{}
|
pod2 := &corev1.Pod{}
|
||||||
pod2.ObjectMeta.Namespace = "default"
|
pod2.ObjectMeta.Namespace = "default"
|
||||||
@@ -205,8 +204,8 @@ func TestPodUpdateExisting(t *testing.T) {
|
|||||||
assert.Check(t, is.Nil(err))
|
assert.Check(t, is.Nil(err))
|
||||||
|
|
||||||
// createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed
|
// createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1)))
|
assert.Check(t, is.Equal(svr.mock.creates.read(), 1))
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(1)))
|
assert.Check(t, is.Equal(svr.mock.updates.read(), 1))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodNoSpecChange(t *testing.T) {
|
func TestPodNoSpecChange(t *testing.T) {
|
||||||
@@ -232,15 +231,15 @@ func TestPodNoSpecChange(t *testing.T) {
|
|||||||
|
|
||||||
err := svr.mock.CreatePod(context.Background(), pod)
|
err := svr.mock.CreatePod(context.Background(), pod)
|
||||||
assert.Check(t, is.Nil(err))
|
assert.Check(t, is.Nil(err))
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1)))
|
assert.Check(t, is.Equal(svr.mock.creates.read(), 1))
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0)))
|
assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
|
||||||
|
|
||||||
err = svr.createOrUpdatePod(context.Background(), pod)
|
err = svr.createOrUpdatePod(context.Background(), pod)
|
||||||
assert.Check(t, is.Nil(err))
|
assert.Check(t, is.Nil(err))
|
||||||
|
|
||||||
// createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change
|
// createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1)))
|
assert.Check(t, is.Equal(svr.mock.creates.read(), 1))
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0)))
|
assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodDelete(t *testing.T) {
|
func TestPodDelete(t *testing.T) {
|
||||||
@@ -280,7 +279,7 @@ func TestPodDelete(t *testing.T) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
err = c.createOrUpdatePod(ctx, p) // make sure it's actually created
|
err = c.createOrUpdatePod(ctx, p) // make sure it's actually created
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&c.mock.creates), uint64(1)))
|
assert.Check(t, is.Equal(c.mock.creates.read(), 1))
|
||||||
|
|
||||||
err = c.deletePod(ctx, pod.Namespace, pod.Name)
|
err = c.deletePod(ctx, pod.Namespace, pod.Name)
|
||||||
assert.Equal(t, pkgerrors.Cause(err), err)
|
assert.Equal(t, pkgerrors.Cause(err), err)
|
||||||
@@ -289,7 +288,7 @@ func TestPodDelete(t *testing.T) {
|
|||||||
if tc.delErr == nil {
|
if tc.delErr == nil {
|
||||||
expectDeletes = 1
|
expectDeletes = 1
|
||||||
}
|
}
|
||||||
assert.Check(t, is.Equal(atomic.LoadUint64(&c.mock.deletes), uint64(expectDeletes)))
|
assert.Check(t, is.Equal(c.mock.deletes.read(), expectDeletes))
|
||||||
|
|
||||||
expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr)
|
expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user