Merge pull request #732 from sargun/move-around-reactor
Move location of eventhandler registration
This commit is contained in:
@@ -173,11 +173,31 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
|||||||
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
|
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set up event handlers for when Pod resources change.
|
return pc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
|
||||||
|
// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
|
||||||
|
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
|
||||||
|
defer pc.k8sQ.ShutDown()
|
||||||
|
|
||||||
|
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
|
||||||
|
pc.runSyncFromProvider(ctx, podStatusQueue)
|
||||||
|
defer podStatusQueue.ShutDown()
|
||||||
|
|
||||||
|
// Wait for the caches to be synced *before* starting to do work.
|
||||||
|
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
|
||||||
|
return pkgerrors.New("failed to wait for caches to sync")
|
||||||
|
}
|
||||||
|
log.G(ctx).Info("Pod cache in-sync")
|
||||||
|
|
||||||
|
// Set up event handlers for when Pod resources change. Since the pod cache is in-sync, the informer will generate
|
||||||
|
// synthetic add events at this point. It again avoids the race condition of adding handlers while the cache is
|
||||||
|
// syncing.
|
||||||
pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: func(pod interface{}) {
|
AddFunc: func(pod interface{}) {
|
||||||
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
||||||
log.L.Error(err)
|
log.G(ctx).Error(err)
|
||||||
} else {
|
} else {
|
||||||
pc.k8sQ.AddRateLimited(key)
|
pc.k8sQ.AddRateLimited(key)
|
||||||
}
|
}
|
||||||
@@ -196,45 +216,27 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
|||||||
}
|
}
|
||||||
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
|
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
|
||||||
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
|
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
|
||||||
log.L.Error(err)
|
log.G(ctx).Error(err)
|
||||||
} else {
|
} else {
|
||||||
pc.k8sQ.AddRateLimited(key)
|
pc.k8sQ.AddRateLimited(key)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
DeleteFunc: func(pod interface{}) {
|
DeleteFunc: func(pod interface{}) {
|
||||||
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
|
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
|
||||||
log.L.Error(err)
|
log.G(ctx).Error(err)
|
||||||
} else {
|
} else {
|
||||||
pc.k8sQ.AddRateLimited(key)
|
pc.k8sQ.AddRateLimited(key)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
return pc, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
|
|
||||||
// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
|
|
||||||
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
|
|
||||||
defer pc.k8sQ.ShutDown()
|
|
||||||
|
|
||||||
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
|
|
||||||
pc.runSyncFromProvider(ctx, podStatusQueue)
|
|
||||||
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
|
|
||||||
defer podStatusQueue.ShutDown()
|
|
||||||
|
|
||||||
// Wait for the caches to be synced *before* starting workers.
|
|
||||||
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
|
|
||||||
return pkgerrors.New("failed to wait for caches to sync")
|
|
||||||
}
|
|
||||||
log.G(ctx).Info("Pod cache in-sync")
|
|
||||||
|
|
||||||
// Perform a reconciliation step that deletes any dangling pods from the provider.
|
// Perform a reconciliation step that deletes any dangling pods from the provider.
|
||||||
// This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis.
|
// This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis.
|
||||||
// If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried.
|
// If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried.
|
||||||
pc.deleteDanglingPods(ctx, podSyncWorkers)
|
pc.deleteDanglingPods(ctx, podSyncWorkers)
|
||||||
|
|
||||||
log.G(ctx).Info("starting workers")
|
log.G(ctx).Info("starting workers")
|
||||||
|
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
|
||||||
for id := 0; id < podSyncWorkers; id++ {
|
for id := 0; id < podSyncWorkers; id++ {
|
||||||
workerID := strconv.Itoa(id)
|
workerID := strconv.Itoa(id)
|
||||||
go wait.Until(func() {
|
go wait.Until(func() {
|
||||||
|
|||||||
Reference in New Issue
Block a user