diff --git a/node/pod.go b/node/pod.go index 3a93aba0b..7fa15f3af 100644 --- a/node/pod.go +++ b/node/pod.go @@ -39,6 +39,10 @@ const ( podEventDeleteSuccess = "ProviderDeleteSuccess" podEventUpdateFailed = "ProviderUpdateFailed" podEventUpdateSuccess = "ProviderUpdateSuccess" + + // 151 milliseconds is just chosen as a small prime number to retry between + // attempts to get a notification from the provider to VK + notificationRetryPeriod = 151 * time.Millisecond ) func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { @@ -237,7 +241,7 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes // enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd // prior to enqueuePodStatusUpdate. func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*maxRetries) defer cancel() ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate") @@ -255,17 +259,23 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue ctx = span.WithField(ctx, "key", key) var obj interface{} - // 151 milliseconds is arbitrarily chosen here as a small prime number. - err = wait.PollImmediateUntil(151*time.Millisecond, func() (bool, error) { + err = wait.PollImmediateUntil(notificationRetryPeriod, func() (bool, error) { var ok bool obj, ok = pc.knownPods.Load(key) if ok { return true, nil } - // Blind sync. Ignore return code. - cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced) + // Blind sync. Partial sync is better than nothing. If this returns false, the poll loop should not be invoked + // again as it means the context has timed out. + if !cache.WaitForNamedCacheSync("enqueuePodStatusUpdate", ctx.Done(), pc.podsInformer.Informer().HasSynced) { + log.G(ctx).Warn("enqueuePodStatusUpdate proceeding with unsynced cache") + } + // The only transient error that pod lister returns is not found. The only case where not found + // should happen, and the pod *actually* exists is the above -- where we haven't been able to finish sync + // before context times out. + // The other class of errors is non-transient _, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name) if err != nil { return false, err @@ -280,7 +290,7 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue if err != nil { if errors.IsNotFound(err) { err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err) - log.G(ctx).WithError(err).Warn("Not enqueuing pod status update") + log.G(ctx).WithError(err).Debug("Not enqueuing pod status update") } else { log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister") }