From ccb6713b86fc6d8a3ae915b154d81abeb3ef8123 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Tue, 13 Aug 2019 11:08:15 -0700 Subject: [PATCH] Move location of eventhandler registration This moves the event handler registration until after the cache is in-sync. It makes it so we can use the log object from the context, rather than having to use the global logger The cache race condition of the cache starting while the reactor is being added wont exist because we wait for the cache to startup / go in sync prior to adding it. --- node/podcontroller.go | 48 ++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/node/podcontroller.go b/node/podcontroller.go index 434df976d..ca788cb0f 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -173,11 +173,31 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"), } - // Set up event handlers for when Pod resources change. + return pc, nil +} + +// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. +// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. +func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { + defer pc.k8sQ.ShutDown() + + podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") + pc.runSyncFromProvider(ctx, podStatusQueue) + defer podStatusQueue.ShutDown() + + // Wait for the caches to be synced *before* starting to do work. + if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { + return pkgerrors.New("failed to wait for caches to sync") + } + log.G(ctx).Info("Pod cache in-sync") + + // Set up event handlers for when Pod resources change. Since the pod cache is in-sync, the informer will generate + // synthetic add events at this point. It again avoids the race condition of adding handlers while the cache is + // syncing. pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(pod interface{}) { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { - log.L.Error(err) + log.G(ctx).Error(err) } else { pc.k8sQ.AddRateLimited(key) } @@ -196,45 +216,27 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { } // At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod. if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { - log.L.Error(err) + log.G(ctx).Error(err) } else { pc.k8sQ.AddRateLimited(key) } }, DeleteFunc: func(pod interface{}) { if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { - log.L.Error(err) + log.G(ctx).Error(err) } else { pc.k8sQ.AddRateLimited(key) } }, }) - return pc, nil -} - -// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. -// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. -func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { - defer pc.k8sQ.ShutDown() - - podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") - pc.runSyncFromProvider(ctx, podStatusQueue) - pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers) - defer podStatusQueue.ShutDown() - - // Wait for the caches to be synced *before* starting workers. - if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { - return pkgerrors.New("failed to wait for caches to sync") - } - log.G(ctx).Info("Pod cache in-sync") - // Perform a reconciliation step that deletes any dangling pods from the provider. // This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis. // If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried. pc.deleteDanglingPods(ctx, podSyncWorkers) log.G(ctx).Info("starting workers") + pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers) for id := 0; id < podSyncWorkers; id++ { workerID := strconv.Itoa(id) go wait.Until(func() {