Split out rate limiter per workqueue

If you share a ratelimiter between workqueues, it breaks.

WQ1: Starts processing item (When)
WQ1: Fails to process item (When)
WQ1: Fails to process item (When)
WQ1: Fails to process item (When)
--- At this point we've backed off a bit ---
WQ2: Starts processing item (with same key, When)
WQ2: Succeeds at processing item (Forget)
WQ1: Fails to process item (When) ---> THIS RESULTS IN AN ERROR

This results in an error because it "forgot" the previous
rate limit.
This commit is contained in:
Sargun Dhillon
2021-02-02 11:40:58 -08:00
parent 2fa03a15a2
commit 82452a73a5
2 changed files with 32 additions and 13 deletions

View File

@@ -44,11 +44,6 @@ func newTestController() *TestController {
rm := testutil.FakeResourceManager()
p := newMockProvider()
iFactory := kubeinformers.NewSharedInformerFactoryWithOptions(fk8s, 10*time.Minute)
rateLimiter := workqueue.NewMaxOfRateLimiter(
// The default upper bound is 1000 seconds. Let's not use that.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
podController, err := NewPodController(PodControllerConfig{
PodClient: fk8s.CoreV1(),
PodInformer: iFactory.Core().V1().Pods(),
@@ -57,7 +52,21 @@ func newTestController() *TestController {
ConfigMapInformer: iFactory.Core().V1().ConfigMaps(),
SecretInformer: iFactory.Core().V1().Secrets(),
ServiceInformer: iFactory.Core().V1().Services(),
RateLimiter: rateLimiter,
SyncPodsFromKubernetesRateLimiter: workqueue.NewMaxOfRateLimiter(
// The default upper bound is 1000 seconds. Let's not use that.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
SyncPodStatusFromProviderRateLimiter: workqueue.NewMaxOfRateLimiter(
// The default upper bound is 1000 seconds. Let's not use that.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
DeletePodsFromKubernetesRateLimiter: workqueue.NewMaxOfRateLimiter(
// The default upper bound is 1000 seconds. Let's not use that.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
})
if err != nil {

View File

@@ -176,8 +176,12 @@ type PodControllerConfig struct {
SecretInformer corev1informers.SecretInformer
ServiceInformer corev1informers.ServiceInformer
// RateLimiter defines the rate limit of work queue
RateLimiter workqueue.RateLimiter
// SyncPodsFromKubernetesRateLimiter defines the rate limit for the SyncPodsFromKubernetes queue
SyncPodsFromKubernetesRateLimiter workqueue.RateLimiter
// DeletePodsFromKubernetesRateLimiter defines the rate limit for the DeletePodsFromKubernetesRateLimiter queue
DeletePodsFromKubernetesRateLimiter workqueue.RateLimiter
// SyncPodStatusFromProviderRateLimiter defines the rate limit for the SyncPodStatusFromProviderRateLimiter queue
SyncPodStatusFromProviderRateLimiter workqueue.RateLimiter
// Add custom filtering for pod informer event handlers
// Use this for cases where the pod informer handles more than pods assigned to this node
@@ -210,8 +214,14 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.Provider == nil {
return nil, errdefs.InvalidInput("missing provider")
}
if cfg.RateLimiter == nil {
cfg.RateLimiter = workqueue.DefaultControllerRateLimiter()
if cfg.SyncPodsFromKubernetesRateLimiter == nil {
cfg.SyncPodsFromKubernetesRateLimiter = workqueue.DefaultControllerRateLimiter()
}
if cfg.DeletePodsFromKubernetesRateLimiter == nil {
cfg.DeletePodsFromKubernetesRateLimiter = workqueue.DefaultControllerRateLimiter()
}
if cfg.SyncPodStatusFromProviderRateLimiter == nil {
cfg.SyncPodStatusFromProviderRateLimiter = workqueue.DefaultControllerRateLimiter()
}
rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister())
if err != nil {
@@ -230,9 +240,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
podEventFilterFunc: cfg.PodEventFilterFunc,
}
pc.syncPodsFromKubernetes = queue.New(cfg.RateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler)
pc.deletePodsFromKubernetes = queue.New(cfg.RateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler)
pc.syncPodStatusFromProvider = queue.New(cfg.RateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler)
pc.syncPodsFromKubernetes = queue.New(cfg.SyncPodsFromKubernetesRateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler)
pc.deletePodsFromKubernetes = queue.New(cfg.DeletePodsFromKubernetesRateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler)
pc.syncPodStatusFromProvider = queue.New(cfg.SyncPodStatusFromProviderRateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler)
return pc, nil
}