Reduce ACI API calls (#282)
* Reduce ACI API calls Reduce reconcile calls and API calls in reconcile * Fix the pod status update issue * Revert a few unnecessary change
This commit is contained in:
@@ -18,6 +18,7 @@ type ResourceManager struct {
|
|||||||
k8sClient kubernetes.Interface
|
k8sClient kubernetes.Interface
|
||||||
|
|
||||||
pods map[string]*v1.Pod
|
pods map[string]*v1.Pod
|
||||||
|
deletingPods map[string]*v1.Pod
|
||||||
configMapRef map[string]int64
|
configMapRef map[string]int64
|
||||||
configMaps map[string]*v1.ConfigMap
|
configMaps map[string]*v1.ConfigMap
|
||||||
secretRef map[string]int64
|
secretRef map[string]int64
|
||||||
@@ -28,6 +29,7 @@ type ResourceManager struct {
|
|||||||
func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
|
func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
|
||||||
rm := ResourceManager{
|
rm := ResourceManager{
|
||||||
pods: make(map[string]*v1.Pod, 0),
|
pods: make(map[string]*v1.Pod, 0),
|
||||||
|
deletingPods: make(map[string]*v1.Pod, 0),
|
||||||
configMapRef: make(map[string]int64, 0),
|
configMapRef: make(map[string]int64, 0),
|
||||||
secretRef: make(map[string]int64, 0),
|
secretRef: make(map[string]int64, 0),
|
||||||
configMaps: make(map[string]*v1.ConfigMap, 0),
|
configMaps: make(map[string]*v1.ConfigMap, 0),
|
||||||
@@ -81,53 +83,52 @@ func (rm *ResourceManager) SetPods(pods *v1.PodList) {
|
|||||||
rm.secrets = make(map[string]*v1.Secret, len(pods.Items))
|
rm.secrets = make(map[string]*v1.Secret, len(pods.Items))
|
||||||
|
|
||||||
for k, p := range pods.Items {
|
for k, p := range pods.Items {
|
||||||
if p.Status.Phase == v1.PodSucceeded {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
rm.pods[rm.getStoreKey(p.Namespace, p.Name)] = &pods.Items[k]
|
rm.pods[rm.getStoreKey(p.Namespace, p.Name)] = &pods.Items[k]
|
||||||
|
|
||||||
rm.incrementRefCounters(&p)
|
rm.incrementRefCounters(&p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddPod adds a pod to the internal cache.
|
|
||||||
func (rm *ResourceManager) AddPod(p *v1.Pod) {
|
|
||||||
rm.Lock()
|
|
||||||
defer rm.Unlock()
|
|
||||||
if p.Status.Phase == v1.PodSucceeded {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
podKey := rm.getStoreKey(p.Namespace, p.Name)
|
|
||||||
if _, ok := rm.pods[podKey]; ok {
|
|
||||||
rm.UpdatePod(p)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
rm.pods[podKey] = p
|
|
||||||
rm.incrementRefCounters(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdatePod updates the supplied pod in the cache.
|
// UpdatePod updates the supplied pod in the cache.
|
||||||
func (rm *ResourceManager) UpdatePod(p *v1.Pod) {
|
func (rm *ResourceManager) UpdatePod(p *v1.Pod) bool {
|
||||||
rm.Lock()
|
rm.Lock()
|
||||||
defer rm.Unlock()
|
defer rm.Unlock()
|
||||||
|
|
||||||
podKey := rm.getStoreKey(p.Namespace, p.Name)
|
podKey := rm.getStoreKey(p.Namespace, p.Name)
|
||||||
if p.Status.Phase == v1.PodSucceeded {
|
if p.DeletionTimestamp != nil {
|
||||||
delete(rm.pods, podKey)
|
if old, ok := rm.pods[podKey]; ok {
|
||||||
|
rm.deletingPods[podKey] = p
|
||||||
|
|
||||||
|
rm.decrementRefCounters(old)
|
||||||
|
delete(rm.pods, podKey)
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := rm.deletingPods[podKey]; ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if old, ok := rm.pods[podKey]; ok {
|
if old, ok := rm.pods[podKey]; ok {
|
||||||
rm.decrementRefCounters(old)
|
rm.decrementRefCounters(old)
|
||||||
|
rm.pods[podKey] = p
|
||||||
|
rm.incrementRefCounters(p)
|
||||||
|
|
||||||
|
// NOTE(junjiez): no reconcile as we don't support update pod.
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
rm.incrementRefCounters(p)
|
|
||||||
|
|
||||||
rm.pods[podKey] = p
|
rm.pods[podKey] = p
|
||||||
|
rm.incrementRefCounters(p)
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeletePod removes the pod from the cache.
|
// DeletePod removes the pod from the cache.
|
||||||
func (rm *ResourceManager) DeletePod(p *v1.Pod) {
|
func (rm *ResourceManager) DeletePod(p *v1.Pod) bool {
|
||||||
rm.Lock()
|
rm.Lock()
|
||||||
defer rm.Unlock()
|
defer rm.Unlock()
|
||||||
|
|
||||||
@@ -135,7 +136,14 @@ func (rm *ResourceManager) DeletePod(p *v1.Pod) {
|
|||||||
if old, ok := rm.pods[podKey]; ok {
|
if old, ok := rm.pods[podKey]; ok {
|
||||||
rm.decrementRefCounters(old)
|
rm.decrementRefCounters(old)
|
||||||
delete(rm.pods, podKey)
|
delete(rm.pods, podKey)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, ok := rm.deletingPods[podKey]; ok {
|
||||||
|
delete(rm.deletingPods, podKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPod retrieves the specified pod from the cache. It returns nil if a pod is not found.
|
// GetPod retrieves the specified pod from the cache. It returns nil if a pod is not found.
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ func TestResourceManager(t *testing.T) {
|
|||||||
pod1Name := "Pod1"
|
pod1Name := "Pod1"
|
||||||
pod1Namespace := "Pod1Namespace"
|
pod1Namespace := "Pod1Namespace"
|
||||||
pod1 := makePod(pod1Namespace, pod1Name)
|
pod1 := makePod(pod1Namespace, pod1Name)
|
||||||
pm.AddPod(pod1)
|
pm.UpdatePod(pod1)
|
||||||
|
|
||||||
pods := pm.GetPods()
|
pods := pm.GetPods()
|
||||||
if len(pods) != 1 {
|
if len(pods) != 1 {
|
||||||
@@ -40,7 +40,7 @@ func TestResourceManagerDeletePod(t *testing.T) {
|
|||||||
pod1Name := "Pod1"
|
pod1Name := "Pod1"
|
||||||
pod1Namespace := "Pod1Namespace"
|
pod1Namespace := "Pod1Namespace"
|
||||||
pod1 := makePod(pod1Namespace, pod1Name)
|
pod1 := makePod(pod1Namespace, pod1Name)
|
||||||
pm.AddPod(pod1)
|
pm.UpdatePod(pod1)
|
||||||
pods := pm.GetPods()
|
pods := pm.GetPods()
|
||||||
if len(pods) != 1 {
|
if len(pods) != 1 {
|
||||||
t.Errorf("Got %d, expected 1 pod", len(pods))
|
t.Errorf("Got %d, expected 1 pod", len(pods))
|
||||||
@@ -65,7 +65,7 @@ func TestResourceManagerUpdatePod(t *testing.T) {
|
|||||||
pod1Name := "Pod1"
|
pod1Name := "Pod1"
|
||||||
pod1Namespace := "Pod1Namespace"
|
pod1Namespace := "Pod1Namespace"
|
||||||
pod1 := makePod(pod1Namespace, pod1Name)
|
pod1 := makePod(pod1Namespace, pod1Name)
|
||||||
pm.AddPod(pod1)
|
pm.UpdatePod(pod1)
|
||||||
|
|
||||||
pods := pm.GetPods()
|
pods := pm.GetPods()
|
||||||
if len(pods) != 1 {
|
if len(pods) != 1 {
|
||||||
|
|||||||
@@ -46,10 +46,5 @@ func (c *Client) DeleteContainerGroup(resourceGroup, containerGroupName string)
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 204 No Content means the specified container group was not found.
|
|
||||||
if resp.StatusCode == http.StatusNoContent {
|
|
||||||
return fmt.Errorf("Container group with name %q was not found", containerGroupName)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -251,15 +251,19 @@ func (s *Server) Run() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Println("Pod watcher event is received:", ev.Type)
|
log.Println("Pod watcher event is received:", ev.Type)
|
||||||
|
reconcile := false
|
||||||
switch ev.Type {
|
switch ev.Type {
|
||||||
case watch.Added:
|
case watch.Added:
|
||||||
s.resourceManager.AddPod(ev.Object.(*corev1.Pod))
|
reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
|
||||||
case watch.Modified:
|
case watch.Modified:
|
||||||
s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
|
reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
|
||||||
case watch.Deleted:
|
case watch.Deleted:
|
||||||
s.resourceManager.DeletePod(ev.Object.(*corev1.Pod))
|
reconcile = s.resourceManager.DeletePod(ev.Object.(*corev1.Pod))
|
||||||
|
}
|
||||||
|
|
||||||
|
if reconcile {
|
||||||
|
s.reconcile()
|
||||||
}
|
}
|
||||||
s.reconcile()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -310,6 +314,7 @@ func (s *Server) updateNode() {
|
|||||||
// reconcile is the main reconciliation loop that compares differences between Kubernetes and
|
// reconcile is the main reconciliation loop that compares differences between Kubernetes and
|
||||||
// the active provider and reconciles the differences.
|
// the active provider and reconciles the differences.
|
||||||
func (s *Server) reconcile() {
|
func (s *Server) reconcile() {
|
||||||
|
log.Println("Start reconcile.")
|
||||||
providerPods, err := s.provider.GetPods()
|
providerPods, err := s.provider.GetPods()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
@@ -318,7 +323,8 @@ func (s *Server) reconcile() {
|
|||||||
|
|
||||||
for _, pod := range providerPods {
|
for _, pod := range providerPods {
|
||||||
// Delete pods that don't exist in Kubernetes
|
// Delete pods that don't exist in Kubernetes
|
||||||
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil {
|
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil {
|
||||||
|
log.Printf("Deleting pod '%s'\n", pod.Name)
|
||||||
if err := s.deletePod(pod); err != nil {
|
if err := s.deletePod(pod); err != nil {
|
||||||
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
|
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
|
||||||
continue
|
continue
|
||||||
@@ -329,21 +335,25 @@ func (s *Server) reconcile() {
|
|||||||
// Create any pods for k8s pods that don't exist in the provider
|
// Create any pods for k8s pods that don't exist in the provider
|
||||||
pods := s.resourceManager.GetPods()
|
pods := s.resourceManager.GetPods()
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
p, err := s.provider.GetPod(pod.Namespace, pod.Name)
|
var providerPod *corev1.Pod
|
||||||
if err != nil {
|
for _, p := range providerPods {
|
||||||
log.Printf("Error retrieving pod '%s' from provider: %s\n", pod.Name, err)
|
if p.Namespace == pod.Namespace && p.Name == pod.Name {
|
||||||
|
providerPod = p
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if pod.DeletionTimestamp == nil && pod.Status.Phase != corev1.PodFailed && p == nil {
|
if pod.DeletionTimestamp == nil && pod.Status.Phase != corev1.PodFailed && providerPod == nil {
|
||||||
|
log.Printf("Creating pod '%s'\n", pod.Name)
|
||||||
if err := s.createPod(pod); err != nil {
|
if err := s.createPod(pod); err != nil {
|
||||||
log.Printf("Error creating pod '%s': %s\n", pod.Name, err)
|
log.Printf("Error creating pod '%s': %s\n", pod.Name, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Printf("Pod '%s' created.\n", pod.Name)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete pod if DeletionTimestamp set
|
// Delete pod if DeletionTimestamp is set
|
||||||
if pod.DeletionTimestamp != nil {
|
if pod.DeletionTimestamp != nil {
|
||||||
|
log.Printf("Pod '%s' is pending deletion.\n", pod.Name)
|
||||||
var err error
|
var err error
|
||||||
if err = s.deletePod(pod); err != nil {
|
if err = s.deletePod(pod); err != nil {
|
||||||
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
|
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
|
||||||
@@ -373,25 +383,30 @@ func (s *Server) createPod(pod *corev1.Pod) error {
|
|||||||
return origErr
|
return origErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Printf("Pod '%s' created.\n", pod.Name)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) deletePod(pod *corev1.Pod) error {
|
func (s *Server) deletePod(pod *corev1.Pod) error {
|
||||||
var delErr error
|
var delErr error
|
||||||
if delErr = s.provider.DeletePod(pod); delErr != nil && errors.IsNotFound(delErr) {
|
if delErr = s.provider.DeletePod(pod); delErr != nil && errors.IsNotFound(delErr) {
|
||||||
return fmt.Errorf("Error deleting pod '%s': %s", pod.Name, delErr)
|
return delErr
|
||||||
}
|
}
|
||||||
|
|
||||||
if !errors.IsNotFound(delErr) {
|
if !errors.IsNotFound(delErr) {
|
||||||
var grace int64
|
var grace int64
|
||||||
if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
|
if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
|
log.Printf("Pod '%s' doesn't exist.\n", pod.Name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("Failed to delete kubernetes pod: %s", err)
|
return fmt.Errorf("Failed to delete kubernetes pod: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.resourceManager.DeletePod(pod)
|
||||||
|
|
||||||
log.Printf("Pod '%s' deleted.\n", pod.Name)
|
log.Printf("Pod '%s' deleted.\n", pod.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -403,17 +418,13 @@ func (s *Server) updatePodStatuses() {
|
|||||||
// Update all the pods with the provider status.
|
// Update all the pods with the provider status.
|
||||||
pods := s.resourceManager.GetPods()
|
pods := s.resourceManager.GetPods()
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
if pod.DeletionTimestamp != nil && pod.Status.Phase == corev1.PodSucceeded {
|
if pod.Status.Phase == corev1.PodSucceeded || (pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == PodStatusReason_ProviderFailed) {
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == PodStatusReason_ProviderFailed {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
status, err := s.provider.GetPodStatus(pod.Namespace, pod.Name)
|
status, err := s.provider.GetPodStatus(pod.Namespace, pod.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error retrieving pod '%s' status from provider: %s\n", pod.Name, err)
|
log.Printf("Error retrieving pod '%s' in namespace '%s' status from provider: %s\n", pod.Name, pod.Namespace, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user