Add parameters to support change work queue qps
This commit is contained in:
@@ -87,6 +87,9 @@ func installFlags(flags *pflag.FlagSet, c *Opts) {
|
||||
flags.DurationVar(&c.StreamCreationTimeout, "stream-creation-timeout", c.StreamCreationTimeout,
|
||||
"stream-creation-timeout is the maximum time for streaming connection, default 30s.")
|
||||
|
||||
flags.IntVar(&c.WorkQueueRetryQPS, "workqueue-retry-qps", c.WorkQueueRetryQPS,
|
||||
"workqueue-retry-qps is the rate limit when objects in a workqueue, default 10.")
|
||||
|
||||
flagset := flag.NewFlagSet("klog", flag.PanicOnError)
|
||||
klog.InitFlags(flagset)
|
||||
flagset.VisitAll(func(f *flag.Flag) {
|
||||
|
||||
@@ -40,6 +40,7 @@ const (
|
||||
DefaultTaintKey = "virtual-kubelet.io/provider"
|
||||
DefaultStreamIdleTimeout = 30 * time.Second
|
||||
DefaultStreamCreationTimeout = 30 * time.Second
|
||||
DefaultWorkQueueRetryQPS = 10
|
||||
)
|
||||
|
||||
// Opts stores all the options for configuring the root virtual-kubelet command.
|
||||
@@ -92,6 +93,9 @@ type Opts struct {
|
||||
// StreamCreationTimeout is the maximum time for streaming connection
|
||||
StreamCreationTimeout time.Duration
|
||||
|
||||
// WorkQueueRetryQPS is the default qps limit when retry for k8s workqueue
|
||||
WorkQueueRetryQPS int
|
||||
|
||||
Version string
|
||||
}
|
||||
|
||||
@@ -167,5 +171,9 @@ func SetDefaultOpts(c *Opts) error {
|
||||
c.StreamCreationTimeout = DefaultStreamCreationTimeout
|
||||
}
|
||||
|
||||
if c.WorkQueueRetryQPS == 0 {
|
||||
c.WorkQueueRetryQPS = DefaultWorkQueueRetryQPS
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -185,6 +185,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
||||
SecretInformer: secretInformer,
|
||||
ConfigMapInformer: configMapInformer,
|
||||
ServiceInformer: serviceInformer,
|
||||
WorkQueueRetryQPS: c.WorkQueueRetryQPS,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error setting up pod controller")
|
||||
|
||||
1
go.mod
1
go.mod
@@ -21,6 +21,7 @@ require (
|
||||
github.com/spf13/pflag v1.0.5
|
||||
go.opencensus.io v0.21.0
|
||||
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
|
||||
gotest.tools v2.2.0+incompatible
|
||||
k8s.io/api v0.0.0
|
||||
k8s.io/apimachinery v0.0.0
|
||||
|
||||
@@ -289,6 +289,7 @@ func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunct
|
||||
ConfigMapInformer: configMapInformer,
|
||||
SecretInformer: secretInformer,
|
||||
ServiceInformer: serviceInformer,
|
||||
WorkQueueRetryQPS: 10,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
||||
"golang.org/x/time/rate"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
corev1informers "k8s.io/client-go/informers/core/v1"
|
||||
@@ -127,6 +128,8 @@ type PodController struct {
|
||||
// This is used since `pc.Run()` is typically called in a goroutine and managing
|
||||
// this can be non-trivial for callers.
|
||||
err error
|
||||
// qps is the default qps limit when retry
|
||||
qps int
|
||||
}
|
||||
|
||||
type knownPod struct {
|
||||
@@ -158,10 +161,26 @@ type PodControllerConfig struct {
|
||||
ConfigMapInformer corev1informers.ConfigMapInformer
|
||||
SecretInformer corev1informers.SecretInformer
|
||||
ServiceInformer corev1informers.ServiceInformer
|
||||
|
||||
// WorkQueueRetryQPS is the default qps limit when retry
|
||||
WorkQueueRetryQPS int
|
||||
}
|
||||
|
||||
// controllerRateLimiter has
|
||||
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
|
||||
func controllerRateLimiter(qps int) workqueue.RateLimiter {
|
||||
return workqueue.NewMaxOfRateLimiter(
|
||||
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
|
||||
// This is only for retry speed and its only the overall factor (not per item)
|
||||
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), qps*10)},
|
||||
)
|
||||
}
|
||||
|
||||
// NewPodController creates a new pod controller with the provided config.
|
||||
func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
||||
if cfg.WorkQueueRetryQPS == 0 {
|
||||
cfg.WorkQueueRetryQPS = 10
|
||||
}
|
||||
if cfg.PodClient == nil {
|
||||
return nil, errdefs.InvalidInput("missing core client")
|
||||
}
|
||||
@@ -198,8 +217,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
||||
ready: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
recorder: cfg.EventRecorder,
|
||||
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
|
||||
deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"),
|
||||
k8sQ: workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(cfg.WorkQueueRetryQPS), "syncPodsFromKubernetes"),
|
||||
deletionQ: workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(cfg.WorkQueueRetryQPS), "deletePodsFromKubernetes"),
|
||||
qps: cfg.WorkQueueRetryQPS,
|
||||
}
|
||||
|
||||
return pc, nil
|
||||
@@ -242,7 +262,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
||||
}
|
||||
pc.provider = provider
|
||||
|
||||
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
|
||||
podStatusQueue := workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(pc.qps), "syncPodStatusFromProvider")
|
||||
provider.NotifyPods(ctx, func(pod *corev1.Pod) {
|
||||
pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user