From fedffd6f2c8df6297734479a5787adb8d2122c98 Mon Sep 17 00:00:00 2001 From: wadecai Date: Sun, 21 Jun 2020 14:57:16 +0800 Subject: [PATCH 1/2] Add parameters to support change work queue qps --- .../internal/commands/root/flag.go | 3 +++ .../internal/commands/root/opts.go | 8 ++++++ .../internal/commands/root/root.go | 1 + go.mod | 1 + node/lifecycle_test.go | 1 + node/podcontroller.go | 26 ++++++++++++++++--- 6 files changed, 37 insertions(+), 3 deletions(-) diff --git a/cmd/virtual-kubelet/internal/commands/root/flag.go b/cmd/virtual-kubelet/internal/commands/root/flag.go index 1ef7946e1..4b63f5296 100644 --- a/cmd/virtual-kubelet/internal/commands/root/flag.go +++ b/cmd/virtual-kubelet/internal/commands/root/flag.go @@ -87,6 +87,9 @@ func installFlags(flags *pflag.FlagSet, c *Opts) { flags.DurationVar(&c.StreamCreationTimeout, "stream-creation-timeout", c.StreamCreationTimeout, "stream-creation-timeout is the maximum time for streaming connection, default 30s.") + flags.IntVar(&c.WorkQueueRetryQPS, "workqueue-retry-qps", c.WorkQueueRetryQPS, + "workqueue-retry-qps is the rate limit when objects in a workqueue, default 10.") + flagset := flag.NewFlagSet("klog", flag.PanicOnError) klog.InitFlags(flagset) flagset.VisitAll(func(f *flag.Flag) { diff --git a/cmd/virtual-kubelet/internal/commands/root/opts.go b/cmd/virtual-kubelet/internal/commands/root/opts.go index 3b70ae1e6..7aa5fe0c4 100644 --- a/cmd/virtual-kubelet/internal/commands/root/opts.go +++ b/cmd/virtual-kubelet/internal/commands/root/opts.go @@ -40,6 +40,7 @@ const ( DefaultTaintKey = "virtual-kubelet.io/provider" DefaultStreamIdleTimeout = 30 * time.Second DefaultStreamCreationTimeout = 30 * time.Second + DefaultWorkQueueRetryQPS = 10 ) // Opts stores all the options for configuring the root virtual-kubelet command. @@ -92,6 +93,9 @@ type Opts struct { // StreamCreationTimeout is the maximum time for streaming connection StreamCreationTimeout time.Duration + // WorkQueueRetryQPS is the default qps limit when retry for k8s workqueue + WorkQueueRetryQPS int + Version string } @@ -167,5 +171,9 @@ func SetDefaultOpts(c *Opts) error { c.StreamCreationTimeout = DefaultStreamCreationTimeout } + if c.WorkQueueRetryQPS == 0 { + c.WorkQueueRetryQPS = DefaultWorkQueueRetryQPS + } + return nil } diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index 6cb1dbb22..fd813782e 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -185,6 +185,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { SecretInformer: secretInformer, ConfigMapInformer: configMapInformer, ServiceInformer: serviceInformer, + WorkQueueRetryQPS: c.WorkQueueRetryQPS, }) if err != nil { return errors.Wrap(err, "error setting up pod controller") diff --git a/go.mod b/go.mod index 9c57aa0ac..a3b593f8b 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/spf13/pflag v1.0.5 go.opencensus.io v0.21.0 golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 gotest.tools v2.2.0+incompatible k8s.io/api v0.0.0 k8s.io/apimachinery v0.0.0 diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 6af38c037..b9af36c8c 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -289,6 +289,7 @@ func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunct ConfigMapInformer: configMapInformer, SecretInformer: secretInformer, ServiceInformer: serviceInformer, + WorkQueueRetryQPS: 10, }, } diff --git a/node/podcontroller.go b/node/podcontroller.go index da405baff..7f680c0d6 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -27,6 +27,7 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/internal/manager" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" + "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" corev1informers "k8s.io/client-go/informers/core/v1" @@ -127,6 +128,8 @@ type PodController struct { // This is used since `pc.Run()` is typically called in a goroutine and managing // this can be non-trivial for callers. err error + // qps is the default qps limit when retry + qps int } type knownPod struct { @@ -158,10 +161,26 @@ type PodControllerConfig struct { ConfigMapInformer corev1informers.ConfigMapInformer SecretInformer corev1informers.SecretInformer ServiceInformer corev1informers.ServiceInformer + + // WorkQueueRetryQPS is the default qps limit when retry + WorkQueueRetryQPS int +} + +// controllerRateLimiter has +// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential +func controllerRateLimiter(qps int) workqueue.RateLimiter { + return workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), + // This is only for retry speed and its only the overall factor (not per item) + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), qps*10)}, + ) } // NewPodController creates a new pod controller with the provided config. func NewPodController(cfg PodControllerConfig) (*PodController, error) { + if cfg.WorkQueueRetryQPS == 0 { + cfg.WorkQueueRetryQPS = 10 + } if cfg.PodClient == nil { return nil, errdefs.InvalidInput("missing core client") } @@ -198,8 +217,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { ready: make(chan struct{}), done: make(chan struct{}), recorder: cfg.EventRecorder, - k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"), - deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"), + k8sQ: workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(cfg.WorkQueueRetryQPS), "syncPodsFromKubernetes"), + deletionQ: workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(cfg.WorkQueueRetryQPS), "deletePodsFromKubernetes"), + qps: cfg.WorkQueueRetryQPS, } return pc, nil @@ -242,7 +262,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er } pc.provider = provider - podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") + podStatusQueue := workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(pc.qps), "syncPodStatusFromProvider") provider.NotifyPods(ctx, func(pod *corev1.Pod) { pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy()) }) From ca417d52395dc479d93b513816148e5c40c7a3e8 Mon Sep 17 00:00:00 2001 From: wadecai Date: Fri, 26 Jun 2020 10:39:05 +0800 Subject: [PATCH 2/2] Expose the queue rate limiter --- .../internal/commands/root/flag.go | 3 -- .../internal/commands/root/opts.go | 8 ---- .../internal/commands/root/root.go | 1 - go.mod | 1 - node/lifecycle_test.go | 1 - node/pod_test.go | 1 + node/podcontroller.go | 41 +++++++------------ 7 files changed, 15 insertions(+), 41 deletions(-) diff --git a/cmd/virtual-kubelet/internal/commands/root/flag.go b/cmd/virtual-kubelet/internal/commands/root/flag.go index 4b63f5296..1ef7946e1 100644 --- a/cmd/virtual-kubelet/internal/commands/root/flag.go +++ b/cmd/virtual-kubelet/internal/commands/root/flag.go @@ -87,9 +87,6 @@ func installFlags(flags *pflag.FlagSet, c *Opts) { flags.DurationVar(&c.StreamCreationTimeout, "stream-creation-timeout", c.StreamCreationTimeout, "stream-creation-timeout is the maximum time for streaming connection, default 30s.") - flags.IntVar(&c.WorkQueueRetryQPS, "workqueue-retry-qps", c.WorkQueueRetryQPS, - "workqueue-retry-qps is the rate limit when objects in a workqueue, default 10.") - flagset := flag.NewFlagSet("klog", flag.PanicOnError) klog.InitFlags(flagset) flagset.VisitAll(func(f *flag.Flag) { diff --git a/cmd/virtual-kubelet/internal/commands/root/opts.go b/cmd/virtual-kubelet/internal/commands/root/opts.go index 7aa5fe0c4..3b70ae1e6 100644 --- a/cmd/virtual-kubelet/internal/commands/root/opts.go +++ b/cmd/virtual-kubelet/internal/commands/root/opts.go @@ -40,7 +40,6 @@ const ( DefaultTaintKey = "virtual-kubelet.io/provider" DefaultStreamIdleTimeout = 30 * time.Second DefaultStreamCreationTimeout = 30 * time.Second - DefaultWorkQueueRetryQPS = 10 ) // Opts stores all the options for configuring the root virtual-kubelet command. @@ -93,9 +92,6 @@ type Opts struct { // StreamCreationTimeout is the maximum time for streaming connection StreamCreationTimeout time.Duration - // WorkQueueRetryQPS is the default qps limit when retry for k8s workqueue - WorkQueueRetryQPS int - Version string } @@ -171,9 +167,5 @@ func SetDefaultOpts(c *Opts) error { c.StreamCreationTimeout = DefaultStreamCreationTimeout } - if c.WorkQueueRetryQPS == 0 { - c.WorkQueueRetryQPS = DefaultWorkQueueRetryQPS - } - return nil } diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index fd813782e..6cb1dbb22 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -185,7 +185,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { SecretInformer: secretInformer, ConfigMapInformer: configMapInformer, ServiceInformer: serviceInformer, - WorkQueueRetryQPS: c.WorkQueueRetryQPS, }) if err != nil { return errors.Wrap(err, "error setting up pod controller") diff --git a/go.mod b/go.mod index a3b593f8b..9c57aa0ac 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,6 @@ require ( github.com/spf13/pflag v1.0.5 go.opencensus.io v0.21.0 golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 - golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 gotest.tools v2.2.0+incompatible k8s.io/api v0.0.0 k8s.io/apimachinery v0.0.0 diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index b9af36c8c..6af38c037 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -289,7 +289,6 @@ func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunct ConfigMapInformer: configMapInformer, SecretInformer: secretInformer, ServiceInformer: serviceInformer, - WorkQueueRetryQPS: 10, }, } diff --git a/node/pod_test.go b/node/pod_test.go index 3d8a986fe..f365b589c 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -49,6 +49,7 @@ func newTestController() *TestController { recorder: testutil.FakeEventRecorder(5), k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + podStatusQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), done: make(chan struct{}), ready: make(chan struct{}), podsInformer: iFactory.Core().V1().Pods(), diff --git a/node/podcontroller.go b/node/podcontroller.go index 7f680c0d6..331928471 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -27,7 +27,6 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/internal/manager" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" - "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" corev1informers "k8s.io/client-go/informers/core/v1" @@ -109,6 +108,8 @@ type PodController struct { // deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period deletionQ workqueue.RateLimitingInterface + podStatusQ workqueue.RateLimitingInterface + // From the time of creation, to termination the knownPods map will contain the pods key // (derived from Kubernetes' cache library) -> a *knownPod struct. knownPods sync.Map @@ -128,8 +129,6 @@ type PodController struct { // This is used since `pc.Run()` is typically called in a goroutine and managing // this can be non-trivial for callers. err error - // qps is the default qps limit when retry - qps int } type knownPod struct { @@ -162,25 +161,12 @@ type PodControllerConfig struct { SecretInformer corev1informers.SecretInformer ServiceInformer corev1informers.ServiceInformer - // WorkQueueRetryQPS is the default qps limit when retry - WorkQueueRetryQPS int -} - -// controllerRateLimiter has -// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential -func controllerRateLimiter(qps int) workqueue.RateLimiter { - return workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), - // This is only for retry speed and its only the overall factor (not per item) - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), qps*10)}, - ) + // RateLimiter defines the rate limit of work queue + RateLimiter workqueue.RateLimiter } // NewPodController creates a new pod controller with the provided config. func NewPodController(cfg PodControllerConfig) (*PodController, error) { - if cfg.WorkQueueRetryQPS == 0 { - cfg.WorkQueueRetryQPS = 10 - } if cfg.PodClient == nil { return nil, errdefs.InvalidInput("missing core client") } @@ -202,7 +188,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { if cfg.Provider == nil { return nil, errdefs.InvalidInput("missing provider") } - + if cfg.RateLimiter == nil { + cfg.RateLimiter = workqueue.DefaultControllerRateLimiter() + } rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister()) if err != nil { return nil, pkgerrors.Wrap(err, "could not create resource manager") @@ -217,9 +205,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { ready: make(chan struct{}), done: make(chan struct{}), recorder: cfg.EventRecorder, - k8sQ: workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(cfg.WorkQueueRetryQPS), "syncPodsFromKubernetes"), - deletionQ: workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(cfg.WorkQueueRetryQPS), "deletePodsFromKubernetes"), - qps: cfg.WorkQueueRetryQPS, + k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"), + deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"), + podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"), } return pc, nil @@ -262,13 +250,12 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er } pc.provider = provider - podStatusQueue := workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(pc.qps), "syncPodStatusFromProvider") provider.NotifyPods(ctx, func(pod *corev1.Pod) { - pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy()) + pc.enqueuePodStatusUpdate(ctx, pc.podStatusQ, pod.DeepCopy()) }) go runProvider(ctx) - defer podStatusQueue.ShutDown() + defer pc.podStatusQ.ShutDown() // Wait for the caches to be synced *before* starting to do work. if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { @@ -328,7 +315,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er workerID := strconv.Itoa(id) go func() { defer wg.Done() - pc.runSyncPodStatusFromProviderWorker(ctx, workerID, podStatusQueue) + pc.runSyncPodStatusFromProviderWorker(ctx, workerID, pc.podStatusQ) }() } @@ -356,7 +343,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er <-ctx.Done() log.G(ctx).Info("shutting down workers") pc.k8sQ.ShutDown() - podStatusQueue.ShutDown() + pc.podStatusQ.ShutDown() pc.deletionQ.ShutDown() wg.Wait()