Merge branch 'master' into upgrade-k8s-v2

This commit is contained in:
Pires
2019-08-13 10:43:00 +01:00
committed by GitHub
6 changed files with 130 additions and 65 deletions

View File

@@ -8,10 +8,13 @@ import (
"testing"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@@ -90,11 +93,26 @@ func TestPodLifecycle(t *testing.T) {
ctx = log.WithLogger(ctx, log.L)
t.Run("createStartDeleteScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
// isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_, which is the VK's
// action when the pod is deleted from the provider
isPodDeletedPermanentlyFunc := func(ctx context.Context, watcher watch.Interface) error {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
log.G(ctx).WithField("event", ev).Info("got event")
// TODO(Sargun): The pod should have transitioned into some status around failed / succeeded
// prior to being deleted.
// In addition, we should check if the deletion timestamp gets set
return ev.Type == watch.Deleted, nil
})
return watchErr
}
// createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting
// it gracefully
t.Run("createStartDeleteScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s)
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
}))
})
@@ -103,13 +121,51 @@ func TestPodLifecycle(t *testing.T) {
}
t.Run("mockV0Provider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s)
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
}))
})
})
// createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider
// for some reason
t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) {
mp := newMockProvider()
mp.errorOnDelete = errdefs.NotFound("not found")
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
}))
})
// createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the
// provider
t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) {
mp := newMockProvider()
deletionFunc := func(ctx context.Context, watcher watch.Interface) error {
select {
case <-mp.attemptedDeletes:
case <-ctx.Done():
return ctx.Err()
}
select {
case <-mp.attemptedDeletes:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
mp.errorOnDelete = errors.New("random error")
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, deletionFunc)
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
assert.NilError(t, err)
assert.Assert(t, is.Len(pods.Items, 1))
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
}))
})
// danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting,
// and ensures the pod controller deletes the pod prior to continuing.
t.Run("danglingPodScenario", func(t *testing.T) {
t.Parallel()
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
@@ -129,8 +185,8 @@ func TestPodLifecycle(t *testing.T) {
})
})
// failedPodScenario ensures that the VK ignores failed pods that were failed prior to the PC starting up
t.Run("failedPodScenario", func(t *testing.T) {
t.Parallel()
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
@@ -149,8 +205,8 @@ func TestPodLifecycle(t *testing.T) {
})
})
// succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the PC starting up.
t.Run("succeededPodScenario", func(t *testing.T) {
t.Parallel()
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
@@ -167,8 +223,9 @@ func TestPodLifecycle(t *testing.T) {
})
})
// updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated
// to the provider
t.Run("updatePodWhileRunningScenario", func(t *testing.T) {
t.Parallel()
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
@@ -309,8 +366,8 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo
}
func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system) {
t.Parallel()
func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -325,7 +382,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system)
// Setup a watch (prior to pod creation, and pod controller startup)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
// This ensures that the pod is created.
go func() {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
@@ -355,6 +412,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system)
// Setup a watch to check if the pod is in running
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
// Wait for the pod to be started
@@ -369,35 +427,26 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system)
// Start the pod controller
podControllerErrCh := s.start(ctx)
// Wait for pod to be in running
// Wait for the pod to go into running
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-podControllerErrCh:
assert.NilError(t, err)
t.Fatal("Pod controller exited prematurely without error")
case err = <-watchErrCh:
assert.NilError(t, err)
case err = <-podControllerErrCh:
assert.NilError(t, err)
t.Fatal("Pod controller terminated early")
}
// Setup a watch prior to pod deletion
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
// Wait for the pod to be started
func(ev watch.Event) (bool, error) {
log.G(ctx).WithField("event", ev).Info("got event")
// TODO(Sargun): The pod should have transitioned into some status around failed / succeeded
// prior to being deleted.
// In addition, we should check if the deletion timestamp gets set
return ev.Type == watch.Deleted, nil
})
watchErrCh <- watchErr
watchErrCh <- waitFunction(ctx, watcher)
}()
// Delete the pod
// Delete the pod via deletiontimestamp
// 1. Get the pod
currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{})
@@ -446,14 +495,16 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
// Setup a watch to check if the pod is in running
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
// Wait for the pod to be started
func(ev watch.Event) (bool, error) {
pod := ev.Object.(*corev1.Pod)
return pod.Status.Phase == corev1.PodRunning, nil
})
// This deepcopy is required to please the race detector
p = newPod.Object.(*corev1.Pod).DeepCopy()
watchErrCh <- watchErr
}()
@@ -473,9 +524,16 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
}
// Update the pod
version, err := strconv.Atoi(p.ResourceVersion)
if err != nil {
t.Fatalf("Could not parse pod's resource version: %s", err.Error())
}
bumpResourceVersion(p)
p.Spec.SchedulerName = "joe"
p.ResourceVersion = strconv.Itoa(version + 1)
var activeDeadlineSeconds int64 = 300
p.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
log.G(ctx).WithField("pod", p).Info("Updating pod")
_, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
assert.NilError(t, err)
for atomic.LoadUint64(&m.updates) == 0 {
@@ -525,15 +583,6 @@ func randomizeName(pod *corev1.Pod) {
pod.Name = name
}
func bumpResourceVersion(pod *corev1.Pod) {
version, err := strconv.Atoi(pod.ResourceVersion)
if err != nil {
panic(err)
}
pod.ResourceVersion = strconv.Itoa(version + 1)
}
func newPod(podmodifiers ...podModifier) *corev1.Pod {
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{

View File

@@ -19,15 +19,16 @@ var (
)
type mockV0Provider struct {
creates uint64
updates uint64
deletes uint64
creates uint64
updates uint64
deletes uint64
attemptedDeletes chan struct{}
errorOnDelete error
pods sync.Map
startTime time.Time
notifier func(*v1.Pod)
pods sync.Map
startTime time.Time
realNotifier func(*v1.Pod)
}
type mockProvider struct {
@@ -37,22 +38,28 @@ type mockProvider struct {
// NewMockProviderMockConfig creates a new mockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface
func newMockV0Provider() *mockV0Provider {
provider := mockV0Provider{
startTime: time.Now(),
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
notifier: func(*v1.Pod) {},
startTime: time.Now(),
attemptedDeletes: make(chan struct{}, maxRetries+1),
}
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
return &provider
}
// NewMockProviderMockConfig creates a new MockProvider with the given config
func newMockProvider() *mockProvider {
return &mockProvider{mockV0Provider: newMockV0Provider()}
}
// notifier calls the callback that we got from the pod controller to notify it of updates (if it is set)
func (p *mockV0Provider) notifier(pod *v1.Pod) {
if p.realNotifier != nil {
p.realNotifier(pod)
}
}
// CreatePod accepts a Pod definition and stores it in memory.
func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
log.G(ctx).Infof("receive CreatePod %q", pod.Name)
@@ -125,6 +132,7 @@ func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
log.G(ctx).Infof("receive DeletePod %q", pod.Name)
p.attemptedDeletes <- struct{}{}
if p.errorOnDelete != nil {
return p.errorOnDelete
}
@@ -140,7 +148,7 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error)
}
now := metav1.Now()
p.pods.Delete(key)
pod.Status.Phase = v1.PodSucceeded
pod.Status.Reason = "MockProviderPodDeleted"
@@ -157,7 +165,8 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error)
}
p.notifier(pod)
// TODO (Sargun): Eventually delete the pod from the map. We cannot right now, because GetPodStatus can / will
// be called momentarily later.
return nil
}
@@ -206,7 +215,7 @@ func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
// within the provider.
func (p *mockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
p.notifier = notifier
p.realNotifier = notifier
}
func buildKeyFromNames(namespace string, name string) (string, error) {

View File

@@ -54,7 +54,11 @@ func testNodeRun(t *testing.T, enableLease bool) {
if enableLease {
opts = append(opts, WithNodeEnableLeaseV1Beta1(leases, nil))
}
node, err := NewNodeController(testP, testNode(t), nodes, opts...)
testNode := testNode(t)
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
// so it will trigger the race detector.
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error)
@@ -68,11 +72,11 @@ func testNodeRun(t *testing.T, enableLease bool) {
close(chErr)
}()
nw := makeWatch(t, nodes, node.n.Name)
nw := makeWatch(t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr := nw.ResultChan()
lw := makeWatch(t, leases, node.n.Name)
lw := makeWatch(t, leases, testNodeCopy.Name)
defer lw.Stop()
lr := lw.ResultChan()
@@ -105,7 +109,8 @@ func testNodeRun(t *testing.T, enableLease bool) {
leaseUpdates++
assert.Assert(t, cmp.Equal(l.Spec.HolderIdentity != nil, true))
assert.Check(t, cmp.Equal(*l.Spec.HolderIdentity, node.n.Name))
assert.NilError(t, err)
assert.Check(t, cmp.Equal(*l.Spec.HolderIdentity, testNodeCopy.Name))
if lBefore != nil {
assert.Check(t, before(lBefore.Spec.RenewTime.Time, l.Spec.RenewTime.Time))
}
@@ -125,14 +130,15 @@ func testNodeRun(t *testing.T, enableLease bool) {
}
// trigger an async node status update
n := node.n.DeepCopy()
n, err := nodes.Get(testNode.Name, metav1.GetOptions{})
assert.NilError(t, err)
newCondition := corev1.NodeCondition{
Type: corev1.NodeConditionType("UPDATED"),
LastTransitionTime: metav1.Now().Rfc3339Copy(),
}
n.Status.Conditions = append(n.Status.Conditions, newCondition)
nw = makeWatch(t, nodes, node.n.Name)
nw = makeWatch(t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr = nw.ResultChan()

View File

@@ -228,7 +228,7 @@ func (pc *PodController) updatePodStatus(ctx context.Context, pod *corev1.Pod) e
status, err := pc.provider.GetPodStatus(ctx, pod.Namespace, pod.Name)
if err != nil && !errdefs.IsNotFound(err) {
span.SetStatus(err)
return pkgerrors.Wrap(err, "error retreiving pod status")
return pkgerrors.Wrap(err, "error retrieving pod status")
}
// Update the pod's status
@@ -292,7 +292,7 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return pkgerrors.Wrap(err, "error spliting cache key")
return pkgerrors.Wrap(err, "error splitting cache key")
}
pod, err := pc.podsLister.Pods(namespace).Get(name)

View File

@@ -227,9 +227,10 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
log.G(ctx).Info("starting workers")
for id := 0; id < podSyncWorkers; id++ {
workerID := strconv.Itoa(id)
go wait.Until(func() {
// Use the worker's "index" as its ID so we can use it for tracing.
pc.runWorker(ctx, strconv.Itoa(id), pc.k8sQ)
pc.runWorker(ctx, workerID, pc.k8sQ)
}, time.Second, ctx.Done())
}