Resource manager: add service lister and remove unused lock. (#559)
* Remove unused lock from the resource manager. * Add service lister to the resource manager. This change adds a service lister in the resource manager. This will be used to set the service env vars. Also added a List method to the resource manager and a simple test to confirm it's a pass through.
This commit is contained in:
@@ -312,8 +312,11 @@ func initConfig() {
|
|||||||
secretInformer := scmInformerFactory.Core().V1().Secrets()
|
secretInformer := scmInformerFactory.Core().V1().Secrets()
|
||||||
configMapInformer := scmInformerFactory.Core().V1().ConfigMaps()
|
configMapInformer := scmInformerFactory.Core().V1().ConfigMaps()
|
||||||
|
|
||||||
|
// Create a service informer so we can pass its lister to the resource manager.
|
||||||
|
serviceInformer := scmInformerFactory.Core().V1().Services()
|
||||||
|
|
||||||
// Create a new instance of the resource manager that uses the listers above for pods, secrets and config maps.
|
// Create a new instance of the resource manager that uses the listers above for pods, secrets and config maps.
|
||||||
rm, err = manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister())
|
rm, err = manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Fatal("Error initializing resource manager")
|
logger.WithError(err).Fatal("Error initializing resource manager")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
package manager
|
package manager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||||
@@ -13,19 +11,19 @@ import (
|
|||||||
// ResourceManager acts as a passthrough to a cache (lister) for pods assigned to the current node.
|
// ResourceManager acts as a passthrough to a cache (lister) for pods assigned to the current node.
|
||||||
// It is also a passthrough to a cache (lister) for Kubernetes secrets and config maps.
|
// It is also a passthrough to a cache (lister) for Kubernetes secrets and config maps.
|
||||||
type ResourceManager struct {
|
type ResourceManager struct {
|
||||||
sync.RWMutex
|
|
||||||
|
|
||||||
podLister corev1listers.PodLister
|
podLister corev1listers.PodLister
|
||||||
secretLister corev1listers.SecretLister
|
secretLister corev1listers.SecretLister
|
||||||
configMapLister corev1listers.ConfigMapLister
|
configMapLister corev1listers.ConfigMapLister
|
||||||
|
serviceLister corev1listers.ServiceLister
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewResourceManager returns a ResourceManager with the internal maps initialized.
|
// NewResourceManager returns a ResourceManager with the internal maps initialized.
|
||||||
func NewResourceManager(podLister corev1listers.PodLister, secretLister corev1listers.SecretLister, configMapLister corev1listers.ConfigMapLister) (*ResourceManager, error) {
|
func NewResourceManager(podLister corev1listers.PodLister, secretLister corev1listers.SecretLister, configMapLister corev1listers.ConfigMapLister, serviceLister corev1listers.ServiceLister) (*ResourceManager, error) {
|
||||||
rm := ResourceManager{
|
rm := ResourceManager{
|
||||||
podLister: podLister,
|
podLister: podLister,
|
||||||
secretLister: secretLister,
|
secretLister: secretLister,
|
||||||
configMapLister: configMapLister,
|
configMapLister: configMapLister,
|
||||||
|
serviceLister: serviceLister,
|
||||||
}
|
}
|
||||||
return &rm, nil
|
return &rm, nil
|
||||||
}
|
}
|
||||||
@@ -49,3 +47,8 @@ func (rm *ResourceManager) GetConfigMap(name, namespace string) (*v1.ConfigMap,
|
|||||||
func (rm *ResourceManager) GetSecret(name, namespace string) (*v1.Secret, error) {
|
func (rm *ResourceManager) GetSecret(name, namespace string) (*v1.Secret, error) {
|
||||||
return rm.secretLister.Secrets(namespace).Get(name)
|
return rm.secretLister.Secrets(namespace).Get(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListServices retrieves the list of services from Kubernetes.
|
||||||
|
func (rm *ResourceManager) ListServices() ([]*v1.Service, error) {
|
||||||
|
return rm.serviceLister.List(labels.Everything())
|
||||||
|
}
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ func TestGetPods(t *testing.T) {
|
|||||||
podLister := corev1listers.NewPodLister(indexer)
|
podLister := corev1listers.NewPodLister(indexer)
|
||||||
|
|
||||||
// Create a new instance of the resource manager based on the pod lister.
|
// Create a new instance of the resource manager based on the pod lister.
|
||||||
rm, err := manager.NewResourceManager(podLister, nil, nil)
|
rm, err := manager.NewResourceManager(podLister, nil, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -58,7 +58,7 @@ func TestGetSecret(t *testing.T) {
|
|||||||
secretLister := corev1listers.NewSecretLister(indexer)
|
secretLister := corev1listers.NewSecretLister(indexer)
|
||||||
|
|
||||||
// Create a new instance of the resource manager based on the secret lister.
|
// Create a new instance of the resource manager based on the secret lister.
|
||||||
rm, err := manager.NewResourceManager(nil, secretLister, nil)
|
rm, err := manager.NewResourceManager(nil, secretLister, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -97,7 +97,7 @@ func TestGetConfigMap(t *testing.T) {
|
|||||||
configMapLister := corev1listers.NewConfigMapLister(indexer)
|
configMapLister := corev1listers.NewConfigMapLister(indexer)
|
||||||
|
|
||||||
// Create a new instance of the resource manager based on the config map lister.
|
// Create a new instance of the resource manager based on the config map lister.
|
||||||
rm, err := manager.NewResourceManager(nil, nil, configMapLister)
|
rm, err := manager.NewResourceManager(nil, nil, configMapLister, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -118,3 +118,35 @@ func TestGetConfigMap(t *testing.T) {
|
|||||||
t.Fatalf("expected a 'not found' error, got %v", err)
|
t.Fatalf("expected a 'not found' error, got %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestListServices verifies that the resource manager acts as a passthrough to a service lister.
|
||||||
|
func TestListServices(t *testing.T) {
|
||||||
|
var (
|
||||||
|
lsServices = []*v1.Service{
|
||||||
|
testutil.FakeService("namespace-0", "service-0"),
|
||||||
|
testutil.FakeService("namespace-1", "service-1"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// Create a pod lister that will list the pods defined above.
|
||||||
|
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||||
|
for _, service := range lsServices {
|
||||||
|
indexer.Add(service)
|
||||||
|
}
|
||||||
|
serviceLister := corev1listers.NewServiceLister(indexer)
|
||||||
|
|
||||||
|
// Create a new instance of the resource manager based on the pod lister.
|
||||||
|
rm, err := manager.NewResourceManager(nil, nil, nil, serviceLister)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the resource manager returns two pods in the call to "GetPods".
|
||||||
|
services, err := rm.ListServices()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(lsServices) != len(services) {
|
||||||
|
t.Fatalf("expected %d services, found %d", len(lsServices), len(services))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -832,7 +832,7 @@ func createTestProvider(aadServerMocker *AADMock, aciServerMocker*ACIMock) (*ACI
|
|||||||
os.Setenv("ACI_RESOURCE_GROUP", fakeResourceGroup)
|
os.Setenv("ACI_RESOURCE_GROUP", fakeResourceGroup)
|
||||||
os.Setenv("ACI_REGION", fakeRegion)
|
os.Setenv("ACI_REGION", fakeRegion)
|
||||||
|
|
||||||
rm, err := manager.NewResourceManager(nil, nil, nil)
|
rm, err := manager.NewResourceManager(nil, nil, nil, nil)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -54,3 +54,13 @@ func FakeSecret(namespace, name string, data map[string]string) *corev1.Secret {
|
|||||||
}
|
}
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FakeService returns a service with the specified namespace and name.
|
||||||
|
func FakeService(namespace, name string) *corev1.Service {
|
||||||
|
return &corev1.Service{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: namespace,
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -17,22 +17,24 @@ import (
|
|||||||
func FakeResourceManager(objects ...runtime.Object) *manager.ResourceManager {
|
func FakeResourceManager(objects ...runtime.Object) *manager.ResourceManager {
|
||||||
// Create a fake Kubernetes client that will list the specified objects.
|
// Create a fake Kubernetes client that will list the specified objects.
|
||||||
kubeClient := fake.NewSimpleClientset(objects...)
|
kubeClient := fake.NewSimpleClientset(objects...)
|
||||||
// Create a shared informer factory from where we can grab informers and listers for pods, configmaps and secrets.
|
// Create a shared informer factory from where we can grab informers and listers for pods, configmaps, secrets and services.
|
||||||
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, 30*time.Second)
|
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, 30*time.Second)
|
||||||
// Grab informers for pods, configmaps and secrets.
|
// Grab informers for pods, configmaps and secrets.
|
||||||
pInformer := kubeInformerFactory.Core().V1().Pods()
|
pInformer := kubeInformerFactory.Core().V1().Pods()
|
||||||
mInformer := kubeInformerFactory.Core().V1().ConfigMaps()
|
mInformer := kubeInformerFactory.Core().V1().ConfigMaps()
|
||||||
sInformer := kubeInformerFactory.Core().V1().Secrets()
|
sInformer := kubeInformerFactory.Core().V1().Secrets()
|
||||||
|
svcInformer := kubeInformerFactory.Core().V1().Services()
|
||||||
// Start all the required informers.
|
// Start all the required informers.
|
||||||
go pInformer.Informer().Run(wait.NeverStop)
|
go pInformer.Informer().Run(wait.NeverStop)
|
||||||
go mInformer.Informer().Run(wait.NeverStop)
|
go mInformer.Informer().Run(wait.NeverStop)
|
||||||
go sInformer.Informer().Run(wait.NeverStop)
|
go sInformer.Informer().Run(wait.NeverStop)
|
||||||
|
go svcInformer.Informer().Run(wait.NeverStop)
|
||||||
// Wait for the caches to be synced.
|
// Wait for the caches to be synced.
|
||||||
if !cache.WaitForCacheSync(wait.NeverStop, pInformer.Informer().HasSynced, mInformer.Informer().HasSynced, sInformer.Informer().HasSynced) {
|
if !cache.WaitForCacheSync(wait.NeverStop, pInformer.Informer().HasSynced, mInformer.Informer().HasSynced, sInformer.Informer().HasSynced, svcInformer.Informer().HasSynced) {
|
||||||
panic("failed to wait for caches to be synced")
|
panic("failed to wait for caches to be synced")
|
||||||
}
|
}
|
||||||
// Create a new instance of the resource manager using the listers for pods, configmaps and secrets.
|
// Create a new instance of the resource manager using the listers for pods, configmaps and secrets.
|
||||||
r, err := manager.NewResourceManager(pInformer.Lister(), sInformer.Lister(), mInformer.Lister())
|
r, err := manager.NewResourceManager(pInformer.Lister(), sInformer.Lister(), mInformer.Lister(), svcInformer.Lister())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user