diff --git a/node/env_test.go b/node/env_test.go index 4c73bcf90..7a3448351 100644 --- a/node/env_test.go +++ b/node/env_test.go @@ -16,6 +16,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" klogv2 "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/envtest" @@ -42,9 +43,11 @@ func TestEnvtest(t *testing.T) { t.Run("E2ERunWithLeases", wrapE2ETest(ctx, env, func(ctx context.Context, t *testing.T, environment *envtest.Environment) { testNodeE2ERun(t, env, true) })) + + t.Run("E2EPodStatusUpdate", wrapE2ETest(ctx, env, testPodStatusUpdate)) } -func nodeNameForTest(t *testing.T) string { +func kubernetesNameForTest(t *testing.T) string { name := t.Name() name = strings.ToLower(name) name = strings.ReplaceAll(name, "/", "-") @@ -67,6 +70,72 @@ func wrapE2ETest(ctx context.Context, env *envtest.Environment, f func(context.C } } +func testPodStatusUpdate(ctx context.Context, t *testing.T, env *envtest.Environment) { + provider := newMockProvider() + + clientset, err := kubernetes.NewForConfig(env.Config) + assert.NilError(t, err) + pods := clientset.CoreV1().Pods(testNamespace) + + assert.NilError(t, wireUpSystemWithClient(ctx, provider, clientset, func(ctx context.Context, s *system) { + p := newPod(forRealAPIServer, nameBasedOnTest(t)) + // In real API server, we don't set the resource version + p.ResourceVersion = "" + newPod, err := pods.Create(ctx, p, metav1.CreateOptions{}) + assert.NilError(t, err) + + key, err := buildKey(newPod) + assert.NilError(t, err) + + listOptions := metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(), + } + + // Setup a watch to check if the pod is in running + watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions) + assert.NilError(t, err) + defer watcher.Stop() + // Start the pod controller + assert.NilError(t, s.start(ctx)) + var serverPod *corev1.Pod + for { + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case ev := <-watcher.ResultChan(): + serverPod = ev.Object.(*corev1.Pod) + if serverPod.Status.Phase == corev1.PodRunning { + goto running + } + } + } + running: + t.Log("Observed pod in running state") + + providerPod, ok := provider.pods.Load(key) + assert.Assert(t, ok) + providerPodCopy := providerPod.(*corev1.Pod).DeepCopy() + providerPodCopy.Status = serverPod.Status + if providerPodCopy.Annotations == nil { + providerPodCopy.Annotations = make(map[string]string, 1) + } + providerPodCopy.Annotations["testannotation"] = "testvalue" + provider.notifier(providerPodCopy) + + for { + select { + case <-ctx.Done(): + t.Fatalf("Context ended early: %s", ctx.Err().Error()) + case ev := <-watcher.ResultChan(): + annotations := ev.Object.(*corev1.Pod).Annotations + if annotations != nil && annotations["testannotation"] == "testvalue" { + return + } + } + } + })) +} + func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -81,7 +150,7 @@ func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) { testNode := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: nodeNameForTest(t), + Name: kubernetesNameForTest(t), }, } diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 2c8a951ed..e455117a7 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -628,6 +628,17 @@ func randomizeName(pod *corev1.Pod) { pod.Name = name } +func forRealAPIServer(pod *corev1.Pod) { + pod.ResourceVersion = "" + pod.ObjectMeta.UID = "" +} + +func nameBasedOnTest(t *testing.T) podModifier { + return func(pod *corev1.Pod) { + pod.Name = kubernetesNameForTest(t) + } +} + func newPod(podmodifiers ...podModifier) *corev1.Pod { var terminationGracePeriodSeconds int64 = 30 pod := &corev1.Pod{ diff --git a/node/pod.go b/node/pod.go index e323b468e..bd538c5cb 100644 --- a/node/pod.go +++ b/node/pod.go @@ -215,14 +215,7 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes } kPod := obj.(*knownPod) kPod.Lock() - podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy() - if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) && podFromProvider.DeletionTimestamp == nil { - kPod.lastPodStatusUpdateSkipped = true - kPod.Unlock() - return nil - } - kPod.lastPodStatusUpdateSkipped = false kPod.Unlock() // Pod deleted by provider due some reasons. e.g. a K8s provider, pod created by deployment would be evicted when node is not ready. // If we do not delete pod in K8s, deployment would not create a new one. @@ -326,9 +319,11 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1 kpod := obj.(*knownPod) kpod.Lock() if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) { + kpod.lastPodStatusUpdateSkipped = true kpod.Unlock() return } + kpod.lastPodStatusUpdateSkipped = false kpod.lastPodStatusReceivedFromProvider = pod kpod.Unlock() pc.syncPodStatusFromProvider.Enqueue(ctx, key) diff --git a/node/podcontroller.go b/node/podcontroller.go index 4add45566..d1731bf65 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -339,7 +339,11 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er kPod := obj.(*knownPod) kPod.Lock() - if kPod.lastPodStatusUpdateSkipped && !cmp.Equal(newPod.Status, kPod.lastPodStatusReceivedFromProvider.Status) { + if kPod.lastPodStatusUpdateSkipped && + (!cmp.Equal(newPod.Status, kPod.lastPodStatusReceivedFromProvider.Status) || + !cmp.Equal(newPod.Annotations, kPod.lastPodStatusReceivedFromProvider.Annotations) || + !cmp.Equal(newPod.Labels, kPod.lastPodStatusReceivedFromProvider.Labels) || + !cmp.Equal(newPod.Finalizers, kPod.lastPodStatusReceivedFromProvider.Finalizers)) { // The last pod from the provider -> kube api server was skipped, but we see they no longer match. // This means that the pod in API server was changed by someone else [this can be okay], but we skipped // a status update on our side because we compared the status received from the provider to the status