From 750de3195ddb54c5a8398e76d90e6a3e660f03ef Mon Sep 17 00:00:00 2001 From: Yash Desai Date: Wed, 3 Apr 2019 11:19:30 -0700 Subject: [PATCH] Resource manager: add service lister and remove unused lock. (#559) * Remove unused lock from the resource manager. * Add service lister to the resource manager. This change adds a service lister in the resource manager. This will be used to set the service env vars. Also added a List method to the resource manager and a simple test to confirm it's a pass through. --- cmd/root.go | 5 ++++- manager/resource.go | 13 +++++++----- manager/resource_test.go | 38 ++++++++++++++++++++++++++++++++--- providers/azure/aci_test.go | 2 +- test/util/kubernetes.go | 10 +++++++++ test/util/resource_manager.go | 8 +++++--- 6 files changed, 63 insertions(+), 13 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 4e10a0fcc..cac569f80 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -312,8 +312,11 @@ func initConfig() { secretInformer := scmInformerFactory.Core().V1().Secrets() configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() + // Create a service informer so we can pass its lister to the resource manager. + serviceInformer := scmInformerFactory.Core().V1().Services() + // Create a new instance of the resource manager that uses the listers above for pods, secrets and config maps. - rm, err = manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister()) + rm, err = manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister()) if err != nil { logger.WithError(err).Fatal("Error initializing resource manager") } diff --git a/manager/resource.go b/manager/resource.go index 45f1f5919..e8d220afc 100644 --- a/manager/resource.go +++ b/manager/resource.go @@ -1,8 +1,6 @@ package manager import ( - "sync" - "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" corev1listers "k8s.io/client-go/listers/core/v1" @@ -13,19 +11,19 @@ import ( // ResourceManager acts as a passthrough to a cache (lister) for pods assigned to the current node. // It is also a passthrough to a cache (lister) for Kubernetes secrets and config maps. type ResourceManager struct { - sync.RWMutex - podLister corev1listers.PodLister secretLister corev1listers.SecretLister configMapLister corev1listers.ConfigMapLister + serviceLister corev1listers.ServiceLister } // NewResourceManager returns a ResourceManager with the internal maps initialized. -func NewResourceManager(podLister corev1listers.PodLister, secretLister corev1listers.SecretLister, configMapLister corev1listers.ConfigMapLister) (*ResourceManager, error) { +func NewResourceManager(podLister corev1listers.PodLister, secretLister corev1listers.SecretLister, configMapLister corev1listers.ConfigMapLister, serviceLister corev1listers.ServiceLister) (*ResourceManager, error) { rm := ResourceManager{ podLister: podLister, secretLister: secretLister, configMapLister: configMapLister, + serviceLister: serviceLister, } return &rm, nil } @@ -49,3 +47,8 @@ func (rm *ResourceManager) GetConfigMap(name, namespace string) (*v1.ConfigMap, func (rm *ResourceManager) GetSecret(name, namespace string) (*v1.Secret, error) { return rm.secretLister.Secrets(namespace).Get(name) } + +// ListServices retrieves the list of services from Kubernetes. +func (rm *ResourceManager) ListServices() ([]*v1.Service, error) { + return rm.serviceLister.List(labels.Everything()) +} diff --git a/manager/resource_test.go b/manager/resource_test.go index 5628413bd..0c4ae235f 100644 --- a/manager/resource_test.go +++ b/manager/resource_test.go @@ -29,7 +29,7 @@ func TestGetPods(t *testing.T) { podLister := corev1listers.NewPodLister(indexer) // Create a new instance of the resource manager based on the pod lister. - rm, err := manager.NewResourceManager(podLister, nil, nil) + rm, err := manager.NewResourceManager(podLister, nil, nil, nil) if err != nil { t.Fatal(err) } @@ -58,7 +58,7 @@ func TestGetSecret(t *testing.T) { secretLister := corev1listers.NewSecretLister(indexer) // Create a new instance of the resource manager based on the secret lister. - rm, err := manager.NewResourceManager(nil, secretLister, nil) + rm, err := manager.NewResourceManager(nil, secretLister, nil, nil) if err != nil { t.Fatal(err) } @@ -97,7 +97,7 @@ func TestGetConfigMap(t *testing.T) { configMapLister := corev1listers.NewConfigMapLister(indexer) // Create a new instance of the resource manager based on the config map lister. - rm, err := manager.NewResourceManager(nil, nil, configMapLister) + rm, err := manager.NewResourceManager(nil, nil, configMapLister, nil) if err != nil { t.Fatal(err) } @@ -118,3 +118,35 @@ func TestGetConfigMap(t *testing.T) { t.Fatalf("expected a 'not found' error, got %v", err) } } + +// TestListServices verifies that the resource manager acts as a passthrough to a service lister. +func TestListServices(t *testing.T) { + var ( + lsServices = []*v1.Service{ + testutil.FakeService("namespace-0", "service-0"), + testutil.FakeService("namespace-1", "service-1"), + } + ) + + // Create a pod lister that will list the pods defined above. + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + for _, service := range lsServices { + indexer.Add(service) + } + serviceLister := corev1listers.NewServiceLister(indexer) + + // Create a new instance of the resource manager based on the pod lister. + rm, err := manager.NewResourceManager(nil, nil, nil, serviceLister) + if err != nil { + t.Fatal(err) + } + + // Check that the resource manager returns two pods in the call to "GetPods". + services, err := rm.ListServices() + if err != nil { + t.Fatal(err) + } + if len(lsServices) != len(services) { + t.Fatalf("expected %d services, found %d", len(lsServices), len(services)) + } +} diff --git a/providers/azure/aci_test.go b/providers/azure/aci_test.go index f663dc356..a626d04e0 100644 --- a/providers/azure/aci_test.go +++ b/providers/azure/aci_test.go @@ -832,7 +832,7 @@ func createTestProvider(aadServerMocker *AADMock, aciServerMocker*ACIMock) (*ACI os.Setenv("ACI_RESOURCE_GROUP", fakeResourceGroup) os.Setenv("ACI_REGION", fakeRegion) - rm, err := manager.NewResourceManager(nil, nil, nil) + rm, err := manager.NewResourceManager(nil, nil, nil, nil) if err != nil { return nil, err diff --git a/test/util/kubernetes.go b/test/util/kubernetes.go index 3fbeae877..c973f5ead 100644 --- a/test/util/kubernetes.go +++ b/test/util/kubernetes.go @@ -54,3 +54,13 @@ func FakeSecret(namespace, name string, data map[string]string) *corev1.Secret { } return res } + +// FakeService returns a service with the specified namespace and name. +func FakeService(namespace, name string) *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + } +} diff --git a/test/util/resource_manager.go b/test/util/resource_manager.go index 65abe43dc..ef8544fb2 100644 --- a/test/util/resource_manager.go +++ b/test/util/resource_manager.go @@ -17,22 +17,24 @@ import ( func FakeResourceManager(objects ...runtime.Object) *manager.ResourceManager { // Create a fake Kubernetes client that will list the specified objects. kubeClient := fake.NewSimpleClientset(objects...) - // Create a shared informer factory from where we can grab informers and listers for pods, configmaps and secrets. + // Create a shared informer factory from where we can grab informers and listers for pods, configmaps, secrets and services. kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, 30*time.Second) // Grab informers for pods, configmaps and secrets. pInformer := kubeInformerFactory.Core().V1().Pods() mInformer := kubeInformerFactory.Core().V1().ConfigMaps() sInformer := kubeInformerFactory.Core().V1().Secrets() + svcInformer := kubeInformerFactory.Core().V1().Services() // Start all the required informers. go pInformer.Informer().Run(wait.NeverStop) go mInformer.Informer().Run(wait.NeverStop) go sInformer.Informer().Run(wait.NeverStop) + go svcInformer.Informer().Run(wait.NeverStop) // Wait for the caches to be synced. - if !cache.WaitForCacheSync(wait.NeverStop, pInformer.Informer().HasSynced, mInformer.Informer().HasSynced, sInformer.Informer().HasSynced) { + if !cache.WaitForCacheSync(wait.NeverStop, pInformer.Informer().HasSynced, mInformer.Informer().HasSynced, sInformer.Informer().HasSynced, svcInformer.Informer().HasSynced) { panic("failed to wait for caches to be synced") } // Create a new instance of the resource manager using the listers for pods, configmaps and secrets. - r, err := manager.NewResourceManager(pInformer.Lister(), sInformer.Lister(), mInformer.Lister()) + r, err := manager.NewResourceManager(pInformer.Lister(), sInformer.Lister(), mInformer.Lister(), svcInformer.Lister()) if err != nil { panic(err) }