diff --git a/node/pod.go b/node/pod.go index e2a720226..7fa15f3af 100644 --- a/node/pod.go +++ b/node/pod.go @@ -16,6 +16,8 @@ package node import ( "context" + "fmt" + "time" "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" @@ -24,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -36,6 +39,10 @@ const ( podEventDeleteSuccess = "ProviderDeleteSuccess" podEventUpdateFailed = "ProviderUpdateFailed" podEventUpdateSuccess = "ProviderUpdateSuccess" + + // 151 milliseconds is just chosen as a small prime number to retry between + // attempts to get a notification from the provider to VK + notificationRetryPeriod = 151 * time.Millisecond ) func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { @@ -234,21 +241,72 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes // enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd // prior to enqueuePodStatusUpdate. func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { - if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { - log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key") - } else { - if obj, ok := pc.knownPods.Load(key); ok { - kpod := obj.(*knownPod) - kpod.Lock() - if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) { - kpod.Unlock() - return - } - kpod.lastPodStatusReceivedFromProvider = pod - kpod.Unlock() - q.AddRateLimited(key) - } + ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*maxRetries) + defer cancel() + + ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate") + defer span.End() + ctx = span.WithField(ctx, "method", "enqueuePodStatusUpdate") + + // TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications + // from the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation. + key, err := cache.MetaNamespaceKeyFunc(pod) + if err != nil { + log.G(ctx).WithError(err).Error("Error getting pod meta namespace key") + span.SetStatus(err) + return } + ctx = span.WithField(ctx, "key", key) + + var obj interface{} + err = wait.PollImmediateUntil(notificationRetryPeriod, func() (bool, error) { + var ok bool + obj, ok = pc.knownPods.Load(key) + if ok { + return true, nil + } + + // Blind sync. Partial sync is better than nothing. If this returns false, the poll loop should not be invoked + // again as it means the context has timed out. + if !cache.WaitForNamedCacheSync("enqueuePodStatusUpdate", ctx.Done(), pc.podsInformer.Informer().HasSynced) { + log.G(ctx).Warn("enqueuePodStatusUpdate proceeding with unsynced cache") + } + + // The only transient error that pod lister returns is not found. The only case where not found + // should happen, and the pod *actually* exists is the above -- where we haven't been able to finish sync + // before context times out. + // The other class of errors is non-transient + _, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name) + if err != nil { + return false, err + } + + // err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. This likely means + // that we're in some kind of startup synchronization issue where the provider knows about a pod (as it performs + // recover, that we do not yet know about). + return false, nil + }, ctx.Done()) + + if err != nil { + if errors.IsNotFound(err) { + err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err) + log.G(ctx).WithError(err).Debug("Not enqueuing pod status update") + } else { + log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister") + } + span.SetStatus(err) + return + } + + kpod := obj.(*knownPod) + kpod.Lock() + if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) { + kpod.Unlock() + return + } + kpod.lastPodStatusReceivedFromProvider = pod + kpod.Unlock() + q.AddRateLimited(key) } func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) { diff --git a/node/pod_test.go b/node/pod_test.go index f365b589c..5eadd7854 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -53,6 +53,7 @@ func newTestController() *TestController { done: make(chan struct{}), ready: make(chan struct{}), podsInformer: iFactory.Core().V1().Pods(), + podsLister: iFactory.Core().V1().Pods().Lister(), }, mock: p, client: fk8s, diff --git a/node/podcontroller.go b/node/podcontroller.go index 331928471..fe4902271 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -83,7 +83,8 @@ type PodNotifier interface { // fashion. The provided pod's PodStatus should be up to date when // this function is called. // - // NotifyPods will not block callers. + // NotifyPods must not block the caller since it is only used to register the callback. + // The callback passed into `NotifyPods` may block when called. NotifyPods(context.Context, func(*corev1.Pod)) }