Wait for Workers to exit prior to returning from PodController.Run
This changes the behaviour slightly, so rather than immediately exiting on context cancellation, this calls shutdown, and waits for the current items to finish being worked on before returning to the user.
This commit is contained in:
@@ -20,7 +20,6 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
pkgerrors "github.com/pkg/errors"
|
pkgerrors "github.com/pkg/errors"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||||
@@ -29,7 +28,6 @@ import (
|
|||||||
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
corev1informers "k8s.io/client-go/informers/core/v1"
|
corev1informers "k8s.io/client-go/informers/core/v1"
|
||||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||||
@@ -203,10 +201,13 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
|||||||
// Run will set up the event handlers for types we are interested in, as well
|
// Run will set up the event handlers for types we are interested in, as well
|
||||||
// as syncing informer caches and starting workers. It will block until the
|
// as syncing informer caches and starting workers. It will block until the
|
||||||
// context is cancelled, at which point it will shutdown the work queue and
|
// context is cancelled, at which point it will shutdown the work queue and
|
||||||
// wait for workers to finish processing their current work items.
|
// wait for workers to finish processing their current work items prior to
|
||||||
|
// returning.
|
||||||
//
|
//
|
||||||
// Once this returns, you should not re-use the controller.
|
// Once this returns, you should not re-use the controller.
|
||||||
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) {
|
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) {
|
||||||
|
// Shutdowns are idempotent, so we can call it multiple times. This is in case we have to bail out early for some reason.
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
pc.k8sQ.ShutDown()
|
pc.k8sQ.ShutDown()
|
||||||
|
|
||||||
@@ -273,13 +274,25 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
|||||||
pc.deleteDanglingPods(ctx, podSyncWorkers)
|
pc.deleteDanglingPods(ctx, podSyncWorkers)
|
||||||
|
|
||||||
log.G(ctx).Info("starting workers")
|
log.G(ctx).Info("starting workers")
|
||||||
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
|
wg := sync.WaitGroup{}
|
||||||
for id := 0; id < podSyncWorkers; id++ {
|
|
||||||
workerID := strconv.Itoa(id)
|
|
||||||
go wait.Until(func() {
|
|
||||||
// Use the worker's "index" as its ID so we can use it for tracing.
|
// Use the worker's "index" as its ID so we can use it for tracing.
|
||||||
pc.runWorker(ctx, workerID, pc.k8sQ)
|
for id := 0; id < podSyncWorkers; id++ {
|
||||||
}, time.Second, ctx.Done())
|
wg.Add(1)
|
||||||
|
workerID := strconv.Itoa(id)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, podStatusQueue)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
for id := 0; id < podSyncWorkers; id++ {
|
||||||
|
wg.Add(1)
|
||||||
|
workerID := strconv.Itoa(id)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
pc.runSyncPodsFromKubernetesWorker(ctx, workerID, pc.k8sQ)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
close(pc.ready)
|
close(pc.ready)
|
||||||
@@ -287,7 +300,10 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
|||||||
log.G(ctx).Info("started workers")
|
log.G(ctx).Info("started workers")
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
log.G(ctx).Info("shutting down workers")
|
log.G(ctx).Info("shutting down workers")
|
||||||
|
pc.k8sQ.ShutDown()
|
||||||
|
podStatusQueue.ShutDown()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -311,8 +327,9 @@ func (pc *PodController) Err() error {
|
|||||||
return pc.err
|
return pc.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue.
|
// runSyncPodsFromKubernetesWorker is a long-running function that will continually call the processNextWorkItem function
|
||||||
func (pc *PodController) runWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
|
// in order to read and process an item on the work queue that is generated by the pod informer.
|
||||||
|
func (pc *PodController) runSyncPodsFromKubernetesWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
|
||||||
for pc.processNextWorkItem(ctx, workerID, q) {
|
for pc.processNextWorkItem(ctx, workerID, q) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ package node
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
pkgerrors "github.com/pkg/errors"
|
pkgerrors "github.com/pkg/errors"
|
||||||
@@ -92,16 +91,7 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pc *PodController) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface, numWorkers int) {
|
func (pc *PodController) runSyncPodStatusFromProviderWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
|
||||||
for i := 0; i < numWorkers; i++ {
|
|
||||||
go func(index int) {
|
|
||||||
workerID := strconv.Itoa(index)
|
|
||||||
pc.runProviderSyncWorker(ctx, workerID, q)
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pc *PodController) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
|
|
||||||
for pc.processPodStatusUpdate(ctx, workerID, q) {
|
for pc.processPodStatusUpdate(ctx, workerID, q) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user