diff --git a/root.go b/root.go index 14bcd63..ee4ea88 100644 --- a/root.go +++ b/root.go @@ -23,25 +23,35 @@ import ( "strconv" "strings" "syscall" + "time" "github.com/Sirupsen/logrus" "github.com/cpuguy83/strongerrors" - homedir "github.com/mitchellh/go-homedir" + "github.com/mitchellh/go-homedir" "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opencensus.io/trace" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + kubeinformers "k8s.io/client-go/informers" + corev1informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/register" - vkubelet "github.com/virtual-kubelet/virtual-kubelet/vkubelet" - "go.opencensus.io/trace" - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet" ) const ( defaultDaemonPort = "10250" + // kubeSharedInformerFactoryDefaultResync is the default resync period used by the shared informer factories for Kubernetes resources. + // It is set to the same value used by the Kubelet, and can be overridden via the "--full-resync-period" flag. + // https://github.com/kubernetes/kubernetes/blob/v1.12.2/pkg/kubelet/apis/config/v1beta1/defaults.go#L51 + kubeSharedInformerFactoryDefaultResync = 1 * time.Minute ) var kubeletConfig string @@ -60,12 +70,17 @@ var k8sClient *kubernetes.Clientset var p providers.Provider var rm *manager.ResourceManager var apiConfig vkubelet.APIConfig +var podInformer corev1informers.PodInformer +var kubeSharedInformerFactoryResync time.Duration var podSyncWorkers int var userTraceExporters []string var userTraceConfig = TracingExporterOptions{Tags: make(map[string]string)} var traceSampler string +// Create a root context to be used by the pod controller and by the shared informer factories. +var rootContext, rootContextCancel = context.WithCancel(context.Background()) + // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ Use: "virtual-kubelet", @@ -74,9 +89,9 @@ var RootCmd = &cobra.Command{ backend implementation allowing users to create kubernetes nodes without running the kubelet. This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, Run: func(cmd *cobra.Command, args []string) { - ctx, cancel := context.WithCancel(context.Background()) + defer rootContextCancel() - f, err := vkubelet.New(ctx, vkubelet.Config{ + f, err := vkubelet.New(rootContext, vkubelet.Config{ Client: k8sClient, Namespace: kubeNamespace, NodeName: nodeName, @@ -86,6 +101,7 @@ This allows users to schedule kubernetes workloads on nodes that aren't running ResourceManager: rm, APIConfig: apiConfig, PodSyncWorkers: podSyncWorkers, + PodInformer: podInformer, }) if err != nil { log.L.WithError(err).Fatal("Error initializing virtual kubelet") @@ -95,10 +111,10 @@ This allows users to schedule kubernetes workloads on nodes that aren't running signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) go func() { <-sig - cancel() + rootContextCancel() }() - if err := f.Run(ctx); err != nil && errors.Cause(err) != context.Canceled { + if err := f.Run(rootContext); err != nil && errors.Cause(err) != context.Canceled { log.L.Fatal(err) } }, @@ -176,6 +192,8 @@ func init() { RootCmd.PersistentFlags().Var(mapVar(userTraceConfig.Tags), "trace-tag", "add tags to include with traces in key=value form") RootCmd.PersistentFlags().StringVar(&traceSampler, "trace-sample-rate", "", "set probability of tracing samples") + RootCmd.PersistentFlags().DurationVar(&kubeSharedInformerFactoryResync, "full-resync-period", kubeSharedInformerFactoryDefaultResync, "how often to perform a full resync of pods between kubernetes and the provider") + // Cobra also supports local flags, which will only run // when this action is called directly. // RootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") @@ -251,11 +269,30 @@ func initConfig() { logger.WithError(err).Fatal("Error creating kubernetes client") } - rm, err = manager.NewResourceManager(k8sClient) + // Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node. + podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k8sClient, kubeSharedInformerFactoryResync, kubeinformers.WithNamespace(kubeNamespace), kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String() + })) + // Create a pod informer so we can pass its lister to the resource manager. + podInformer = podInformerFactory.Core().V1().Pods() + + // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). + scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k8sClient, kubeSharedInformerFactoryResync) + // Create a secret informer and a config map informer so we can pass their listers to the resource manager. + secretInformer := scmInformerFactory.Core().V1().Secrets() + configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() + + // Create a new instance of the resource manager that uses the listers above for pods, secrets and config maps. + rm, err = manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister()) if err != nil { logger.WithError(err).Fatal("Error initializing resource manager") } + // Start the shared informer factory for pods. + go podInformerFactory.Start(rootContext.Done()) + // Start the shared informer factory for secrets and configmaps. + go scmInformerFactory.Start(rootContext.Done()) + daemonPortEnv := getEnv("KUBELET_PORT", defaultDaemonPort) daemonPort, err := strconv.ParseInt(daemonPortEnv, 10, 32) if err != nil {