From 3f83588e599075aee7958620f62e78fdeb97152e Mon Sep 17 00:00:00 2001 From: Robbie Zhang Date: Tue, 31 Jul 2018 13:31:00 -0700 Subject: [PATCH] Reduce ACI API calls (#282) * Reduce ACI API calls Reduce reconcile calls and API calls in reconcile * Fix the pod status update issue * Revert a few unnecessary change --- manager/resource.go | 60 ++++++++++++++++------------ manager/resource_test.go | 6 +-- providers/azure/client/aci/delete.go | 5 --- vkubelet/vkubelet.go | 47 +++++++++++++--------- 4 files changed, 66 insertions(+), 52 deletions(-) diff --git a/manager/resource.go b/manager/resource.go index 6e7b990f0..41fb1d9fa 100644 --- a/manager/resource.go +++ b/manager/resource.go @@ -18,6 +18,7 @@ type ResourceManager struct { k8sClient kubernetes.Interface pods map[string]*v1.Pod + deletingPods map[string]*v1.Pod configMapRef map[string]int64 configMaps map[string]*v1.ConfigMap secretRef map[string]int64 @@ -28,6 +29,7 @@ type ResourceManager struct { func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager { rm := ResourceManager{ pods: make(map[string]*v1.Pod, 0), + deletingPods: make(map[string]*v1.Pod, 0), configMapRef: make(map[string]int64, 0), secretRef: make(map[string]int64, 0), configMaps: make(map[string]*v1.ConfigMap, 0), @@ -81,53 +83,52 @@ func (rm *ResourceManager) SetPods(pods *v1.PodList) { rm.secrets = make(map[string]*v1.Secret, len(pods.Items)) for k, p := range pods.Items { - if p.Status.Phase == v1.PodSucceeded { - continue - } rm.pods[rm.getStoreKey(p.Namespace, p.Name)] = &pods.Items[k] rm.incrementRefCounters(&p) } } -// AddPod adds a pod to the internal cache. -func (rm *ResourceManager) AddPod(p *v1.Pod) { - rm.Lock() - defer rm.Unlock() - if p.Status.Phase == v1.PodSucceeded { - return - } - - podKey := rm.getStoreKey(p.Namespace, p.Name) - if _, ok := rm.pods[podKey]; ok { - rm.UpdatePod(p) - return - } - - rm.pods[podKey] = p - rm.incrementRefCounters(p) -} - // UpdatePod updates the supplied pod in the cache. -func (rm *ResourceManager) UpdatePod(p *v1.Pod) { +func (rm *ResourceManager) UpdatePod(p *v1.Pod) bool { rm.Lock() defer rm.Unlock() podKey := rm.getStoreKey(p.Namespace, p.Name) - if p.Status.Phase == v1.PodSucceeded { - delete(rm.pods, podKey) + if p.DeletionTimestamp != nil { + if old, ok := rm.pods[podKey]; ok { + rm.deletingPods[podKey] = p + + rm.decrementRefCounters(old) + delete(rm.pods, podKey) + + return true + } + + if _, ok := rm.deletingPods[podKey]; ok { + return false + } + + return false } if old, ok := rm.pods[podKey]; ok { rm.decrementRefCounters(old) + rm.pods[podKey] = p + rm.incrementRefCounters(p) + + // NOTE(junjiez): no reconcile as we don't support update pod. + return false } - rm.incrementRefCounters(p) rm.pods[podKey] = p + rm.incrementRefCounters(p) + + return true } // DeletePod removes the pod from the cache. -func (rm *ResourceManager) DeletePod(p *v1.Pod) { +func (rm *ResourceManager) DeletePod(p *v1.Pod) bool { rm.Lock() defer rm.Unlock() @@ -135,7 +136,14 @@ func (rm *ResourceManager) DeletePod(p *v1.Pod) { if old, ok := rm.pods[podKey]; ok { rm.decrementRefCounters(old) delete(rm.pods, podKey) + return true } + + if _, ok := rm.deletingPods[podKey]; ok { + delete(rm.deletingPods, podKey) + } + + return false } // GetPod retrieves the specified pod from the cache. It returns nil if a pod is not found. diff --git a/manager/resource_test.go b/manager/resource_test.go index 1546a89e5..111d6fc90 100644 --- a/manager/resource_test.go +++ b/manager/resource_test.go @@ -23,7 +23,7 @@ func TestResourceManager(t *testing.T) { pod1Name := "Pod1" pod1Namespace := "Pod1Namespace" pod1 := makePod(pod1Namespace, pod1Name) - pm.AddPod(pod1) + pm.UpdatePod(pod1) pods := pm.GetPods() if len(pods) != 1 { @@ -40,7 +40,7 @@ func TestResourceManagerDeletePod(t *testing.T) { pod1Name := "Pod1" pod1Namespace := "Pod1Namespace" pod1 := makePod(pod1Namespace, pod1Name) - pm.AddPod(pod1) + pm.UpdatePod(pod1) pods := pm.GetPods() if len(pods) != 1 { t.Errorf("Got %d, expected 1 pod", len(pods)) @@ -65,7 +65,7 @@ func TestResourceManagerUpdatePod(t *testing.T) { pod1Name := "Pod1" pod1Namespace := "Pod1Namespace" pod1 := makePod(pod1Namespace, pod1Name) - pm.AddPod(pod1) + pm.UpdatePod(pod1) pods := pm.GetPods() if len(pods) != 1 { diff --git a/providers/azure/client/aci/delete.go b/providers/azure/client/aci/delete.go index 5386825f0..de23a7e7c 100644 --- a/providers/azure/client/aci/delete.go +++ b/providers/azure/client/aci/delete.go @@ -46,10 +46,5 @@ func (c *Client) DeleteContainerGroup(resourceGroup, containerGroupName string) return err } - // 204 No Content means the specified container group was not found. - if resp.StatusCode == http.StatusNoContent { - return fmt.Errorf("Container group with name %q was not found", containerGroupName) - } - return nil } diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index 6a36f62fe..a1f5af32c 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -251,15 +251,19 @@ func (s *Server) Run() error { } log.Println("Pod watcher event is received:", ev.Type) + reconcile := false switch ev.Type { case watch.Added: - s.resourceManager.AddPod(ev.Object.(*corev1.Pod)) + reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod)) case watch.Modified: - s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod)) + reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod)) case watch.Deleted: - s.resourceManager.DeletePod(ev.Object.(*corev1.Pod)) + reconcile = s.resourceManager.DeletePod(ev.Object.(*corev1.Pod)) + } + + if reconcile { + s.reconcile() } - s.reconcile() } } @@ -310,6 +314,7 @@ func (s *Server) updateNode() { // reconcile is the main reconciliation loop that compares differences between Kubernetes and // the active provider and reconciles the differences. func (s *Server) reconcile() { + log.Println("Start reconcile.") providerPods, err := s.provider.GetPods() if err != nil { log.Println(err) @@ -318,7 +323,8 @@ func (s *Server) reconcile() { for _, pod := range providerPods { // Delete pods that don't exist in Kubernetes - if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil { + if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil { + log.Printf("Deleting pod '%s'\n", pod.Name) if err := s.deletePod(pod); err != nil { log.Printf("Error deleting pod '%s': %s\n", pod.Name, err) continue @@ -329,21 +335,25 @@ func (s *Server) reconcile() { // Create any pods for k8s pods that don't exist in the provider pods := s.resourceManager.GetPods() for _, pod := range pods { - p, err := s.provider.GetPod(pod.Namespace, pod.Name) - if err != nil { - log.Printf("Error retrieving pod '%s' from provider: %s\n", pod.Name, err) + var providerPod *corev1.Pod + for _, p := range providerPods { + if p.Namespace == pod.Namespace && p.Name == pod.Name { + providerPod = p + break; + } } - if pod.DeletionTimestamp == nil && pod.Status.Phase != corev1.PodFailed && p == nil { + if pod.DeletionTimestamp == nil && pod.Status.Phase != corev1.PodFailed && providerPod == nil { + log.Printf("Creating pod '%s'\n", pod.Name) if err := s.createPod(pod); err != nil { log.Printf("Error creating pod '%s': %s\n", pod.Name, err) continue } - log.Printf("Pod '%s' created.\n", pod.Name) } - // Delete pod if DeletionTimestamp set + // Delete pod if DeletionTimestamp is set if pod.DeletionTimestamp != nil { + log.Printf("Pod '%s' is pending deletion.\n", pod.Name) var err error if err = s.deletePod(pod); err != nil { log.Printf("Error deleting pod '%s': %s\n", pod.Name, err) @@ -373,25 +383,30 @@ func (s *Server) createPod(pod *corev1.Pod) error { return origErr } + log.Printf("Pod '%s' created.\n", pod.Name) + return nil } func (s *Server) deletePod(pod *corev1.Pod) error { var delErr error if delErr = s.provider.DeletePod(pod); delErr != nil && errors.IsNotFound(delErr) { - return fmt.Errorf("Error deleting pod '%s': %s", pod.Name, delErr) + return delErr } if !errors.IsNotFound(delErr) { var grace int64 if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) { if errors.IsNotFound(err) { + log.Printf("Pod '%s' doesn't exist.\n", pod.Name) return nil } return fmt.Errorf("Failed to delete kubernetes pod: %s", err) } + s.resourceManager.DeletePod(pod) + log.Printf("Pod '%s' deleted.\n", pod.Name) } @@ -403,17 +418,13 @@ func (s *Server) updatePodStatuses() { // Update all the pods with the provider status. pods := s.resourceManager.GetPods() for _, pod := range pods { - if pod.DeletionTimestamp != nil && pod.Status.Phase == corev1.PodSucceeded { - continue - } - - if pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == PodStatusReason_ProviderFailed { + if pod.Status.Phase == corev1.PodSucceeded || (pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == PodStatusReason_ProviderFailed) { continue } status, err := s.provider.GetPodStatus(pod.Namespace, pod.Name) if err != nil { - log.Printf("Error retrieving pod '%s' status from provider: %s\n", pod.Name, err) + log.Printf("Error retrieving pod '%s' in namespace '%s' status from provider: %s\n", pod.Name, pod.Namespace, err) return }