diff --git a/node/podcontroller.go b/node/podcontroller.go index 434df976d..ca788cb0f 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -173,11 +173,31 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"), } - // Set up event handlers for when Pod resources change. + return pc, nil +} + +// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. +// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. +func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { + defer pc.k8sQ.ShutDown() + + podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") + pc.runSyncFromProvider(ctx, podStatusQueue) + defer podStatusQueue.ShutDown() + + // Wait for the caches to be synced *before* starting to do work. + if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { + return pkgerrors.New("failed to wait for caches to sync") + } + log.G(ctx).Info("Pod cache in-sync") + + // Set up event handlers for when Pod resources change. Since the pod cache is in-sync, the informer will generate + // synthetic add events at this point. It again avoids the race condition of adding handlers while the cache is + // syncing. pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(pod interface{}) { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { - log.L.Error(err) + log.G(ctx).Error(err) } else { pc.k8sQ.AddRateLimited(key) } @@ -196,45 +216,27 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { } // At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod. if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { - log.L.Error(err) + log.G(ctx).Error(err) } else { pc.k8sQ.AddRateLimited(key) } }, DeleteFunc: func(pod interface{}) { if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { - log.L.Error(err) + log.G(ctx).Error(err) } else { pc.k8sQ.AddRateLimited(key) } }, }) - return pc, nil -} - -// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. -// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. -func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { - defer pc.k8sQ.ShutDown() - - podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") - pc.runSyncFromProvider(ctx, podStatusQueue) - pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers) - defer podStatusQueue.ShutDown() - - // Wait for the caches to be synced *before* starting workers. - if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { - return pkgerrors.New("failed to wait for caches to sync") - } - log.G(ctx).Info("Pod cache in-sync") - // Perform a reconciliation step that deletes any dangling pods from the provider. // This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis. // If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried. pc.deleteDanglingPods(ctx, podSyncWorkers) log.G(ctx).Info("starting workers") + pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers) for id := 0; id < podSyncWorkers; id++ { workerID := strconv.Itoa(id) go wait.Until(func() {