Solve the notification on startup pod status notification race condition
This solves the race condition as described in https://github.com/virtual-kubelet/virtual-kubelet/issues/836. It does this by checking two conditions when the possible race condition is detected. If we receive a pod notification from the provider, and it is not in our known pods list: 1. Is our cache in-sync? 2. Is it known to our pod lister? The first case can happen because of the order we start the provider and sync our caches. The second case can happen because even if the cache returns synced, it does not mean all of the call backs on the informer have quiesced. This slightly changes the behaviour of notifyPods to that it can block (especially at startup). We can solve this later by using something like a fair (ticket?) lock.
This commit is contained in:
56
node/pod.go
56
node/pod.go
@@ -16,6 +16,8 @@ package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
pkgerrors "github.com/pkg/errors"
|
||||
@@ -234,21 +236,47 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
|
||||
// enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd
|
||||
// prior to enqueuePodStatusUpdate.
|
||||
func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
|
||||
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
||||
log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key")
|
||||
} else {
|
||||
if obj, ok := pc.knownPods.Load(key); ok {
|
||||
kpod := obj.(*knownPod)
|
||||
kpod.Lock()
|
||||
if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) {
|
||||
kpod.Unlock()
|
||||
return
|
||||
}
|
||||
kpod.lastPodStatusReceivedFromProvider = pod
|
||||
kpod.Unlock()
|
||||
q.AddRateLimited(key)
|
||||
}
|
||||
// TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications from
|
||||
// the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation.
|
||||
l := log.G(ctx).WithField("method", "enqueuePodStatusUpdate")
|
||||
key, err := cache.MetaNamespaceKeyFunc(pod)
|
||||
if err != nil {
|
||||
l.WithError(err).Error("Error getting pod meta namespace key")
|
||||
return
|
||||
}
|
||||
// This doesn't wait for all of the callbacks to finish. We should check if the pod exists in K8s. If the pod
|
||||
// does not exist in K8s, then we can bail. Alternatively, if the
|
||||
var obj interface{}
|
||||
var ok bool
|
||||
retry:
|
||||
obj, ok = pc.knownPods.Load(key)
|
||||
if !ok {
|
||||
// Blind wait for sync. If we haven't synced yet, that's okay too.
|
||||
cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced)
|
||||
|
||||
_, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name)
|
||||
if errors.IsNotFound(err) {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
l.WithError(err).Error("Received error from pod lister while trying to see if pod exists")
|
||||
return
|
||||
}
|
||||
// err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. Sleep
|
||||
// and retry for somewhere between 1 and 1000 microseconds.
|
||||
time.Sleep(time.Microsecond * time.Duration(rand.Intn(1000)))
|
||||
goto retry
|
||||
}
|
||||
|
||||
kpod := obj.(*knownPod)
|
||||
kpod.Lock()
|
||||
if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) {
|
||||
kpod.Unlock()
|
||||
return
|
||||
}
|
||||
kpod.lastPodStatusReceivedFromProvider = pod
|
||||
kpod.Unlock()
|
||||
q.AddRateLimited(key)
|
||||
}
|
||||
|
||||
func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) {
|
||||
|
||||
@@ -53,6 +53,7 @@ func newTestController() *TestController {
|
||||
done: make(chan struct{}),
|
||||
ready: make(chan struct{}),
|
||||
podsInformer: iFactory.Core().V1().Pods(),
|
||||
podsLister: iFactory.Core().V1().Pods().Lister(),
|
||||
},
|
||||
mock: p,
|
||||
client: fk8s,
|
||||
|
||||
@@ -83,7 +83,8 @@ type PodNotifier interface {
|
||||
// fashion. The provided pod's PodStatus should be up to date when
|
||||
// this function is called.
|
||||
//
|
||||
// NotifyPods will not block callers.
|
||||
// NotifyPods may block callers. The NotifyPods function may be called
|
||||
// concurrently.
|
||||
NotifyPods(context.Context, func(*corev1.Pod))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user