From 0e1cc1566e781ca1ccb2ef848443029fd8c3b15a Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Mon, 15 Feb 2021 14:53:50 -0800 Subject: [PATCH 1/3] Create envtest wrapper Lift up a little bit of the common envtest code into a common wrapper function. --- node/env_test.go | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/node/env_test.go b/node/env_test.go index ae85ef0d5..4c73bcf90 100644 --- a/node/env_test.go +++ b/node/env_test.go @@ -25,6 +25,9 @@ func TestEnvtest(t *testing.T) { if !*enableEnvTest || os.Getenv("VK_ENVTEST") != "" { t.Skip("test only runs when -envtest is passed or if VK_ENVTEST is set to a non-empty value") } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := &envtest.Environment{} _, err := env.Start() assert.NilError(t, err) @@ -33,12 +36,12 @@ func TestEnvtest(t *testing.T) { }() t.Log("Env test environment ready") - t.Run("E2ERunWithoutLeases", func(t *testing.T) { + t.Run("E2ERunWithoutLeases", wrapE2ETest(ctx, env, func(ctx context.Context, t *testing.T, environment *envtest.Environment) { testNodeE2ERun(t, env, false) - }) - t.Run("E2ERunWithLeases", func(t *testing.T) { + })) + t.Run("E2ERunWithLeases", wrapE2ETest(ctx, env, func(ctx context.Context, t *testing.T, environment *envtest.Environment) { testNodeE2ERun(t, env, true) - }) + })) } func nodeNameForTest(t *testing.T) string { @@ -49,17 +52,25 @@ func nodeNameForTest(t *testing.T) string { return name } +func wrapE2ETest(ctx context.Context, env *envtest.Environment, f func(context.Context, *testing.T, *envtest.Environment)) func(*testing.T) { + return func(t *testing.T) { + log.G(ctx) + sl := logrus.StandardLogger() + sl.SetLevel(logrus.DebugLevel) + logger := logruslogger.FromLogrus(sl.WithField("test", t.Name())) + ctx = log.WithLogger(ctx, logger) + + // The following requires that E2E tests are performed *sequentially* + log.L = logger + klogv2.SetLogger(logrusr.NewLogger(sl)) + f(ctx, t, env) + } +} + func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sl := logrus.StandardLogger() - sl.SetLevel(logrus.DebugLevel) - logger := logruslogger.FromLogrus(sl.WithField("test", t.Name())) - ctx = log.WithLogger(ctx, logger) - log.L = logger - klogv2.SetLogger(logrusr.NewLogger(sl)) - clientset, err := kubernetes.NewForConfig(env.Config) assert.NilError(t, err) nodes := clientset.CoreV1().Nodes() From 7feb17572089e04e73f944de1c2079e1def6aa5b Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Mon, 15 Feb 2021 15:14:01 -0800 Subject: [PATCH 2/3] Split up lifecycle test wireUpSystem function This splits up the wireUpSystem function into a chunk that makes it "client agnostic". It also removes the requirement that the client is faked. --- node/lifecycle_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 3adc2f8b3..2c8a951ed 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/watch" kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" ktesting "k8s.io/client-go/testing" @@ -226,7 +227,7 @@ func TestPodLifecycle(t *testing.T) { type testFunction func(ctx context.Context, s *system) type system struct { pc *PodController - client *fake.Clientset + client kubernetes.Interface podControllerConfig PodControllerConfig } @@ -262,6 +263,13 @@ func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunct return false, nil, nil }) + return wireUpSystemWithClient(ctx, provider, client, f) +} + +func wireUpSystemWithClient(ctx context.Context, provider PodLifecycleHandler, client kubernetes.Interface, f testFunction) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + // This is largely copy and pasted code from the root command sharedInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions( client, From c4582ccfbc9360abca02832f9ba5eb962d61b30f Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Mon, 15 Feb 2021 17:34:01 -0800 Subject: [PATCH 3/3] Allow providers to update pod statuses We had added an optimization that made it so we dedupe pod status updates from the provider. This ignored two subfields that could be updated along with status. Because the details of subresource updating is a bit API server centric, I wrote an envtest which checks for this behaviour. Signed-off-by: Sargun Dhillon --- node/env_test.go | 73 ++++++++++++++++++++++++++++++++++++++++-- node/lifecycle_test.go | 11 +++++++ node/pod.go | 9 ++---- node/podcontroller.go | 6 +++- 4 files changed, 89 insertions(+), 10 deletions(-) diff --git a/node/env_test.go b/node/env_test.go index 4c73bcf90..7a3448351 100644 --- a/node/env_test.go +++ b/node/env_test.go @@ -16,6 +16,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" klogv2 "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/envtest" @@ -42,9 +43,11 @@ func TestEnvtest(t *testing.T) { t.Run("E2ERunWithLeases", wrapE2ETest(ctx, env, func(ctx context.Context, t *testing.T, environment *envtest.Environment) { testNodeE2ERun(t, env, true) })) + + t.Run("E2EPodStatusUpdate", wrapE2ETest(ctx, env, testPodStatusUpdate)) } -func nodeNameForTest(t *testing.T) string { +func kubernetesNameForTest(t *testing.T) string { name := t.Name() name = strings.ToLower(name) name = strings.ReplaceAll(name, "/", "-") @@ -67,6 +70,72 @@ func wrapE2ETest(ctx context.Context, env *envtest.Environment, f func(context.C } } +func testPodStatusUpdate(ctx context.Context, t *testing.T, env *envtest.Environment) { + provider := newMockProvider() + + clientset, err := kubernetes.NewForConfig(env.Config) + assert.NilError(t, err) + pods := clientset.CoreV1().Pods(testNamespace) + + assert.NilError(t, wireUpSystemWithClient(ctx, provider, clientset, func(ctx context.Context, s *system) { + p := newPod(forRealAPIServer, nameBasedOnTest(t)) + // In real API server, we don't set the resource version + p.ResourceVersion = "" + newPod, err := pods.Create(ctx, p, metav1.CreateOptions{}) + assert.NilError(t, err) + + key, err := buildKey(newPod) + assert.NilError(t, err) + + listOptions := metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(), + } + + // Setup a watch to check if the pod is in running + watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions) + assert.NilError(t, err) + defer watcher.Stop() + // Start the pod controller + assert.NilError(t, s.start(ctx)) + var serverPod *corev1.Pod + for { + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case ev := <-watcher.ResultChan(): + serverPod = ev.Object.(*corev1.Pod) + if serverPod.Status.Phase == corev1.PodRunning { + goto running + } + } + } + running: + t.Log("Observed pod in running state") + + providerPod, ok := provider.pods.Load(key) + assert.Assert(t, ok) + providerPodCopy := providerPod.(*corev1.Pod).DeepCopy() + providerPodCopy.Status = serverPod.Status + if providerPodCopy.Annotations == nil { + providerPodCopy.Annotations = make(map[string]string, 1) + } + providerPodCopy.Annotations["testannotation"] = "testvalue" + provider.notifier(providerPodCopy) + + for { + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case ev := <-watcher.ResultChan(): + annotations := ev.Object.(*corev1.Pod).Annotations + if annotations != nil && annotations["testannotation"] == "testvalue" { + return + } + } + } + })) +} + func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -81,7 +150,7 @@ func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) { testNode := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: nodeNameForTest(t), + Name: kubernetesNameForTest(t), }, } diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 2c8a951ed..e455117a7 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -628,6 +628,17 @@ func randomizeName(pod *corev1.Pod) { pod.Name = name } +func forRealAPIServer(pod *corev1.Pod) { + pod.ResourceVersion = "" + pod.ObjectMeta.UID = "" +} + +func nameBasedOnTest(t *testing.T) podModifier { + return func(pod *corev1.Pod) { + pod.Name = kubernetesNameForTest(t) + } +} + func newPod(podmodifiers ...podModifier) *corev1.Pod { var terminationGracePeriodSeconds int64 = 30 pod := &corev1.Pod{ diff --git a/node/pod.go b/node/pod.go index e323b468e..bd538c5cb 100644 --- a/node/pod.go +++ b/node/pod.go @@ -215,14 +215,7 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes } kPod := obj.(*knownPod) kPod.Lock() - podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy() - if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) && podFromProvider.DeletionTimestamp == nil { - kPod.lastPodStatusUpdateSkipped = true - kPod.Unlock() - return nil - } - kPod.lastPodStatusUpdateSkipped = false kPod.Unlock() // Pod deleted by provider due some reasons. e.g. a K8s provider, pod created by deployment would be evicted when node is not ready. // If we do not delete pod in K8s, deployment would not create a new one. @@ -326,9 +319,11 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1 kpod := obj.(*knownPod) kpod.Lock() if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) { + kpod.lastPodStatusUpdateSkipped = true kpod.Unlock() return } + kpod.lastPodStatusUpdateSkipped = false kpod.lastPodStatusReceivedFromProvider = pod kpod.Unlock() pc.syncPodStatusFromProvider.Enqueue(ctx, key) diff --git a/node/podcontroller.go b/node/podcontroller.go index 4add45566..d1731bf65 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -339,7 +339,11 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er kPod := obj.(*knownPod) kPod.Lock() - if kPod.lastPodStatusUpdateSkipped && !cmp.Equal(newPod.Status, kPod.lastPodStatusReceivedFromProvider.Status) { + if kPod.lastPodStatusUpdateSkipped && + (!cmp.Equal(newPod.Status, kPod.lastPodStatusReceivedFromProvider.Status) || + !cmp.Equal(newPod.Annotations, kPod.lastPodStatusReceivedFromProvider.Annotations) || + !cmp.Equal(newPod.Labels, kPod.lastPodStatusReceivedFromProvider.Labels) || + !cmp.Equal(newPod.Finalizers, kPod.lastPodStatusReceivedFromProvider.Finalizers)) { // The last pod from the provider -> kube api server was skipped, but we see they no longer match. // This means that the pod in API server was changed by someone else [this can be okay], but we skipped // a status update on our side because we compared the status received from the provider to the status