From 57e8ee4b51e386da78548f9eede6cc42b78587d2 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 19 Apr 2019 17:02:39 -0700 Subject: [PATCH] Refactor CLI initialization (#562) This cleans up the CLI code significantly. Also makes some of this re-usable for providers who want to do so. This also removes the main.go from the top of the tree of the repro, instead moving it into cmd/virtual-kubelet. This allows us to better utilize the package namespace (and e.g. mv the `vkubelet` package to the top of the tree). --- census_jaeger.go | 37 -- census_ocagent.go | 37 -- census_test.go | 45 -- census_tracing.go | 69 --- client.go | 35 -- providers.go | 55 --- root.go | 404 ------------------ taint.go | 56 --- version.go | 46 -- .../commands/providers/provider.go | 54 +++ virtual-kubelet/commands/root/flag.go | 90 ++++ .../commands/root/http.go | 31 +- .../commands/root/node.go | 57 ++- virtual-kubelet/commands/root/opts.go | 143 +++++++ virtual-kubelet/commands/root/root.go | 204 +++++++++ virtual-kubelet/commands/root/tracing.go | 102 +++++ virtual-kubelet/commands/version/version.go | 34 ++ virtual-kubelet/main.go | 68 +++ 18 files changed, 771 insertions(+), 796 deletions(-) delete mode 100644 census_jaeger.go delete mode 100644 census_ocagent.go delete mode 100644 census_test.go delete mode 100644 census_tracing.go delete mode 100644 client.go delete mode 100644 providers.go delete mode 100644 root.go delete mode 100644 taint.go delete mode 100644 version.go create mode 100644 virtual-kubelet/commands/providers/provider.go create mode 100644 virtual-kubelet/commands/root/flag.go rename http.go => virtual-kubelet/commands/root/http.go (76%) rename node.go => virtual-kubelet/commands/root/node.go (51%) create mode 100644 virtual-kubelet/commands/root/opts.go create mode 100644 virtual-kubelet/commands/root/root.go create mode 100644 virtual-kubelet/commands/root/tracing.go create mode 100644 virtual-kubelet/commands/version/version.go create mode 100644 virtual-kubelet/main.go diff --git a/census_jaeger.go b/census_jaeger.go deleted file mode 100644 index d4d3882..0000000 --- a/census_jaeger.go +++ /dev/null @@ -1,37 +0,0 @@ -// +build !no_jaeger_exporter - -package cmd - -import ( - "errors" - "os" - - "go.opencensus.io/exporter/jaeger" - "go.opencensus.io/trace" -) - -func init() { - RegisterTracingExporter("jaeger", NewJaegerExporter) -} - -// NewJaegerExporter creates a new opencensus tracing exporter. -func NewJaegerExporter(opts TracingExporterOptions) (trace.Exporter, error) { - jOpts := jaeger.Options{ - Endpoint: os.Getenv("JAEGER_ENDPOINT"), - AgentEndpoint: os.Getenv("JAEGER_AGENT_ENDPOINT"), - Username: os.Getenv("JAEGER_USER"), - Password: os.Getenv("JAEGER_PASSWORD"), - Process: jaeger.Process{ - ServiceName: opts.ServiceName, - }, - } - - if jOpts.Endpoint == "" && jOpts.AgentEndpoint == "" { - return nil, errors.New("Must specify either JAEGER_ENDPOINT or JAEGER_AGENT_ENDPOINT") - } - - for k, v := range opts.Tags { - jOpts.Process.Tags = append(jOpts.Process.Tags, jaeger.StringTag(k, v)) - } - return jaeger.NewExporter(jOpts) -} diff --git a/census_ocagent.go b/census_ocagent.go deleted file mode 100644 index 6ad1558..0000000 --- a/census_ocagent.go +++ /dev/null @@ -1,37 +0,0 @@ -// +build !no_ocagent_exporter - -package cmd - -import ( - "os" - - "contrib.go.opencensus.io/exporter/ocagent" - "github.com/cpuguy83/strongerrors" - "github.com/pkg/errors" - "go.opencensus.io/trace" -) - -func init() { - RegisterTracingExporter("ocagent", NewOCAgentExporter) -} - -// NewOCAgentExporter creates a new opencensus tracing exporter using the opencensus agent forwarder. -func NewOCAgentExporter(opts TracingExporterOptions) (trace.Exporter, error) { - agentOpts := append([]ocagent.ExporterOption{}, ocagent.WithServiceName(opts.ServiceName)) - - if endpoint := os.Getenv("OCAGENT_ENDPOINT"); endpoint != "" { - agentOpts = append(agentOpts, ocagent.WithAddress(endpoint)) - } else { - return nil, strongerrors.InvalidArgument(errors.New("must set endpoint address in OCAGENT_ENDPOINT")) - } - - switch os.Getenv("OCAGENT_INSECURE") { - case "0", "no", "n", "off", "": - case "1", "yes", "y", "on": - agentOpts = append(agentOpts, ocagent.WithInsecure()) - default: - return nil, strongerrors.InvalidArgument(errors.New("invalid value for OCAGENT_INSECURE")) - } - - return ocagent.NewExporter(agentOpts...) -} diff --git a/census_test.go b/census_test.go deleted file mode 100644 index d06c16d..0000000 --- a/census_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package cmd - -import ( - "testing" - - "github.com/cpuguy83/strongerrors" - - "go.opencensus.io/trace" -) - -func TestGetTracingExporter(t *testing.T) { - defer delete(tracingExporters, "mock") - - mockExporterFn := func(_ TracingExporterOptions) (trace.Exporter, error) { - return nil, nil - } - - _, err := GetTracingExporter("notexist", TracingExporterOptions{}) - if !strongerrors.IsNotFound(err) { - t.Fatalf("expected not found error, got: %v", err) - } - - RegisterTracingExporter("mock", mockExporterFn) - - if _, err := GetTracingExporter("mock", TracingExporterOptions{}); err != nil { - t.Fatal(err) - } -} - -func TestAvailableExporters(t *testing.T) { - defer delete(tracingExporters, "mock") - - mockExporterFn := func(_ TracingExporterOptions) (trace.Exporter, error) { - return nil, nil - } - RegisterTracingExporter("mock", mockExporterFn) - - for _, e := range AvailableTraceExporters() { - if e == "mock" { - return - } - } - - t.Fatal("could not find mock exporter in list of registered exporters") -} diff --git a/census_tracing.go b/census_tracing.go deleted file mode 100644 index 8ffdd58..0000000 --- a/census_tracing.go +++ /dev/null @@ -1,69 +0,0 @@ -package cmd - -import ( - "context" - "net/http" - "os" - - "github.com/cpuguy83/strongerrors" - "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/log" - "go.opencensus.io/trace" - "go.opencensus.io/zpages" -) - -var ( - tracingExporters = make(map[string]TracingExporterInitFunc) - - reservedTagNames = map[string]bool{ - "operatingSystem": true, - "provider": true, - "nodeName": true, - } -) - -// TracingExporterOptions is used to pass options to the configured tracer -type TracingExporterOptions struct { - Tags map[string]string - ServiceName string -} - -// TracingExporterInitFunc is the function that is called to initialize an exporter. -// This is used when registering an exporter and called when a user specifed they want to use the exporter. -type TracingExporterInitFunc func(TracingExporterOptions) (trace.Exporter, error) - -// RegisterTracingExporter registers a tracing exporter. -// For a user to select an exporter, it must be registered here. -func RegisterTracingExporter(name string, f TracingExporterInitFunc) { - tracingExporters[name] = f -} - -// GetTracingExporter gets the specified tracing exporter passing in the options to the exporter init function. -// For an exporter to be availbale here it must be registered with `RegisterTracingExporter`. -func GetTracingExporter(name string, opts TracingExporterOptions) (trace.Exporter, error) { - f, ok := tracingExporters[name] - if !ok { - return nil, strongerrors.NotFound(errors.Errorf("tracing exporter %q not found", name)) - } - return f(opts) -} - -// AvailableTraceExporters gets the list of registered exporters -func AvailableTraceExporters() []string { - out := make([]string, 0, len(tracingExporters)) - for k := range tracingExporters { - out = append(out, k) - } - return out -} - -func setupZpages() { - ctx := context.TODO() - p := os.Getenv("ZPAGES_PORT") - if p == "" { - log.G(ctx).Error("Missing ZPAGES_PORT env var, cannot setup zpages endpoint") - } - mux := http.NewServeMux() - zpages.Handle(mux, "/debug") - http.ListenAndServe(p, mux) -} diff --git a/client.go b/client.go deleted file mode 100644 index d092157..0000000 --- a/client.go +++ /dev/null @@ -1,35 +0,0 @@ -package cmd - -import ( - "os" - - "github.com/pkg/errors" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" -) - -func newClient(configPath string) (*kubernetes.Clientset, error) { - var config *rest.Config - - // Check if the kubeConfig file exists. - if _, err := os.Stat(configPath); !os.IsNotExist(err) { - // Get the kubeconfig from the filepath. - config, err = clientcmd.BuildConfigFromFlags("", configPath) - if err != nil { - return nil, errors.Wrap(err, "error building client config") - } - } else { - // Set to in-cluster config. - config, err = rest.InClusterConfig() - if err != nil { - return nil, errors.Wrap(err, "error building in cluster config") - } - } - - if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { - config.Host = masterURI - } - - return kubernetes.NewForConfig(config) -} diff --git a/providers.go b/providers.go deleted file mode 100644 index 29fed9a..0000000 --- a/providers.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright © 2017 NAME HERE -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "fmt" - "os" - - "github.com/spf13/cobra" - "github.com/virtual-kubelet/virtual-kubelet/providers/register" -) - -// versionCmd represents the version command -var providersCmd = &cobra.Command{ - Use: "providers", - Short: "Show the list of supported providers", - Long: "Show the list of supported providers", - Args: cobra.MaximumNArgs(2), - Run: func(cmd *cobra.Command, args []string) { - switch len(args) { - case 0: - ls := register.List() - for _, p := range ls { - fmt.Fprintln(cmd.OutOrStdout(), p) - } - case 1: - if !register.Exists(args[0]) { - fmt.Fprintln(cmd.OutOrStderr(), "no such provider", args[0]) - - // TODO(@cpuuy83): would be nice to not short-circuit the exit here - // But at the momemt this seems to be the only way to exit non-zero and - // handle our own error output - os.Exit(1) - } - fmt.Fprintln(cmd.OutOrStdout(), args[0]) - } - return - }, -} - -func init() { - RootCmd.AddCommand(providersCmd) -} diff --git a/root.go b/root.go deleted file mode 100644 index cac569f..0000000 --- a/root.go +++ /dev/null @@ -1,404 +0,0 @@ -// Copyright © 2017 The virtual-kubelet authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "context" - "fmt" - "os" - "os/signal" - "path/filepath" - "strconv" - "strings" - "syscall" - "time" - - "github.com/Sirupsen/logrus" - "github.com/mitchellh/go-homedir" - "github.com/pkg/errors" - "github.com/spf13/cobra" - "github.com/spf13/viper" - "github.com/virtual-kubelet/virtual-kubelet/log" - logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" - "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers" - "github.com/virtual-kubelet/virtual-kubelet/providers/register" - "github.com/virtual-kubelet/virtual-kubelet/trace" - "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" - "github.com/virtual-kubelet/virtual-kubelet/vkubelet" - octrace "go.opencensus.io/trace" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - kubeinformers "k8s.io/client-go/informers" - corev1informers "k8s.io/client-go/informers/core/v1" - "k8s.io/client-go/kubernetes" -) - -const ( - defaultDaemonPort = "10250" - // kubeSharedInformerFactoryDefaultResync is the default resync period used by the shared informer factories for Kubernetes resources. - // It is set to the same value used by the Kubelet, and can be overridden via the "--full-resync-period" flag. - // https://github.com/kubernetes/kubernetes/blob/v1.12.2/pkg/kubelet/apis/config/v1beta1/defaults.go#L51 - kubeSharedInformerFactoryDefaultResync = 1 * time.Minute -) - -var kubeletConfig string -var kubeConfig string -var kubeNamespace string -var nodeName string -var operatingSystem string -var provider string -var providerConfig string -var taintKey string -var disableTaint bool -var logLevel string -var metricsAddr string -var taint *corev1.Taint -var k8sClient *kubernetes.Clientset -var p providers.Provider -var rm *manager.ResourceManager -var apiConfig *apiServerConfig -var podInformer corev1informers.PodInformer -var kubeSharedInformerFactoryResync time.Duration -var podSyncWorkers int -var enableNodeLease bool - -var userTraceExporters []string -var userTraceConfig = TracingExporterOptions{Tags: make(map[string]string)} -var traceSampler string - -// Create a root context to be used by the pod controller and by the shared informer factories. -var rootContext, rootContextCancel = context.WithCancel(context.Background()) - -// RootCmd represents the base command when called without any subcommands -var RootCmd = &cobra.Command{ - Use: "virtual-kubelet", - Short: "virtual-kubelet provides a virtual kubelet interface for your kubernetes cluster.", - Long: `virtual-kubelet implements the Kubelet interface with a pluggable -backend implementation allowing users to create kubernetes nodes without running the kubelet. -This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, - Run: func(cmd *cobra.Command, args []string) { - initConfig() - - defer rootContextCancel() - - pNode := NodeFromProvider(rootContext, nodeName, taint, p) - node, err := vkubelet.NewNode( - vkubelet.NaiveNodeProvider{}, - pNode, - k8sClient.Coordination().Leases(corev1.NamespaceNodeLease), - k8sClient.CoreV1().Nodes(), - vkubelet.WithNodeDisableLease(!enableNodeLease), - ) - if err != nil { - log.G(rootContext).Fatal(err) - } - - vk := vkubelet.New(vkubelet.Config{ - Client: k8sClient, - Namespace: kubeNamespace, - NodeName: pNode.Name, - Provider: p, - ResourceManager: rm, - PodSyncWorkers: podSyncWorkers, - PodInformer: podInformer, - }) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sig - rootContextCancel() - }() - - cancelHTTP, err := setupHTTPServer(rootContext, apiConfig) - if err != nil { - log.G(rootContext).Fatal(err) - } - defer cancelHTTP() - - go func() { - if err := vk.Run(rootContext); err != nil && errors.Cause(err) != context.Canceled { - log.G(rootContext).Fatal(err) - } - }() - - go func() { - if err := node.Run(rootContext); err != nil { - log.G(rootContext).Fatal(err) - } - }() - <-rootContext.Done() - }, -} - -// Execute adds all child commands to the root command and sets flags appropriately. -// This is called by main.main(). It only needs to happen once to the rootCmd. -func Execute() { - if err := RootCmd.Execute(); err != nil { - log.GetLogger(context.TODO()).WithError(err).Fatal("Error executing root command") - } -} - -type mapVar map[string]string - -func (mv mapVar) String() string { - var s string - for k, v := range mv { - if s == "" { - s = fmt.Sprintf("%s=%v", k, v) - } else { - s += fmt.Sprintf(", %s=%v", k, v) - } - } - return s -} - -func (mv mapVar) Set(s string) error { - split := strings.SplitN(s, "=", 2) - if len(split) != 2 { - return errors.Errorf("invalid format, must be `key=value`: %s", s) - } - - _, ok := mv[split[0]] - if ok { - return errors.Errorf("duplicate key: %s", split[0]) - } - mv[split[0]] = split[1] - return nil -} - -func (mv mapVar) Type() string { - return "map" -} - -func init() { - // make sure the default logger/tracer is initialized - log.L = logruslogger.FromLogrus(logrus.NewEntry(logrus.StandardLogger())) - trace.T = opencensus.Adapter{} - - // read default node name from environment variable. - // it can be overwritten by cli flags if specified. - defaultNodeName := os.Getenv("DEFAULT_NODE_NAME") - if defaultNodeName == "" { - defaultNodeName = "virtual-kubelet" - } - // Here you will define your flags and configuration settings. - // Cobra supports persistent flags, which, if defined here, - // will be global for your application. - //RootCmd.PersistentFlags().StringVar(&kubeletConfig, "config", "", "config file (default is $HOME/.virtual-kubelet.yaml)") - RootCmd.PersistentFlags().StringVar(&kubeConfig, "kubeconfig", "", "config file (default is $HOME/.kube/config)") - RootCmd.PersistentFlags().StringVar(&kubeNamespace, "namespace", "", "kubernetes namespace (default is 'all')") - RootCmd.PersistentFlags().StringVar(&nodeName, "nodename", defaultNodeName, "kubernetes node name") - RootCmd.PersistentFlags().StringVar(&operatingSystem, "os", "Linux", "Operating System (Linux/Windows)") - RootCmd.PersistentFlags().StringVar(&provider, "provider", "", "cloud provider") - RootCmd.PersistentFlags().BoolVar(&disableTaint, "disable-taint", false, "disable the virtual-kubelet node taint") - RootCmd.PersistentFlags().StringVar(&providerConfig, "provider-config", "", "cloud provider configuration file") - RootCmd.PersistentFlags().StringVar(&metricsAddr, "metrics-addr", ":10255", "address to listen for metrics/stats requests") - - RootCmd.PersistentFlags().StringVar(&taintKey, "taint", "", "Set node taint key") - RootCmd.PersistentFlags().MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable") - RootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", `set the log level, e.g. "trace", debug", "info", "warn", "error"`) - RootCmd.PersistentFlags().IntVar(&podSyncWorkers, "pod-sync-workers", 10, `set the number of pod synchronization workers`) - RootCmd.PersistentFlags().BoolVar(&enableNodeLease, "enable-node-lease", false, `use node leases (1.13) for node heartbeats`) - - RootCmd.PersistentFlags().StringSliceVar(&userTraceExporters, "trace-exporter", nil, fmt.Sprintf("sets the tracing exporter to use, available exporters: %s", AvailableTraceExporters())) - RootCmd.PersistentFlags().StringVar(&userTraceConfig.ServiceName, "trace-service-name", "virtual-kubelet", "sets the name of the service used to register with the trace exporter") - RootCmd.PersistentFlags().Var(mapVar(userTraceConfig.Tags), "trace-tag", "add tags to include with traces in key=value form") - RootCmd.PersistentFlags().StringVar(&traceSampler, "trace-sample-rate", "", "set probability of tracing samples") - - RootCmd.PersistentFlags().DurationVar(&kubeSharedInformerFactoryResync, "full-resync-period", kubeSharedInformerFactoryDefaultResync, "how often to perform a full resync of pods between kubernetes and the provider") - - // Cobra also supports local flags, which will only run - // when this action is called directly. - // RootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} - -// initConfig reads in config file and ENV variables if set. -func initConfig() { - if provider == "" { - log.G(context.TODO()).Fatal("You must supply a cloud provider option: use --provider") - } - - // Find home directory. - home, err := homedir.Dir() - if err != nil { - log.G(context.TODO()).WithError(err).Fatal("Error reading homedir") - } - - if kubeletConfig != "" { - // Use config file from the flag. - viper.SetConfigFile(kubeletConfig) - } else { - // Search config in home directory with name ".virtual-kubelet" (without extension). - viper.AddConfigPath(home) - viper.SetConfigName(".virtual-kubelet") - } - - viper.AutomaticEnv() // read in environment variables that match - - // If a config file is found, read it in. - if err := viper.ReadInConfig(); err == nil { - log.G(context.TODO()).Debugf("Using config file %s", viper.ConfigFileUsed()) - } - - if kubeConfig == "" { - kubeConfig = filepath.Join(home, ".kube", "config") - - } - - if kubeNamespace == "" { - kubeNamespace = corev1.NamespaceAll - } - - // Validate operating system. - ok, _ := providers.ValidOperatingSystems[operatingSystem] - if !ok { - log.G(context.TODO()).WithField("OperatingSystem", operatingSystem).Fatalf("Operating system not supported. Valid options are: %s", strings.Join(providers.ValidOperatingSystems.Names(), " | ")) - } - - level, err := logrus.ParseLevel(logLevel) - if err != nil { - log.G(context.TODO()).WithField("logLevel", logLevel).Fatal("log level is not supported") - } - - logrus.SetLevel(level) - logger := logruslogger.FromLogrus(logrus.WithFields(logrus.Fields{ - "provider": provider, - "operatingSystem": operatingSystem, - "node": nodeName, - "namespace": kubeNamespace, - })) - - rootContext = log.WithLogger(rootContext, logger) - - log.L = logger - - if !disableTaint { - taint, err = getTaint(taintKey, provider) - if err != nil { - logger.WithError(err).Fatal("Error setting up desired kubernetes node taint") - } - } - - k8sClient, err = newClient(kubeConfig) - if err != nil { - logger.WithError(err).Fatal("Error creating kubernetes client") - } - - // Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node. - podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k8sClient, kubeSharedInformerFactoryResync, kubeinformers.WithNamespace(kubeNamespace), kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { - options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String() - })) - // Create a pod informer so we can pass its lister to the resource manager. - podInformer = podInformerFactory.Core().V1().Pods() - - // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). - scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k8sClient, kubeSharedInformerFactoryResync) - // Create a secret informer and a config map informer so we can pass their listers to the resource manager. - secretInformer := scmInformerFactory.Core().V1().Secrets() - configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() - - // Create a service informer so we can pass its lister to the resource manager. - serviceInformer := scmInformerFactory.Core().V1().Services() - - // Create a new instance of the resource manager that uses the listers above for pods, secrets and config maps. - rm, err = manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister()) - if err != nil { - logger.WithError(err).Fatal("Error initializing resource manager") - } - - // Start the shared informer factory for pods. - go podInformerFactory.Start(rootContext.Done()) - // Start the shared informer factory for secrets and configmaps. - go scmInformerFactory.Start(rootContext.Done()) - - daemonPortEnv := getEnv("KUBELET_PORT", defaultDaemonPort) - daemonPort, err := strconv.ParseInt(daemonPortEnv, 10, 32) - if err != nil { - logger.WithError(err).WithField("value", daemonPortEnv).Fatal("Invalid value from KUBELET_PORT in environment") - } - - initConfig := register.InitConfig{ - ConfigPath: providerConfig, - NodeName: nodeName, - OperatingSystem: operatingSystem, - ResourceManager: rm, - DaemonPort: int32(daemonPort), - InternalIP: os.Getenv("VKUBELET_POD_IP"), - } - - p, err = register.GetProvider(provider, initConfig) - if err != nil { - logger.WithError(err).Fatal("Error initializing provider") - } - - apiConfig, err = getAPIConfig(metricsAddr) - if err != nil { - logger.WithError(err).Fatal("Error reading API config") - } - - if podSyncWorkers <= 0 { - logger.Fatal("The number of pod synchronization workers should not be negative") - } - - for k := range userTraceConfig.Tags { - if reservedTagNames[k] { - logger.WithField("tag", k).Fatal("must not use a reserved tag key") - } - } - userTraceConfig.Tags["operatingSystem"] = operatingSystem - userTraceConfig.Tags["provider"] = provider - userTraceConfig.Tags["nodeName"] = nodeName - for _, e := range userTraceExporters { - if e == "zpages" { - go setupZpages() - continue - } - exporter, err := GetTracingExporter(e, userTraceConfig) - if err != nil { - log.L.WithError(err).WithField("exporter", e).Fatal("Cannot initialize exporter") - } - octrace.RegisterExporter(exporter) - } - if len(userTraceExporters) > 0 { - var s octrace.Sampler - switch strings.ToLower(traceSampler) { - case "": - case "always": - s = octrace.AlwaysSample() - case "never": - s = octrace.NeverSample() - default: - rate, err := strconv.Atoi(traceSampler) - if err != nil { - logger.WithError(err).WithField("rate", traceSampler).Fatal("unsupported trace sample rate, supported values: always, never, or number 0-100") - } - if rate < 0 || rate > 100 { - logger.WithField("rate", traceSampler).Fatal("trace sample rate must not be less than zero or greater than 100") - } - s = octrace.ProbabilitySampler(float64(rate) / 100) - } - - if s != nil { - octrace.ApplyConfig( - octrace.Config{ - DefaultSampler: s, - }, - ) - } - } -} diff --git a/taint.go b/taint.go deleted file mode 100644 index fd5a47d..0000000 --- a/taint.go +++ /dev/null @@ -1,56 +0,0 @@ -package cmd - -import ( - "os" - - "github.com/cpuguy83/strongerrors" - - "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" -) - -// Default taint values -const ( - DefaultTaintEffect = corev1.TaintEffectNoSchedule - DefaultTaintKey = "virtual-kubelet.io/provider" -) - -func getEnv(key, defaultValue string) string { - value, found := os.LookupEnv(key) - if found { - return value - } - return defaultValue -} - -// getTaint creates a taint using the provided key/value. -// Taint effect is read from the environment -// The taint key/value may be overwritten by the environment. -func getTaint(key, value string) (*corev1.Taint, error) { - if key == "" { - key = DefaultTaintKey - value = provider - } - - key = getEnv("VKUBELET_TAINT_KEY", key) - value = getEnv("VKUBELET_TAINT_VALUE", value) - effectEnv := getEnv("VKUBELET_TAINT_EFFECT", string(DefaultTaintEffect)) - - var effect corev1.TaintEffect - switch effectEnv { - case "NoSchedule": - effect = corev1.TaintEffectNoSchedule - case "NoExecute": - effect = corev1.TaintEffectNoExecute - case "PreferNoSchedule": - effect = corev1.TaintEffectPreferNoSchedule - default: - return nil, strongerrors.InvalidArgument(errors.Errorf("taint effect %q is not supported", effectEnv)) - } - - return &corev1.Taint{ - Key: key, - Value: value, - Effect: effect, - }, nil -} diff --git a/version.go b/version.go deleted file mode 100644 index 20545e1..0000000 --- a/version.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright © 2017 NAME HERE -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "fmt" - - "github.com/spf13/cobra" - "github.com/virtual-kubelet/virtual-kubelet/version" -) - -// versionCmd represents the version command -var versionCmd = &cobra.Command{ - Use: "version", - Short: "Show the version of the program", - Long: `Show the version of the program`, - Run: func(cmd *cobra.Command, args []string) { - fmt.Printf("Version: %s, Built: %s\n", version.Version, version.BuildTime) - }, -} - -func init() { - RootCmd.AddCommand(versionCmd) - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // versionCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // versionCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} diff --git a/virtual-kubelet/commands/providers/provider.go b/virtual-kubelet/commands/providers/provider.go new file mode 100644 index 0000000..8da5b67 --- /dev/null +++ b/virtual-kubelet/commands/providers/provider.go @@ -0,0 +1,54 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package providers + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + "github.com/virtual-kubelet/virtual-kubelet/providers/register" +) + +// NewCommand creates a new providers subcommand +// This subcommand is used to determine which providers are registered. +func NewCommand() *cobra.Command { + return &cobra.Command{ + Use: "providers", + Short: "Show the list of supported providers", + Long: "Show the list of supported providers", + Args: cobra.MaximumNArgs(2), + Run: func(cmd *cobra.Command, args []string) { + switch len(args) { + case 0: + ls := register.List() + for _, p := range ls { + fmt.Fprintln(cmd.OutOrStdout(), p) + } + case 1: + if !register.Exists(args[0]) { + fmt.Fprintln(cmd.OutOrStderr(), "no such provider", args[0]) + + // TODO(@cpuuy83): would be nice to not short-circuit the exit here + // But at the momemt this seems to be the only way to exit non-zero and + // handle our own error output + os.Exit(1) + } + fmt.Fprintln(cmd.OutOrStdout(), args[0]) + } + return + }, + } +} diff --git a/virtual-kubelet/commands/root/flag.go b/virtual-kubelet/commands/root/flag.go new file mode 100644 index 0000000..3a5bc74 --- /dev/null +++ b/virtual-kubelet/commands/root/flag.go @@ -0,0 +1,90 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "fmt" + "os" + "strings" + + "github.com/pkg/errors" + "github.com/spf13/pflag" + "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" +) + +type mapVar map[string]string + +func (mv mapVar) String() string { + var s string + for k, v := range mv { + if s == "" { + s = fmt.Sprintf("%s=%v", k, v) + } else { + s += fmt.Sprintf(", %s=%v", k, v) + } + } + return s +} + +func (mv mapVar) Set(s string) error { + split := strings.SplitN(s, "=", 2) + if len(split) != 2 { + return errors.Errorf("invalid format, must be `key=value`: %s", s) + } + + _, ok := mv[split[0]] + if ok { + return errors.Errorf("duplicate key: %s", split[0]) + } + mv[split[0]] = split[1] + return nil +} + +func (mv mapVar) Type() string { + return "map" +} + +func installFlags(flags *pflag.FlagSet, c *Opts) { + flags.StringVar(&c.KubeConfigPath, "kubeconfig", c.KubeConfigPath, "kube config file to use for connecting to the Kubernetes API server") + flags.StringVar(&c.KubeNamespace, "namespace", c.KubeNamespace, "kubernetes namespace (default is 'all')") + flags.StringVar(&c.NodeName, "nodename", c.NodeName, "kubernetes node name") + flags.StringVar(&c.OperatingSystem, "os", c.OperatingSystem, "Operating System (Linux/Windows)") + flags.StringVar(&c.Provider, "provider", c.Provider, "cloud provider") + flags.StringVar(&c.ProviderConfigPath, "provider-config", c.ProviderConfigPath, "cloud provider configuration file") + flags.StringVar(&c.MetricsAddr, "metrics-addr", c.MetricsAddr, "address to listen for metrics/stats requests") + + flags.StringVar(&c.TaintKey, "taint", c.TaintKey, "Set node taint key") + flags.BoolVar(&c.DisableTaint, "disable-taint", c.DisableTaint, "disable the virtual-kubelet node taint") + flags.MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable") + + flags.IntVar(&c.PodSyncWorkers, "pod-sync-workers", c.PodSyncWorkers, `set the number of pod synchronization workers`) + flags.BoolVar(&c.EnableNodeLease, "enable-node-lease", c.EnableNodeLease, `use node leases (1.13) for node heartbeats`) + + flags.StringSliceVar(&c.TraceExporters, "trace-exporter", c.TraceExporters, fmt.Sprintf("sets the tracing exporter to use, available exporters: %s", opencensus.AvailableTraceExporters())) + flags.StringVar(&c.TraceConfig.ServiceName, "trace-service-name", c.TraceConfig.ServiceName, "sets the name of the service used to register with the trace exporter") + flags.Var(mapVar(c.TraceConfig.Tags), "trace-tag", "add tags to include with traces in key=value form") + flags.StringVar(&c.TraceSampleRate, "trace-sample-rate", c.TraceSampleRate, "set probability of tracing samples") + + flags.DurationVar(&c.InformerResyncPeriod, "full-resync-period", c.InformerResyncPeriod, "how often to perform a full resync of pods between kubernetes and the provider") + +} + +func getEnv(key, defaultValue string) string { + value, found := os.LookupEnv(key) + if found { + return value + } + return defaultValue +} diff --git a/http.go b/virtual-kubelet/commands/root/http.go similarity index 76% rename from http.go rename to virtual-kubelet/commands/root/http.go index 6c911da..c716373 100644 --- a/http.go +++ b/virtual-kubelet/commands/root/http.go @@ -1,4 +1,18 @@ -package cmd +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root import ( "context" @@ -8,11 +22,10 @@ import ( "net" "net/http" "os" - "strconv" - "github.com/cpuguy83/strongerrors" "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/vkubelet" ) @@ -44,7 +57,7 @@ func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) { }, nil } -func setupHTTPServer(ctx context.Context, cfg *apiServerConfig) (cancel func(), retErr error) { +func setupHTTPServer(ctx context.Context, p providers.Provider, cfg *apiServerConfig) (cancel func(), retErr error) { var closers []io.Closer cancel = func() { for _, c := range closers { @@ -121,18 +134,14 @@ type apiServerConfig struct { MetricsAddr string } -func getAPIConfig(metricsAddr string) (*apiServerConfig, error) { +func getAPIConfig(c Opts) (*apiServerConfig, error) { config := apiServerConfig{ CertPath: os.Getenv("APISERVER_CERT_LOCATION"), KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), } - port, err := strconv.Atoi(os.Getenv("KUBELET_PORT")) - if err != nil { - return nil, strongerrors.InvalidArgument(errors.Wrap(err, "error parsing KUBELET_PORT variable")) - } - config.Addr = fmt.Sprintf(":%d", port) - config.MetricsAddr = metricsAddr + config.Addr = fmt.Sprintf(":%d", c.ListenPort) + config.MetricsAddr = c.MetricsAddr return &config, nil } diff --git a/node.go b/virtual-kubelet/commands/root/node.go similarity index 51% rename from node.go rename to virtual-kubelet/commands/root/node.go index 67611bd..c949abd 100644 --- a/node.go +++ b/virtual-kubelet/commands/root/node.go @@ -1,11 +1,28 @@ -package cmd +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root import ( "context" "strings" + "github.com/cpuguy83/strongerrors" + "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/version" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -54,3 +71,41 @@ func NodeFromProvider(ctx context.Context, name string, taint *v1.Taint, p provi } return node } + +// getTaint creates a taint using the provided key/value. +// Taint effect is read from the environment +// The taint key/value may be overwritten by the environment. +func getTaint(c Opts) (*corev1.Taint, error) { + value := c.Provider + + key := c.TaintKey + if key == "" { + key = DefaultTaintKey + } + + if c.TaintEffect == "" { + c.TaintEffect = DefaultTaintEffect + } + + key = getEnv("VKUBELET_TAINT_KEY", key) + value = getEnv("VKUBELET_TAINT_VALUE", value) + effectEnv := getEnv("VKUBELET_TAINT_EFFECT", string(c.TaintEffect)) + + var effect corev1.TaintEffect + switch effectEnv { + case "NoSchedule": + effect = corev1.TaintEffectNoSchedule + case "NoExecute": + effect = corev1.TaintEffectNoExecute + case "PreferNoSchedule": + effect = corev1.TaintEffectPreferNoSchedule + default: + return nil, strongerrors.InvalidArgument(errors.Errorf("taint effect %q is not supported", effectEnv)) + } + + return &corev1.Taint{ + Key: key, + Value: value, + Effect: effect, + }, nil +} diff --git a/virtual-kubelet/commands/root/opts.go b/virtual-kubelet/commands/root/opts.go new file mode 100644 index 0000000..41038fa --- /dev/null +++ b/virtual-kubelet/commands/root/opts.go @@ -0,0 +1,143 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "os" + "path/filepath" + "strconv" + "time" + + "github.com/mitchellh/go-homedir" + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" + corev1 "k8s.io/api/core/v1" +) + +// Defaults for root command options +const ( + DefaultNodeName = "virtual-kubelet" + DefaultOperatingSystem = "Linux" + DefaultInformerResyncPeriod = 1 * time.Minute + DefaultMetricsAddr = ":10255" + DefaultListenPort = 10250 // TODO(cpuguy83)(VK1.0): Change this to an addr instead of just a port.. we should not be listening on all interfaces. + DefaultPodSyncWorkers = 10 + DefaultKubeNamespace = corev1.NamespaceAll + + DefaultTaintEffect = string(corev1.TaintEffectNoSchedule) + DefaultTaintKey = "virtual-kubelet.io/provider" +) + +// Opts stores all the options for configuring the root virtual-kubelet command. +// It is used for setting flag values. +// +// You can set the default options by creating a new `Opts` struct and passing +// it into `SetDefaultOpts` +type Opts struct { + // Path to the kubeconfig to use to connect to the Kubernetes API server. + KubeConfigPath string + // Namespace to watch for pods and other resources + KubeNamespace string + // Sets the port to listen for requests from the Kubernetes API server + ListenPort int32 + + // Node name to use when creating a node in Kubernetes + NodeName string + + // Operating system to run pods for + OperatingSystem string + + Provider string + ProviderConfigPath string + + TaintKey string + TaintEffect string + DisableTaint bool + + MetricsAddr string + + // Number of workers to use to handle pod notifications + PodSyncWorkers int + InformerResyncPeriod time.Duration + + // Use node leases when supported by Kubernetes (instead of node status updates) + EnableNodeLease bool + + TraceExporters []string + TraceSampleRate string + TraceConfig opencensus.TracingExporterOptions +} + +// SetDefaultOpts sets default options for unset values on the passed in option struct. +// Fields tht are already set will not be modified. +func SetDefaultOpts(c *Opts) error { + if c.OperatingSystem == "" { + c.OperatingSystem = DefaultOperatingSystem + } + + if c.NodeName == "" { + c.NodeName = getEnv("DEFAULT_NODE_NAME", DefaultNodeName) + } + + if c.InformerResyncPeriod == 0 { + c.InformerResyncPeriod = DefaultInformerResyncPeriod + } + + if c.MetricsAddr == "" { + c.MetricsAddr = DefaultMetricsAddr + } + + if c.PodSyncWorkers == 0 { + c.PodSyncWorkers = DefaultPodSyncWorkers + } + + if c.TraceConfig.ServiceName == "" { + c.TraceConfig.ServiceName = DefaultNodeName + } + + if c.ListenPort == 0 { + if kp := os.Getenv("KUBELET_PORT"); kp != "" { + p, err := strconv.Atoi(kp) + if err != nil { + return errors.Wrap(err, "error parsing KUBELET_PORT environment variable") + } + c.ListenPort = int32(p) + } + c.ListenPort = DefaultListenPort + } + + if c.KubeNamespace == "" { + c.KubeNamespace = DefaultKubeNamespace + } + + if c.TaintKey == "" { + c.TaintKey = DefaultTaintKey + } + if c.TaintEffect == "" { + c.TaintEffect = DefaultTaintEffect + } + + if c.KubeConfigPath == "" { + c.KubeConfigPath = os.Getenv("KUBECONFIG") + if c.KubeConfigPath == "" { + home, _ := homedir.Dir() + if home != "" { + c.KubeConfigPath = filepath.Join(home, ".kube", "config") + } + } + } + + return nil +} diff --git a/virtual-kubelet/commands/root/root.go b/virtual-kubelet/commands/root/root.go new file mode 100644 index 0000000..0e20781 --- /dev/null +++ b/virtual-kubelet/commands/root/root.go @@ -0,0 +1,204 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "context" + "os" + + "github.com/cpuguy83/strongerrors" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/register" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +// NewCommand creates a new top-level command. +// This command is used to start the virtual-kubelet daemon +func NewCommand(ctx context.Context, name string, c Opts) *cobra.Command { + cmd := &cobra.Command{ + Use: name, + Short: name + " provides a virtual kubelet interface for your kubernetes cluster.", + Long: name + ` implements the Kubelet interface with a pluggable +backend implementation allowing users to create kubernetes nodes without running the kubelet. +This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, + RunE: func(cmd *cobra.Command, args []string) error { + return runRootCommand(ctx, c) + }, + } + + installFlags(cmd.Flags(), &c) + return cmd +} + +func runRootCommand(ctx context.Context, c Opts) error { + if ok := providers.ValidOperatingSystems[c.OperatingSystem]; !ok { + return strongerrors.InvalidArgument(errors.Errorf("operating system %q is not supported", c.OperatingSystem)) + } + + if c.PodSyncWorkers == 0 { + return strongerrors.InvalidArgument(errors.New("pod sync workers must be greater than 0")) + } + + var taint *corev1.Taint + if !c.DisableTaint { + var err error + taint, err = getTaint(c) + if err != nil { + return err + } + } + + client, err := newClient(c.KubeConfigPath) + if err != nil { + return err + } + + // Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node. + podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions( + client, + c.InformerResyncPeriod, + kubeinformers.WithNamespace(c.KubeNamespace), + kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String() + })) + // Create a pod informer so we can pass its lister to the resource manager. + podInformer := podInformerFactory.Core().V1().Pods() + + // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). + scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, c.InformerResyncPeriod) + // Create a secret informer and a config map informer so we can pass their listers to the resource manager. + secretInformer := scmInformerFactory.Core().V1().Secrets() + configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() + serviceInformer := scmInformerFactory.Core().V1().Services() + + go podInformerFactory.Start(ctx.Done()) + go scmInformerFactory.Start(ctx.Done()) + + rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister()) + if err != nil { + return errors.Wrap(err, "could not create resource manager") + } + + apiConfig, err := getAPIConfig(c) + if err != nil { + return err + } + + if err := setupTracing(ctx, c); err != nil { + return err + } + + initConfig := register.InitConfig{ + ConfigPath: c.ProviderConfigPath, + NodeName: c.NodeName, + OperatingSystem: c.OperatingSystem, + ResourceManager: rm, + DaemonPort: int32(c.ListenPort), + InternalIP: os.Getenv("VKUBELET_POD_IP"), + } + + p, err := register.GetProvider(c.Provider, initConfig) + if err != nil { + return err + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithFields(log.Fields{ + "provider": c.Provider, + "operatingSystem": c.OperatingSystem, + "node": c.NodeName, + "watchedNamespace": c.KubeNamespace, + })) + + pNode := NodeFromProvider(ctx, c.NodeName, taint, p) + node, err := vkubelet.NewNode( + vkubelet.NaiveNodeProvider{}, + pNode, + client.Coordination().Leases(corev1.NamespaceNodeLease), + client.CoreV1().Nodes(), + vkubelet.WithNodeDisableLease(!c.EnableNodeLease), + ) + if err != nil { + log.G(ctx).Fatal(err) + } + + vk := vkubelet.New(vkubelet.Config{ + Client: client, + Namespace: c.KubeNamespace, + NodeName: pNode.Name, + Provider: p, + ResourceManager: rm, + PodSyncWorkers: c.PodSyncWorkers, + PodInformer: podInformer, + }) + + cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig) + if err != nil { + return err + } + defer cancelHTTP() + + go func() { + if err := vk.Run(ctx); err != nil && errors.Cause(err) != context.Canceled { + log.G(ctx).Fatal(err) + } + }() + + go func() { + if err := node.Run(ctx); err != nil { + log.G(ctx).Fatal(err) + } + }() + + log.G(ctx).Info("Initialized") + + <-ctx.Done() + return nil +} + +func newClient(configPath string) (*kubernetes.Clientset, error) { + var config *rest.Config + + // Check if the kubeConfig file exists. + if _, err := os.Stat(configPath); !os.IsNotExist(err) { + // Get the kubeconfig from the filepath. + config, err = clientcmd.BuildConfigFromFlags("", configPath) + if err != nil { + return nil, errors.Wrap(err, "error building client config") + } + } else { + // Set to in-cluster config. + config, err = rest.InClusterConfig() + if err != nil { + return nil, errors.Wrap(err, "error building in cluster config") + } + } + + if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { + config.Host = masterURI + } + + return kubernetes.NewForConfig(config) +} diff --git a/virtual-kubelet/commands/root/tracing.go b/virtual-kubelet/commands/root/tracing.go new file mode 100644 index 0000000..bad60bc --- /dev/null +++ b/virtual-kubelet/commands/root/tracing.go @@ -0,0 +1,102 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "context" + "net/http" + "os" + "strconv" + "strings" + + "github.com/cpuguy83/strongerrors" + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" + octrace "go.opencensus.io/trace" + "go.opencensus.io/zpages" +) + +var ( + reservedTagNames = map[string]bool{ + "operatingSystem": true, + "provider": true, + "nodeName": true, + } +) + +func setupTracing(ctx context.Context, c Opts) error { + for k := range c.TraceConfig.Tags { + if reservedTagNames[k] { + return strongerrors.InvalidArgument(errors.Errorf("invalid trace tag %q, must not use a reserved tag key")) + } + } + if c.TraceConfig.Tags == nil { + c.TraceConfig.Tags = make(map[string]string, 3) + } + c.TraceConfig.Tags["operatingSystem"] = c.OperatingSystem + c.TraceConfig.Tags["provider"] = c.Provider + c.TraceConfig.Tags["nodeName"] = c.NodeName + for _, e := range c.TraceExporters { + if e == "zpages" { + go setupZpages(ctx) + continue + } + exporter, err := opencensus.GetTracingExporter(e, c.TraceConfig) + if err != nil { + return err + } + octrace.RegisterExporter(exporter) + } + if len(c.TraceExporters) > 0 { + var s octrace.Sampler + switch strings.ToLower(c.TraceSampleRate) { + case "": + case "always": + s = octrace.AlwaysSample() + case "never": + s = octrace.NeverSample() + default: + rate, err := strconv.Atoi(c.TraceSampleRate) + if err != nil { + return strongerrors.InvalidArgument(errors.Wrap(err, "unsupported trace sample rate")) + } + if rate < 0 || rate > 100 { + return strongerrors.InvalidArgument(errors.Wrap(err, "trace sample rate must be between 0 and 100")) + } + s = octrace.ProbabilitySampler(float64(rate) / 100) + } + + if s != nil { + octrace.ApplyConfig( + octrace.Config{ + DefaultSampler: s, + }, + ) + } + } + + return nil +} + +func setupZpages(ctx context.Context) { + p := os.Getenv("ZPAGES_PORT") + if p == "" { + log.G(ctx).Error("Missing ZPAGES_PORT env var, cannot setup zpages endpoint") + } + mux := http.NewServeMux() + zpages.Handle(mux, "/debug") + http.ListenAndServe(p, mux) +} diff --git a/virtual-kubelet/commands/version/version.go b/virtual-kubelet/commands/version/version.go new file mode 100644 index 0000000..73f3e39 --- /dev/null +++ b/virtual-kubelet/commands/version/version.go @@ -0,0 +1,34 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "fmt" + + "github.com/spf13/cobra" + "github.com/virtual-kubelet/virtual-kubelet/version" +) + +// NewCommand creates a new version subcommand command +func NewCommand() *cobra.Command { + return &cobra.Command{ + Use: "version", + Short: "Show the version of the program", + Long: `Show the version of the program`, + Run: func(cmd *cobra.Command, args []string) { + fmt.Printf("Version: %s, Built: %s\n", version.Version, version.BuildTime) + }, + } +} diff --git a/virtual-kubelet/main.go b/virtual-kubelet/main.go new file mode 100644 index 0000000..cdac764 --- /dev/null +++ b/virtual-kubelet/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "os" + "os/signal" + "path/filepath" + "syscall" + + "github.com/Sirupsen/logrus" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/commands/providers" + "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/commands/root" + "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/commands/version" + "github.com/virtual-kubelet/virtual-kubelet/log" + logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" + "github.com/virtual-kubelet/virtual-kubelet/trace" + "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sig + cancel() + }() + + log.L = logruslogger.FromLogrus(logrus.NewEntry(logrus.StandardLogger())) + trace.T = opencensus.Adapter{} + + var opts root.Opts + optsErr := root.SetDefaultOpts(&opts) + + rootCmd := root.NewCommand(ctx, filepath.Base(os.Args[0]), opts) + rootCmd.AddCommand(version.NewCommand(), providers.NewCommand()) + preRun := rootCmd.PreRunE + + var logLevel string + rootCmd.PreRunE = func(cmd *cobra.Command, args []string) error { + if optsErr != nil { + return optsErr + } + if preRun != nil { + return preRun(cmd, args) + } + return nil + } + + rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", `set the log level, e.g. "debug", "info", "warn", "error"`) + + rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error { + if logLevel != "" { + lvl, err := logrus.ParseLevel(logLevel) + if err != nil { + return errors.Wrap(err, "could not parse log level") + } + logrus.SetLevel(lvl) + } + return nil + } + + if err := rootCmd.Execute(); err != nil && errors.Cause(err) != context.Canceled { + log.G(ctx).Fatal(err) + } +}