Support custom filter for pod event handlers
This allows users who have a shared informer that is *not* filtering on node name to supply a filter for event handlers to ensure events do not fire for pods not scheduled to the node. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
@@ -88,6 +88,12 @@ type PodNotifier interface {
|
||||
NotifyPods(context.Context, func(*corev1.Pod))
|
||||
}
|
||||
|
||||
// PodEventFilterFunc is used to filter pod events received from Kubernetes.
|
||||
//
|
||||
// Filters that return true means the event handler will be run
|
||||
// Filters that return false means the filter will *not* be run.
|
||||
type PodEventFilterFunc func(context.Context, *corev1.Pod) bool
|
||||
|
||||
// PodController is the controller implementation for Pod resources.
|
||||
type PodController struct {
|
||||
provider PodLifecycleHandler
|
||||
@@ -115,6 +121,8 @@ type PodController struct {
|
||||
// (derived from Kubernetes' cache library) -> a *knownPod struct.
|
||||
knownPods sync.Map
|
||||
|
||||
podEventFilterFunc PodEventFilterFunc
|
||||
|
||||
// ready is a channel which will be closed once the pod controller is fully up and running.
|
||||
// this channel will never be closed if there is an error on startup.
|
||||
ready chan struct{}
|
||||
@@ -148,6 +156,8 @@ type PodControllerConfig struct {
|
||||
|
||||
// PodInformer is used as a local cache for pods
|
||||
// This should be configured to only look at pods scheduled to the node which the controller will be managing
|
||||
// If the informer does not filter based on node, then you must provide a `PodEventFilterFunc` parameter so event handlers
|
||||
// can filter pods not assigned to this node.
|
||||
PodInformer corev1informers.PodInformer
|
||||
|
||||
EventRecorder record.EventRecorder
|
||||
@@ -156,7 +166,7 @@ type PodControllerConfig struct {
|
||||
|
||||
// Informers used for filling details for things like downward API in pod spec.
|
||||
//
|
||||
// We are using informers here instead of listeners because we'll need the
|
||||
// We are using informers here instead of listers because we'll need the
|
||||
// informer for certain features (like notifications for updated ConfigMaps)
|
||||
ConfigMapInformer corev1informers.ConfigMapInformer
|
||||
SecretInformer corev1informers.SecretInformer
|
||||
@@ -164,6 +174,13 @@ type PodControllerConfig struct {
|
||||
|
||||
// RateLimiter defines the rate limit of work queue
|
||||
RateLimiter workqueue.RateLimiter
|
||||
|
||||
// Add custom filtering for pod informer event handlers
|
||||
// Use this for cases where the pod informer handles more than pods assigned to this node
|
||||
//
|
||||
// For example, if the pod informer is not filtering based on pod.Spec.NodeName, you should
|
||||
// set that filter here so the pod controller does not handle events for pods assigned to other nodes.
|
||||
PodEventFilterFunc PodEventFilterFunc
|
||||
}
|
||||
|
||||
// NewPodController creates a new pod controller with the provided config.
|
||||
@@ -198,17 +215,18 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
||||
}
|
||||
|
||||
pc := &PodController{
|
||||
client: cfg.PodClient,
|
||||
podsInformer: cfg.PodInformer,
|
||||
podsLister: cfg.PodInformer.Lister(),
|
||||
provider: cfg.Provider,
|
||||
resourceManager: rm,
|
||||
ready: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
recorder: cfg.EventRecorder,
|
||||
k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"),
|
||||
deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"),
|
||||
podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"),
|
||||
client: cfg.PodClient,
|
||||
podsInformer: cfg.PodInformer,
|
||||
podsLister: cfg.PodInformer.Lister(),
|
||||
provider: cfg.Provider,
|
||||
resourceManager: rm,
|
||||
ready: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
recorder: cfg.EventRecorder,
|
||||
k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"),
|
||||
deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"),
|
||||
podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"),
|
||||
podEventFilterFunc: cfg.PodEventFilterFunc,
|
||||
}
|
||||
|
||||
return pc, nil
|
||||
@@ -267,7 +285,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
||||
// Set up event handlers for when Pod resources change. Since the pod cache is in-sync, the informer will generate
|
||||
// synthetic add events at this point. It again avoids the race condition of adding handlers while the cache is
|
||||
// syncing.
|
||||
pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
|
||||
var eventHandler cache.ResourceEventHandler = cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(pod interface{}) {
|
||||
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
||||
log.G(ctx).Error(err)
|
||||
@@ -300,7 +319,22 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
||||
pc.deletionQ.Forget(key)
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
if pc.podEventFilterFunc != nil {
|
||||
eventHandler = cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
p, ok := obj.(*corev1.Pod)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return pc.podEventFilterFunc(ctx, p)
|
||||
},
|
||||
Handler: eventHandler,
|
||||
}
|
||||
}
|
||||
|
||||
pc.podsInformer.Informer().AddEventHandler(eventHandler)
|
||||
|
||||
// Perform a reconciliation step that deletes any dangling pods from the provider.
|
||||
// This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis.
|
||||
|
||||
Reference in New Issue
Block a user