diff --git a/node/pod.go b/node/pod.go index e2a720226..b0190935c 100644 --- a/node/pod.go +++ b/node/pod.go @@ -16,6 +16,8 @@ package node import ( "context" + "math/rand" + "time" "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" @@ -234,21 +236,47 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes // enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd // prior to enqueuePodStatusUpdate. func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { - if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { - log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key") - } else { - if obj, ok := pc.knownPods.Load(key); ok { - kpod := obj.(*knownPod) - kpod.Lock() - if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) { - kpod.Unlock() - return - } - kpod.lastPodStatusReceivedFromProvider = pod - kpod.Unlock() - q.AddRateLimited(key) - } + // TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications from + // the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation. + l := log.G(ctx).WithField("method", "enqueuePodStatusUpdate") + key, err := cache.MetaNamespaceKeyFunc(pod) + if err != nil { + l.WithError(err).Error("Error getting pod meta namespace key") + return } + // This doesn't wait for all of the callbacks to finish. We should check if the pod exists in K8s. If the pod + // does not exist in K8s, then we can bail. Alternatively, if the + var obj interface{} + var ok bool +retry: + obj, ok = pc.knownPods.Load(key) + if !ok { + // Blind wait for sync. If we haven't synced yet, that's okay too. + cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced) + + _, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name) + if errors.IsNotFound(err) { + return + } + if err != nil { + l.WithError(err).Error("Received error from pod lister while trying to see if pod exists") + return + } + // err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. Sleep + // and retry for somewhere between 1 and 1000 microseconds. + time.Sleep(time.Microsecond * time.Duration(rand.Intn(1000))) + goto retry + } + + kpod := obj.(*knownPod) + kpod.Lock() + if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) { + kpod.Unlock() + return + } + kpod.lastPodStatusReceivedFromProvider = pod + kpod.Unlock() + q.AddRateLimited(key) } func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) { diff --git a/node/pod_test.go b/node/pod_test.go index f365b589c..5eadd7854 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -53,6 +53,7 @@ func newTestController() *TestController { done: make(chan struct{}), ready: make(chan struct{}), podsInformer: iFactory.Core().V1().Pods(), + podsLister: iFactory.Core().V1().Pods().Lister(), }, mock: p, client: fk8s, diff --git a/node/podcontroller.go b/node/podcontroller.go index 331928471..6a2574398 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -83,7 +83,8 @@ type PodNotifier interface { // fashion. The provided pod's PodStatus should be up to date when // this function is called. // - // NotifyPods will not block callers. + // NotifyPods may block callers. The NotifyPods function may be called + // concurrently. NotifyPods(context.Context, func(*corev1.Pod)) }