From 12625131b5dc187895b95305cb027a4c7added5f Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Tue, 14 Jul 2020 12:01:40 -0700 Subject: [PATCH 1/3] Solve the notification on startup pod status notification race condition This solves the race condition as described in https://github.com/virtual-kubelet/virtual-kubelet/issues/836. It does this by checking two conditions when the possible race condition is detected. If we receive a pod notification from the provider, and it is not in our known pods list: 1. Is our cache in-sync? 2. Is it known to our pod lister? The first case can happen because of the order we start the provider and sync our caches. The second case can happen because even if the cache returns synced, it does not mean all of the call backs on the informer have quiesced. This slightly changes the behaviour of notifyPods to that it can block (especially at startup). We can solve this later by using something like a fair (ticket?) lock. --- node/pod.go | 56 ++++++++++++++++++++++++++++++++----------- node/pod_test.go | 1 + node/podcontroller.go | 3 ++- 3 files changed, 45 insertions(+), 15 deletions(-) diff --git a/node/pod.go b/node/pod.go index e2a720226..b0190935c 100644 --- a/node/pod.go +++ b/node/pod.go @@ -16,6 +16,8 @@ package node import ( "context" + "math/rand" + "time" "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" @@ -234,21 +236,47 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes // enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd // prior to enqueuePodStatusUpdate. func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { - if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { - log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key") - } else { - if obj, ok := pc.knownPods.Load(key); ok { - kpod := obj.(*knownPod) - kpod.Lock() - if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) { - kpod.Unlock() - return - } - kpod.lastPodStatusReceivedFromProvider = pod - kpod.Unlock() - q.AddRateLimited(key) - } + // TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications from + // the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation. + l := log.G(ctx).WithField("method", "enqueuePodStatusUpdate") + key, err := cache.MetaNamespaceKeyFunc(pod) + if err != nil { + l.WithError(err).Error("Error getting pod meta namespace key") + return } + // This doesn't wait for all of the callbacks to finish. We should check if the pod exists in K8s. If the pod + // does not exist in K8s, then we can bail. Alternatively, if the + var obj interface{} + var ok bool +retry: + obj, ok = pc.knownPods.Load(key) + if !ok { + // Blind wait for sync. If we haven't synced yet, that's okay too. + cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced) + + _, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name) + if errors.IsNotFound(err) { + return + } + if err != nil { + l.WithError(err).Error("Received error from pod lister while trying to see if pod exists") + return + } + // err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. Sleep + // and retry for somewhere between 1 and 1000 microseconds. + time.Sleep(time.Microsecond * time.Duration(rand.Intn(1000))) + goto retry + } + + kpod := obj.(*knownPod) + kpod.Lock() + if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) { + kpod.Unlock() + return + } + kpod.lastPodStatusReceivedFromProvider = pod + kpod.Unlock() + q.AddRateLimited(key) } func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) { diff --git a/node/pod_test.go b/node/pod_test.go index f365b589c..5eadd7854 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -53,6 +53,7 @@ func newTestController() *TestController { done: make(chan struct{}), ready: make(chan struct{}), podsInformer: iFactory.Core().V1().Pods(), + podsLister: iFactory.Core().V1().Pods().Lister(), }, mock: p, client: fk8s, diff --git a/node/podcontroller.go b/node/podcontroller.go index 331928471..6a2574398 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -83,7 +83,8 @@ type PodNotifier interface { // fashion. The provided pod's PodStatus should be up to date when // this function is called. // - // NotifyPods will not block callers. + // NotifyPods may block callers. The NotifyPods function may be called + // concurrently. NotifyPods(context.Context, func(*corev1.Pod)) } From 1e9e055e89b6abdd6832a35d72cf525b46c1ea76 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Tue, 14 Jul 2020 18:46:22 -0700 Subject: [PATCH 2/3] Address concerns with PR Also, just use Kubernetes waiter library. --- node/pod.go | 62 ++++++++++++++++++++++++++++--------------- node/podcontroller.go | 4 +-- 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/node/pod.go b/node/pod.go index b0190935c..3a93aba0b 100644 --- a/node/pod.go +++ b/node/pod.go @@ -16,7 +16,7 @@ package node import ( "context" - "math/rand" + "fmt" "time" "github.com/google/go-cmp/cmp" @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -236,36 +237,55 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes // enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd // prior to enqueuePodStatusUpdate. func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { - // TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications from - // the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation. - l := log.G(ctx).WithField("method", "enqueuePodStatusUpdate") + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate") + defer span.End() + ctx = span.WithField(ctx, "method", "enqueuePodStatusUpdate") + + // TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications + // from the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation. key, err := cache.MetaNamespaceKeyFunc(pod) if err != nil { - l.WithError(err).Error("Error getting pod meta namespace key") + log.G(ctx).WithError(err).Error("Error getting pod meta namespace key") + span.SetStatus(err) return } - // This doesn't wait for all of the callbacks to finish. We should check if the pod exists in K8s. If the pod - // does not exist in K8s, then we can bail. Alternatively, if the + ctx = span.WithField(ctx, "key", key) + var obj interface{} - var ok bool -retry: - obj, ok = pc.knownPods.Load(key) - if !ok { - // Blind wait for sync. If we haven't synced yet, that's okay too. + // 151 milliseconds is arbitrarily chosen here as a small prime number. + err = wait.PollImmediateUntil(151*time.Millisecond, func() (bool, error) { + var ok bool + obj, ok = pc.knownPods.Load(key) + if ok { + return true, nil + } + + // Blind sync. Ignore return code. cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced) _, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name) - if errors.IsNotFound(err) { - return - } if err != nil { - l.WithError(err).Error("Received error from pod lister while trying to see if pod exists") - return + return false, err } - // err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. Sleep - // and retry for somewhere between 1 and 1000 microseconds. - time.Sleep(time.Microsecond * time.Duration(rand.Intn(1000))) - goto retry + + // err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. This likely means + // that we're in some kind of startup synchronization issue where the provider knows about a pod (as it performs + // recover, that we do not yet know about). + return false, nil + }, ctx.Done()) + + if err != nil { + if errors.IsNotFound(err) { + err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err) + log.G(ctx).WithError(err).Warn("Not enqueuing pod status update") + } else { + log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister") + } + span.SetStatus(err) + return } kpod := obj.(*knownPod) diff --git a/node/podcontroller.go b/node/podcontroller.go index 6a2574398..fe4902271 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -83,8 +83,8 @@ type PodNotifier interface { // fashion. The provided pod's PodStatus should be up to date when // this function is called. // - // NotifyPods may block callers. The NotifyPods function may be called - // concurrently. + // NotifyPods must not block the caller since it is only used to register the callback. + // The callback passed into `NotifyPods` may block when called. NotifyPods(context.Context, func(*corev1.Pod)) } From 4258c46746cc40a89c08630433b100f151dea942 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 22 Jul 2020 18:56:05 -0700 Subject: [PATCH 3/3] Enhance / cleanup enqueuePodStatusUpdate polling in retry loop --- node/pod.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/node/pod.go b/node/pod.go index 3a93aba0b..7fa15f3af 100644 --- a/node/pod.go +++ b/node/pod.go @@ -39,6 +39,10 @@ const ( podEventDeleteSuccess = "ProviderDeleteSuccess" podEventUpdateFailed = "ProviderUpdateFailed" podEventUpdateSuccess = "ProviderUpdateSuccess" + + // 151 milliseconds is just chosen as a small prime number to retry between + // attempts to get a notification from the provider to VK + notificationRetryPeriod = 151 * time.Millisecond ) func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { @@ -237,7 +241,7 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes // enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd // prior to enqueuePodStatusUpdate. func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*maxRetries) defer cancel() ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate") @@ -255,17 +259,23 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue ctx = span.WithField(ctx, "key", key) var obj interface{} - // 151 milliseconds is arbitrarily chosen here as a small prime number. - err = wait.PollImmediateUntil(151*time.Millisecond, func() (bool, error) { + err = wait.PollImmediateUntil(notificationRetryPeriod, func() (bool, error) { var ok bool obj, ok = pc.knownPods.Load(key) if ok { return true, nil } - // Blind sync. Ignore return code. - cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced) + // Blind sync. Partial sync is better than nothing. If this returns false, the poll loop should not be invoked + // again as it means the context has timed out. + if !cache.WaitForNamedCacheSync("enqueuePodStatusUpdate", ctx.Done(), pc.podsInformer.Informer().HasSynced) { + log.G(ctx).Warn("enqueuePodStatusUpdate proceeding with unsynced cache") + } + // The only transient error that pod lister returns is not found. The only case where not found + // should happen, and the pod *actually* exists is the above -- where we haven't been able to finish sync + // before context times out. + // The other class of errors is non-transient _, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name) if err != nil { return false, err @@ -280,7 +290,7 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue if err != nil { if errors.IsNotFound(err) { err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err) - log.G(ctx).WithError(err).Warn("Not enqueuing pod status update") + log.G(ctx).WithError(err).Debug("Not enqueuing pod status update") } else { log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister") }