From 4a270fea08c53c6353972020d370a3567163008f Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 24 Jul 2019 16:12:03 -0700 Subject: [PATCH 1/4] Add a test which tests the e2e lifecycle of the pod controller This uses the mock provider, so I moved the mock provider to a location where the node test can use it. --- node/lifecycle_test.go | 327 +++++++++++++++++++++++++++++++++++++++++ node/mock_test.go | 225 ++++++++++++++++++++++++++++ node/pod_test.go | 60 -------- 3 files changed, 552 insertions(+), 60 deletions(-) create mode 100644 node/lifecycle_test.go create mode 100644 node/mock_test.go diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go new file mode 100644 index 000000000..ec80560bc --- /dev/null +++ b/node/lifecycle_test.go @@ -0,0 +1,327 @@ +package node + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/virtual-kubelet/virtual-kubelet/log" + logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" + "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + watchutils "k8s.io/client-go/tools/watch" + "k8s.io/klog" +) + +var ( + _ record.EventRecorder = (*fakeDiscardingRecorder)(nil) +) + +const ( + // There might be a constant we can already leverage here + testNamespace = "default" + informerResyncPeriod = time.Duration(1 * time.Second) + testNodeName = "testnode" + podSyncWorkers = 3 +) + +func init() { + klog.InitFlags(nil) + // We neet to set log.L because new spans derive their loggers from log.L + sl := logrus.StandardLogger() + sl.SetLevel(logrus.DebugLevel) + newLogger := logruslogger.FromLogrus(logrus.NewEntry(sl)) + log.L = newLogger +} + +// fakeDiscardingRecorder discards all events. Silently. +type fakeDiscardingRecorder struct { + logger log.Logger +} + +func (r *fakeDiscardingRecorder) Event(object runtime.Object, eventType, reason, message string) { + r.Eventf(object, eventType, reason, message) +} + +func (r *fakeDiscardingRecorder) Eventf(object runtime.Object, eventType, reason, messageFmt string, args ...interface{}) { + r.logger.WithFields(map[string]interface{}{ + "object": object, + "eventType": eventType, + "message": fmt.Sprintf(messageFmt, args...), + }).Infof("Received event") +} + +func (r *fakeDiscardingRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventType, reason, messageFmt string, args ...interface{}) { + r.logger.WithFields(map[string]interface{}{ + "timestamp": timestamp.String(), + "object": object, + "eventType": eventType, + "message": fmt.Sprintf(messageFmt, args...), + }).Infof("Received past event") +} + +func (r *fakeDiscardingRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventType, reason, messageFmt string, args ...interface{}) { + r.logger.WithFields(map[string]interface{}{ + "object": object, + "annotations": annotations, + "eventType": eventType, + "message": fmt.Sprintf(messageFmt, args...), + }).Infof("Received annotated event") +} + +func TestPodLifecycle(t *testing.T) { + // We don't do the defer cancel() thing here because t.Run is non-blocking, so the parent context may be cancelled + // before the children are finished and there is no way to do a "join" and wait for them without using a waitgroup, + // at which point, it doesn't seem much better. + ctx := context.Background() + + ctx = log.WithLogger(ctx, log.L) + + t.Run("createStartDeleteScenario", func(t *testing.T) { + t.Run("mockProvider", func(t *testing.T) { + + assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) { + testPodLifecycle(ctx, t, s) + })) + }) + + t.Run("mockV0Provider", func(t *testing.T) { + assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { + testPodLifecycle(ctx, t, s) + })) + }) + }) +} + +type testFunction func(ctx context.Context, s *system) +type system struct { + retChan chan error + pc *PodController + client *fake.Clientset + podControllerConfig PodControllerConfig +} + +func (s *system) start(ctx context.Context) chan error { + podControllerErrChan := make(chan error) + go func() { + podControllerErrChan <- s.pc.Run(ctx, podSyncWorkers) + }() + + // We need to wait for the pod controller to start. If there is an error before the pod controller starts, or + // the context is cancelled. If the context is cancelled, the startup will be aborted, and the pod controller + // will return an error, so we don't need to wait on ctx.Done() + select { + case <-s.pc.Ready(): + // This listens for errors, or exits in the future. + go func() { + podControllerErr := <-podControllerErrChan + s.retChan <- podControllerErr + }() + // If there is an error before things are ready, we need to forward it immediately + case podControllerErr := <-podControllerErrChan: + s.retChan <- podControllerErr + } + return s.retChan +} + +func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunction) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Create the fake client. + client := fake.NewSimpleClientset() + + // This is largely copy and pasted code from the root command + podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions( + client, + informerResyncPeriod, + kubeinformers.WithNamespace(testNamespace), + ) + podInformer := podInformerFactory.Core().V1().Pods() + + scmInformerFactory := kubeinformers.NewSharedInformerFactory(client, informerResyncPeriod) + + eb := record.NewBroadcaster() + eb.StartLogging(log.G(ctx).Infof) + eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(testNamespace)}) + fakeRecorder := &fakeDiscardingRecorder{ + logger: log.G(ctx), + } + + secretInformer := scmInformerFactory.Core().V1().Secrets() + configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() + serviceInformer := scmInformerFactory.Core().V1().Services() + sys := &system{ + client: client, + retChan: make(chan error, 1), + podControllerConfig: PodControllerConfig{ + PodClient: client.CoreV1(), + PodInformer: podInformer, + EventRecorder: fakeRecorder, + Provider: provider, + ConfigMapInformer: configMapInformer, + SecretInformer: secretInformer, + ServiceInformer: serviceInformer, + }, + } + + var err error + sys.pc, err = NewPodController(sys.podControllerConfig) + if err != nil { + return err + } + + go scmInformerFactory.Start(ctx.Done()) + go podInformerFactory.Start(ctx.Done()) + + f(ctx, sys) + + // Shutdown the pod controller, and wait for it to exit + cancel() + return <-sys.retChan +} + +func testPodLifecycle(ctx context.Context, t *testing.T, s *system) { + t.Parallel() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + p := corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + Namespace: testNamespace, + UID: "4f20ff31-7775-11e9-893d-000c29a24b34", + ResourceVersion: "100", + }, + Spec: corev1.PodSpec{ + NodeName: testNodeName, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + }, + } + + listOptions := metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(), + } + + watchErrCh := make(chan error) + + // Setup a watch (prior to pod creation, and pod controller startup) + watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions) + assert.NilError(t, err) + + // This ensures that the pod is created. + go func() { + _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, + // Wait for the pod to be created + // TODO(Sargun): Make this "smarter" about the status the pod is in. + func(ev watch.Event) (bool, error) { + pod := ev.Object.(*corev1.Pod) + return pod.Name == p.ObjectMeta.Name, nil + }) + + watchErrCh <- watchErr + }() + + // Create the Pod + _, e := s.client.CoreV1().Pods(testNamespace).Create(&p) + assert.NilError(t, e) + + // This will return once + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case err = <-watchErrCh: + assert.NilError(t, err) + + } + + // Setup a watch to check if the pod is in running + watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions) + assert.NilError(t, err) + go func() { + _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, + // Wait for the pod to be started + func(ev watch.Event) (bool, error) { + pod := ev.Object.(*corev1.Pod) + return pod.Status.Phase == corev1.PodRunning, nil + }) + + watchErrCh <- watchErr + }() + + // Start the pod controller + podControllerErrCh := s.start(ctx) + + // Wait for pod to be in running + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case err = <-podControllerErrCh: + assert.NilError(t, err) + t.Fatal("Pod controller exited prematurely without error") + case err = <-watchErrCh: + assert.NilError(t, err) + + } + + // Setup a watch prior to pod deletion + watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions) + assert.NilError(t, err) + go func() { + _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, + // Wait for the pod to be started + func(ev watch.Event) (bool, error) { + log.G(ctx).WithField("event", ev).Info("got event") + // TODO(Sargun): The pod should have transitioned into some status around failed / succeeded + // prior to being deleted. + // In addition, we should check if the deletion timestamp gets set + return ev.Type == watch.Deleted, nil + }) + watchErrCh <- watchErr + }() + + // Delete the pod + + // 1. Get the pod + currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{}) + assert.NilError(t, err) + // 2. Set the pod's deletion timestamp, version, and so on + curVersion, err := strconv.Atoi(currentPod.ResourceVersion) + assert.NilError(t, err) + currentPod.ResourceVersion = strconv.Itoa(curVersion + 1) + var deletionGracePeriod int64 = 30 + currentPod.DeletionGracePeriodSeconds = &deletionGracePeriod + deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod))) + currentPod.DeletionTimestamp = &deletionTimestamp + // 3. Update (overwrite) the pod + _, err = s.client.CoreV1().Pods(testNamespace).Update(currentPod) + assert.NilError(t, err) + + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case err = <-podControllerErrCh: + assert.NilError(t, err) + t.Fatal("Pod controller exited prematurely without error") + case err = <-watchErrCh: + assert.NilError(t, err) + + } +} diff --git a/node/mock_test.go b/node/mock_test.go new file mode 100644 index 000000000..62c713c80 --- /dev/null +++ b/node/mock_test.go @@ -0,0 +1,225 @@ +package node + +import ( + "context" + "fmt" + "time" + + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "github.com/virtual-kubelet/virtual-kubelet/log" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + _ PodLifecycleHandler = (*mockV0Provider)(nil) + _ PodNotifier = (*mockProvider)(nil) +) + +type mockV0Provider struct { + creates int + updates int + deletes int + + errorOnDelete error + + pods map[string]*v1.Pod + startTime time.Time + notifier func(*v1.Pod) +} + +type mockProvider struct { + *mockV0Provider +} + +// NewMockProviderMockConfig creates a new mockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface +func newMockV0Provider() *mockV0Provider { + provider := mockV0Provider{ + pods: make(map[string]*v1.Pod), + startTime: time.Now(), + // By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface, + // it will be set, and then we'll call a real underlying implementation. + // This makes it easier in the sense we don't need to wrap each method. + notifier: func(*v1.Pod) {}, + } + + return &provider +} + +// NewMockProviderMockConfig creates a new MockProvider with the given config +func newMockProvider() *mockProvider { + + return &mockProvider{mockV0Provider: newMockV0Provider()} +} + +// CreatePod accepts a Pod definition and stores it in memory. +func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { + log.G(ctx).Infof("receive CreatePod %q", pod.Name) + + p.creates++ + key, err := buildKey(pod) + if err != nil { + return err + } + + now := metav1.NewTime(time.Now()) + pod.Status = v1.PodStatus{ + Phase: v1.PodRunning, + HostIP: "1.2.3.4", + PodIP: "5.6.7.8", + StartTime: &now, + Conditions: []v1.PodCondition{ + { + Type: v1.PodInitialized, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + } + + for _, container := range pod.Spec.Containers { + pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{ + Name: container.Name, + Image: container.Image, + Ready: true, + RestartCount: 0, + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{ + StartedAt: now, + }, + }, + }) + } + + p.pods[key] = pod + p.notifier(pod) + + return nil +} + +// UpdatePod accepts a Pod definition and updates its reference. +func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { + log.G(ctx).Infof("receive UpdatePod %q", pod.Name) + + p.updates++ + key, err := buildKey(pod) + if err != nil { + return err + } + + p.pods[key] = pod + p.notifier(pod) + + return nil +} + +// DeletePod deletes the specified pod out of memory. +func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { + log.G(ctx).Infof("receive DeletePod %q", pod.Name) + + if p.errorOnDelete != nil { + return p.errorOnDelete + } + + p.deletes++ + key, err := buildKey(pod) + if err != nil { + return err + } + + if _, exists := p.pods[key]; !exists { + return errdefs.NotFound("pod not found") + } + + now := metav1.Now() + delete(p.pods, key) + pod.Status.Phase = v1.PodSucceeded + pod.Status.Reason = "MockProviderPodDeleted" + + for idx := range pod.Status.ContainerStatuses { + pod.Status.ContainerStatuses[idx].Ready = false + pod.Status.ContainerStatuses[idx].State = v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Message: "Mock provider terminated container upon deletion", + FinishedAt: now, + Reason: "MockProviderPodContainerDeleted", + StartedAt: pod.Status.ContainerStatuses[idx].State.Running.StartedAt, + }, + } + } + + p.notifier(pod) + + return nil +} + +// GetPod returns a pod by name that is stored in memory. +func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { + log.G(ctx).Infof("receive GetPod %q", name) + + key, err := buildKeyFromNames(namespace, name) + if err != nil { + return nil, err + } + + if pod, ok := p.pods[key]; ok { + return pod, nil + } + return nil, errdefs.NotFoundf("pod \"%s/%s\" is not known to the provider", namespace, name) +} + +// GetPodStatus returns the status of a pod by name that is "running". +// returns nil if a pod by that name is not found. +func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { + log.G(ctx).Infof("receive GetPodStatus %q", name) + + pod, err := p.GetPod(ctx, namespace, name) + if err != nil { + return nil, err + } + + return &pod.Status, nil +} + +// GetPods returns a list of all pods known to be "running". +func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { + log.G(ctx).Info("receive GetPods") + + var pods []*v1.Pod + + for _, pod := range p.pods { + pods = append(pods, pod) + } + + return pods, nil +} + +// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done +// within the provider. +func (p *mockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) { + p.notifier = notifier +} + +func buildKeyFromNames(namespace string, name string) (string, error) { + return fmt.Sprintf("%s-%s", namespace, name), nil +} + +// buildKey is a helper for building the "key" for the providers pod store. +func buildKey(pod *v1.Pod) (string, error) { + if pod.ObjectMeta.Namespace == "" { + return "", fmt.Errorf("pod namespace not found") + } + + if pod.ObjectMeta.Name == "" { + return "", fmt.Errorf("pod name not found") + } + + return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name) +} diff --git a/node/pod_test.go b/node/pod_test.go index 101532006..56eff411f 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -16,7 +16,6 @@ package node import ( "context" - "path" "testing" pkgerrors "github.com/pkg/errors" @@ -30,71 +29,12 @@ import ( "k8s.io/client-go/kubernetes/fake" ) -type mockProvider struct { - pods map[string]*corev1.Pod - - creates int - updates int - deletes int - - errorOnDelete error -} - -func (m *mockProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error { - m.pods[path.Join(pod.GetNamespace(), pod.GetName())] = pod - m.creates++ - return nil -} - -func (m *mockProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error { - m.pods[path.Join(pod.GetNamespace(), pod.GetName())] = pod - m.updates++ - return nil -} - -func (m *mockProvider) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) { - p := m.pods[path.Join(namespace, name)] - if p == nil { - return nil, errdefs.NotFound("not found") - } - return p, nil -} - -func (m *mockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) { - p := m.pods[path.Join(namespace, name)] - if p == nil { - return nil, errdefs.NotFound("not found") - } - return &p.Status, nil -} - -func (m *mockProvider) DeletePod(ctx context.Context, p *corev1.Pod) error { - if m.errorOnDelete != nil { - return m.errorOnDelete - } - delete(m.pods, path.Join(p.GetNamespace(), p.GetName())) - m.deletes++ - return nil -} - -func (m *mockProvider) GetPods(_ context.Context) ([]*corev1.Pod, error) { - ls := make([]*corev1.Pod, 0, len(m.pods)) - for _, p := range ls { - ls = append(ls, p) - } - return ls, nil -} - type TestController struct { *PodController mock *mockProvider client *fake.Clientset } -func newMockProvider() *mockProvider { - return &mockProvider{pods: make(map[string]*corev1.Pod)} -} - func newTestController() *TestController { fk8s := fake.NewSimpleClientset() From ce38d72c0e575294c3fe1b78847774ded39ae2f7 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Fri, 26 Jul 2019 16:16:51 -0700 Subject: [PATCH 2/4] Add additional lifecycle tests * Don't scheduled failed, or succeeded pods * Delete dangling pods --- node/lifecycle_test.go | 153 +++++++++++++++++++++++++++++++++++------ 1 file changed, 131 insertions(+), 22 deletions(-) diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index ec80560bc..4b9f73974 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -92,13 +92,75 @@ func TestPodLifecycle(t *testing.T) { t.Run("mockProvider", func(t *testing.T) { assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) { - testPodLifecycle(ctx, t, s) + testCreateStartDeleteScenario(ctx, t, s) })) }) + if testing.Short() { + return + } t.Run("mockV0Provider", func(t *testing.T) { assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { - testPodLifecycle(ctx, t, s) + testCreateStartDeleteScenario(ctx, t, s) + })) + }) + }) + + t.Run("danglingPodScenario", func(t *testing.T) { + t.Parallel() + t.Run("mockProvider", func(t *testing.T) { + mp := newMockProvider() + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testDanglingPodScenario(ctx, t, s, mp.mockV0Provider) + })) + }) + + if testing.Short() { + return + } + + t.Run("mockV0Provider", func(t *testing.T) { + mp := newMockV0Provider() + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testDanglingPodScenario(ctx, t, s, mp) + })) + }) + }) + + t.Run("failedPodScenario", func(t *testing.T) { + t.Parallel() + t.Run("mockProvider", func(t *testing.T) { + mp := newMockProvider() + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testFailedPodScenario(ctx, t, s) + })) + }) + + if testing.Short() { + return + } + + t.Run("mockV0Provider", func(t *testing.T) { + assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { + testFailedPodScenario(ctx, t, s) + })) + }) + }) + + t.Run("succeededPodScenario", func(t *testing.T) { + t.Parallel() + t.Run("mockProvider", func(t *testing.T) { + mp := newMockProvider() + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testSucceededPodScenario(ctx, t, s) + })) + }) + if testing.Short() { + return + } + t.Run("mockV0Provider", func(t *testing.T) { + assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { + testSucceededPodScenario(ctx, t, s) })) }) }) @@ -192,29 +254,55 @@ func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunct return <-sys.retChan } -func testPodLifecycle(ctx context.Context, t *testing.T, s *system) { +func testFailedPodScenario(ctx context.Context, t *testing.T, s *system) { + testTerminalStatePodScenario(ctx, t, s, corev1.PodFailed) +} + +func testSucceededPodScenario(ctx context.Context, t *testing.T, s *system) { + testTerminalStatePodScenario(ctx, t, s, corev1.PodSucceeded) +} +func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system, state corev1.PodPhase) { + + t.Parallel() + + p1 := newPod() + p1.Status.Phase = state + // Create the Pod + _, e := s.client.CoreV1().Pods(testNamespace).Create(p1) + assert.NilError(t, e) + + // Start the pod controller + s.start(ctx) + + for s.pc.k8sQ.Len() > 0 { + } + + p2, err := s.client.CoreV1().Pods(testNamespace).Get(p1.Name, metav1.GetOptions{}) + assert.NilError(t, err) + + // Make sure the pods have not changed + assert.DeepEqual(t, p1, p2) +} + +func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) { + t.Parallel() + + pod := newPod() + assert.NilError(t, m.CreatePod(ctx, pod)) + + // Start the pod controller + s.start(ctx) + + assert.Assert(t, m.deletes == 1) + +} + +func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system) { t.Parallel() ctx, cancel := context.WithCancel(ctx) defer cancel() - p := corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "my-pod", - Namespace: testNamespace, - UID: "4f20ff31-7775-11e9-893d-000c29a24b34", - ResourceVersion: "100", - }, - Spec: corev1.PodSpec{ - NodeName: testNodeName, - }, - Status: corev1.PodStatus{ - Phase: corev1.PodPending, - }, - } + p := newPod() listOptions := metav1.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(), @@ -240,7 +328,7 @@ func testPodLifecycle(ctx context.Context, t *testing.T, s *system) { }() // Create the Pod - _, e := s.client.CoreV1().Pods(testNamespace).Create(&p) + _, e := s.client.CoreV1().Pods(testNamespace).Create(p) assert.NilError(t, e) // This will return once @@ -325,3 +413,24 @@ func testPodLifecycle(ctx context.Context, t *testing.T, s *system) { } } + +func newPod() *corev1.Pod { + return &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + Namespace: testNamespace, + UID: "4f20ff31-7775-11e9-893d-000c29a24b34", + ResourceVersion: "100", + }, + Spec: corev1.PodSpec{ + NodeName: testNodeName, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + }, + } +} From bd8e39e3f9828cc43b8954b7c2ffb09a0288b1b4 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Sat, 27 Jul 2019 12:43:39 -0700 Subject: [PATCH 3/4] Add a benchmark for pod creation This adds a benchmark for pod creation and makes the mock_test provider actually work correctly in concurrent situations. --- go.mod | 1 + go.sum | 4 ++++ node/lifecycle_test.go | 52 ++++++++++++++++++++++++++++++++++++++++-- node/mock_test.go | 23 ++++++++++--------- 4 files changed, 67 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 9cd8eaa18..e72c2efb5 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/modern-go/reflect2 v1.0.1 // indirect github.com/onsi/ginkgo v1.8.0 // indirect github.com/onsi/gomega v1.5.0 // indirect + github.com/pborman/uuid v1.2.0 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pkg/errors v0.8.1 github.com/sirupsen/logrus v1.4.1 diff --git a/go.sum b/go.sum index 14f2a0d0a..60cebcfb4 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= +github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gnostic v0.0.0-20170426233943-68f4ded48ba9/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= github.com/googleapis/gnostic v0.1.0 h1:rVsPeBmXbYv4If/cumu1AzZPwV58q433hvONV1UEZoI= github.com/googleapis/gnostic v0.1.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= @@ -125,6 +127,8 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= +github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g= +github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 4b9f73974..059f3d571 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -15,6 +15,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/watch" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -414,8 +415,51 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system) } } -func newPod() *corev1.Pod { - return &corev1.Pod{ +func BenchmarkCreatePods(b *testing.B) { + sl := logrus.StandardLogger() + sl.SetLevel(logrus.ErrorLevel) + newLogger := logruslogger.FromLogrus(logrus.NewEntry(sl)) + + ctx := context.Background() + ctx = log.WithLogger(ctx, newLogger) + + assert.NilError(b, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) { + benchmarkCreatePods(ctx, b, s) + })) +} + +func benchmarkCreatePods(ctx context.Context, b *testing.B, s *system) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + errCh := s.start(ctx) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + pod := newPod(randomizeUID, randomizeName) + _, err := s.client.CoreV1().Pods(pod.Namespace).Create(pod) + assert.NilError(b, err) + select { + case err = <-errCh: + b.Fatalf("Benchmark terminated with error: %+v", err) + default: + } + } +} + +type podModifier func(*corev1.Pod) + +func randomizeUID(pod *corev1.Pod) { + pod.ObjectMeta.UID = uuid.NewUUID() +} + +func randomizeName(pod *corev1.Pod) { + name := fmt.Sprintf("pod-%s", uuid.NewUUID()) + pod.Name = name +} + +func newPod(podmodifiers ...podModifier) *corev1.Pod { + pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", APIVersion: "v1", @@ -433,4 +477,8 @@ func newPod() *corev1.Pod { Phase: corev1.PodPending, }, } + for _, modifier := range podmodifiers { + modifier(pod) + } + return pod } diff --git a/node/mock_test.go b/node/mock_test.go index 62c713c80..1ab237dd2 100644 --- a/node/mock_test.go +++ b/node/mock_test.go @@ -3,6 +3,7 @@ package node import ( "context" "fmt" + "sync" "time" "github.com/virtual-kubelet/virtual-kubelet/errdefs" @@ -23,7 +24,7 @@ type mockV0Provider struct { errorOnDelete error - pods map[string]*v1.Pod + pods sync.Map startTime time.Time notifier func(*v1.Pod) } @@ -35,7 +36,6 @@ type mockProvider struct { // NewMockProviderMockConfig creates a new mockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface func newMockV0Provider() *mockV0Provider { provider := mockV0Provider{ - pods: make(map[string]*v1.Pod), startTime: time.Now(), // By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface, // it will be set, and then we'll call a real underlying implementation. @@ -98,7 +98,7 @@ func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { }) } - p.pods[key] = pod + p.pods.Store(key, pod) p.notifier(pod) return nil @@ -114,7 +114,7 @@ func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { return err } - p.pods[key] = pod + p.pods.Store(key, pod) p.notifier(pod) return nil @@ -134,12 +134,12 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) return err } - if _, exists := p.pods[key]; !exists { + if _, exists := p.pods.Load(key); !exists { return errdefs.NotFound("pod not found") } now := metav1.Now() - delete(p.pods, key) + p.pods.Delete(key) pod.Status.Phase = v1.PodSucceeded pod.Status.Reason = "MockProviderPodDeleted" @@ -169,8 +169,8 @@ func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (po return nil, err } - if pod, ok := p.pods[key]; ok { - return pod, nil + if pod, ok := p.pods.Load(key); ok { + return pod.(*v1.Pod), nil } return nil, errdefs.NotFoundf("pod \"%s/%s\" is not known to the provider", namespace, name) } @@ -194,9 +194,10 @@ func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { var pods []*v1.Pod - for _, pod := range p.pods { - pods = append(pods, pod) - } + p.pods.Range(func(key, pod interface{}) bool { + pods = append(pods, pod.(*v1.Pod)) + return true + }) return pods, nil } From 50bbc3d1d4ba5807b26d2e185c80acf306f328f6 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Mon, 29 Jul 2019 08:30:35 -0700 Subject: [PATCH 4/4] Add tests around updates This makes sure the update function works correctly after the pod is running if the podspec is changed. Upon writing the test, I realized we were accessing the variables outside of the goroutine that the workers with tests were running in, and we had no locks. Therefore, I converted all of those numbers to use atomics. --- node/lifecycle_test.go | 76 ++++++++++++++++++++++++++++++++++++++++++ node/mock_test.go | 13 ++++---- node/pod_test.go | 25 +++++++------- 3 files changed, 96 insertions(+), 18 deletions(-) diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 059f3d571..3bccdf02e 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "sync/atomic" "testing" "time" @@ -165,6 +166,16 @@ func TestPodLifecycle(t *testing.T) { })) }) }) + + t.Run("updatePodWhileRunningScenario", func(t *testing.T) { + t.Parallel() + t.Run("mockProvider", func(t *testing.T) { + mp := newMockProvider() + assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { + testUpdatePodWhileRunningScenario(ctx, t, s, mp) + })) + }) + }) } type testFunction func(ctx context.Context, s *system) @@ -415,6 +426,62 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system) } } +func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) { + t.Parallel() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + p := newPod() + + listOptions := metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(), + } + + watchErrCh := make(chan error) + + // Create a Pod + _, e := s.client.CoreV1().Pods(testNamespace).Create(p) + assert.NilError(t, e) + + // Setup a watch to check if the pod is in running + watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions) + assert.NilError(t, err) + go func() { + _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, + // Wait for the pod to be started + func(ev watch.Event) (bool, error) { + pod := ev.Object.(*corev1.Pod) + return pod.Status.Phase == corev1.PodRunning, nil + }) + + watchErrCh <- watchErr + }() + + // Start the pod controller + podControllerErrCh := s.start(ctx) + + // Wait for pod to be in running + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case err = <-podControllerErrCh: + assert.NilError(t, err) + t.Fatal("Pod controller exited prematurely without error") + case err = <-watchErrCh: + assert.NilError(t, err) + + } + + // Update the pod + + bumpResourceVersion(p) + p.Spec.SchedulerName = "joe" + _, err = s.client.CoreV1().Pods(p.Namespace).Update(p) + assert.NilError(t, err) + for atomic.LoadUint64(&m.updates) == 0 { + } +} + func BenchmarkCreatePods(b *testing.B) { sl := logrus.StandardLogger() sl.SetLevel(logrus.ErrorLevel) @@ -458,6 +525,15 @@ func randomizeName(pod *corev1.Pod) { pod.Name = name } +func bumpResourceVersion(pod *corev1.Pod) { + version, err := strconv.Atoi(pod.ResourceVersion) + if err != nil { + panic(err) + } + + pod.ResourceVersion = strconv.Itoa(version + 1) +} + func newPod(podmodifiers ...podModifier) *corev1.Pod { pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ diff --git a/node/mock_test.go b/node/mock_test.go index 1ab237dd2..d0835ca53 100644 --- a/node/mock_test.go +++ b/node/mock_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/virtual-kubelet/virtual-kubelet/errdefs" @@ -18,9 +19,9 @@ var ( ) type mockV0Provider struct { - creates int - updates int - deletes int + creates uint64 + updates uint64 + deletes uint64 errorOnDelete error @@ -56,7 +57,7 @@ func newMockProvider() *mockProvider { func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { log.G(ctx).Infof("receive CreatePod %q", pod.Name) - p.creates++ + atomic.AddUint64(&p.creates, 1) key, err := buildKey(pod) if err != nil { return err @@ -108,7 +109,7 @@ func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { log.G(ctx).Infof("receive UpdatePod %q", pod.Name) - p.updates++ + atomic.AddUint64(&p.updates, 1) key, err := buildKey(pod) if err != nil { return err @@ -128,7 +129,7 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) return p.errorOnDelete } - p.deletes++ + atomic.AddUint64(&p.deletes, 1) key, err := buildKey(pod) if err != nil { return err diff --git a/node/pod_test.go b/node/pod_test.go index 56eff411f..3006d23fb 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -16,6 +16,7 @@ package node import ( "context" + "sync/atomic" "testing" pkgerrors "github.com/pkg/errors" @@ -152,8 +153,8 @@ func TestPodCreateNewPod(t *testing.T) { assert.Check(t, is.Nil(err)) // createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist - assert.Check(t, is.Equal(svr.mock.creates, 1)) - assert.Check(t, is.Equal(svr.mock.updates, 0)) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) } func TestPodUpdateExisting(t *testing.T) { @@ -179,8 +180,8 @@ func TestPodUpdateExisting(t *testing.T) { err := svr.provider.CreatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) - assert.Check(t, is.Equal(svr.mock.creates, 1)) - assert.Check(t, is.Equal(svr.mock.updates, 0)) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) pod2 := &corev1.Pod{} pod2.ObjectMeta.Namespace = "default" @@ -204,8 +205,8 @@ func TestPodUpdateExisting(t *testing.T) { assert.Check(t, is.Nil(err)) // createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed - assert.Check(t, is.Equal(svr.mock.creates, 1)) - assert.Check(t, is.Equal(svr.mock.updates, 1)) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(1))) } func TestPodNoSpecChange(t *testing.T) { @@ -231,15 +232,15 @@ func TestPodNoSpecChange(t *testing.T) { err := svr.mock.CreatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) - assert.Check(t, is.Equal(svr.mock.creates, 1)) - assert.Check(t, is.Equal(svr.mock.updates, 0)) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) err = svr.createOrUpdatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) // createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change - assert.Check(t, is.Equal(svr.mock.creates, 1)) - assert.Check(t, is.Equal(svr.mock.updates, 0)) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.creates), uint64(1))) + assert.Check(t, is.Equal(atomic.LoadUint64(&svr.mock.updates), uint64(0))) } func TestPodDelete(t *testing.T) { @@ -279,7 +280,7 @@ func TestPodDelete(t *testing.T) { ctx := context.Background() err = c.createOrUpdatePod(ctx, p) // make sure it's actually created assert.NilError(t, err) - assert.Check(t, is.Equal(c.mock.creates, 1)) + assert.Check(t, is.Equal(atomic.LoadUint64(&c.mock.creates), uint64(1))) err = c.deletePod(ctx, pod.Namespace, pod.Name) assert.Equal(t, pkgerrors.Cause(err), err) @@ -288,7 +289,7 @@ func TestPodDelete(t *testing.T) { if tc.delErr == nil { expectDeletes = 1 } - assert.Check(t, is.Equal(c.mock.deletes, expectDeletes)) + assert.Check(t, is.Equal(atomic.LoadUint64(&c.mock.deletes), uint64(expectDeletes))) expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr)