From 82452a73a5e0fc98a3cfedca41cd4cb05ed5277b Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Tue, 2 Feb 2021 11:40:58 -0800 Subject: [PATCH] Split out rate limiter per workqueue If you share a ratelimiter between workqueues, it breaks. WQ1: Starts processing item (When) WQ1: Fails to process item (When) WQ1: Fails to process item (When) WQ1: Fails to process item (When) --- At this point we've backed off a bit --- WQ2: Starts processing item (with same key, When) WQ2: Succeeds at processing item (Forget) WQ1: Fails to process item (When) ---> THIS RESULTS IN AN ERROR This results in an error because it "forgot" the previous rate limit. --- node/pod_test.go | 21 +++++++++++++++------ node/podcontroller.go | 24 +++++++++++++++++------- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/node/pod_test.go b/node/pod_test.go index e392cb220..7bf73b194 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -44,11 +44,6 @@ func newTestController() *TestController { rm := testutil.FakeResourceManager() p := newMockProvider() iFactory := kubeinformers.NewSharedInformerFactoryWithOptions(fk8s, 10*time.Minute) - rateLimiter := workqueue.NewMaxOfRateLimiter( - // The default upper bound is 1000 seconds. Let's not use that. - workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ) podController, err := NewPodController(PodControllerConfig{ PodClient: fk8s.CoreV1(), PodInformer: iFactory.Core().V1().Pods(), @@ -57,7 +52,21 @@ func newTestController() *TestController { ConfigMapInformer: iFactory.Core().V1().ConfigMaps(), SecretInformer: iFactory.Core().V1().Secrets(), ServiceInformer: iFactory.Core().V1().Services(), - RateLimiter: rateLimiter, + SyncPodsFromKubernetesRateLimiter: workqueue.NewMaxOfRateLimiter( + // The default upper bound is 1000 seconds. Let's not use that. + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + SyncPodStatusFromProviderRateLimiter: workqueue.NewMaxOfRateLimiter( + // The default upper bound is 1000 seconds. Let's not use that. + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + DeletePodsFromKubernetesRateLimiter: workqueue.NewMaxOfRateLimiter( + // The default upper bound is 1000 seconds. Let's not use that. + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), }) if err != nil { diff --git a/node/podcontroller.go b/node/podcontroller.go index 27ffadf54..85f6ae150 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -176,8 +176,12 @@ type PodControllerConfig struct { SecretInformer corev1informers.SecretInformer ServiceInformer corev1informers.ServiceInformer - // RateLimiter defines the rate limit of work queue - RateLimiter workqueue.RateLimiter + // SyncPodsFromKubernetesRateLimiter defines the rate limit for the SyncPodsFromKubernetes queue + SyncPodsFromKubernetesRateLimiter workqueue.RateLimiter + // DeletePodsFromKubernetesRateLimiter defines the rate limit for the DeletePodsFromKubernetesRateLimiter queue + DeletePodsFromKubernetesRateLimiter workqueue.RateLimiter + // SyncPodStatusFromProviderRateLimiter defines the rate limit for the SyncPodStatusFromProviderRateLimiter queue + SyncPodStatusFromProviderRateLimiter workqueue.RateLimiter // Add custom filtering for pod informer event handlers // Use this for cases where the pod informer handles more than pods assigned to this node @@ -210,8 +214,14 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { if cfg.Provider == nil { return nil, errdefs.InvalidInput("missing provider") } - if cfg.RateLimiter == nil { - cfg.RateLimiter = workqueue.DefaultControllerRateLimiter() + if cfg.SyncPodsFromKubernetesRateLimiter == nil { + cfg.SyncPodsFromKubernetesRateLimiter = workqueue.DefaultControllerRateLimiter() + } + if cfg.DeletePodsFromKubernetesRateLimiter == nil { + cfg.DeletePodsFromKubernetesRateLimiter = workqueue.DefaultControllerRateLimiter() + } + if cfg.SyncPodStatusFromProviderRateLimiter == nil { + cfg.SyncPodStatusFromProviderRateLimiter = workqueue.DefaultControllerRateLimiter() } rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister()) if err != nil { @@ -230,9 +240,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { podEventFilterFunc: cfg.PodEventFilterFunc, } - pc.syncPodsFromKubernetes = queue.New(cfg.RateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler) - pc.deletePodsFromKubernetes = queue.New(cfg.RateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler) - pc.syncPodStatusFromProvider = queue.New(cfg.RateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler) + pc.syncPodsFromKubernetes = queue.New(cfg.SyncPodsFromKubernetesRateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler) + pc.deletePodsFromKubernetes = queue.New(cfg.DeletePodsFromKubernetesRateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler) + pc.syncPodStatusFromProvider = queue.New(cfg.SyncPodStatusFromProviderRateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler) return pc, nil }