Merge pull request #763 from sargun/wait-for-worker-shutdown-v2

Wait for Workers to exit prior to returning from PodController.Run
This commit is contained in:
Brian Goff
2019-09-12 14:33:59 -07:00
committed by GitHub
2 changed files with 28 additions and 21 deletions

View File

@@ -20,7 +20,6 @@ import (
"reflect"
"strconv"
"sync"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
@@ -29,7 +28,6 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
@@ -203,10 +201,13 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until the
// context is cancelled, at which point it will shutdown the work queue and
// wait for workers to finish processing their current work items.
// wait for workers to finish processing their current work items prior to
// returning.
//
// Once this returns, you should not re-use the controller.
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) {
// Shutdowns are idempotent, so we can call it multiple times. This is in case we have to bail out early for some reason.
defer func() {
pc.k8sQ.ShutDown()
@@ -273,13 +274,25 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
pc.deleteDanglingPods(ctx, podSyncWorkers)
log.G(ctx).Info("starting workers")
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
wg := sync.WaitGroup{}
// Use the worker's "index" as its ID so we can use it for tracing.
for id := 0; id < podSyncWorkers; id++ {
wg.Add(1)
workerID := strconv.Itoa(id)
go wait.Until(func() {
// Use the worker's "index" as its ID so we can use it for tracing.
pc.runWorker(ctx, workerID, pc.k8sQ)
}, time.Second, ctx.Done())
go func() {
defer wg.Done()
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, podStatusQueue)
}()
}
for id := 0; id < podSyncWorkers; id++ {
wg.Add(1)
workerID := strconv.Itoa(id)
go func() {
defer wg.Done()
pc.runSyncPodsFromKubernetesWorker(ctx, workerID, pc.k8sQ)
}()
}
close(pc.ready)
@@ -287,7 +300,10 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
log.G(ctx).Info("started workers")
<-ctx.Done()
log.G(ctx).Info("shutting down workers")
pc.k8sQ.ShutDown()
podStatusQueue.ShutDown()
wg.Wait()
return nil
}
@@ -311,8 +327,9 @@ func (pc *PodController) Err() error {
return pc.err
}
// runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue.
func (pc *PodController) runWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
// runSyncPodsFromKubernetesWorker is a long-running function that will continually call the processNextWorkItem function
// in order to read and process an item on the work queue that is generated by the pod informer.
func (pc *PodController) runSyncPodsFromKubernetesWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processNextWorkItem(ctx, workerID, q) {
}
}

View File

@@ -16,7 +16,6 @@ package node
import (
"context"
"strconv"
"time"
pkgerrors "github.com/pkg/errors"
@@ -92,16 +91,7 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han
return true
}
func (pc *PodController) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface, numWorkers int) {
for i := 0; i < numWorkers; i++ {
go func(index int) {
workerID := strconv.Itoa(index)
pc.runProviderSyncWorker(ctx, workerID, q)
}(i)
}
}
func (pc *PodController) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
func (pc *PodController) runSyncPodStatusFromProviderWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processPodStatusUpdate(ctx, workerID, q) {
}
}