Setup event handler at Pod Controller creation time

This seems to avoid a race conditions where at pod informer
startup time, the reactor doesn't properly get setup.

It also refactors the root command example to start up
the informers after everything is wired up.
This commit is contained in:
Sargun Dhillon
2019-07-26 13:38:06 -07:00
parent 28dac027ce
commit 4d60fc2049
2 changed files with 25 additions and 21 deletions

View File

@@ -102,9 +102,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
configMapInformer := scmInformerFactory.Core().V1().ConfigMaps()
serviceInformer := scmInformerFactory.Core().V1().Services()
go podInformerFactory.Start(ctx.Done())
go scmInformerFactory.Start(ctx.Done())
rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister())
if err != nil {
return errors.Wrap(err, "could not create resource manager")
@@ -194,6 +191,9 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
return errors.Wrap(err, "error setting up pod controller")
}
go podInformerFactory.Start(ctx.Done())
go scmInformerFactory.Start(ctx.Done())
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig)
if err != nil {
return err

View File

@@ -94,6 +94,8 @@ type PodController struct {
client corev1client.PodsGetter
resourceManager *manager.ResourceManager
k8sQ workqueue.RateLimitingInterface
}
// PodControllerConfig is used to configure a new PodController.
@@ -147,7 +149,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
return nil, pkgerrors.Wrap(err, "could not create resource manager")
}
return &PodController{
pc := &PodController{
client: cfg.PodClient,
podsInformer: cfg.PodInformer,
podsLister: cfg.PodInformer.Lister(),
@@ -155,19 +157,8 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
resourceManager: rm,
ready: make(chan struct{}),
recorder: cfg.EventRecorder,
}, nil
}
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
k8sQ := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes")
defer k8sQ.ShutDown()
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
pc.runSyncFromProvider(ctx, podStatusQueue)
defer podStatusQueue.ShutDown()
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
}
// Set up event handlers for when Pod resources change.
pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -175,7 +166,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.L.Error(err)
} else {
k8sQ.AddRateLimited(key)
pc.k8sQ.AddRateLimited(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
@@ -194,18 +185,31 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.L.Error(err)
} else {
k8sQ.AddRateLimited(key)
pc.k8sQ.AddRateLimited(key)
}
},
DeleteFunc: func(pod interface{}) {
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
log.L.Error(err)
} else {
k8sQ.AddRateLimited(key)
pc.k8sQ.AddRateLimited(key)
}
},
})
return pc, nil
}
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
defer pc.k8sQ.ShutDown()
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
pc.runSyncFromProvider(ctx, podStatusQueue)
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
defer podStatusQueue.ShutDown()
// Wait for the caches to be synced *before* starting workers.
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
return pkgerrors.New("failed to wait for caches to sync")
@@ -221,7 +225,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
for id := 0; id < podSyncWorkers; id++ {
go wait.Until(func() {
// Use the worker's "index" as its ID so we can use it for tracing.
pc.runWorker(ctx, strconv.Itoa(id), k8sQ)
pc.runWorker(ctx, strconv.Itoa(id), pc.k8sQ)
}, time.Second, ctx.Done())
}