Merge pull request #866 from cpuguy83/add_event_filter_support
Support custom filter for pod event handlers
This commit is contained in:
30
node/nodeutil/filter.go
Normal file
30
node/nodeutil/filter.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package nodeutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// FilterPodsForNodeName creates an event filter function that filters pod events such that pod.Sepc.NodeName matches the provided name
|
||||
// Use the return value of this as the PodEventFilterFunc in PodControllerConfig
|
||||
func FilterPodsForNodeName(name string) node.PodEventFilterFunc {
|
||||
return func(_ context.Context, p *v1.Pod) bool {
|
||||
return p.Spec.NodeName == name
|
||||
}
|
||||
}
|
||||
|
||||
// PodFilters turns a list of pod filters into a single filter.
|
||||
// When run, each item in the list is itterated in order until the first `true` result.
|
||||
// If nothing returns true, the filter is false.
|
||||
func PodFilters(filters ...node.PodEventFilterFunc) node.PodEventFilterFunc {
|
||||
return func(ctx context.Context, p *v1.Pod) bool {
|
||||
for _, f := range filters {
|
||||
if f(ctx, p) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
17
node/nodeutil/filter_test.go
Normal file
17
node/nodeutil/filter_test.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package nodeutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"gotest.tools/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
func TestFilterPodsForNodeName(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
// Doesn't match pod with wrong name
|
||||
assert.Check(t, !FilterPodsForNodeName(t.Name())(ctx, &v1.Pod{Spec: v1.PodSpec{NodeName: "NotOurNode"}}))
|
||||
// Match pod with wrong name
|
||||
assert.Check(t, FilterPodsForNodeName(t.Name())(ctx, &v1.Pod{Spec: v1.PodSpec{NodeName: t.Name()}}))
|
||||
}
|
||||
@@ -88,6 +88,12 @@ type PodNotifier interface {
|
||||
NotifyPods(context.Context, func(*corev1.Pod))
|
||||
}
|
||||
|
||||
// PodEventFilterFunc is used to filter pod events received from Kubernetes.
|
||||
//
|
||||
// Filters that return true means the event handler will be run
|
||||
// Filters that return false means the filter will *not* be run.
|
||||
type PodEventFilterFunc func(context.Context, *corev1.Pod) bool
|
||||
|
||||
// PodController is the controller implementation for Pod resources.
|
||||
type PodController struct {
|
||||
provider PodLifecycleHandler
|
||||
@@ -115,6 +121,8 @@ type PodController struct {
|
||||
// (derived from Kubernetes' cache library) -> a *knownPod struct.
|
||||
knownPods sync.Map
|
||||
|
||||
podEventFilterFunc PodEventFilterFunc
|
||||
|
||||
// ready is a channel which will be closed once the pod controller is fully up and running.
|
||||
// this channel will never be closed if there is an error on startup.
|
||||
ready chan struct{}
|
||||
@@ -148,6 +156,8 @@ type PodControllerConfig struct {
|
||||
|
||||
// PodInformer is used as a local cache for pods
|
||||
// This should be configured to only look at pods scheduled to the node which the controller will be managing
|
||||
// If the informer does not filter based on node, then you must provide a `PodEventFilterFunc` parameter so event handlers
|
||||
// can filter pods not assigned to this node.
|
||||
PodInformer corev1informers.PodInformer
|
||||
|
||||
EventRecorder record.EventRecorder
|
||||
@@ -156,7 +166,7 @@ type PodControllerConfig struct {
|
||||
|
||||
// Informers used for filling details for things like downward API in pod spec.
|
||||
//
|
||||
// We are using informers here instead of listeners because we'll need the
|
||||
// We are using informers here instead of listers because we'll need the
|
||||
// informer for certain features (like notifications for updated ConfigMaps)
|
||||
ConfigMapInformer corev1informers.ConfigMapInformer
|
||||
SecretInformer corev1informers.SecretInformer
|
||||
@@ -164,6 +174,13 @@ type PodControllerConfig struct {
|
||||
|
||||
// RateLimiter defines the rate limit of work queue
|
||||
RateLimiter workqueue.RateLimiter
|
||||
|
||||
// Add custom filtering for pod informer event handlers
|
||||
// Use this for cases where the pod informer handles more than pods assigned to this node
|
||||
//
|
||||
// For example, if the pod informer is not filtering based on pod.Spec.NodeName, you should
|
||||
// set that filter here so the pod controller does not handle events for pods assigned to other nodes.
|
||||
PodEventFilterFunc PodEventFilterFunc
|
||||
}
|
||||
|
||||
// NewPodController creates a new pod controller with the provided config.
|
||||
@@ -198,17 +215,18 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
||||
}
|
||||
|
||||
pc := &PodController{
|
||||
client: cfg.PodClient,
|
||||
podsInformer: cfg.PodInformer,
|
||||
podsLister: cfg.PodInformer.Lister(),
|
||||
provider: cfg.Provider,
|
||||
resourceManager: rm,
|
||||
ready: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
recorder: cfg.EventRecorder,
|
||||
k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"),
|
||||
deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"),
|
||||
podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"),
|
||||
client: cfg.PodClient,
|
||||
podsInformer: cfg.PodInformer,
|
||||
podsLister: cfg.PodInformer.Lister(),
|
||||
provider: cfg.Provider,
|
||||
resourceManager: rm,
|
||||
ready: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
recorder: cfg.EventRecorder,
|
||||
k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"),
|
||||
deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"),
|
||||
podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"),
|
||||
podEventFilterFunc: cfg.PodEventFilterFunc,
|
||||
}
|
||||
|
||||
return pc, nil
|
||||
@@ -267,7 +285,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
||||
// Set up event handlers for when Pod resources change. Since the pod cache is in-sync, the informer will generate
|
||||
// synthetic add events at this point. It again avoids the race condition of adding handlers while the cache is
|
||||
// syncing.
|
||||
pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
|
||||
var eventHandler cache.ResourceEventHandler = cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(pod interface{}) {
|
||||
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
||||
log.G(ctx).Error(err)
|
||||
@@ -300,7 +319,22 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
||||
pc.deletionQ.Forget(key)
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
if pc.podEventFilterFunc != nil {
|
||||
eventHandler = cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
p, ok := obj.(*corev1.Pod)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return pc.podEventFilterFunc(ctx, p)
|
||||
},
|
||||
Handler: eventHandler,
|
||||
}
|
||||
}
|
||||
|
||||
pc.podsInformer.Informer().AddEventHandler(eventHandler)
|
||||
|
||||
// Perform a reconciliation step that deletes any dangling pods from the provider.
|
||||
// This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis.
|
||||
|
||||
@@ -90,3 +90,58 @@ func TestCompareLabels(t *testing.T) {
|
||||
}
|
||||
assert.Assert(t, !podsEffectivelyEqual(p1, p2))
|
||||
}
|
||||
|
||||
// TestPodEventFilter ensure that pod filters are run for each event
|
||||
func TestPodEventFilter(t *testing.T) {
|
||||
tc := newTestController()
|
||||
|
||||
wait := make(chan struct{}, 3)
|
||||
tc.podEventFilterFunc = func(_ context.Context, pod *corev1.Pod) bool {
|
||||
wait <- struct{}{}
|
||||
return true
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go tc.Run(ctx, 1)
|
||||
|
||||
ctxT, cancelT := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancelT()
|
||||
|
||||
select {
|
||||
case <-ctxT.Done():
|
||||
t.Fatal(ctxT.Err())
|
||||
case <-tc.Done():
|
||||
t.Fatal(tc.Err())
|
||||
case <-tc.Ready():
|
||||
}
|
||||
|
||||
pod := &corev1.Pod{}
|
||||
pod.ObjectMeta.Namespace = "default"
|
||||
pod.ObjectMeta.Name = "nginx"
|
||||
pod.Spec = newPodSpec()
|
||||
|
||||
podC := tc.client.CoreV1().Pods(testNamespace)
|
||||
|
||||
_, err := podC.Create(ctx, pod, metav1.CreateOptions{})
|
||||
assert.NilError(t, err)
|
||||
|
||||
pod.Annotations = map[string]string{"updated": "true"}
|
||||
_, err = podC.Update(ctx, pod, metav1.UpdateOptions{})
|
||||
assert.NilError(t, err)
|
||||
|
||||
err = podC.Delete(ctx, pod.Name, metav1.DeleteOptions{})
|
||||
assert.NilError(t, err)
|
||||
|
||||
ctxT, cancelT = context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancelT()
|
||||
for i := 0; i < 3; i++ {
|
||||
// check that the event filter fires
|
||||
select {
|
||||
case <-ctxT.Done():
|
||||
t.Fatal(ctxT.Err())
|
||||
case <-wait:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user