diff --git a/node/podcontroller.go b/node/podcontroller.go index 99a641d13..d70e90360 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -20,7 +20,6 @@ import ( "reflect" "strconv" "sync" - "time" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/errdefs" @@ -29,7 +28,6 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/util/wait" corev1informers "k8s.io/client-go/informers/core/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" @@ -203,10 +201,13 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { // Run will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. It will block until the // context is cancelled, at which point it will shutdown the work queue and -// wait for workers to finish processing their current work items. +// wait for workers to finish processing their current work items prior to +// returning. // // Once this returns, you should not re-use the controller. func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) { + // Shutdowns are idempotent, so we can call it multiple times. This is in case we have to bail out early for some reason. + defer func() { pc.k8sQ.ShutDown() @@ -273,13 +274,25 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er pc.deleteDanglingPods(ctx, podSyncWorkers) log.G(ctx).Info("starting workers") - pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers) + wg := sync.WaitGroup{} + + // Use the worker's "index" as its ID so we can use it for tracing. for id := 0; id < podSyncWorkers; id++ { + wg.Add(1) workerID := strconv.Itoa(id) - go wait.Until(func() { - // Use the worker's "index" as its ID so we can use it for tracing. - pc.runWorker(ctx, workerID, pc.k8sQ) - }, time.Second, ctx.Done()) + go func() { + defer wg.Done() + pc.runSyncPodStatusFromProviderWorker(ctx, workerID, podStatusQueue) + }() + } + + for id := 0; id < podSyncWorkers; id++ { + wg.Add(1) + workerID := strconv.Itoa(id) + go func() { + defer wg.Done() + pc.runSyncPodsFromKubernetesWorker(ctx, workerID, pc.k8sQ) + }() } close(pc.ready) @@ -287,7 +300,10 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er log.G(ctx).Info("started workers") <-ctx.Done() log.G(ctx).Info("shutting down workers") + pc.k8sQ.ShutDown() + podStatusQueue.ShutDown() + wg.Wait() return nil } @@ -311,8 +327,9 @@ func (pc *PodController) Err() error { return pc.err } -// runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue. -func (pc *PodController) runWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { +// runSyncPodsFromKubernetesWorker is a long-running function that will continually call the processNextWorkItem function +// in order to read and process an item on the work queue that is generated by the pod informer. +func (pc *PodController) runSyncPodsFromKubernetesWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { for pc.processNextWorkItem(ctx, workerID, q) { } } diff --git a/node/queue.go b/node/queue.go index 891ef2edd..540d6a4ad 100644 --- a/node/queue.go +++ b/node/queue.go @@ -16,7 +16,6 @@ package node import ( "context" - "strconv" "time" pkgerrors "github.com/pkg/errors" @@ -92,16 +91,7 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han return true } -func (pc *PodController) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface, numWorkers int) { - for i := 0; i < numWorkers; i++ { - go func(index int) { - workerID := strconv.Itoa(index) - pc.runProviderSyncWorker(ctx, workerID, q) - }(i) - } -} - -func (pc *PodController) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { +func (pc *PodController) runSyncPodStatusFromProviderWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { for pc.processPodStatusUpdate(ctx, workerID, q) { } }