From 058c683c66bf281f021fca16e4f802c5f64fb9cd Mon Sep 17 00:00:00 2001 From: Paulo Pires Date: Thu, 29 Nov 2018 22:18:14 +0000 Subject: [PATCH] controller: use shared informers and a work queue Signed-off-by: Paulo Pires --- cmd/root.go | 46 +++++- manager/resource.go | 315 +++-------------------------------- manager/resource_test.go | 204 ++++++++++++++--------- vkubelet/pod.go | 207 +---------------------- vkubelet/podcontroller.go | 339 ++++++++++++++++++++++++++++++++++++++ vkubelet/vkubelet.go | 135 ++------------- 6 files changed, 543 insertions(+), 703 deletions(-) create mode 100644 vkubelet/podcontroller.go diff --git a/cmd/root.go b/cmd/root.go index 14bcd63d7..fd6107b91 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -23,25 +23,35 @@ import ( "strconv" "strings" "syscall" + "time" "github.com/Sirupsen/logrus" "github.com/cpuguy83/strongerrors" - homedir "github.com/mitchellh/go-homedir" + "github.com/mitchellh/go-homedir" "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opencensus.io/trace" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + kubeinformers "k8s.io/client-go/informers" + corev1informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/register" - vkubelet "github.com/virtual-kubelet/virtual-kubelet/vkubelet" - "go.opencensus.io/trace" - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet" ) const ( defaultDaemonPort = "10250" + // kubeSharedInformerFactoryDefaultResync is the default resync period used by the shared informer factories for Kubernetes resources. + // It is set to the same value used by the Kubelet, and can be overridden via the "--full-resync-period" flag. + // https://github.com/kubernetes/kubernetes/blob/v1.12.2/pkg/kubelet/apis/config/v1beta1/defaults.go#L51 + kubeSharedInformerFactoryDefaultResync = 1 * time.Minute ) var kubeletConfig string @@ -60,6 +70,8 @@ var k8sClient *kubernetes.Clientset var p providers.Provider var rm *manager.ResourceManager var apiConfig vkubelet.APIConfig +var podInformer corev1informers.PodInformer +var kubeSharedInformerFactoryResync time.Duration var podSyncWorkers int var userTraceExporters []string @@ -86,6 +98,7 @@ This allows users to schedule kubernetes workloads on nodes that aren't running ResourceManager: rm, APIConfig: apiConfig, PodSyncWorkers: podSyncWorkers, + PodInformer: podInformer, }) if err != nil { log.L.WithError(err).Fatal("Error initializing virtual kubelet") @@ -176,6 +189,8 @@ func init() { RootCmd.PersistentFlags().Var(mapVar(userTraceConfig.Tags), "trace-tag", "add tags to include with traces in key=value form") RootCmd.PersistentFlags().StringVar(&traceSampler, "trace-sample-rate", "", "set probability of tracing samples") + RootCmd.PersistentFlags().DurationVar(&kubeSharedInformerFactoryResync, "full-resync-period", kubeSharedInformerFactoryDefaultResync, "how often to perform a full resync of pods between kubernetes and the provider") + // Cobra also supports local flags, which will only run // when this action is called directly. // RootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") @@ -251,11 +266,30 @@ func initConfig() { logger.WithError(err).Fatal("Error creating kubernetes client") } - rm, err = manager.NewResourceManager(k8sClient) + // Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node. + podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k8sClient, kubeSharedInformerFactoryResync, kubeinformers.WithNamespace(kubeNamespace), kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String() + })) + // Create a pod informer so we can pass its lister to the resource manager. + podInformer = podInformerFactory.Core().V1().Pods() + + // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). + scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k8sClient, kubeSharedInformerFactoryResync) + // Create a secret informer and a config map informer so we can pass their listers to the resource manager. + secretInformer := scmInformerFactory.Core().V1().Secrets() + configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() + + // Create a new instance of the resource manager that uses the listers above for pods, secrets and config maps. + rm, err = manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister()) if err != nil { logger.WithError(err).Fatal("Error initializing resource manager") } + // Start the shared informer factory for pods. + go podInformerFactory.Start(context.Background().Done()) + // Start the shared informer factory for secrets and configmaps. + go scmInformerFactory.Start(context.Background().Done()) + daemonPortEnv := getEnv("KUBELET_PORT", defaultDaemonPort) daemonPort, err := strconv.ParseInt(daemonPortEnv, 10, 32) if err != nil { diff --git a/manager/resource.go b/manager/resource.go index 1033f768c..7d694895d 100644 --- a/manager/resource.go +++ b/manager/resource.go @@ -2,321 +2,50 @@ package manager import ( "sync" - "time" - "github.com/pkg/errors" "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" + "k8s.io/apimachinery/pkg/labels" + corev1listers "k8s.io/client-go/listers/core/v1" + + "github.com/virtual-kubelet/virtual-kubelet/log" ) -// ResourceManager works a cache for pods assigned to this virtual node within Kubernetes. -// New ResourceManagers should be created with the NewResourceManager() function. +// ResourceManager acts as a passthrough to a cache (lister) for pods assigned to the current node. +// It is also a passthrough to a cache (lister) for Kubernetes secrets and config maps. type ResourceManager struct { sync.RWMutex - k8sClient kubernetes.Interface - pods map[string]*v1.Pod - deletingPods map[string]*v1.Pod - configMapRef map[string]int64 - configMaps map[string]*v1.ConfigMap - secretRef map[string]int64 - secrets map[string]*v1.Secret + podLister corev1listers.PodLister + secretLister corev1listers.SecretLister + configMapLister corev1listers.ConfigMapLister } // NewResourceManager returns a ResourceManager with the internal maps initialized. -func NewResourceManager(k8sClient kubernetes.Interface) (*ResourceManager, error) { +func NewResourceManager(podLister corev1listers.PodLister, secretLister corev1listers.SecretLister, configMapLister corev1listers.ConfigMapLister) (*ResourceManager, error) { rm := ResourceManager{ - pods: make(map[string]*v1.Pod, 0), - deletingPods: make(map[string]*v1.Pod, 0), - configMapRef: make(map[string]int64, 0), - secretRef: make(map[string]int64, 0), - configMaps: make(map[string]*v1.ConfigMap, 0), - secrets: make(map[string]*v1.Secret, 0), - k8sClient: k8sClient, + podLister: podLister, + secretLister: secretLister, + configMapLister: configMapLister, } - - configW, err := rm.k8sClient.CoreV1().ConfigMaps(v1.NamespaceAll).Watch(metav1.ListOptions{}) - if err != nil { - return nil, errors.Wrap(err, "error getting config watch") - } - - secretsW, err := rm.k8sClient.CoreV1().Secrets(v1.NamespaceAll).Watch(metav1.ListOptions{}) - if err != nil { - return nil, errors.Wrap(err, "error getting secrets watch") - } - - go rm.watchConfigMaps(configW) - go rm.watchSecrets(secretsW) - - tick := time.Tick(5 * time.Minute) - go func() { - for range tick { - rm.Lock() - for n, c := range rm.secretRef { - if c <= 0 { - delete(rm.secretRef, n) - } - } - for n := range rm.secrets { - if _, ok := rm.secretRef[n]; !ok { - delete(rm.secrets, n) - } - } - for n, c := range rm.configMapRef { - if c <= 0 { - delete(rm.configMapRef, n) - } - } - for n := range rm.configMaps { - if _, ok := rm.configMapRef[n]; !ok { - delete(rm.configMaps, n) - } - } - rm.Unlock() - } - }() - return &rm, nil } -// SetPods clears the internal cache and populates it with the supplied pods. -func (rm *ResourceManager) SetPods(pods *v1.PodList) { - rm.Lock() - defer rm.Unlock() - - for k, p := range pods.Items { - podKey := rm.getStoreKey(p.Namespace, p.Name) - if p.DeletionTimestamp != nil { - rm.deletingPods[podKey] = &pods.Items[k] - } else { - rm.pods[podKey] = &pods.Items[k] - rm.incrementRefCounters(&p) - } - } -} - -// UpdatePod updates the supplied pod in the cache. -func (rm *ResourceManager) UpdatePod(p *v1.Pod) bool { - rm.Lock() - defer rm.Unlock() - - podKey := rm.getStoreKey(p.Namespace, p.Name) - if p.DeletionTimestamp != nil { - if old, ok := rm.pods[podKey]; ok { - rm.deletingPods[podKey] = p - - rm.decrementRefCounters(old) - delete(rm.pods, podKey) - - return true - } - - if _, ok := rm.deletingPods[podKey]; !ok { - return true - } - - return false - } - - if old, ok := rm.pods[podKey]; ok { - rm.decrementRefCounters(old) - rm.pods[podKey] = p - rm.incrementRefCounters(p) - - // NOTE(junjiez): no reconcile as we don't support update pod. - return false - } - - rm.pods[podKey] = p - rm.incrementRefCounters(p) - - return true -} - -// DeletePod removes the pod from the cache. -func (rm *ResourceManager) DeletePod(p *v1.Pod) bool { - rm.Lock() - defer rm.Unlock() - - podKey := rm.getStoreKey(p.Namespace, p.Name) - if old, ok := rm.pods[podKey]; ok { - rm.decrementRefCounters(old) - delete(rm.pods, podKey) - return true - } - - if _, ok := rm.deletingPods[podKey]; ok { - delete(rm.deletingPods, podKey) - } - - return false -} - -// GetPod retrieves the specified pod from the cache. It returns nil if a pod is not found. -func (rm *ResourceManager) GetPod(namespace, name string) *v1.Pod { - rm.RLock() - defer rm.RUnlock() - - if p, ok := rm.pods[rm.getStoreKey(namespace, name)]; ok { - return p - } - - return nil -} - // GetPods returns a list of all known pods assigned to this virtual node. func (rm *ResourceManager) GetPods() []*v1.Pod { - rm.RLock() - defer rm.RUnlock() - - pods := make([]*v1.Pod, 0, len(rm.pods)+len(rm.deletingPods)) - for _, p := range rm.pods { - pods = append(pods, p) + l, err := rm.podLister.Pods(v1.NamespaceAll).List(labels.Everything()) + if err == nil { + return l } - for _, p := range rm.deletingPods { - pods = append(pods, p) - } - - return pods + log.L.Errorf("failed to fetch pods from lister: %v", err) + return make([]*v1.Pod, 0) } -// GetConfigMap returns the specified ConfigMap from Kubernetes. It retrieves it from cache if there +// GetConfigMap retrieves the specified config map from the cache. func (rm *ResourceManager) GetConfigMap(name, namespace string) (*v1.ConfigMap, error) { - rm.Lock() - defer rm.Unlock() - - configMapKey := rm.getStoreKey(namespace, name) - if cm, ok := rm.configMaps[configMapKey]; ok { - return cm, nil - } - - var opts metav1.GetOptions - cm, err := rm.k8sClient.CoreV1().ConfigMaps(namespace).Get(name, opts) - if err != nil { - return nil, err - } - rm.configMaps[configMapKey] = cm - - return cm, err + return rm.configMapLister.ConfigMaps(namespace).Get(name) } -// GetSecret returns the specified ConfigMap from Kubernetes. It retrieves it from cache if there +// GetSecret retrieves the specified secret from Kubernetes. func (rm *ResourceManager) GetSecret(name, namespace string) (*v1.Secret, error) { - rm.Lock() - defer rm.Unlock() - - secretkey := rm.getStoreKey(namespace, name) - if secret, ok := rm.secrets[secretkey]; ok { - return secret, nil - } - - var opts metav1.GetOptions - secret, err := rm.k8sClient.CoreV1().Secrets(namespace).Get(name, opts) - if err != nil { - return nil, err - } - rm.secrets[secretkey] = secret - - return secret, err -} - -// watchConfigMaps monitors the kubernetes API for modifications and deletions of configmaps -// it evicts them from the internal cache -func (rm *ResourceManager) watchConfigMaps(w watch.Interface) { - - for { - select { - case ev, ok := <-w.ResultChan(): - if !ok { - return - } - - rm.Lock() - configMapkey := rm.getStoreKey(ev.Object.(*v1.ConfigMap).Namespace, ev.Object.(*v1.ConfigMap).Name) - switch ev.Type { - case watch.Modified: - delete(rm.configMaps, configMapkey) - case watch.Deleted: - delete(rm.configMaps, configMapkey) - } - rm.Unlock() - } - } -} - -// watchSecretes monitors the kubernetes API for modifications and deletions of secrets -// it evicts them from the internal cache -func (rm *ResourceManager) watchSecrets(w watch.Interface) { - - for { - select { - case ev, ok := <-w.ResultChan(): - if !ok { - return - } - - rm.Lock() - secretKey := rm.getStoreKey(ev.Object.(*v1.Secret).Namespace, ev.Object.(*v1.Secret).Name) - switch ev.Type { - case watch.Modified: - delete(rm.secrets, secretKey) - case watch.Deleted: - delete(rm.secrets, secretKey) - } - rm.Unlock() - } - } -} - -func (rm *ResourceManager) incrementRefCounters(p *v1.Pod) { - for _, c := range p.Spec.Containers { - for _, e := range c.Env { - if e.ValueFrom != nil && e.ValueFrom.ConfigMapKeyRef != nil { - configMapKey := rm.getStoreKey(p.Namespace, e.ValueFrom.ConfigMapKeyRef.Name) - rm.configMapRef[configMapKey]++ - } - - if e.ValueFrom != nil && e.ValueFrom.SecretKeyRef != nil { - secretKey := rm.getStoreKey(p.Namespace, e.ValueFrom.SecretKeyRef.Name) - rm.secretRef[secretKey]++ - } - } - } - - for _, v := range p.Spec.Volumes { - if v.VolumeSource.Secret != nil { - secretKey := rm.getStoreKey(p.Namespace, v.VolumeSource.Secret.SecretName) - rm.secretRef[secretKey]++ - } - } -} - -func (rm *ResourceManager) decrementRefCounters(p *v1.Pod) { - for _, c := range p.Spec.Containers { - for _, e := range c.Env { - if e.ValueFrom != nil && e.ValueFrom.ConfigMapKeyRef != nil { - configMapKey := rm.getStoreKey(p.Namespace, e.ValueFrom.ConfigMapKeyRef.Name) - rm.configMapRef[configMapKey]-- - } - - if e.ValueFrom != nil && e.ValueFrom.SecretKeyRef != nil { - secretKey := rm.getStoreKey(p.Namespace, e.ValueFrom.SecretKeyRef.Name) - rm.secretRef[secretKey]-- - } - } - } - - for _, v := range p.Spec.Volumes { - if v.VolumeSource.Secret != nil { - secretKey := rm.getStoreKey(p.Namespace, v.VolumeSource.Secret.SecretName) - rm.secretRef[secretKey]-- - } - } -} - -// getStoreKey return the key with namespace for store objects from different namespaces -func (rm *ResourceManager) getStoreKey(namespace, name string) string { - return namespace + "_" + name + return rm.secretLister.Secrets(namespace).Get(name) } diff --git a/manager/resource_test.go b/manager/resource_test.go index 0fd19162d..579547c9e 100644 --- a/manager/resource_test.go +++ b/manager/resource_test.go @@ -3,100 +3,156 @@ package manager import ( "testing" - "github.com/google/uuid" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" ) -var ( - fakeClient kubernetes.Interface -) +// TestGetPods verifies that the resource manager acts as a passthrough to a pod lister. +func TestGetPods(t *testing.T) { + var ( + lsPods = []*v1.Pod{ + makePod("namespace-0", "name-0", "image-0"), + makePod("namespace-1", "name-1", "image-1"), + } + ) -func init() { - fakeClient = fake.NewSimpleClientset() -} + // Create a pod lister that will list the pods defined above. + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + for _, pod := range lsPods { + indexer.Add(pod) + } + podLister := corev1listers.NewPodLister(indexer) -func TestResourceManager(t *testing.T) { - pm, err := NewResourceManager(fakeClient) + // Create a new instance of the resource manager based on the pod lister. + rm, err := NewResourceManager(podLister, nil, nil) if err != nil { t.Fatal(err) } - pod1Name := "Pod1" - pod1Namespace := "Pod1Namespace" - pod1 := makePod(pod1Namespace, pod1Name) - pm.UpdatePod(pod1) - pods := pm.GetPods() - if len(pods) != 1 { - t.Errorf("Got %d, expected 1 pod", len(pods)) - } - gotPod1 := pm.GetPod(pod1Namespace, pod1Name) - if gotPod1.Name != pod1.Name { - t.Errorf("Got %s, wanted %s", gotPod1.Name, pod1.Name) + // Check that the resource manager returns two pods in the call to "GetPods". + rmPods := rm.GetPods() + if len(rmPods) != len(lsPods) { + t.Fatalf("expected %d pods, found %d", len(lsPods), len(rmPods)) } } -func TestResourceManagerDeletePod(t *testing.T) { - pm, err := NewResourceManager(fakeClient) +// TestGetSecret verifies that the resource manager acts as a passthrough to a secret lister. +func TestGetSecret(t *testing.T) { + var ( + lsSecrets = []*v1.Secret{ + makeSecret("namespace-0", "name-0", "key-0", "val-0"), + makeSecret("namespace-1", "name-1", "key-1", "val-1"), + } + ) + + // Create a secret lister that will list the secrets defined above. + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + for _, secret := range lsSecrets { + indexer.Add(secret) + } + secretLister := corev1listers.NewSecretLister(indexer) + + // Create a new instance of the resource manager based on the secret lister. + rm, err := NewResourceManager(nil, secretLister, nil) if err != nil { t.Fatal(err) } - pod1Name := "Pod1" - pod1Namespace := "Pod1Namespace" - pod1 := makePod(pod1Namespace, pod1Name) - pm.UpdatePod(pod1) - pods := pm.GetPods() - if len(pods) != 1 { - t.Errorf("Got %d, expected 1 pod", len(pods)) - } - pm.DeletePod(pod1) - pods = pm.GetPods() - if len(pods) != 0 { - t.Errorf("Got %d, expected 0 pods", len(pods)) - } -} -func makePod(namespace, name string) *v1.Pod { - pod := &v1.Pod{} - pod.Name = name - pod.Namespace = namespace - pod.UID = types.UID(uuid.New().String()) - return pod -} - -func TestResourceManagerUpdatePod(t *testing.T) { - pm, err := NewResourceManager(fakeClient) + // Get the secret with coordinates "namespace-0/name-0". + secret, err := rm.GetSecret("name-0", "namespace-0") if err != nil { t.Fatal(err) } - pod1Name := "Pod1" - pod1Namespace := "Pod1Namespace" - pod1 := makePod(pod1Namespace, pod1Name) - pm.UpdatePod(pod1) - - pods := pm.GetPods() - if len(pods) != 1 { - t.Errorf("Got %d, expected 1 pod", len(pods)) - } - gotPod1 := pm.GetPod(pod1Namespace, pod1Name) - if gotPod1.Name != pod1.Name { - t.Errorf("Got %s, wanted %s", gotPod1.Name, pod1.Name) + value := secret.Data["key-0"] + if string(value) != "val-0" { + t.Fatal("got unexpected value", string(value)) } - if gotPod1.Namespace != pod1.Namespace { - t.Errorf("Got %s, wanted %s", gotPod1.Namespace, pod1.Namespace) - } - pod1.Namespace = "POD2NAMESPACE" - pm.UpdatePod(pod1) - - gotPod1 = pm.GetPod(pod1Namespace, pod1Name) - if gotPod1.Name != pod1.Name { - t.Errorf("Got %s, wanted %s", gotPod1.Name, pod1.Name) - } - - if gotPod1.Namespace != pod1.Namespace { - t.Errorf("Got %s, wanted %s", gotPod1.Namespace, pod1.Namespace) + // Try to get a secret that does not exist, and make sure we've got a "not found" error as a response. + _, err = rm.GetSecret("name-X", "namespace-X") + if err == nil || !errors.IsNotFound(err) { + t.Fatalf("expected a 'not found' error, got %v", err) + } +} + +// TestGetConfigMap verifies that the resource manager acts as a passthrough to a config map lister. +func TestGetConfigMap(t *testing.T) { + var ( + lsConfigMaps = []*v1.ConfigMap{ + makeConfigMap("namespace-0", "name-0", "key-0", "val-0"), + makeConfigMap("namespace-1", "name-1", "key-1", "val-1"), + } + ) + + // Create a config map lister that will list the config maps defined above. + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + for _, secret := range lsConfigMaps { + indexer.Add(secret) + } + configMapLister := corev1listers.NewConfigMapLister(indexer) + + // Create a new instance of the resource manager based on the config map lister. + rm, err := NewResourceManager(nil, nil, configMapLister) + if err != nil { + t.Fatal(err) + } + + // Get the config map with coordinates "namespace-0/name-0". + configMap, err := rm.GetConfigMap("name-0", "namespace-0") + if err != nil { + t.Fatal(err) + } + value := configMap.Data["key-0"] + if value != "val-0" { + t.Fatal("got unexpected value", string(value)) + } + + // Try to get a configmap that does not exist, and make sure we've got a "not found" error as a response. + _, err = rm.GetConfigMap("name-X", "namespace-X") + if err == nil || !errors.IsNotFound(err) { + t.Fatalf("expected a 'not found' error, got %v", err) + } +} + +func makeConfigMap(namespace, name, key, value string) *v1.ConfigMap { + return &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Data: map[string]string{ + key: value, + }, + } +} + +func makePod(namespace, name, image string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Image: image, + }, + }, + }, + } +} + +func makeSecret(namespace, name, key, value string) *v1.Secret { + return &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Data: map[string][]byte{ + key: []byte(value), + }, } } diff --git a/vkubelet/pod.go b/vkubelet/pod.go index f538fd0ab..b370f19e8 100644 --- a/vkubelet/pod.go +++ b/vkubelet/pod.go @@ -6,17 +6,13 @@ import ( "time" "github.com/cpuguy83/strongerrors/status/ocstatus" - pkgerrors "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/log" "go.opencensus.io/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" + + "github.com/virtual-kubelet/virtual-kubelet/log" ) func addPodAttributes(span *trace.Span, pod *corev1.Pod) { @@ -29,136 +25,6 @@ func addPodAttributes(span *trace.Span, pod *corev1.Pod) { ) } -func (s *Server) onAddPod(ctx context.Context, obj interface{}) { - ctx, span := trace.StartSpan(ctx, "onAddPod") - defer span.End() - logger := log.G(ctx).WithField("method", "onAddPod") - - pod, ok := obj.(*corev1.Pod) - if !ok { - span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: fmt.Sprintf("Unexpected object from event: %T", obj)}) - logger.Errorf("obj is not of a valid type: %T", obj) - return - } - - addPodAttributes(span, pod) - - logger.Debugf("Receive added pod '%s/%s' ", pod.GetNamespace(), pod.GetName()) - - if s.resourceManager.UpdatePod(pod) { - span.Annotate(nil, "Add pod to synchronizer channel.") - select { - case <-ctx.Done(): - logger = logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) - logger.WithError(ctx.Err()).Debug("Cancel send pod event due to cancelled context") - return - case s.podCh <- &podNotification{pod: pod, ctx: ctx}: - } - } -} - -func (s *Server) onUpdatePod(ctx context.Context, obj interface{}) { - ctx, span := trace.StartSpan(ctx, "onUpdatePod") - defer span.End() - logger := log.G(ctx).WithField("method", "onUpdatePod") - - pod, ok := obj.(*corev1.Pod) - if !ok { - span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: fmt.Sprintf("Unexpected object from event: %T", obj)}) - logger.Errorf("obj is not of a valid type: %T", obj) - return - } - - addPodAttributes(span, pod) - - logger.Debugf("Receive updated pod '%s/%s'", pod.GetNamespace(), pod.GetName()) - - if s.resourceManager.UpdatePod(pod) { - span.Annotate(nil, "Add pod to synchronizer channel.") - select { - case <-ctx.Done(): - logger = logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) - logger.WithError(ctx.Err()).Debug("Cancel send pod event due to cancelled context") - return - case s.podCh <- &podNotification{pod: pod, ctx: ctx}: - } - } -} - -func (s *Server) onDeletePod(ctx context.Context, obj interface{}) { - ctx, span := trace.StartSpan(ctx, "onDeletePod") - defer span.End() - logger := log.G(ctx).WithField("method", "onDeletePod") - - pod, ok := obj.(*corev1.Pod) - if !ok { - delta, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: fmt.Sprintf("Unexpected object from event: %T", obj)}) - logger.Errorf("obj is not of a valid type: %T", obj) - return - } - - if pod, ok = delta.Obj.(*corev1.Pod); !ok { - span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: fmt.Sprintf("Unexpected object from event: %T", obj)}) - logger.Errorf("obj is not of a valid type: %T", obj) - return - } - } - - addPodAttributes(span, pod) - - logger.Debugf("Receive deleted pod '%s/%s'", pod.GetNamespace(), pod.GetName()) - - if s.resourceManager.DeletePod(pod) { - span.Annotate(nil, "Add pod to synchronizer channel.") - select { - case <-ctx.Done(): - logger = logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) - logger.WithError(ctx.Err()).Debug("Cancel send pod event due to cancelled context") - return - case s.podCh <- &podNotification{pod: pod, ctx: ctx}: - } - } -} - -func (s *Server) startPodSynchronizer(ctx context.Context, id int) { - logger := log.G(ctx).WithField("method", "startPodSynchronizer").WithField("podSynchronizer", id) - logger.Debug("Start pod synchronizer") - - for { - select { - case <-ctx.Done(): - logger.Info("Stop pod syncronizer") - return - case event := <-s.podCh: - s.syncPod(event.ctx, event.pod) - } - } -} - -func (s *Server) syncPod(ctx context.Context, pod *corev1.Pod) { - ctx, span := trace.StartSpan(ctx, "syncPod") - defer span.End() - logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) - - addPodAttributes(span, pod) - - if pod.DeletionTimestamp != nil { - span.Annotate(nil, "Delete pod") - logger.Debugf("Deleting pod") - if err := s.deletePod(ctx, pod); err != nil { - logger.WithError(err).Error("Failed to delete pod") - } - } else { - span.Annotate(nil, "Create pod") - logger.Debugf("Creating pod") - if err := s.createPod(ctx, pod); err != nil { - logger.WithError(err).Errorf("Failed to create pod") - } - } -} - func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "createPod") defer span.End() @@ -224,9 +90,6 @@ func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error { return fmt.Errorf("Failed to delete kubernetes pod: %s", err) } span.Annotate(nil, "Deleted pod from k8s") - - s.resourceManager.DeletePod(pod) - span.Annotate(nil, "Deleted pod from internal state") logger.Info("Pod deleted") } @@ -310,69 +173,3 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { }, "updated pod status in kubernetes") return nil } - -// watchForPodEvent waits for pod changes from kubernetes and updates the details accordingly in the local state. -// This returns after a single pod event. -func (s *Server) watchForPodEvent(ctx context.Context) error { - opts := metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", s.nodeName).String(), - } - - pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts) - if err != nil { - return pkgerrors.Wrap(err, "error getting pod list") - } - - s.resourceManager.SetPods(pods) - s.reconcile(ctx) - - opts.ResourceVersion = pods.ResourceVersion - - var controller cache.Controller - _, controller = cache.NewInformer( - - &cache.ListWatch{ - - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - if controller != nil { - opts.ResourceVersion = controller.LastSyncResourceVersion() - } - - return s.k8sClient.Core().Pods(s.namespace).List(opts) - }, - - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - if controller != nil { - opts.ResourceVersion = controller.LastSyncResourceVersion() - } - - return s.k8sClient.Core().Pods(s.namespace).Watch(opts) - }, - }, - - &corev1.Pod{}, - - time.Minute, - - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - s.onAddPod(ctx, obj) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - s.onUpdatePod(ctx, newObj) - }, - DeleteFunc: func(obj interface{}) { - s.onDeletePod(ctx, obj) - }, - }, - ) - - for i := 0; i < s.podSyncWorkers; i++ { - go s.startPodSynchronizer(ctx, i) - } - - log.G(ctx).Info("Start to run pod cache controller.") - controller.Run(ctx.Done()) - - return ctx.Err() -} diff --git a/vkubelet/podcontroller.go b/vkubelet/podcontroller.go new file mode 100644 index 000000000..967ede7cc --- /dev/null +++ b/vkubelet/podcontroller.go @@ -0,0 +1,339 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vkubelet + +import ( + "context" + "fmt" + "sync" + "time" + + pkgerrors "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + "github.com/virtual-kubelet/virtual-kubelet/log" +) + +const ( + // maxRetries is the number of times we try to process a given key before permanently forgetting it. + maxRetries = 20 +) + +// PodController is the controller implementation for Pod resources. +type PodController struct { + // server is the instance to which this controller belongs. + server *Server + // podsInformer is an informer for Pod resources. + podsInformer v1.PodInformer + // podsLister is able to list/get Pod resources from a shared informer's store. + podsLister corev1listers.PodLister + + // workqueue is a rate limited work queue. + // This is used to queue work to be processed instead of performing it as soon as a change happens. + // This means we can ensure we only process a fixed amount of resources at a time, and makes it easy to ensure we are never processing the same item simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + // recorder is an event recorder for recording Event resources to the Kubernetes API. + recorder record.EventRecorder +} + +// NewPodController returns a new instance of PodController. +func NewPodController(server *Server) *PodController { + // Create an event broadcaster. + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(log.L.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: server.k8sClient.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "pod-controller"}) + + // Create an instance of PodController having a work queue that uses the rate limiter created above. + pc := &PodController{ + server: server, + podsInformer: server.podInformer, + podsLister: server.podInformer.Lister(), + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pods"), + recorder: recorder, + } + + // Set up event handlers for when Pod resources change. + pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(pod interface{}) { + if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { + runtime.HandleError(err) + } else { + pc.workqueue.AddRateLimited(key) + } + }, + UpdateFunc: func(_, pod interface{}) { + if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { + runtime.HandleError(err) + } else { + pc.workqueue.AddRateLimited(key) + } + }, + DeleteFunc: func(pod interface{}) { + if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { + runtime.HandleError(err) + } else { + pc.workqueue.AddRateLimited(key) + } + }, + }) + + // Return the instance of PodController back to the caller. + return pc +} + +// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. +// It will block until stopCh is closed, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. +func (pc *PodController) Run(ctx context.Context, threadiness int) error { + defer runtime.HandleCrash() + defer pc.workqueue.ShutDown() + + // Wait for the caches to be synced before starting workers. + if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { + return pkgerrors.New("failed to wait for caches to sync") + } + + // Perform a reconciliation step that deletes any dangling pods from the provider. + // This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis. + // If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried. + pc.deleteDanglingPods(ctx) + + // Launch two workers to process Pod resources. + log.G(ctx).Info("starting workers") + for i := 0; i < threadiness; i++ { + go wait.Until(pc.runWorker, time.Second, ctx.Done()) + } + + log.G(ctx).Info("started workers") + <-ctx.Done() + log.G(ctx).Info("shutting down workers") + + return nil +} + +// runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue. +func (pc *PodController) runWorker() { + for pc.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the work queue and attempt to process it,by calling the syncHandler. +func (pc *PodController) processNextWorkItem() bool { + obj, shutdown := pc.workqueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer pc.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the work queue knows we have finished processing this item. + // We also must remember to call Forget if we do not want this work item being re-queued. + // For example, we do not call Forget if a transient error occurs. + // Instead, the item is put back on the work queue and attempted again after a back-off period. + defer pc.workqueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the work queue. + // These are of the form namespace/name. + // We do this as the delayed nature of the work queue means the items in the informer cache may actually be more up to date that when the item was initially put onto the workqueue. + if key, ok = obj.(string); !ok { + // As the item in the work queue is actually invalid, we call Forget here else we'd go into a loop of attempting to process a work item that is invalid. + pc.workqueue.Forget(obj) + runtime.HandleError(pkgerrors.Errorf("expected string in work queue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced. + if err := pc.syncHandler(key); err != nil { + if pc.workqueue.NumRequeues(key) < maxRetries { + // Put the item back on the work queue to handle any transient errors. + log.L.Errorf("requeuing %q due to failed sync", key) + pc.workqueue.AddRateLimited(key) + return nil + } + // We've exceeded the maximum retries, so we must forget the key. + pc.workqueue.Forget(key) + return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key) + } + // Finally, if no error occurs we Forget this item so it does not get queued again until another change happens. + pc.workqueue.Forget(obj) + return nil + }(obj) + + if err != nil { + runtime.HandleError(err) + return true + } + + return true +} + +// syncHandler compares the actual state with the desired, and attempts to converge the two. +func (pc *PodController) syncHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name. + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + // Log the error but do not requeue the key as it is invalid. + runtime.HandleError(pkgerrors.Wrapf(err, "invalid resource key: %q", key)) + return nil + } + + // Create a context to pass to the provider. + ctx := context.Background() + + // Get the Pod resource with this namespace/name. + pod, err := pc.podsLister.Pods(namespace).Get(name) + if err != nil { + if !errors.IsNotFound(err) { + // We've failed to fetch the pod from the lister, but the error is not a 404. + // Hence, we add the key back to the work queue so we can retry processing it later. + return pkgerrors.Wrapf(err, "failed to fetch pod with key %q from lister", key) + } + // At this point we know the Pod resource doesn't exist, which most probably means it was deleted. + // Hence, we must delete it from the provider if it still exists there. + return pc.deletePodInProvider(ctx, namespace, name) + } + // At this point we know the Pod resource has either been created or updated (which includes being marked for deletion). + return pc.syncPodInProvider(ctx, pod) +} + +// syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation. +func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) error { + // Reconstruct the pod's key. + key := metaKey(pod) + + // Check whether the pod has been marked for deletion. + // If it does, delete it in the provider. + if pod.DeletionTimestamp != nil { + // Delete the pod. + if err := pc.server.deletePod(ctx, pod); err != nil { + return pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", key) + } + return nil + } + + // Ignore the pod if it is in the "Failed" state. + if pod.Status.Phase == corev1.PodFailed { + log.G(ctx).Warnf("skipping sync of pod %q in %q phase", key, pod.Status.Phase) + } + + // Check if the pod is already known by the provider. + // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. + // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). + pp, _ := pc.server.provider.GetPod(ctx, pod.Namespace, pod.Name) + if pp != nil { + // The pod has already been created in the provider. + // Hence, we return since pod updates are not yet supported. + return nil + } + // Create the pod in the provider. + if err := pc.server.createPod(ctx, pod); err != nil { + return pkgerrors.Wrapf(err, "failed to create pod %q in the provider", key) + } + return nil +} + +// deletePodInProvider checks whether the pod with the specified namespace and name is still known to the provider, and deletes it in case it is. +// This function is meant to be called only when a given Pod resource has already been deleted from Kubernetes. +func (pc *PodController) deletePodInProvider(ctx context.Context, namespace, name string) error { + // Reconstruct the pod's key. + key := metaKeyFromNamespaceName(namespace, name) + + // Grab the pod as known by the provider. + // Since this function is only called when the Pod resource has already been deleted from Kubernetes, we must get it from the provider so we can call "deletePod". + // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. + // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). + pod, _ := pc.server.provider.GetPod(ctx, namespace, name) + if pod == nil { + // The provider is not aware of the pod, so we just exit. + return nil + } + + // Delete the pod. + if err := pc.server.deletePod(ctx, pod); err != nil { + return pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", key) + } + return nil +} + +// deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them. +func (pc *PodController) deleteDanglingPods(ctx context.Context) error { + // Grab the list of pods known to the provider. + pps, err := pc.server.provider.GetPods(ctx) + if err != nil { + return pkgerrors.Wrap(err, "failed to fetch the list of pods from the provider") + } + + // Create a slice to hold the pods we will be deleting from the provider. + ptd := make([]*corev1.Pod, 0) + + // Iterate over the pods known to the provider, marking for deletion those that don't exist in Kubernetes. + // Take on this opportunity to populate the list of key that correspond to pods known to the provider. + for _, pp := range pps { + if _, err := pc.podsLister.Pods(pp.Namespace).Get(pp.Name); err != nil { + if errors.IsNotFound(err) { + // The current pod does not exist in Kubernetes, so we mark it for deletion. + ptd = append(ptd, pp) + continue + } + // For some reason we couldn't fetch the pod from the lister, so we propagate the error. + return pkgerrors.Wrap(err, "failed to fetch pod from the lister") + } + } + + var wg sync.WaitGroup + wg.Add(len(ptd)) + + // Iterate over the slice of pods to be deleted and delete them in the provider. + for _, pod := range ptd { + go func(pod *corev1.Pod) { + if err := pc.server.deletePod(ctx, pod); err != nil { + log.G(ctx).Errorf("failed to delete pod %q in provider", metaKey(pod)) + } else { + log.G(ctx).Infof("deleted leaked pod %q in provider", metaKey(pod)) + } + wg.Done() + }(pod) + } + + // Wait for all pods to be deleted. + wg.Wait() + return nil +} + +// metaKey returns the "namespace/name" key for the specified pod. +// If the key cannot be computed, "(unknown)" is returned. +func metaKey(pod *corev1.Pod) string { + k, err := cache.MetaNamespaceKeyFunc(pod) + if err != nil { + return "(unknown)" + } + return k +} + +// metaKeyFromNamespaceName returns the "namespace/name" key for the pod identified by the specified namespace and name. +func metaKeyFromNamespaceName(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +} diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index 7246c9656..7f2af9213 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -6,12 +6,14 @@ import ( "time" pkgerrors "github.com/pkg/errors" + "go.opencensus.io/trace" + corev1 "k8s.io/api/core/v1" + corev1informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" - "go.opencensus.io/trace" - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes" ) const ( @@ -28,6 +30,7 @@ type Server struct { resourceManager *manager.ResourceManager podSyncWorkers int podCh chan *podNotification + podInformer corev1informers.PodInformer } // Config is used to configure a new server. @@ -41,6 +44,7 @@ type Config struct { ResourceManager *manager.ResourceManager Taint *corev1.Taint PodSyncWorkers int + PodInformer corev1informers.PodInformer } // APIConfig is used to configure the API server of the virtual kubelet. @@ -66,6 +70,7 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) { provider: cfg.Provider, podSyncWorkers: cfg.PodSyncWorkers, podCh: make(chan *podNotification, cfg.PodSyncWorkers), + podInformer: cfg.PodInformer, } ctx = log.WithLogger(ctx, log.G(ctx)) @@ -120,127 +125,7 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) { return s, nil } -// Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster. -// Run will block until Stop is called or a SIGINT or SIGTERM signal is received. +// Run creates and starts an instance of the pod controller, blocking until it stops. func (s *Server) Run(ctx context.Context) error { - if err := s.watchForPodEvent(ctx); err != nil { - if pkgerrors.Cause(err) == context.Canceled { - return err - } - log.G(ctx).Error(err) - } - - return nil -} - -// reconcile is the main reconciliation loop that compares differences between Kubernetes and -// the active provider and reconciles the differences. -func (s *Server) reconcile(ctx context.Context) { - ctx, span := trace.StartSpan(ctx, "reconcile") - defer span.End() - - logger := log.G(ctx) - logger.Debug("Start reconcile") - defer logger.Debug("End reconcile") - - providerPods, err := s.provider.GetPods(ctx) - if err != nil { - logger.WithError(err).Error("Error getting pod list from provider") - return - } - - var deletePods []*corev1.Pod - for _, pod := range providerPods { - // Delete pods that don't exist in Kubernetes - if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil { - deletePods = append(deletePods, pod) - } - } - span.Annotate(nil, "Got provider pods") - - var failedDeleteCount int64 - for _, pod := range deletePods { - logger := logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) - logger.Debug("Deleting pod") - if err := s.deletePod(ctx, pod); err != nil { - logger.WithError(err).Error("Error deleting pod") - failedDeleteCount++ - continue - } - } - span.Annotate( - []trace.Attribute{ - trace.Int64Attribute("expected_delete_pods_count", int64(len(deletePods))), - trace.Int64Attribute("failed_delete_pods_count", failedDeleteCount), - }, - "Cleaned up stale provider pods", - ) - - pods := s.resourceManager.GetPods() - - var createPods []*corev1.Pod - cleanupPods := deletePods[:0] - - for _, pod := range pods { - var providerPod *corev1.Pod - for _, p := range providerPods { - if p.Namespace == pod.Namespace && p.Name == pod.Name { - providerPod = p - break - } - } - - // Delete pod if DeletionTimestamp is set - if pod.DeletionTimestamp != nil { - cleanupPods = append(cleanupPods, pod) - continue - } - - if providerPod == nil && - pod.DeletionTimestamp == nil && - pod.Status.Phase != corev1.PodSucceeded && - pod.Status.Phase != corev1.PodFailed && - pod.Status.Reason != podStatusReasonProviderFailed { - createPods = append(createPods, pod) - } - } - - var failedCreateCount int64 - for _, pod := range createPods { - logger := logger.WithField("pod", pod.Name) - logger.Debug("Creating pod") - if err := s.createPod(ctx, pod); err != nil { - failedCreateCount++ - logger.WithError(err).Error("Error creating pod") - continue - } - } - span.Annotate( - []trace.Attribute{ - trace.Int64Attribute("expected_created_pods", int64(len(createPods))), - trace.Int64Attribute("failed_pod_creates", failedCreateCount), - }, - "Created pods in provider", - ) - - var failedCleanupCount int64 - for _, pod := range cleanupPods { - logger := logger.WithField("pod", pod.Name) - log.Trace(logger, "Pod pending deletion") - var err error - if err = s.deletePod(ctx, pod); err != nil { - logger.WithError(err).Error("Error deleting pod") - failedCleanupCount++ - continue - } - log.Trace(logger, "Pod deletion complete") - } - - span.Annotate( - []trace.Attribute{ - trace.Int64Attribute("expected_cleaned_up_pods", int64(len(cleanupPods))), - trace.Int64Attribute("cleaned_up_pod_failures", failedCleanupCount), - }, - "Cleaned up provider pods marked for deletion", - ) + return NewPodController(s).Run(ctx, s.podSyncWorkers) }