From c0296b99fd5ae25f50d0c6d7da0866e8950c6c60 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Tue, 28 Jul 2020 15:07:32 -0700 Subject: [PATCH] Support custom filter for pod event handlers This allows users who have a shared informer that is *not* filtering on node name to supply a filter for event handlers to ensure events do not fire for pods not scheduled to the node. Signed-off-by: Brian Goff --- node/nodeutil/filter.go | 30 +++++++++++++++++ node/nodeutil/filter_test.go | 17 ++++++++++ node/podcontroller.go | 62 ++++++++++++++++++++++++++++-------- node/podcontroller_test.go | 55 ++++++++++++++++++++++++++++++++ 4 files changed, 150 insertions(+), 14 deletions(-) create mode 100644 node/nodeutil/filter.go create mode 100644 node/nodeutil/filter_test.go diff --git a/node/nodeutil/filter.go b/node/nodeutil/filter.go new file mode 100644 index 000000000..026353f3c --- /dev/null +++ b/node/nodeutil/filter.go @@ -0,0 +1,30 @@ +package nodeutil + +import ( + "context" + + "github.com/virtual-kubelet/virtual-kubelet/node" + v1 "k8s.io/api/core/v1" +) + +// FilterPodsForNodeName creates an event filter function that filters pod events such that pod.Sepc.NodeName matches the provided name +// Use the return value of this as the PodEventFilterFunc in PodControllerConfig +func FilterPodsForNodeName(name string) node.PodEventFilterFunc { + return func(_ context.Context, p *v1.Pod) bool { + return p.Spec.NodeName == name + } +} + +// PodFilters turns a list of pod filters into a single filter. +// When run, each item in the list is itterated in order until the first `true` result. +// If nothing returns true, the filter is false. +func PodFilters(filters ...node.PodEventFilterFunc) node.PodEventFilterFunc { + return func(ctx context.Context, p *v1.Pod) bool { + for _, f := range filters { + if f(ctx, p) { + return true + } + } + return false + } +} diff --git a/node/nodeutil/filter_test.go b/node/nodeutil/filter_test.go new file mode 100644 index 000000000..992de8525 --- /dev/null +++ b/node/nodeutil/filter_test.go @@ -0,0 +1,17 @@ +package nodeutil + +import ( + "context" + "testing" + + "gotest.tools/assert" + v1 "k8s.io/api/core/v1" +) + +func TestFilterPodsForNodeName(t *testing.T) { + ctx := context.Background() + // Doesn't match pod with wrong name + assert.Check(t, !FilterPodsForNodeName(t.Name())(ctx, &v1.Pod{Spec: v1.PodSpec{NodeName: "NotOurNode"}})) + // Match pod with wrong name + assert.Check(t, FilterPodsForNodeName(t.Name())(ctx, &v1.Pod{Spec: v1.PodSpec{NodeName: t.Name()}})) +} diff --git a/node/podcontroller.go b/node/podcontroller.go index fe4902271..ca03c57cb 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -88,6 +88,12 @@ type PodNotifier interface { NotifyPods(context.Context, func(*corev1.Pod)) } +// PodEventFilterFunc is used to filter pod events received from Kubernetes. +// +// Filters that return true means the event handler will be run +// Filters that return false means the filter will *not* be run. +type PodEventFilterFunc func(context.Context, *corev1.Pod) bool + // PodController is the controller implementation for Pod resources. type PodController struct { provider PodLifecycleHandler @@ -115,6 +121,8 @@ type PodController struct { // (derived from Kubernetes' cache library) -> a *knownPod struct. knownPods sync.Map + podEventFilterFunc PodEventFilterFunc + // ready is a channel which will be closed once the pod controller is fully up and running. // this channel will never be closed if there is an error on startup. ready chan struct{} @@ -148,6 +156,8 @@ type PodControllerConfig struct { // PodInformer is used as a local cache for pods // This should be configured to only look at pods scheduled to the node which the controller will be managing + // If the informer does not filter based on node, then you must provide a `PodEventFilterFunc` parameter so event handlers + // can filter pods not assigned to this node. PodInformer corev1informers.PodInformer EventRecorder record.EventRecorder @@ -156,7 +166,7 @@ type PodControllerConfig struct { // Informers used for filling details for things like downward API in pod spec. // - // We are using informers here instead of listeners because we'll need the + // We are using informers here instead of listers because we'll need the // informer for certain features (like notifications for updated ConfigMaps) ConfigMapInformer corev1informers.ConfigMapInformer SecretInformer corev1informers.SecretInformer @@ -164,6 +174,13 @@ type PodControllerConfig struct { // RateLimiter defines the rate limit of work queue RateLimiter workqueue.RateLimiter + + // Add custom filtering for pod informer event handlers + // Use this for cases where the pod informer handles more than pods assigned to this node + // + // For example, if the pod informer is not filtering based on pod.Spec.NodeName, you should + // set that filter here so the pod controller does not handle events for pods assigned to other nodes. + PodEventFilterFunc PodEventFilterFunc } // NewPodController creates a new pod controller with the provided config. @@ -198,17 +215,18 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { } pc := &PodController{ - client: cfg.PodClient, - podsInformer: cfg.PodInformer, - podsLister: cfg.PodInformer.Lister(), - provider: cfg.Provider, - resourceManager: rm, - ready: make(chan struct{}), - done: make(chan struct{}), - recorder: cfg.EventRecorder, - k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"), - deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"), - podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"), + client: cfg.PodClient, + podsInformer: cfg.PodInformer, + podsLister: cfg.PodInformer.Lister(), + provider: cfg.Provider, + resourceManager: rm, + ready: make(chan struct{}), + done: make(chan struct{}), + recorder: cfg.EventRecorder, + k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"), + deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"), + podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"), + podEventFilterFunc: cfg.PodEventFilterFunc, } return pc, nil @@ -267,7 +285,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er // Set up event handlers for when Pod resources change. Since the pod cache is in-sync, the informer will generate // synthetic add events at this point. It again avoids the race condition of adding handlers while the cache is // syncing. - pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + + var eventHandler cache.ResourceEventHandler = cache.ResourceEventHandlerFuncs{ AddFunc: func(pod interface{}) { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { log.G(ctx).Error(err) @@ -300,7 +319,22 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er pc.deletionQ.Forget(key) } }, - }) + } + + if pc.podEventFilterFunc != nil { + eventHandler = cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + p, ok := obj.(*corev1.Pod) + if !ok { + return false + } + return pc.podEventFilterFunc(ctx, p) + }, + Handler: eventHandler, + } + } + + pc.podsInformer.Informer().AddEventHandler(eventHandler) // Perform a reconciliation step that deletes any dangling pods from the provider. // This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis. diff --git a/node/podcontroller_test.go b/node/podcontroller_test.go index c87d8c673..f8e986ae7 100644 --- a/node/podcontroller_test.go +++ b/node/podcontroller_test.go @@ -90,3 +90,58 @@ func TestCompareLabels(t *testing.T) { } assert.Assert(t, !podsEffectivelyEqual(p1, p2)) } + +// TestPodEventFilter ensure that pod filters are run for each event +func TestPodEventFilter(t *testing.T) { + tc := newTestController() + + wait := make(chan struct{}, 3) + tc.podEventFilterFunc = func(_ context.Context, pod *corev1.Pod) bool { + wait <- struct{}{} + return true + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go tc.Run(ctx, 1) + + ctxT, cancelT := context.WithTimeout(ctx, 30*time.Second) + defer cancelT() + + select { + case <-ctxT.Done(): + t.Fatal(ctxT.Err()) + case <-tc.Done(): + t.Fatal(tc.Err()) + case <-tc.Ready(): + } + + pod := &corev1.Pod{} + pod.ObjectMeta.Namespace = "default" + pod.ObjectMeta.Name = "nginx" + pod.Spec = newPodSpec() + + podC := tc.client.CoreV1().Pods(testNamespace) + + _, err := podC.Create(ctx, pod, metav1.CreateOptions{}) + assert.NilError(t, err) + + pod.Annotations = map[string]string{"updated": "true"} + _, err = podC.Update(ctx, pod, metav1.UpdateOptions{}) + assert.NilError(t, err) + + err = podC.Delete(ctx, pod.Name, metav1.DeleteOptions{}) + assert.NilError(t, err) + + ctxT, cancelT = context.WithTimeout(ctx, 30*time.Second) + defer cancelT() + for i := 0; i < 3; i++ { + // check that the event filter fires + select { + case <-ctxT.Done(): + t.Fatal(ctxT.Err()) + case <-wait: + } + } +}