Merge pull request #851 from virtual-kubelet/race-condition-2nd
This commit is contained in:
86
node/pod.go
86
node/pod.go
@@ -16,6 +16,8 @@ package node
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
pkgerrors "github.com/pkg/errors"
|
pkgerrors "github.com/pkg/errors"
|
||||||
@@ -24,6 +26,7 @@ import (
|
|||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
)
|
)
|
||||||
@@ -36,6 +39,10 @@ const (
|
|||||||
podEventDeleteSuccess = "ProviderDeleteSuccess"
|
podEventDeleteSuccess = "ProviderDeleteSuccess"
|
||||||
podEventUpdateFailed = "ProviderUpdateFailed"
|
podEventUpdateFailed = "ProviderUpdateFailed"
|
||||||
podEventUpdateSuccess = "ProviderUpdateSuccess"
|
podEventUpdateSuccess = "ProviderUpdateSuccess"
|
||||||
|
|
||||||
|
// 151 milliseconds is just chosen as a small prime number to retry between
|
||||||
|
// attempts to get a notification from the provider to VK
|
||||||
|
notificationRetryPeriod = 151 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
|
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
|
||||||
@@ -234,21 +241,72 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
|
|||||||
// enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd
|
// enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd
|
||||||
// prior to enqueuePodStatusUpdate.
|
// prior to enqueuePodStatusUpdate.
|
||||||
func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
|
func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
|
||||||
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*maxRetries)
|
||||||
log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key")
|
defer cancel()
|
||||||
} else {
|
|
||||||
if obj, ok := pc.knownPods.Load(key); ok {
|
ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate")
|
||||||
kpod := obj.(*knownPod)
|
defer span.End()
|
||||||
kpod.Lock()
|
ctx = span.WithField(ctx, "method", "enqueuePodStatusUpdate")
|
||||||
if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) {
|
|
||||||
kpod.Unlock()
|
// TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications
|
||||||
return
|
// from the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation.
|
||||||
}
|
key, err := cache.MetaNamespaceKeyFunc(pod)
|
||||||
kpod.lastPodStatusReceivedFromProvider = pod
|
if err != nil {
|
||||||
kpod.Unlock()
|
log.G(ctx).WithError(err).Error("Error getting pod meta namespace key")
|
||||||
q.AddRateLimited(key)
|
span.SetStatus(err)
|
||||||
}
|
return
|
||||||
}
|
}
|
||||||
|
ctx = span.WithField(ctx, "key", key)
|
||||||
|
|
||||||
|
var obj interface{}
|
||||||
|
err = wait.PollImmediateUntil(notificationRetryPeriod, func() (bool, error) {
|
||||||
|
var ok bool
|
||||||
|
obj, ok = pc.knownPods.Load(key)
|
||||||
|
if ok {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Blind sync. Partial sync is better than nothing. If this returns false, the poll loop should not be invoked
|
||||||
|
// again as it means the context has timed out.
|
||||||
|
if !cache.WaitForNamedCacheSync("enqueuePodStatusUpdate", ctx.Done(), pc.podsInformer.Informer().HasSynced) {
|
||||||
|
log.G(ctx).Warn("enqueuePodStatusUpdate proceeding with unsynced cache")
|
||||||
|
}
|
||||||
|
|
||||||
|
// The only transient error that pod lister returns is not found. The only case where not found
|
||||||
|
// should happen, and the pod *actually* exists is the above -- where we haven't been able to finish sync
|
||||||
|
// before context times out.
|
||||||
|
// The other class of errors is non-transient
|
||||||
|
_, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. This likely means
|
||||||
|
// that we're in some kind of startup synchronization issue where the provider knows about a pod (as it performs
|
||||||
|
// recover, that we do not yet know about).
|
||||||
|
return false, nil
|
||||||
|
}, ctx.Done())
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
|
err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err)
|
||||||
|
log.G(ctx).WithError(err).Debug("Not enqueuing pod status update")
|
||||||
|
} else {
|
||||||
|
log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister")
|
||||||
|
}
|
||||||
|
span.SetStatus(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
kpod := obj.(*knownPod)
|
||||||
|
kpod.Lock()
|
||||||
|
if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) {
|
||||||
|
kpod.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
kpod.lastPodStatusReceivedFromProvider = pod
|
||||||
|
kpod.Unlock()
|
||||||
|
q.AddRateLimited(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) {
|
func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) {
|
||||||
|
|||||||
@@ -53,6 +53,7 @@ func newTestController() *TestController {
|
|||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
ready: make(chan struct{}),
|
ready: make(chan struct{}),
|
||||||
podsInformer: iFactory.Core().V1().Pods(),
|
podsInformer: iFactory.Core().V1().Pods(),
|
||||||
|
podsLister: iFactory.Core().V1().Pods().Lister(),
|
||||||
},
|
},
|
||||||
mock: p,
|
mock: p,
|
||||||
client: fk8s,
|
client: fk8s,
|
||||||
|
|||||||
@@ -83,7 +83,8 @@ type PodNotifier interface {
|
|||||||
// fashion. The provided pod's PodStatus should be up to date when
|
// fashion. The provided pod's PodStatus should be up to date when
|
||||||
// this function is called.
|
// this function is called.
|
||||||
//
|
//
|
||||||
// NotifyPods will not block callers.
|
// NotifyPods must not block the caller since it is only used to register the callback.
|
||||||
|
// The callback passed into `NotifyPods` may block when called.
|
||||||
NotifyPods(context.Context, func(*corev1.Pod))
|
NotifyPods(context.Context, func(*corev1.Pod))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user