diff --git a/node/pod_test.go b/node/pod_test.go index 3d8a986fe..f365b589c 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -49,6 +49,7 @@ func newTestController() *TestController { recorder: testutil.FakeEventRecorder(5), k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + podStatusQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), done: make(chan struct{}), ready: make(chan struct{}), podsInformer: iFactory.Core().V1().Pods(), diff --git a/node/podcontroller.go b/node/podcontroller.go index da405baff..331928471 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -108,6 +108,8 @@ type PodController struct { // deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period deletionQ workqueue.RateLimitingInterface + podStatusQ workqueue.RateLimitingInterface + // From the time of creation, to termination the knownPods map will contain the pods key // (derived from Kubernetes' cache library) -> a *knownPod struct. knownPods sync.Map @@ -158,6 +160,9 @@ type PodControllerConfig struct { ConfigMapInformer corev1informers.ConfigMapInformer SecretInformer corev1informers.SecretInformer ServiceInformer corev1informers.ServiceInformer + + // RateLimiter defines the rate limit of work queue + RateLimiter workqueue.RateLimiter } // NewPodController creates a new pod controller with the provided config. @@ -183,7 +188,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { if cfg.Provider == nil { return nil, errdefs.InvalidInput("missing provider") } - + if cfg.RateLimiter == nil { + cfg.RateLimiter = workqueue.DefaultControllerRateLimiter() + } rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister()) if err != nil { return nil, pkgerrors.Wrap(err, "could not create resource manager") @@ -198,8 +205,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { ready: make(chan struct{}), done: make(chan struct{}), recorder: cfg.EventRecorder, - k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"), - deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"), + k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"), + deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"), + podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"), } return pc, nil @@ -242,13 +250,12 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er } pc.provider = provider - podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") provider.NotifyPods(ctx, func(pod *corev1.Pod) { - pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy()) + pc.enqueuePodStatusUpdate(ctx, pc.podStatusQ, pod.DeepCopy()) }) go runProvider(ctx) - defer podStatusQueue.ShutDown() + defer pc.podStatusQ.ShutDown() // Wait for the caches to be synced *before* starting to do work. if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { @@ -308,7 +315,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er workerID := strconv.Itoa(id) go func() { defer wg.Done() - pc.runSyncPodStatusFromProviderWorker(ctx, workerID, podStatusQueue) + pc.runSyncPodStatusFromProviderWorker(ctx, workerID, pc.podStatusQ) }() } @@ -336,7 +343,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er <-ctx.Done() log.G(ctx).Info("shutting down workers") pc.k8sQ.ShutDown() - podStatusQueue.ShutDown() + pc.podStatusQ.ShutDown() pc.deletionQ.ShutDown() wg.Wait()