From 4a270fea08c53c6353972020d370a3567163008f Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 24 Jul 2019 16:12:03 -0700 Subject: [PATCH] Add a test which tests the e2e lifecycle of the pod controller This uses the mock provider, so I moved the mock provider to a location where the node test can use it. --- node/lifecycle_test.go | 327 +++++++++++++++++++++++++++++++++++++++++ node/mock_test.go | 225 ++++++++++++++++++++++++++++ node/pod_test.go | 60 -------- 3 files changed, 552 insertions(+), 60 deletions(-) create mode 100644 node/lifecycle_test.go create mode 100644 node/mock_test.go diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go new file mode 100644 index 000000000..ec80560bc --- /dev/null +++ b/node/lifecycle_test.go @@ -0,0 +1,327 @@ +package node + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/virtual-kubelet/virtual-kubelet/log" + logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" + "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + watchutils "k8s.io/client-go/tools/watch" + "k8s.io/klog" +) + +var ( + _ record.EventRecorder = (*fakeDiscardingRecorder)(nil) +) + +const ( + // There might be a constant we can already leverage here + testNamespace = "default" + informerResyncPeriod = time.Duration(1 * time.Second) + testNodeName = "testnode" + podSyncWorkers = 3 +) + +func init() { + klog.InitFlags(nil) + // We neet to set log.L because new spans derive their loggers from log.L + sl := logrus.StandardLogger() + sl.SetLevel(logrus.DebugLevel) + newLogger := logruslogger.FromLogrus(logrus.NewEntry(sl)) + log.L = newLogger +} + +// fakeDiscardingRecorder discards all events. Silently. +type fakeDiscardingRecorder struct { + logger log.Logger +} + +func (r *fakeDiscardingRecorder) Event(object runtime.Object, eventType, reason, message string) { + r.Eventf(object, eventType, reason, message) +} + +func (r *fakeDiscardingRecorder) Eventf(object runtime.Object, eventType, reason, messageFmt string, args ...interface{}) { + r.logger.WithFields(map[string]interface{}{ + "object": object, + "eventType": eventType, + "message": fmt.Sprintf(messageFmt, args...), + }).Infof("Received event") +} + +func (r *fakeDiscardingRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventType, reason, messageFmt string, args ...interface{}) { + r.logger.WithFields(map[string]interface{}{ + "timestamp": timestamp.String(), + "object": object, + "eventType": eventType, + "message": fmt.Sprintf(messageFmt, args...), + }).Infof("Received past event") +} + +func (r *fakeDiscardingRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventType, reason, messageFmt string, args ...interface{}) { + r.logger.WithFields(map[string]interface{}{ + "object": object, + "annotations": annotations, + "eventType": eventType, + "message": fmt.Sprintf(messageFmt, args...), + }).Infof("Received annotated event") +} + +func TestPodLifecycle(t *testing.T) { + // We don't do the defer cancel() thing here because t.Run is non-blocking, so the parent context may be cancelled + // before the children are finished and there is no way to do a "join" and wait for them without using a waitgroup, + // at which point, it doesn't seem much better. + ctx := context.Background() + + ctx = log.WithLogger(ctx, log.L) + + t.Run("createStartDeleteScenario", func(t *testing.T) { + t.Run("mockProvider", func(t *testing.T) { + + assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) { + testPodLifecycle(ctx, t, s) + })) + }) + + t.Run("mockV0Provider", func(t *testing.T) { + assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { + testPodLifecycle(ctx, t, s) + })) + }) + }) +} + +type testFunction func(ctx context.Context, s *system) +type system struct { + retChan chan error + pc *PodController + client *fake.Clientset + podControllerConfig PodControllerConfig +} + +func (s *system) start(ctx context.Context) chan error { + podControllerErrChan := make(chan error) + go func() { + podControllerErrChan <- s.pc.Run(ctx, podSyncWorkers) + }() + + // We need to wait for the pod controller to start. If there is an error before the pod controller starts, or + // the context is cancelled. If the context is cancelled, the startup will be aborted, and the pod controller + // will return an error, so we don't need to wait on ctx.Done() + select { + case <-s.pc.Ready(): + // This listens for errors, or exits in the future. + go func() { + podControllerErr := <-podControllerErrChan + s.retChan <- podControllerErr + }() + // If there is an error before things are ready, we need to forward it immediately + case podControllerErr := <-podControllerErrChan: + s.retChan <- podControllerErr + } + return s.retChan +} + +func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunction) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Create the fake client. + client := fake.NewSimpleClientset() + + // This is largely copy and pasted code from the root command + podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions( + client, + informerResyncPeriod, + kubeinformers.WithNamespace(testNamespace), + ) + podInformer := podInformerFactory.Core().V1().Pods() + + scmInformerFactory := kubeinformers.NewSharedInformerFactory(client, informerResyncPeriod) + + eb := record.NewBroadcaster() + eb.StartLogging(log.G(ctx).Infof) + eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(testNamespace)}) + fakeRecorder := &fakeDiscardingRecorder{ + logger: log.G(ctx), + } + + secretInformer := scmInformerFactory.Core().V1().Secrets() + configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() + serviceInformer := scmInformerFactory.Core().V1().Services() + sys := &system{ + client: client, + retChan: make(chan error, 1), + podControllerConfig: PodControllerConfig{ + PodClient: client.CoreV1(), + PodInformer: podInformer, + EventRecorder: fakeRecorder, + Provider: provider, + ConfigMapInformer: configMapInformer, + SecretInformer: secretInformer, + ServiceInformer: serviceInformer, + }, + } + + var err error + sys.pc, err = NewPodController(sys.podControllerConfig) + if err != nil { + return err + } + + go scmInformerFactory.Start(ctx.Done()) + go podInformerFactory.Start(ctx.Done()) + + f(ctx, sys) + + // Shutdown the pod controller, and wait for it to exit + cancel() + return <-sys.retChan +} + +func testPodLifecycle(ctx context.Context, t *testing.T, s *system) { + t.Parallel() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + p := corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + Namespace: testNamespace, + UID: "4f20ff31-7775-11e9-893d-000c29a24b34", + ResourceVersion: "100", + }, + Spec: corev1.PodSpec{ + NodeName: testNodeName, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + }, + } + + listOptions := metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(), + } + + watchErrCh := make(chan error) + + // Setup a watch (prior to pod creation, and pod controller startup) + watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions) + assert.NilError(t, err) + + // This ensures that the pod is created. + go func() { + _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, + // Wait for the pod to be created + // TODO(Sargun): Make this "smarter" about the status the pod is in. + func(ev watch.Event) (bool, error) { + pod := ev.Object.(*corev1.Pod) + return pod.Name == p.ObjectMeta.Name, nil + }) + + watchErrCh <- watchErr + }() + + // Create the Pod + _, e := s.client.CoreV1().Pods(testNamespace).Create(&p) + assert.NilError(t, e) + + // This will return once + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case err = <-watchErrCh: + assert.NilError(t, err) + + } + + // Setup a watch to check if the pod is in running + watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions) + assert.NilError(t, err) + go func() { + _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, + // Wait for the pod to be started + func(ev watch.Event) (bool, error) { + pod := ev.Object.(*corev1.Pod) + return pod.Status.Phase == corev1.PodRunning, nil + }) + + watchErrCh <- watchErr + }() + + // Start the pod controller + podControllerErrCh := s.start(ctx) + + // Wait for pod to be in running + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case err = <-podControllerErrCh: + assert.NilError(t, err) + t.Fatal("Pod controller exited prematurely without error") + case err = <-watchErrCh: + assert.NilError(t, err) + + } + + // Setup a watch prior to pod deletion + watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions) + assert.NilError(t, err) + go func() { + _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, + // Wait for the pod to be started + func(ev watch.Event) (bool, error) { + log.G(ctx).WithField("event", ev).Info("got event") + // TODO(Sargun): The pod should have transitioned into some status around failed / succeeded + // prior to being deleted. + // In addition, we should check if the deletion timestamp gets set + return ev.Type == watch.Deleted, nil + }) + watchErrCh <- watchErr + }() + + // Delete the pod + + // 1. Get the pod + currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{}) + assert.NilError(t, err) + // 2. Set the pod's deletion timestamp, version, and so on + curVersion, err := strconv.Atoi(currentPod.ResourceVersion) + assert.NilError(t, err) + currentPod.ResourceVersion = strconv.Itoa(curVersion + 1) + var deletionGracePeriod int64 = 30 + currentPod.DeletionGracePeriodSeconds = &deletionGracePeriod + deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod))) + currentPod.DeletionTimestamp = &deletionTimestamp + // 3. Update (overwrite) the pod + _, err = s.client.CoreV1().Pods(testNamespace).Update(currentPod) + assert.NilError(t, err) + + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case err = <-podControllerErrCh: + assert.NilError(t, err) + t.Fatal("Pod controller exited prematurely without error") + case err = <-watchErrCh: + assert.NilError(t, err) + + } +} diff --git a/node/mock_test.go b/node/mock_test.go new file mode 100644 index 000000000..62c713c80 --- /dev/null +++ b/node/mock_test.go @@ -0,0 +1,225 @@ +package node + +import ( + "context" + "fmt" + "time" + + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "github.com/virtual-kubelet/virtual-kubelet/log" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + _ PodLifecycleHandler = (*mockV0Provider)(nil) + _ PodNotifier = (*mockProvider)(nil) +) + +type mockV0Provider struct { + creates int + updates int + deletes int + + errorOnDelete error + + pods map[string]*v1.Pod + startTime time.Time + notifier func(*v1.Pod) +} + +type mockProvider struct { + *mockV0Provider +} + +// NewMockProviderMockConfig creates a new mockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface +func newMockV0Provider() *mockV0Provider { + provider := mockV0Provider{ + pods: make(map[string]*v1.Pod), + startTime: time.Now(), + // By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface, + // it will be set, and then we'll call a real underlying implementation. + // This makes it easier in the sense we don't need to wrap each method. + notifier: func(*v1.Pod) {}, + } + + return &provider +} + +// NewMockProviderMockConfig creates a new MockProvider with the given config +func newMockProvider() *mockProvider { + + return &mockProvider{mockV0Provider: newMockV0Provider()} +} + +// CreatePod accepts a Pod definition and stores it in memory. +func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { + log.G(ctx).Infof("receive CreatePod %q", pod.Name) + + p.creates++ + key, err := buildKey(pod) + if err != nil { + return err + } + + now := metav1.NewTime(time.Now()) + pod.Status = v1.PodStatus{ + Phase: v1.PodRunning, + HostIP: "1.2.3.4", + PodIP: "5.6.7.8", + StartTime: &now, + Conditions: []v1.PodCondition{ + { + Type: v1.PodInitialized, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + } + + for _, container := range pod.Spec.Containers { + pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{ + Name: container.Name, + Image: container.Image, + Ready: true, + RestartCount: 0, + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{ + StartedAt: now, + }, + }, + }) + } + + p.pods[key] = pod + p.notifier(pod) + + return nil +} + +// UpdatePod accepts a Pod definition and updates its reference. +func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { + log.G(ctx).Infof("receive UpdatePod %q", pod.Name) + + p.updates++ + key, err := buildKey(pod) + if err != nil { + return err + } + + p.pods[key] = pod + p.notifier(pod) + + return nil +} + +// DeletePod deletes the specified pod out of memory. +func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { + log.G(ctx).Infof("receive DeletePod %q", pod.Name) + + if p.errorOnDelete != nil { + return p.errorOnDelete + } + + p.deletes++ + key, err := buildKey(pod) + if err != nil { + return err + } + + if _, exists := p.pods[key]; !exists { + return errdefs.NotFound("pod not found") + } + + now := metav1.Now() + delete(p.pods, key) + pod.Status.Phase = v1.PodSucceeded + pod.Status.Reason = "MockProviderPodDeleted" + + for idx := range pod.Status.ContainerStatuses { + pod.Status.ContainerStatuses[idx].Ready = false + pod.Status.ContainerStatuses[idx].State = v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Message: "Mock provider terminated container upon deletion", + FinishedAt: now, + Reason: "MockProviderPodContainerDeleted", + StartedAt: pod.Status.ContainerStatuses[idx].State.Running.StartedAt, + }, + } + } + + p.notifier(pod) + + return nil +} + +// GetPod returns a pod by name that is stored in memory. +func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { + log.G(ctx).Infof("receive GetPod %q", name) + + key, err := buildKeyFromNames(namespace, name) + if err != nil { + return nil, err + } + + if pod, ok := p.pods[key]; ok { + return pod, nil + } + return nil, errdefs.NotFoundf("pod \"%s/%s\" is not known to the provider", namespace, name) +} + +// GetPodStatus returns the status of a pod by name that is "running". +// returns nil if a pod by that name is not found. +func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { + log.G(ctx).Infof("receive GetPodStatus %q", name) + + pod, err := p.GetPod(ctx, namespace, name) + if err != nil { + return nil, err + } + + return &pod.Status, nil +} + +// GetPods returns a list of all pods known to be "running". +func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { + log.G(ctx).Info("receive GetPods") + + var pods []*v1.Pod + + for _, pod := range p.pods { + pods = append(pods, pod) + } + + return pods, nil +} + +// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done +// within the provider. +func (p *mockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) { + p.notifier = notifier +} + +func buildKeyFromNames(namespace string, name string) (string, error) { + return fmt.Sprintf("%s-%s", namespace, name), nil +} + +// buildKey is a helper for building the "key" for the providers pod store. +func buildKey(pod *v1.Pod) (string, error) { + if pod.ObjectMeta.Namespace == "" { + return "", fmt.Errorf("pod namespace not found") + } + + if pod.ObjectMeta.Name == "" { + return "", fmt.Errorf("pod name not found") + } + + return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name) +} diff --git a/node/pod_test.go b/node/pod_test.go index 101532006..56eff411f 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -16,7 +16,6 @@ package node import ( "context" - "path" "testing" pkgerrors "github.com/pkg/errors" @@ -30,71 +29,12 @@ import ( "k8s.io/client-go/kubernetes/fake" ) -type mockProvider struct { - pods map[string]*corev1.Pod - - creates int - updates int - deletes int - - errorOnDelete error -} - -func (m *mockProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error { - m.pods[path.Join(pod.GetNamespace(), pod.GetName())] = pod - m.creates++ - return nil -} - -func (m *mockProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error { - m.pods[path.Join(pod.GetNamespace(), pod.GetName())] = pod - m.updates++ - return nil -} - -func (m *mockProvider) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) { - p := m.pods[path.Join(namespace, name)] - if p == nil { - return nil, errdefs.NotFound("not found") - } - return p, nil -} - -func (m *mockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) { - p := m.pods[path.Join(namespace, name)] - if p == nil { - return nil, errdefs.NotFound("not found") - } - return &p.Status, nil -} - -func (m *mockProvider) DeletePod(ctx context.Context, p *corev1.Pod) error { - if m.errorOnDelete != nil { - return m.errorOnDelete - } - delete(m.pods, path.Join(p.GetNamespace(), p.GetName())) - m.deletes++ - return nil -} - -func (m *mockProvider) GetPods(_ context.Context) ([]*corev1.Pod, error) { - ls := make([]*corev1.Pod, 0, len(m.pods)) - for _, p := range ls { - ls = append(ls, p) - } - return ls, nil -} - type TestController struct { *PodController mock *mockProvider client *fake.Clientset } -func newMockProvider() *mockProvider { - return &mockProvider{pods: make(map[string]*corev1.Pod)} -} - func newTestController() *TestController { fk8s := fake.NewSimpleClientset()