Enhance / cleanup enqueuePodStatusUpdate polling in retry loop

This commit is contained in:
Sargun Dhillon
2020-07-22 18:56:05 -07:00
parent 1e9e055e89
commit 4258c46746

View File

@@ -39,6 +39,10 @@ const (
podEventDeleteSuccess = "ProviderDeleteSuccess" podEventDeleteSuccess = "ProviderDeleteSuccess"
podEventUpdateFailed = "ProviderUpdateFailed" podEventUpdateFailed = "ProviderUpdateFailed"
podEventUpdateSuccess = "ProviderUpdateSuccess" podEventUpdateSuccess = "ProviderUpdateSuccess"
// 151 milliseconds is just chosen as a small prime number to retry between
// attempts to get a notification from the provider to VK
notificationRetryPeriod = 151 * time.Millisecond
) )
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
@@ -237,7 +241,7 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
// enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd // enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd
// prior to enqueuePodStatusUpdate. // prior to enqueuePodStatusUpdate.
func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*maxRetries)
defer cancel() defer cancel()
ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate") ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate")
@@ -255,17 +259,23 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue
ctx = span.WithField(ctx, "key", key) ctx = span.WithField(ctx, "key", key)
var obj interface{} var obj interface{}
// 151 milliseconds is arbitrarily chosen here as a small prime number. err = wait.PollImmediateUntil(notificationRetryPeriod, func() (bool, error) {
err = wait.PollImmediateUntil(151*time.Millisecond, func() (bool, error) {
var ok bool var ok bool
obj, ok = pc.knownPods.Load(key) obj, ok = pc.knownPods.Load(key)
if ok { if ok {
return true, nil return true, nil
} }
// Blind sync. Ignore return code. // Blind sync. Partial sync is better than nothing. If this returns false, the poll loop should not be invoked
cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced) // again as it means the context has timed out.
if !cache.WaitForNamedCacheSync("enqueuePodStatusUpdate", ctx.Done(), pc.podsInformer.Informer().HasSynced) {
log.G(ctx).Warn("enqueuePodStatusUpdate proceeding with unsynced cache")
}
// The only transient error that pod lister returns is not found. The only case where not found
// should happen, and the pod *actually* exists is the above -- where we haven't been able to finish sync
// before context times out.
// The other class of errors is non-transient
_, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name) _, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name)
if err != nil { if err != nil {
return false, err return false, err
@@ -280,7 +290,7 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err) err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err)
log.G(ctx).WithError(err).Warn("Not enqueuing pod status update") log.G(ctx).WithError(err).Debug("Not enqueuing pod status update")
} else { } else {
log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister") log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister")
} }