From 4d60fc20493829b579bc0c07458ad40e337e7530 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Fri, 26 Jul 2019 13:38:06 -0700 Subject: [PATCH] Setup event handler at Pod Controller creation time This seems to avoid a race conditions where at pod informer startup time, the reactor doesn't properly get setup. It also refactors the root command example to start up the informers after everything is wired up. --- .../internal/commands/root/root.go | 6 +-- node/podcontroller.go | 40 ++++++++++--------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index c7bd555d5..baaa99eca 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -102,9 +102,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() serviceInformer := scmInformerFactory.Core().V1().Services() - go podInformerFactory.Start(ctx.Done()) - go scmInformerFactory.Start(ctx.Done()) - rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister()) if err != nil { return errors.Wrap(err, "could not create resource manager") @@ -194,6 +191,9 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return errors.Wrap(err, "error setting up pod controller") } + go podInformerFactory.Start(ctx.Done()) + go scmInformerFactory.Start(ctx.Done()) + cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig) if err != nil { return err diff --git a/node/podcontroller.go b/node/podcontroller.go index fdbb89734..82a69550b 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -94,6 +94,8 @@ type PodController struct { client corev1client.PodsGetter resourceManager *manager.ResourceManager + + k8sQ workqueue.RateLimitingInterface } // PodControllerConfig is used to configure a new PodController. @@ -147,7 +149,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { return nil, pkgerrors.Wrap(err, "could not create resource manager") } - return &PodController{ + pc := &PodController{ client: cfg.PodClient, podsInformer: cfg.PodInformer, podsLister: cfg.PodInformer.Lister(), @@ -155,19 +157,8 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { resourceManager: rm, ready: make(chan struct{}), recorder: cfg.EventRecorder, - }, nil -} - -// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. -// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. -func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { - k8sQ := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes") - defer k8sQ.ShutDown() - - podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") - pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers) - pc.runSyncFromProvider(ctx, podStatusQueue) - defer podStatusQueue.ShutDown() + k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"), + } // Set up event handlers for when Pod resources change. pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -175,7 +166,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { log.L.Error(err) } else { - k8sQ.AddRateLimited(key) + pc.k8sQ.AddRateLimited(key) } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -194,18 +185,31 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { log.L.Error(err) } else { - k8sQ.AddRateLimited(key) + pc.k8sQ.AddRateLimited(key) } }, DeleteFunc: func(pod interface{}) { if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { log.L.Error(err) } else { - k8sQ.AddRateLimited(key) + pc.k8sQ.AddRateLimited(key) } }, }) + return pc, nil +} + +// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. +// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. +func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { + defer pc.k8sQ.ShutDown() + + podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") + pc.runSyncFromProvider(ctx, podStatusQueue) + pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers) + defer podStatusQueue.ShutDown() + // Wait for the caches to be synced *before* starting workers. if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { return pkgerrors.New("failed to wait for caches to sync") @@ -221,7 +225,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { for id := 0; id < podSyncWorkers; id++ { go wait.Until(func() { // Use the worker's "index" as its ID so we can use it for tracing. - pc.runWorker(ctx, strconv.Itoa(id), k8sQ) + pc.runWorker(ctx, strconv.Itoa(id), pc.k8sQ) }, time.Second, ctx.Done()) }