Address concerns with PR

Also, just use Kubernetes waiter library.
This commit is contained in:
Sargun Dhillon
2020-07-14 18:46:22 -07:00
parent 12625131b5
commit 1e9e055e89
2 changed files with 43 additions and 23 deletions

View File

@@ -16,7 +16,7 @@ package node
import ( import (
"context" "context"
"math/rand" "fmt"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
@@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
@@ -236,36 +237,55 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
// enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd // enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd
// prior to enqueuePodStatusUpdate. // prior to enqueuePodStatusUpdate.
func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
// TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications from ctx, cancel := context.WithCancel(ctx)
// the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation. defer cancel()
l := log.G(ctx).WithField("method", "enqueuePodStatusUpdate")
ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate")
defer span.End()
ctx = span.WithField(ctx, "method", "enqueuePodStatusUpdate")
// TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications
// from the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation.
key, err := cache.MetaNamespaceKeyFunc(pod) key, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil { if err != nil {
l.WithError(err).Error("Error getting pod meta namespace key") log.G(ctx).WithError(err).Error("Error getting pod meta namespace key")
span.SetStatus(err)
return return
} }
// This doesn't wait for all of the callbacks to finish. We should check if the pod exists in K8s. If the pod ctx = span.WithField(ctx, "key", key)
// does not exist in K8s, then we can bail. Alternatively, if the
var obj interface{} var obj interface{}
var ok bool // 151 milliseconds is arbitrarily chosen here as a small prime number.
retry: err = wait.PollImmediateUntil(151*time.Millisecond, func() (bool, error) {
obj, ok = pc.knownPods.Load(key) var ok bool
if !ok { obj, ok = pc.knownPods.Load(key)
// Blind wait for sync. If we haven't synced yet, that's okay too. if ok {
return true, nil
}
// Blind sync. Ignore return code.
cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced) cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced)
_, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name) _, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name)
if errors.IsNotFound(err) {
return
}
if err != nil { if err != nil {
l.WithError(err).Error("Received error from pod lister while trying to see if pod exists") return false, err
return
} }
// err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. Sleep
// and retry for somewhere between 1 and 1000 microseconds. // err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. This likely means
time.Sleep(time.Microsecond * time.Duration(rand.Intn(1000))) // that we're in some kind of startup synchronization issue where the provider knows about a pod (as it performs
goto retry // recover, that we do not yet know about).
return false, nil
}, ctx.Done())
if err != nil {
if errors.IsNotFound(err) {
err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err)
log.G(ctx).WithError(err).Warn("Not enqueuing pod status update")
} else {
log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister")
}
span.SetStatus(err)
return
} }
kpod := obj.(*knownPod) kpod := obj.(*knownPod)

View File

@@ -83,8 +83,8 @@ type PodNotifier interface {
// fashion. The provided pod's PodStatus should be up to date when // fashion. The provided pod's PodStatus should be up to date when
// this function is called. // this function is called.
// //
// NotifyPods may block callers. The NotifyPods function may be called // NotifyPods must not block the caller since it is only used to register the callback.
// concurrently. // The callback passed into `NotifyPods` may block when called.
NotifyPods(context.Context, func(*corev1.Pod)) NotifyPods(context.Context, func(*corev1.Pod))
} }