diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index c7bd555d5..baaa99eca 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -102,9 +102,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() serviceInformer := scmInformerFactory.Core().V1().Services() - go podInformerFactory.Start(ctx.Done()) - go scmInformerFactory.Start(ctx.Done()) - rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister()) if err != nil { return errors.Wrap(err, "could not create resource manager") @@ -194,6 +191,9 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return errors.Wrap(err, "error setting up pod controller") } + go podInformerFactory.Start(ctx.Done()) + go scmInformerFactory.Start(ctx.Done()) + cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig) if err != nil { return err diff --git a/node/podcontroller.go b/node/podcontroller.go index fdbb89734..82a69550b 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -94,6 +94,8 @@ type PodController struct { client corev1client.PodsGetter resourceManager *manager.ResourceManager + + k8sQ workqueue.RateLimitingInterface } // PodControllerConfig is used to configure a new PodController. @@ -147,7 +149,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { return nil, pkgerrors.Wrap(err, "could not create resource manager") } - return &PodController{ + pc := &PodController{ client: cfg.PodClient, podsInformer: cfg.PodInformer, podsLister: cfg.PodInformer.Lister(), @@ -155,19 +157,8 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { resourceManager: rm, ready: make(chan struct{}), recorder: cfg.EventRecorder, - }, nil -} - -// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. -// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. -func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { - k8sQ := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes") - defer k8sQ.ShutDown() - - podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") - pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers) - pc.runSyncFromProvider(ctx, podStatusQueue) - defer podStatusQueue.ShutDown() + k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"), + } // Set up event handlers for when Pod resources change. pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -175,7 +166,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { log.L.Error(err) } else { - k8sQ.AddRateLimited(key) + pc.k8sQ.AddRateLimited(key) } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -194,18 +185,31 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { log.L.Error(err) } else { - k8sQ.AddRateLimited(key) + pc.k8sQ.AddRateLimited(key) } }, DeleteFunc: func(pod interface{}) { if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { log.L.Error(err) } else { - k8sQ.AddRateLimited(key) + pc.k8sQ.AddRateLimited(key) } }, }) + return pc, nil +} + +// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. +// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. +func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { + defer pc.k8sQ.ShutDown() + + podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") + pc.runSyncFromProvider(ctx, podStatusQueue) + pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers) + defer podStatusQueue.ShutDown() + // Wait for the caches to be synced *before* starting workers. if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { return pkgerrors.New("failed to wait for caches to sync") @@ -221,7 +225,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { for id := 0; id < podSyncWorkers; id++ { go wait.Until(func() { // Use the worker's "index" as its ID so we can use it for tracing. - pc.runWorker(ctx, strconv.Itoa(id), k8sQ) + pc.runWorker(ctx, strconv.Itoa(id), pc.k8sQ) }, time.Second, ctx.Done()) }