From c4582ccfbc9360abca02832f9ba5eb962d61b30f Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Mon, 15 Feb 2021 17:34:01 -0800 Subject: [PATCH] Allow providers to update pod statuses We had added an optimization that made it so we dedupe pod status updates from the provider. This ignored two subfields that could be updated along with status. Because the details of subresource updating is a bit API server centric, I wrote an envtest which checks for this behaviour. Signed-off-by: Sargun Dhillon --- node/env_test.go | 73 ++++++++++++++++++++++++++++++++++++++++-- node/lifecycle_test.go | 11 +++++++ node/pod.go | 9 ++---- node/podcontroller.go | 6 +++- 4 files changed, 89 insertions(+), 10 deletions(-) diff --git a/node/env_test.go b/node/env_test.go index 4c73bcf90..7a3448351 100644 --- a/node/env_test.go +++ b/node/env_test.go @@ -16,6 +16,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" klogv2 "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/envtest" @@ -42,9 +43,11 @@ func TestEnvtest(t *testing.T) { t.Run("E2ERunWithLeases", wrapE2ETest(ctx, env, func(ctx context.Context, t *testing.T, environment *envtest.Environment) { testNodeE2ERun(t, env, true) })) + + t.Run("E2EPodStatusUpdate", wrapE2ETest(ctx, env, testPodStatusUpdate)) } -func nodeNameForTest(t *testing.T) string { +func kubernetesNameForTest(t *testing.T) string { name := t.Name() name = strings.ToLower(name) name = strings.ReplaceAll(name, "/", "-") @@ -67,6 +70,72 @@ func wrapE2ETest(ctx context.Context, env *envtest.Environment, f func(context.C } } +func testPodStatusUpdate(ctx context.Context, t *testing.T, env *envtest.Environment) { + provider := newMockProvider() + + clientset, err := kubernetes.NewForConfig(env.Config) + assert.NilError(t, err) + pods := clientset.CoreV1().Pods(testNamespace) + + assert.NilError(t, wireUpSystemWithClient(ctx, provider, clientset, func(ctx context.Context, s *system) { + p := newPod(forRealAPIServer, nameBasedOnTest(t)) + // In real API server, we don't set the resource version + p.ResourceVersion = "" + newPod, err := pods.Create(ctx, p, metav1.CreateOptions{}) + assert.NilError(t, err) + + key, err := buildKey(newPod) + assert.NilError(t, err) + + listOptions := metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(), + } + + // Setup a watch to check if the pod is in running + watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions) + assert.NilError(t, err) + defer watcher.Stop() + // Start the pod controller + assert.NilError(t, s.start(ctx)) + var serverPod *corev1.Pod + for { + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case ev := <-watcher.ResultChan(): + serverPod = ev.Object.(*corev1.Pod) + if serverPod.Status.Phase == corev1.PodRunning { + goto running + } + } + } + running: + t.Log("Observed pod in running state") + + providerPod, ok := provider.pods.Load(key) + assert.Assert(t, ok) + providerPodCopy := providerPod.(*corev1.Pod).DeepCopy() + providerPodCopy.Status = serverPod.Status + if providerPodCopy.Annotations == nil { + providerPodCopy.Annotations = make(map[string]string, 1) + } + providerPodCopy.Annotations["testannotation"] = "testvalue" + provider.notifier(providerPodCopy) + + for { + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case ev := <-watcher.ResultChan(): + annotations := ev.Object.(*corev1.Pod).Annotations + if annotations != nil && annotations["testannotation"] == "testvalue" { + return + } + } + } + })) +} + func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -81,7 +150,7 @@ func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) { testNode := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: nodeNameForTest(t), + Name: kubernetesNameForTest(t), }, } diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 2c8a951ed..e455117a7 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -628,6 +628,17 @@ func randomizeName(pod *corev1.Pod) { pod.Name = name } +func forRealAPIServer(pod *corev1.Pod) { + pod.ResourceVersion = "" + pod.ObjectMeta.UID = "" +} + +func nameBasedOnTest(t *testing.T) podModifier { + return func(pod *corev1.Pod) { + pod.Name = kubernetesNameForTest(t) + } +} + func newPod(podmodifiers ...podModifier) *corev1.Pod { var terminationGracePeriodSeconds int64 = 30 pod := &corev1.Pod{ diff --git a/node/pod.go b/node/pod.go index e323b468e..bd538c5cb 100644 --- a/node/pod.go +++ b/node/pod.go @@ -215,14 +215,7 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes } kPod := obj.(*knownPod) kPod.Lock() - podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy() - if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) && podFromProvider.DeletionTimestamp == nil { - kPod.lastPodStatusUpdateSkipped = true - kPod.Unlock() - return nil - } - kPod.lastPodStatusUpdateSkipped = false kPod.Unlock() // Pod deleted by provider due some reasons. e.g. a K8s provider, pod created by deployment would be evicted when node is not ready. // If we do not delete pod in K8s, deployment would not create a new one. @@ -326,9 +319,11 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1 kpod := obj.(*knownPod) kpod.Lock() if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) { + kpod.lastPodStatusUpdateSkipped = true kpod.Unlock() return } + kpod.lastPodStatusUpdateSkipped = false kpod.lastPodStatusReceivedFromProvider = pod kpod.Unlock() pc.syncPodStatusFromProvider.Enqueue(ctx, key) diff --git a/node/podcontroller.go b/node/podcontroller.go index 4add45566..d1731bf65 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -339,7 +339,11 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er kPod := obj.(*knownPod) kPod.Lock() - if kPod.lastPodStatusUpdateSkipped && !cmp.Equal(newPod.Status, kPod.lastPodStatusReceivedFromProvider.Status) { + if kPod.lastPodStatusUpdateSkipped && + (!cmp.Equal(newPod.Status, kPod.lastPodStatusReceivedFromProvider.Status) || + !cmp.Equal(newPod.Annotations, kPod.lastPodStatusReceivedFromProvider.Annotations) || + !cmp.Equal(newPod.Labels, kPod.lastPodStatusReceivedFromProvider.Labels) || + !cmp.Equal(newPod.Finalizers, kPod.lastPodStatusReceivedFromProvider.Finalizers)) { // The last pod from the provider -> kube api server was skipped, but we see they no longer match. // This means that the pod in API server was changed by someone else [this can be okay], but we skipped // a status update on our side because we compared the status received from the provider to the status