Merge pull request #711 from sargun/avoid-startup-race

Setup event handler at Pod Controller creation time
This commit is contained in:
Sargun Dhillon
2019-07-29 09:37:28 -07:00
committed by GitHub
2 changed files with 25 additions and 21 deletions

View File

@@ -102,9 +102,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
configMapInformer := scmInformerFactory.Core().V1().ConfigMaps()
serviceInformer := scmInformerFactory.Core().V1().Services()
go podInformerFactory.Start(ctx.Done())
go scmInformerFactory.Start(ctx.Done())
rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister())
if err != nil {
return errors.Wrap(err, "could not create resource manager")
@@ -194,6 +191,9 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
return errors.Wrap(err, "error setting up pod controller")
}
go podInformerFactory.Start(ctx.Done())
go scmInformerFactory.Start(ctx.Done())
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig)
if err != nil {
return err

View File

@@ -94,6 +94,8 @@ type PodController struct {
client corev1client.PodsGetter
resourceManager *manager.ResourceManager
k8sQ workqueue.RateLimitingInterface
}
// PodControllerConfig is used to configure a new PodController.
@@ -147,7 +149,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
return nil, pkgerrors.Wrap(err, "could not create resource manager")
}
return &PodController{
pc := &PodController{
client: cfg.PodClient,
podsInformer: cfg.PodInformer,
podsLister: cfg.PodInformer.Lister(),
@@ -155,19 +157,8 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
resourceManager: rm,
ready: make(chan struct{}),
recorder: cfg.EventRecorder,
}, nil
}
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
k8sQ := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes")
defer k8sQ.ShutDown()
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
pc.runSyncFromProvider(ctx, podStatusQueue)
defer podStatusQueue.ShutDown()
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
}
// Set up event handlers for when Pod resources change.
pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -175,7 +166,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.L.Error(err)
} else {
k8sQ.AddRateLimited(key)
pc.k8sQ.AddRateLimited(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
@@ -194,18 +185,31 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.L.Error(err)
} else {
k8sQ.AddRateLimited(key)
pc.k8sQ.AddRateLimited(key)
}
},
DeleteFunc: func(pod interface{}) {
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
log.L.Error(err)
} else {
k8sQ.AddRateLimited(key)
pc.k8sQ.AddRateLimited(key)
}
},
})
return pc, nil
}
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
defer pc.k8sQ.ShutDown()
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
pc.runSyncFromProvider(ctx, podStatusQueue)
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
defer podStatusQueue.ShutDown()
// Wait for the caches to be synced *before* starting workers.
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
return pkgerrors.New("failed to wait for caches to sync")
@@ -221,7 +225,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
for id := 0; id < podSyncWorkers; id++ {
go wait.Until(func() {
// Use the worker's "index" as its ID so we can use it for tracing.
pc.runWorker(ctx, strconv.Itoa(id), k8sQ)
pc.runWorker(ctx, strconv.Itoa(id), pc.k8sQ)
}, time.Second, ctx.Done())
}