Expose the queue rate limiter

This commit is contained in:
wadecai
2020-06-26 10:39:05 +08:00
parent fedffd6f2c
commit ca417d5239
7 changed files with 15 additions and 41 deletions

View File

@@ -289,7 +289,6 @@ func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunct
ConfigMapInformer: configMapInformer,
SecretInformer: secretInformer,
ServiceInformer: serviceInformer,
WorkQueueRetryQPS: 10,
},
}

View File

@@ -49,6 +49,7 @@ func newTestController() *TestController {
recorder: testutil.FakeEventRecorder(5),
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
podStatusQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
done: make(chan struct{}),
ready: make(chan struct{}),
podsInformer: iFactory.Core().V1().Pods(),

View File

@@ -27,7 +27,6 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
corev1informers "k8s.io/client-go/informers/core/v1"
@@ -109,6 +108,8 @@ type PodController struct {
// deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period
deletionQ workqueue.RateLimitingInterface
podStatusQ workqueue.RateLimitingInterface
// From the time of creation, to termination the knownPods map will contain the pods key
// (derived from Kubernetes' cache library) -> a *knownPod struct.
knownPods sync.Map
@@ -128,8 +129,6 @@ type PodController struct {
// This is used since `pc.Run()` is typically called in a goroutine and managing
// this can be non-trivial for callers.
err error
// qps is the default qps limit when retry
qps int
}
type knownPod struct {
@@ -162,25 +161,12 @@ type PodControllerConfig struct {
SecretInformer corev1informers.SecretInformer
ServiceInformer corev1informers.ServiceInformer
// WorkQueueRetryQPS is the default qps limit when retry
WorkQueueRetryQPS int
}
// controllerRateLimiter has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func controllerRateLimiter(qps int) workqueue.RateLimiter {
return workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), qps*10)},
)
// RateLimiter defines the rate limit of work queue
RateLimiter workqueue.RateLimiter
}
// NewPodController creates a new pod controller with the provided config.
func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.WorkQueueRetryQPS == 0 {
cfg.WorkQueueRetryQPS = 10
}
if cfg.PodClient == nil {
return nil, errdefs.InvalidInput("missing core client")
}
@@ -202,7 +188,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.Provider == nil {
return nil, errdefs.InvalidInput("missing provider")
}
if cfg.RateLimiter == nil {
cfg.RateLimiter = workqueue.DefaultControllerRateLimiter()
}
rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister())
if err != nil {
return nil, pkgerrors.Wrap(err, "could not create resource manager")
@@ -217,9 +205,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
ready: make(chan struct{}),
done: make(chan struct{}),
recorder: cfg.EventRecorder,
k8sQ: workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(cfg.WorkQueueRetryQPS), "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(cfg.WorkQueueRetryQPS), "deletePodsFromKubernetes"),
qps: cfg.WorkQueueRetryQPS,
k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"),
podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"),
}
return pc, nil
@@ -262,13 +250,12 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
}
pc.provider = provider
podStatusQueue := workqueue.NewNamedRateLimitingQueue(controllerRateLimiter(pc.qps), "syncPodStatusFromProvider")
provider.NotifyPods(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
pc.enqueuePodStatusUpdate(ctx, pc.podStatusQ, pod.DeepCopy())
})
go runProvider(ctx)
defer podStatusQueue.ShutDown()
defer pc.podStatusQ.ShutDown()
// Wait for the caches to be synced *before* starting to do work.
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
@@ -328,7 +315,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
workerID := strconv.Itoa(id)
go func() {
defer wg.Done()
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, podStatusQueue)
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, pc.podStatusQ)
}()
}
@@ -356,7 +343,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
<-ctx.Done()
log.G(ctx).Info("shutting down workers")
pc.k8sQ.ShutDown()
podStatusQueue.ShutDown()
pc.podStatusQ.ShutDown()
pc.deletionQ.ShutDown()
wg.Wait()