Add namespace for storeKey in ResourceManager (#211)

* add namespace for storeKey in RM

* fix UT for add namespace
This commit is contained in:
Fei Xu
2018-05-30 01:45:03 +08:00
committed by Robbie Zhang
parent b4cb809968
commit 9c38c1bdfb
3 changed files with 57 additions and 35 deletions

View File

@@ -84,7 +84,7 @@ func (rm *ResourceManager) SetPods(pods *v1.PodList) {
if p.Status.Phase == v1.PodSucceeded {
continue
}
rm.pods[p.Name] = &pods.Items[k]
rm.pods[rm.getStoreKey(p.Namespace, p.Name)] = &pods.Items[k]
rm.incrementRefCounters(&p)
}
@@ -98,12 +98,13 @@ func (rm *ResourceManager) AddPod(p *v1.Pod) {
return
}
if _, ok := rm.pods[p.Name]; ok {
podKey := rm.getStoreKey(p.Namespace, p.Name)
if _, ok := rm.pods[podKey]; ok {
rm.UpdatePod(p)
return
}
rm.pods[p.Name] = p
rm.pods[podKey] = p
rm.incrementRefCounters(p)
}
@@ -112,16 +113,17 @@ func (rm *ResourceManager) UpdatePod(p *v1.Pod) {
rm.Lock()
defer rm.Unlock()
podKey := rm.getStoreKey(p.Namespace, p.Name)
if p.Status.Phase == v1.PodSucceeded {
delete(rm.pods, p.Name)
delete(rm.pods, podKey)
}
if old, ok := rm.pods[p.Name]; ok {
if old, ok := rm.pods[podKey]; ok {
rm.decrementRefCounters(old)
}
rm.incrementRefCounters(p)
rm.pods[p.Name] = p
rm.pods[podKey] = p
}
// DeletePod removes the pod from the cache.
@@ -129,18 +131,19 @@ func (rm *ResourceManager) DeletePod(p *v1.Pod) {
rm.Lock()
defer rm.Unlock()
if old, ok := rm.pods[p.Name]; ok {
podKey := rm.getStoreKey(p.Namespace, p.Name)
if old, ok := rm.pods[podKey]; ok {
rm.decrementRefCounters(old)
delete(rm.pods, p.Name)
delete(rm.pods, podKey)
}
}
// GetPod retrieves the specified pod from the cache. It returns nil if a pod is not found.
func (rm *ResourceManager) GetPod(name string) *v1.Pod {
func (rm *ResourceManager) GetPod(namespace, name string) *v1.Pod {
rm.RLock()
defer rm.RUnlock()
if p, ok := rm.pods[name]; ok {
if p, ok := rm.pods[rm.getStoreKey(namespace, name)]; ok {
return p
}
@@ -165,7 +168,8 @@ func (rm *ResourceManager) GetConfigMap(name, namespace string) (*v1.ConfigMap,
rm.Lock()
defer rm.Unlock()
if cm, ok := rm.configMaps[name]; ok {
configMapKey := rm.getStoreKey(namespace, name)
if cm, ok := rm.configMaps[configMapKey]; ok {
return cm, nil
}
@@ -174,7 +178,7 @@ func (rm *ResourceManager) GetConfigMap(name, namespace string) (*v1.ConfigMap,
if err != nil {
return nil, err
}
rm.configMaps[name] = cm
rm.configMaps[configMapKey] = cm
return cm, err
}
@@ -184,7 +188,8 @@ func (rm *ResourceManager) GetSecret(name, namespace string) (*v1.Secret, error)
rm.Lock()
defer rm.Unlock()
if secret, ok := rm.secrets[name]; ok {
secretkey := rm.getStoreKey(namespace, name)
if secret, ok := rm.secrets[secretkey]; ok {
return secret, nil
}
@@ -193,7 +198,7 @@ func (rm *ResourceManager) GetSecret(name, namespace string) (*v1.Secret, error)
if err != nil {
return nil, err
}
rm.secrets[name] = secret
rm.secrets[secretkey] = secret
return secret, err
}
@@ -215,11 +220,12 @@ func (rm *ResourceManager) watchConfigMaps() {
}
rm.Lock()
configMapkey := rm.getStoreKey(ev.Object.(*v1.ConfigMap).Namespace, ev.Object.(*v1.ConfigMap).Name)
switch ev.Type {
case watch.Modified:
delete(rm.configMaps, ev.Object.(*v1.ConfigMap).Name)
delete(rm.configMaps, configMapkey)
case watch.Deleted:
delete(rm.configMaps, ev.Object.(*v1.ConfigMap).Name)
delete(rm.configMaps, configMapkey)
}
rm.Unlock()
}
@@ -243,11 +249,12 @@ func (rm *ResourceManager) watchSecrets() {
}
rm.Lock()
secretKey := rm.getStoreKey(ev.Object.(*v1.Secret).Namespace, ev.Object.(*v1.Secret).Name)
switch ev.Type {
case watch.Modified:
delete(rm.secrets, ev.Object.(*v1.Secret).Name)
delete(rm.secrets, secretKey)
case watch.Deleted:
delete(rm.secrets, ev.Object.(*v1.Secret).Name)
delete(rm.secrets, secretKey)
}
rm.Unlock()
}
@@ -258,18 +265,21 @@ func (rm *ResourceManager) incrementRefCounters(p *v1.Pod) {
for _, c := range p.Spec.Containers {
for _, e := range c.Env {
if e.ValueFrom != nil && e.ValueFrom.ConfigMapKeyRef != nil {
rm.configMapRef[e.ValueFrom.ConfigMapKeyRef.Name]++
configMapKey := rm.getStoreKey(p.Namespace, e.ValueFrom.ConfigMapKeyRef.Name)
rm.configMapRef[configMapKey]++
}
if e.ValueFrom != nil && e.ValueFrom.SecretKeyRef != nil {
rm.secretRef[e.ValueFrom.SecretKeyRef.Name]++
secretKey := rm.getStoreKey(p.Namespace, e.ValueFrom.SecretKeyRef.Name)
rm.secretRef[secretKey]++
}
}
}
for _, v := range p.Spec.Volumes {
if v.VolumeSource.Secret != nil {
rm.secretRef[v.VolumeSource.Secret.SecretName]++
secretKey := rm.getStoreKey(p.Namespace, v.VolumeSource.Secret.SecretName)
rm.secretRef[secretKey]++
}
}
}
@@ -278,18 +288,26 @@ func (rm *ResourceManager) decrementRefCounters(p *v1.Pod) {
for _, c := range p.Spec.Containers {
for _, e := range c.Env {
if e.ValueFrom != nil && e.ValueFrom.ConfigMapKeyRef != nil {
rm.configMapRef[e.ValueFrom.ConfigMapKeyRef.Name]--
configMapKey := rm.getStoreKey(p.Namespace, e.ValueFrom.ConfigMapKeyRef.Name)
rm.configMapRef[configMapKey]--
}
if e.ValueFrom != nil && e.ValueFrom.SecretKeyRef != nil {
rm.secretRef[e.ValueFrom.SecretKeyRef.Name]--
secretKey := rm.getStoreKey(p.Namespace, e.ValueFrom.SecretKeyRef.Name)
rm.secretRef[secretKey]--
}
}
}
for _, v := range p.Spec.Volumes {
if v.VolumeSource.Secret != nil {
rm.secretRef[v.VolumeSource.Secret.SecretName]--
secretKey := rm.getStoreKey(p.Namespace, v.VolumeSource.Secret.SecretName)
rm.secretRef[secretKey]--
}
}
}
// getStoreKey return the key with namespace for store objects from different namespaces
func (rm *ResourceManager) getStoreKey(namespace, name string) string {
return namespace + "_" + name
}

View File

@@ -21,14 +21,15 @@ func init() {
func TestResourceManager(t *testing.T) {
pm := NewResourceManager(fakeClient)
pod1Name := "Pod1"
pod1 := makePod(pod1Name)
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)
pm.AddPod(pod1)
pods := pm.GetPods()
if len(pods) != 1 {
t.Errorf("Got %d, expected 1 pod", len(pods))
}
gotPod1 := pm.GetPod(pod1Name)
gotPod1 := pm.GetPod(pod1Namespace, pod1Name)
if gotPod1.Name != pod1.Name {
t.Errorf("Got %s, wanted %s", gotPod1.Name, pod1.Name)
}
@@ -37,7 +38,8 @@ func TestResourceManager(t *testing.T) {
func TestResourceManagerDeletePod(t *testing.T) {
pm := NewResourceManager(fakeClient)
pod1Name := "Pod1"
pod1 := makePod(pod1Name)
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)
pm.AddPod(pod1)
pods := pm.GetPods()
if len(pods) != 1 {
@@ -50,9 +52,10 @@ func TestResourceManagerDeletePod(t *testing.T) {
t.Errorf("Got %d, expected 0 pods", len(pods))
}
}
func makePod(name string) *v1.Pod {
func makePod(namespace, name string) *v1.Pod {
pod := &v1.Pod{}
pod.Name = name
pod.Namespace = namespace
pod.UID = types.UID(uuid.New().String())
return pod
}
@@ -60,25 +63,26 @@ func makePod(name string) *v1.Pod {
func TestResourceManagerUpdatePod(t *testing.T) {
pm := NewResourceManager(fakeClient)
pod1Name := "Pod1"
pod1 := makePod(pod1Name)
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)
pm.AddPod(pod1)
pods := pm.GetPods()
if len(pods) != 1 {
t.Errorf("Got %d, expected 1 pod", len(pods))
}
gotPod1 := pm.GetPod(pod1Name)
gotPod1 := pm.GetPod(pod1Namespace, pod1Name)
if gotPod1.Name != pod1.Name {
t.Errorf("Got %s, wanted %s", gotPod1.Name, pod1.Name)
}
if gotPod1.Namespace != "" {
t.Errorf("Got %s, wanted %s", gotPod1.Namespace, "<empty namespace>")
if gotPod1.Namespace != pod1.Namespace {
t.Errorf("Got %s, wanted %s", gotPod1.Namespace, pod1.Namespace)
}
pod1.Namespace = "POD1NAMESPACE"
pod1.Namespace = "POD2NAMESPACE"
pm.UpdatePod(pod1)
gotPod1 = pm.GetPod(pod1Name)
gotPod1 = pm.GetPod(pod1Namespace, pod1Name)
if gotPod1.Name != pod1.Name {
t.Errorf("Got %s, wanted %s", gotPod1.Name, pod1.Name)
}

View File

@@ -267,7 +267,7 @@ func (s *Server) reconcile() {
for _, pod := range providerPods {
// Delete pods that don't exist in Kubernetes
if p := s.resourceManager.GetPod(pod.Name); p == nil {
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil {
if err := s.deletePod(pod); err != nil {
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
continue