From 1e9e055e89b6abdd6832a35d72cf525b46c1ea76 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Tue, 14 Jul 2020 18:46:22 -0700 Subject: [PATCH] Address concerns with PR Also, just use Kubernetes waiter library. --- node/pod.go | 62 ++++++++++++++++++++++++++++--------------- node/podcontroller.go | 4 +-- 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/node/pod.go b/node/pod.go index b0190935c..3a93aba0b 100644 --- a/node/pod.go +++ b/node/pod.go @@ -16,7 +16,7 @@ package node import ( "context" - "math/rand" + "fmt" "time" "github.com/google/go-cmp/cmp" @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -236,36 +237,55 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes // enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd // prior to enqueuePodStatusUpdate. func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { - // TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications from - // the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation. - l := log.G(ctx).WithField("method", "enqueuePodStatusUpdate") + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate") + defer span.End() + ctx = span.WithField(ctx, "method", "enqueuePodStatusUpdate") + + // TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications + // from the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation. key, err := cache.MetaNamespaceKeyFunc(pod) if err != nil { - l.WithError(err).Error("Error getting pod meta namespace key") + log.G(ctx).WithError(err).Error("Error getting pod meta namespace key") + span.SetStatus(err) return } - // This doesn't wait for all of the callbacks to finish. We should check if the pod exists in K8s. If the pod - // does not exist in K8s, then we can bail. Alternatively, if the + ctx = span.WithField(ctx, "key", key) + var obj interface{} - var ok bool -retry: - obj, ok = pc.knownPods.Load(key) - if !ok { - // Blind wait for sync. If we haven't synced yet, that's okay too. + // 151 milliseconds is arbitrarily chosen here as a small prime number. + err = wait.PollImmediateUntil(151*time.Millisecond, func() (bool, error) { + var ok bool + obj, ok = pc.knownPods.Load(key) + if ok { + return true, nil + } + + // Blind sync. Ignore return code. cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced) _, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name) - if errors.IsNotFound(err) { - return - } if err != nil { - l.WithError(err).Error("Received error from pod lister while trying to see if pod exists") - return + return false, err } - // err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. Sleep - // and retry for somewhere between 1 and 1000 microseconds. - time.Sleep(time.Microsecond * time.Duration(rand.Intn(1000))) - goto retry + + // err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. This likely means + // that we're in some kind of startup synchronization issue where the provider knows about a pod (as it performs + // recover, that we do not yet know about). + return false, nil + }, ctx.Done()) + + if err != nil { + if errors.IsNotFound(err) { + err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err) + log.G(ctx).WithError(err).Warn("Not enqueuing pod status update") + } else { + log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister") + } + span.SetStatus(err) + return } kpod := obj.(*knownPod) diff --git a/node/podcontroller.go b/node/podcontroller.go index 6a2574398..fe4902271 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -83,8 +83,8 @@ type PodNotifier interface { // fashion. The provided pod's PodStatus should be up to date when // this function is called. // - // NotifyPods may block callers. The NotifyPods function may be called - // concurrently. + // NotifyPods must not block the caller since it is only used to register the callback. + // The callback passed into `NotifyPods` may block when called. NotifyPods(context.Context, func(*corev1.Pod)) }