Merge pull request #953 from sargun/break-up-ratelimiters

This commit is contained in:
Sargun Dhillon
2021-02-03 04:18:07 -08:00
committed by GitHub
2 changed files with 32 additions and 13 deletions

View File

@@ -44,11 +44,6 @@ func newTestController() *TestController {
rm := testutil.FakeResourceManager() rm := testutil.FakeResourceManager()
p := newMockProvider() p := newMockProvider()
iFactory := kubeinformers.NewSharedInformerFactoryWithOptions(fk8s, 10*time.Minute) iFactory := kubeinformers.NewSharedInformerFactoryWithOptions(fk8s, 10*time.Minute)
rateLimiter := workqueue.NewMaxOfRateLimiter(
// The default upper bound is 1000 seconds. Let's not use that.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
podController, err := NewPodController(PodControllerConfig{ podController, err := NewPodController(PodControllerConfig{
PodClient: fk8s.CoreV1(), PodClient: fk8s.CoreV1(),
PodInformer: iFactory.Core().V1().Pods(), PodInformer: iFactory.Core().V1().Pods(),
@@ -57,7 +52,21 @@ func newTestController() *TestController {
ConfigMapInformer: iFactory.Core().V1().ConfigMaps(), ConfigMapInformer: iFactory.Core().V1().ConfigMaps(),
SecretInformer: iFactory.Core().V1().Secrets(), SecretInformer: iFactory.Core().V1().Secrets(),
ServiceInformer: iFactory.Core().V1().Services(), ServiceInformer: iFactory.Core().V1().Services(),
RateLimiter: rateLimiter, SyncPodsFromKubernetesRateLimiter: workqueue.NewMaxOfRateLimiter(
// The default upper bound is 1000 seconds. Let's not use that.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
SyncPodStatusFromProviderRateLimiter: workqueue.NewMaxOfRateLimiter(
// The default upper bound is 1000 seconds. Let's not use that.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
DeletePodsFromKubernetesRateLimiter: workqueue.NewMaxOfRateLimiter(
// The default upper bound is 1000 seconds. Let's not use that.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
}) })
if err != nil { if err != nil {

View File

@@ -176,8 +176,12 @@ type PodControllerConfig struct {
SecretInformer corev1informers.SecretInformer SecretInformer corev1informers.SecretInformer
ServiceInformer corev1informers.ServiceInformer ServiceInformer corev1informers.ServiceInformer
// RateLimiter defines the rate limit of work queue // SyncPodsFromKubernetesRateLimiter defines the rate limit for the SyncPodsFromKubernetes queue
RateLimiter workqueue.RateLimiter SyncPodsFromKubernetesRateLimiter workqueue.RateLimiter
// DeletePodsFromKubernetesRateLimiter defines the rate limit for the DeletePodsFromKubernetesRateLimiter queue
DeletePodsFromKubernetesRateLimiter workqueue.RateLimiter
// SyncPodStatusFromProviderRateLimiter defines the rate limit for the SyncPodStatusFromProviderRateLimiter queue
SyncPodStatusFromProviderRateLimiter workqueue.RateLimiter
// Add custom filtering for pod informer event handlers // Add custom filtering for pod informer event handlers
// Use this for cases where the pod informer handles more than pods assigned to this node // Use this for cases where the pod informer handles more than pods assigned to this node
@@ -210,8 +214,14 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.Provider == nil { if cfg.Provider == nil {
return nil, errdefs.InvalidInput("missing provider") return nil, errdefs.InvalidInput("missing provider")
} }
if cfg.RateLimiter == nil { if cfg.SyncPodsFromKubernetesRateLimiter == nil {
cfg.RateLimiter = workqueue.DefaultControllerRateLimiter() cfg.SyncPodsFromKubernetesRateLimiter = workqueue.DefaultControllerRateLimiter()
}
if cfg.DeletePodsFromKubernetesRateLimiter == nil {
cfg.DeletePodsFromKubernetesRateLimiter = workqueue.DefaultControllerRateLimiter()
}
if cfg.SyncPodStatusFromProviderRateLimiter == nil {
cfg.SyncPodStatusFromProviderRateLimiter = workqueue.DefaultControllerRateLimiter()
} }
rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister()) rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister())
if err != nil { if err != nil {
@@ -230,9 +240,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
podEventFilterFunc: cfg.PodEventFilterFunc, podEventFilterFunc: cfg.PodEventFilterFunc,
} }
pc.syncPodsFromKubernetes = queue.New(cfg.RateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler) pc.syncPodsFromKubernetes = queue.New(cfg.SyncPodsFromKubernetesRateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler)
pc.deletePodsFromKubernetes = queue.New(cfg.RateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler) pc.deletePodsFromKubernetes = queue.New(cfg.DeletePodsFromKubernetesRateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler)
pc.syncPodStatusFromProvider = queue.New(cfg.RateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler) pc.syncPodStatusFromProvider = queue.New(cfg.SyncPodStatusFromProviderRateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler)
return pc, nil return pc, nil
} }