Merge pull request #841 from cwdsuzhou/June/support_queue_define
This commit is contained in:
@@ -49,6 +49,7 @@ func newTestController() *TestController {
|
||||
recorder: testutil.FakeEventRecorder(5),
|
||||
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
||||
deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
||||
podStatusQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
||||
done: make(chan struct{}),
|
||||
ready: make(chan struct{}),
|
||||
podsInformer: iFactory.Core().V1().Pods(),
|
||||
|
||||
@@ -108,6 +108,8 @@ type PodController struct {
|
||||
// deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period
|
||||
deletionQ workqueue.RateLimitingInterface
|
||||
|
||||
podStatusQ workqueue.RateLimitingInterface
|
||||
|
||||
// From the time of creation, to termination the knownPods map will contain the pods key
|
||||
// (derived from Kubernetes' cache library) -> a *knownPod struct.
|
||||
knownPods sync.Map
|
||||
@@ -158,6 +160,9 @@ type PodControllerConfig struct {
|
||||
ConfigMapInformer corev1informers.ConfigMapInformer
|
||||
SecretInformer corev1informers.SecretInformer
|
||||
ServiceInformer corev1informers.ServiceInformer
|
||||
|
||||
// RateLimiter defines the rate limit of work queue
|
||||
RateLimiter workqueue.RateLimiter
|
||||
}
|
||||
|
||||
// NewPodController creates a new pod controller with the provided config.
|
||||
@@ -183,7 +188,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
||||
if cfg.Provider == nil {
|
||||
return nil, errdefs.InvalidInput("missing provider")
|
||||
}
|
||||
|
||||
if cfg.RateLimiter == nil {
|
||||
cfg.RateLimiter = workqueue.DefaultControllerRateLimiter()
|
||||
}
|
||||
rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister())
|
||||
if err != nil {
|
||||
return nil, pkgerrors.Wrap(err, "could not create resource manager")
|
||||
@@ -198,8 +205,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
||||
ready: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
recorder: cfg.EventRecorder,
|
||||
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
|
||||
deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"),
|
||||
k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"),
|
||||
deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"),
|
||||
podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"),
|
||||
}
|
||||
|
||||
return pc, nil
|
||||
@@ -242,13 +250,12 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
||||
}
|
||||
pc.provider = provider
|
||||
|
||||
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
|
||||
provider.NotifyPods(ctx, func(pod *corev1.Pod) {
|
||||
pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
|
||||
pc.enqueuePodStatusUpdate(ctx, pc.podStatusQ, pod.DeepCopy())
|
||||
})
|
||||
go runProvider(ctx)
|
||||
|
||||
defer podStatusQueue.ShutDown()
|
||||
defer pc.podStatusQ.ShutDown()
|
||||
|
||||
// Wait for the caches to be synced *before* starting to do work.
|
||||
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
|
||||
@@ -308,7 +315,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
||||
workerID := strconv.Itoa(id)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, podStatusQueue)
|
||||
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, pc.podStatusQ)
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -336,7 +343,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
||||
<-ctx.Done()
|
||||
log.G(ctx).Info("shutting down workers")
|
||||
pc.k8sQ.ShutDown()
|
||||
podStatusQueue.ShutDown()
|
||||
pc.podStatusQ.ShutDown()
|
||||
pc.deletionQ.ShutDown()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
Reference in New Issue
Block a user