Move location of eventhandler registration
This moves the event handler registration until after the cache is in-sync. It makes it so we can use the log object from the context, rather than having to use the global logger The cache race condition of the cache starting while the reactor is being added wont exist because we wait for the cache to startup / go in sync prior to adding it.
This commit is contained in:
@@ -173,11 +173,31 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
||||
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
|
||||
}
|
||||
|
||||
// Set up event handlers for when Pod resources change.
|
||||
return pc, nil
|
||||
}
|
||||
|
||||
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
|
||||
// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
|
||||
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
|
||||
defer pc.k8sQ.ShutDown()
|
||||
|
||||
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
|
||||
pc.runSyncFromProvider(ctx, podStatusQueue)
|
||||
defer podStatusQueue.ShutDown()
|
||||
|
||||
// Wait for the caches to be synced *before* starting to do work.
|
||||
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
|
||||
return pkgerrors.New("failed to wait for caches to sync")
|
||||
}
|
||||
log.G(ctx).Info("Pod cache in-sync")
|
||||
|
||||
// Set up event handlers for when Pod resources change. Since the pod cache is in-sync, the informer will generate
|
||||
// synthetic add events at this point. It again avoids the race condition of adding handlers while the cache is
|
||||
// syncing.
|
||||
pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(pod interface{}) {
|
||||
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
||||
log.L.Error(err)
|
||||
log.G(ctx).Error(err)
|
||||
} else {
|
||||
pc.k8sQ.AddRateLimited(key)
|
||||
}
|
||||
@@ -196,45 +216,27 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
||||
}
|
||||
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
|
||||
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
|
||||
log.L.Error(err)
|
||||
log.G(ctx).Error(err)
|
||||
} else {
|
||||
pc.k8sQ.AddRateLimited(key)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(pod interface{}) {
|
||||
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
|
||||
log.L.Error(err)
|
||||
log.G(ctx).Error(err)
|
||||
} else {
|
||||
pc.k8sQ.AddRateLimited(key)
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
return pc, nil
|
||||
}
|
||||
|
||||
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
|
||||
// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
|
||||
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
|
||||
defer pc.k8sQ.ShutDown()
|
||||
|
||||
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
|
||||
pc.runSyncFromProvider(ctx, podStatusQueue)
|
||||
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
|
||||
defer podStatusQueue.ShutDown()
|
||||
|
||||
// Wait for the caches to be synced *before* starting workers.
|
||||
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
|
||||
return pkgerrors.New("failed to wait for caches to sync")
|
||||
}
|
||||
log.G(ctx).Info("Pod cache in-sync")
|
||||
|
||||
// Perform a reconciliation step that deletes any dangling pods from the provider.
|
||||
// This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis.
|
||||
// If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried.
|
||||
pc.deleteDanglingPods(ctx, podSyncWorkers)
|
||||
|
||||
log.G(ctx).Info("starting workers")
|
||||
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
|
||||
for id := 0; id < podSyncWorkers; id++ {
|
||||
workerID := strconv.Itoa(id)
|
||||
go wait.Until(func() {
|
||||
|
||||
Reference in New Issue
Block a user