diff --git a/cmd/virtual-kubelet/commands/providers/provider.go b/cmd/virtual-kubelet/commands/providers/provider.go new file mode 100644 index 0000000..8da5b67 --- /dev/null +++ b/cmd/virtual-kubelet/commands/providers/provider.go @@ -0,0 +1,54 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package providers + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + "github.com/virtual-kubelet/virtual-kubelet/providers/register" +) + +// NewCommand creates a new providers subcommand +// This subcommand is used to determine which providers are registered. +func NewCommand() *cobra.Command { + return &cobra.Command{ + Use: "providers", + Short: "Show the list of supported providers", + Long: "Show the list of supported providers", + Args: cobra.MaximumNArgs(2), + Run: func(cmd *cobra.Command, args []string) { + switch len(args) { + case 0: + ls := register.List() + for _, p := range ls { + fmt.Fprintln(cmd.OutOrStdout(), p) + } + case 1: + if !register.Exists(args[0]) { + fmt.Fprintln(cmd.OutOrStderr(), "no such provider", args[0]) + + // TODO(@cpuuy83): would be nice to not short-circuit the exit here + // But at the momemt this seems to be the only way to exit non-zero and + // handle our own error output + os.Exit(1) + } + fmt.Fprintln(cmd.OutOrStdout(), args[0]) + } + return + }, + } +} diff --git a/cmd/virtual-kubelet/commands/root/flag.go b/cmd/virtual-kubelet/commands/root/flag.go new file mode 100644 index 0000000..85e04ae --- /dev/null +++ b/cmd/virtual-kubelet/commands/root/flag.go @@ -0,0 +1,98 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "flag" + "fmt" + "os" + "strings" + + "github.com/pkg/errors" + "github.com/spf13/pflag" + "k8s.io/klog" +) + +type mapVar map[string]string + +func (mv mapVar) String() string { + var s string + for k, v := range mv { + if s == "" { + s = fmt.Sprintf("%s=%v", k, v) + } else { + s += fmt.Sprintf(", %s=%v", k, v) + } + } + return s +} + +func (mv mapVar) Set(s string) error { + split := strings.SplitN(s, "=", 2) + if len(split) != 2 { + return errors.Errorf("invalid format, must be `key=value`: %s", s) + } + + _, ok := mv[split[0]] + if ok { + return errors.Errorf("duplicate key: %s", split[0]) + } + mv[split[0]] = split[1] + return nil +} + +func (mv mapVar) Type() string { + return "map" +} + +func installFlags(flags *pflag.FlagSet, c *Opts) { + flags.StringVar(&c.KubeConfigPath, "kubeconfig", c.KubeConfigPath, "kube config file to use for connecting to the Kubernetes API server") + flags.StringVar(&c.KubeNamespace, "namespace", c.KubeNamespace, "kubernetes namespace (default is 'all')") + flags.StringVar(&c.NodeName, "nodename", c.NodeName, "kubernetes node name") + flags.StringVar(&c.OperatingSystem, "os", c.OperatingSystem, "Operating System (Linux/Windows)") + flags.StringVar(&c.Provider, "provider", c.Provider, "cloud provider") + flags.StringVar(&c.ProviderConfigPath, "provider-config", c.ProviderConfigPath, "cloud provider configuration file") + flags.StringVar(&c.MetricsAddr, "metrics-addr", c.MetricsAddr, "address to listen for metrics/stats requests") + + flags.StringVar(&c.TaintKey, "taint", c.TaintKey, "Set node taint key") + flags.BoolVar(&c.DisableTaint, "disable-taint", c.DisableTaint, "disable the virtual-kubelet node taint") + flags.MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable") + + flags.IntVar(&c.PodSyncWorkers, "pod-sync-workers", c.PodSyncWorkers, `set the number of pod synchronization workers`) + flags.BoolVar(&c.EnableNodeLease, "enable-node-lease", c.EnableNodeLease, `use node leases (1.13) for node heartbeats`) + + flags.StringSliceVar(&c.TraceExporters, "trace-exporter", c.TraceExporters, fmt.Sprintf("sets the tracing exporter to use, available exporters: %s", AvailableTraceExporters())) + flags.StringVar(&c.TraceConfig.ServiceName, "trace-service-name", c.TraceConfig.ServiceName, "sets the name of the service used to register with the trace exporter") + flags.Var(mapVar(c.TraceConfig.Tags), "trace-tag", "add tags to include with traces in key=value form") + flags.StringVar(&c.TraceSampleRate, "trace-sample-rate", c.TraceSampleRate, "set probability of tracing samples") + + flags.DurationVar(&c.InformerResyncPeriod, "full-resync-period", c.InformerResyncPeriod, "how often to perform a full resync of pods between kubernetes and the provider") + flags.DurationVar(&c.StartupTimeout, "startup-timeout", c.StartupTimeout, "How long to wait for the virtual-kubelet to start") + + flagset := flag.NewFlagSet("klog", flag.PanicOnError) + klog.InitFlags(flagset) + flagset.VisitAll(func(f *flag.Flag) { + f.Name = "klog." + f.Name + flags.AddGoFlag(f) + }) +} + +func getEnv(key, defaultValue string) string { + value, found := os.LookupEnv(key) + if found { + return value + } + return defaultValue +} diff --git a/cmd/virtual-kubelet/commands/root/http.go b/cmd/virtual-kubelet/commands/root/http.go new file mode 100644 index 0000000..8ffba86 --- /dev/null +++ b/cmd/virtual-kubelet/commands/root/http.go @@ -0,0 +1,161 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "os" + + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/node/api" + "github.com/virtual-kubelet/virtual-kubelet/providers" +) + +// AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided +// Note this list should be a moving target. +var AcceptedCiphers = []uint16{ + tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, + + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, +} + +func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) { + cert, err := tls.LoadX509KeyPair(certPath, keyPath) + if err != nil { + return nil, errors.Wrap(err, "error loading tls certs") + } + + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + MinVersion: tls.VersionTLS12, + PreferServerCipherSuites: true, + CipherSuites: AcceptedCiphers, + }, nil +} + +func setupHTTPServer(ctx context.Context, p providers.Provider, cfg *apiServerConfig) (_ func(), retErr error) { + var closers []io.Closer + cancel := func() { + for _, c := range closers { + c.Close() + } + } + defer func() { + if retErr != nil { + cancel() + } + }() + + if cfg.CertPath == "" || cfg.KeyPath == "" { + log.G(ctx). + WithField("certPath", cfg.CertPath). + WithField("keyPath", cfg.KeyPath). + Error("TLS certificates not provided, not setting up pod http server") + } else { + tlsCfg, err := loadTLSConfig(cfg.CertPath, cfg.KeyPath) + if err != nil { + return nil, err + } + l, err := tls.Listen("tcp", cfg.Addr, tlsCfg) + if err != nil { + return nil, errors.Wrap(err, "error setting up listener for pod http server") + } + + mux := http.NewServeMux() + + podRoutes := api.PodHandlerConfig{ + RunInContainer: p.RunInContainer, + GetContainerLogs: p.GetContainerLogs, + GetPods: p.GetPods, + } + api.AttachPodRoutes(podRoutes, mux, true) + + s := &http.Server{ + Handler: mux, + TLSConfig: tlsCfg, + } + go serveHTTP(ctx, s, l, "pods") + closers = append(closers, s) + } + + if cfg.MetricsAddr == "" { + log.G(ctx).Info("Pod metrics server not setup due to empty metrics address") + } else { + l, err := net.Listen("tcp", cfg.MetricsAddr) + if err != nil { + return nil, errors.Wrap(err, "could not setup listener for pod metrics http server") + } + + mux := http.NewServeMux() + + var summaryHandlerFunc api.PodStatsSummaryHandlerFunc + if mp, ok := p.(providers.PodMetricsProvider); ok { + summaryHandlerFunc = mp.GetStatsSummary + } + podMetricsRoutes := api.PodMetricsConfig{ + GetStatsSummary: summaryHandlerFunc, + } + api.AttachPodMetricsRoutes(podMetricsRoutes, mux) + s := &http.Server{ + Handler: mux, + } + go serveHTTP(ctx, s, l, "pod metrics") + closers = append(closers, s) + } + + return cancel, nil +} + +func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string) { + if err := s.Serve(l); err != nil { + select { + case <-ctx.Done(): + default: + log.G(ctx).WithError(err).Errorf("Error setting up %s http server", name) + } + } + l.Close() +} + +type apiServerConfig struct { + CertPath string + KeyPath string + Addr string + MetricsAddr string +} + +func getAPIConfig(c Opts) (*apiServerConfig, error) { + config := apiServerConfig{ + CertPath: os.Getenv("APISERVER_CERT_LOCATION"), + KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), + } + + config.Addr = fmt.Sprintf(":%d", c.ListenPort) + config.MetricsAddr = c.MetricsAddr + + return &config, nil +} diff --git a/cmd/virtual-kubelet/commands/root/node.go b/cmd/virtual-kubelet/commands/root/node.go new file mode 100644 index 0000000..7285756 --- /dev/null +++ b/cmd/virtual-kubelet/commands/root/node.go @@ -0,0 +1,110 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "context" + "strings" + + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/version" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + // vkVersion is a concatenation of the Kubernetes version the VK is built against, the string "vk" and the VK release version. + // TODO @pires revisit after VK 1.0 is released as agreed in https://github.com/virtual-kubelet/virtual-kubelet/pull/446#issuecomment-448423176. + vkVersion = strings.Join([]string{"v1.13.1", "vk", version.Version}, "-") +) + +// NodeFromProvider builds a kubernetes node object from a provider +// This is a temporary solution until node stuff actually split off from the provider interface itself. +func NodeFromProvider(ctx context.Context, name string, taint *v1.Taint, p providers.Provider) *v1.Node { + taints := make([]v1.Taint, 0) + + if taint != nil { + taints = append(taints, *taint) + } + + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "type": "virtual-kubelet", + "kubernetes.io/role": "agent", + "beta.kubernetes.io/os": strings.ToLower(p.OperatingSystem()), + "kubernetes.io/hostname": name, + "alpha.service-controller.kubernetes.io/exclude-balancer": "true", + }, + }, + Spec: v1.NodeSpec{ + Taints: taints, + }, + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + OperatingSystem: p.OperatingSystem(), + Architecture: "amd64", + KubeletVersion: vkVersion, + }, + Capacity: p.Capacity(ctx), + Allocatable: p.Capacity(ctx), + Conditions: p.NodeConditions(ctx), + Addresses: p.NodeAddresses(ctx), + DaemonEndpoints: *p.NodeDaemonEndpoints(ctx), + }, + } + return node +} + +// getTaint creates a taint using the provided key/value. +// Taint effect is read from the environment +// The taint key/value may be overwritten by the environment. +func getTaint(c Opts) (*corev1.Taint, error) { + value := c.Provider + + key := c.TaintKey + if key == "" { + key = DefaultTaintKey + } + + if c.TaintEffect == "" { + c.TaintEffect = DefaultTaintEffect + } + + key = getEnv("VKUBELET_TAINT_KEY", key) + value = getEnv("VKUBELET_TAINT_VALUE", value) + effectEnv := getEnv("VKUBELET_TAINT_EFFECT", string(c.TaintEffect)) + + var effect corev1.TaintEffect + switch effectEnv { + case "NoSchedule": + effect = corev1.TaintEffectNoSchedule + case "NoExecute": + effect = corev1.TaintEffectNoExecute + case "PreferNoSchedule": + effect = corev1.TaintEffectPreferNoSchedule + default: + return nil, errdefs.InvalidInputf("taint effect %q is not supported", effectEnv) + } + + return &corev1.Taint{ + Key: key, + Value: value, + Effect: effect, + }, nil +} diff --git a/cmd/virtual-kubelet/commands/root/opts.go b/cmd/virtual-kubelet/commands/root/opts.go new file mode 100644 index 0000000..7b3c46d --- /dev/null +++ b/cmd/virtual-kubelet/commands/root/opts.go @@ -0,0 +1,146 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "os" + "path/filepath" + "strconv" + "time" + + "github.com/mitchellh/go-homedir" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" +) + +// Defaults for root command options +const ( + DefaultNodeName = "virtual-kubelet" + DefaultOperatingSystem = "Linux" + DefaultInformerResyncPeriod = 1 * time.Minute + DefaultMetricsAddr = ":10255" + DefaultListenPort = 10250 // TODO(cpuguy83)(VK1.0): Change this to an addr instead of just a port.. we should not be listening on all interfaces. + DefaultPodSyncWorkers = 10 + DefaultKubeNamespace = corev1.NamespaceAll + + DefaultTaintEffect = string(corev1.TaintEffectNoSchedule) + DefaultTaintKey = "virtual-kubelet.io/provider" +) + +// Opts stores all the options for configuring the root virtual-kubelet command. +// It is used for setting flag values. +// +// You can set the default options by creating a new `Opts` struct and passing +// it into `SetDefaultOpts` +type Opts struct { + // Path to the kubeconfig to use to connect to the Kubernetes API server. + KubeConfigPath string + // Namespace to watch for pods and other resources + KubeNamespace string + // Sets the port to listen for requests from the Kubernetes API server + ListenPort int32 + + // Node name to use when creating a node in Kubernetes + NodeName string + + // Operating system to run pods for + OperatingSystem string + + Provider string + ProviderConfigPath string + + TaintKey string + TaintEffect string + DisableTaint bool + + MetricsAddr string + + // Number of workers to use to handle pod notifications + PodSyncWorkers int + InformerResyncPeriod time.Duration + + // Use node leases when supported by Kubernetes (instead of node status updates) + EnableNodeLease bool + + TraceExporters []string + TraceSampleRate string + TraceConfig TracingExporterOptions + + // Startup Timeout is how long to wait for the kubelet to start + StartupTimeout time.Duration +} + +// SetDefaultOpts sets default options for unset values on the passed in option struct. +// Fields tht are already set will not be modified. +func SetDefaultOpts(c *Opts) error { + if c.OperatingSystem == "" { + c.OperatingSystem = DefaultOperatingSystem + } + + if c.NodeName == "" { + c.NodeName = getEnv("DEFAULT_NODE_NAME", DefaultNodeName) + } + + if c.InformerResyncPeriod == 0 { + c.InformerResyncPeriod = DefaultInformerResyncPeriod + } + + if c.MetricsAddr == "" { + c.MetricsAddr = DefaultMetricsAddr + } + + if c.PodSyncWorkers == 0 { + c.PodSyncWorkers = DefaultPodSyncWorkers + } + + if c.TraceConfig.ServiceName == "" { + c.TraceConfig.ServiceName = DefaultNodeName + } + + if c.ListenPort == 0 { + if kp := os.Getenv("KUBELET_PORT"); kp != "" { + p, err := strconv.Atoi(kp) + if err != nil { + return errors.Wrap(err, "error parsing KUBELET_PORT environment variable") + } + c.ListenPort = int32(p) + } else { + c.ListenPort = DefaultListenPort + } + } + + if c.KubeNamespace == "" { + c.KubeNamespace = DefaultKubeNamespace + } + + if c.TaintKey == "" { + c.TaintKey = DefaultTaintKey + } + if c.TaintEffect == "" { + c.TaintEffect = DefaultTaintEffect + } + + if c.KubeConfigPath == "" { + c.KubeConfigPath = os.Getenv("KUBECONFIG") + if c.KubeConfigPath == "" { + home, _ := homedir.Dir() + if home != "" { + c.KubeConfigPath = filepath.Join(home, ".kube", "config") + } + } + } + + return nil +} diff --git a/cmd/virtual-kubelet/commands/root/root.go b/cmd/virtual-kubelet/commands/root/root.go new file mode 100644 index 0000000..037cb34 --- /dev/null +++ b/cmd/virtual-kubelet/commands/root/root.go @@ -0,0 +1,264 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "context" + "os" + "path" + "time" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/register" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/record" +) + +// NewCommand creates a new top-level command. +// This command is used to start the virtual-kubelet daemon +func NewCommand(ctx context.Context, name string, c Opts) *cobra.Command { + cmd := &cobra.Command{ + Use: name, + Short: name + " provides a virtual kubelet interface for your kubernetes cluster.", + Long: name + ` implements the Kubelet interface with a pluggable +backend implementation allowing users to create kubernetes nodes without running the kubelet. +This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, + RunE: func(cmd *cobra.Command, args []string) error { + return runRootCommand(ctx, c) + }, + } + + installFlags(cmd.Flags(), &c) + return cmd +} + +func runRootCommand(ctx context.Context, c Opts) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if ok := providers.ValidOperatingSystems[c.OperatingSystem]; !ok { + return errdefs.InvalidInputf("operating system %q is not supported", c.OperatingSystem) + } + + if c.PodSyncWorkers == 0 { + return errdefs.InvalidInput("pod sync workers must be greater than 0") + } + + var taint *corev1.Taint + if !c.DisableTaint { + var err error + taint, err = getTaint(c) + if err != nil { + return err + } + } + + client, err := newClient(c.KubeConfigPath) + if err != nil { + return err + } + + // Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node. + podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions( + client, + c.InformerResyncPeriod, + kubeinformers.WithNamespace(c.KubeNamespace), + kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String() + })) + podInformer := podInformerFactory.Core().V1().Pods() + + // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). + scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, c.InformerResyncPeriod) + // Create a secret informer and a config map informer so we can pass their listers to the resource manager. + secretInformer := scmInformerFactory.Core().V1().Secrets() + configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() + serviceInformer := scmInformerFactory.Core().V1().Services() + + go podInformerFactory.Start(ctx.Done()) + go scmInformerFactory.Start(ctx.Done()) + + rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister()) + if err != nil { + return errors.Wrap(err, "could not create resource manager") + } + + apiConfig, err := getAPIConfig(c) + if err != nil { + return err + } + + if err := setupTracing(ctx, c); err != nil { + return err + } + + initConfig := register.InitConfig{ + ConfigPath: c.ProviderConfigPath, + NodeName: c.NodeName, + OperatingSystem: c.OperatingSystem, + ResourceManager: rm, + DaemonPort: int32(c.ListenPort), + InternalIP: os.Getenv("VKUBELET_POD_IP"), + } + + p, err := register.GetProvider(c.Provider, initConfig) + if err != nil { + return err + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithFields(log.Fields{ + "provider": c.Provider, + "operatingSystem": c.OperatingSystem, + "node": c.NodeName, + "watchedNamespace": c.KubeNamespace, + })) + + var leaseClient v1beta1.LeaseInterface + if c.EnableNodeLease { + leaseClient = client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) + } + + pNode := NodeFromProvider(ctx, c.NodeName, taint, p) + nodeRunner, err := node.NewNodeController( + node.NaiveNodeProvider{}, + pNode, + client.CoreV1().Nodes(), + node.WithNodeEnableLeaseV1Beta1(leaseClient, nil), + node.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error { + if !k8serrors.IsNotFound(err) { + return err + } + + log.G(ctx).Debug("node not found") + newNode := pNode.DeepCopy() + newNode.ResourceVersion = "" + _, err = client.CoreV1().Nodes().Create(newNode) + if err != nil { + return err + } + log.G(ctx).Debug("created new node") + return nil + }), + ) + if err != nil { + log.G(ctx).Fatal(err) + } + + eb := record.NewBroadcaster() + eb.StartLogging(log.G(ctx).Infof) + eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)}) + + pc, err := node.NewPodController(node.PodControllerConfig{ + PodClient: client.CoreV1(), + PodInformer: podInformer, + EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}), + Provider: p, + SecretLister: secretInformer.Lister(), + ConfigMapLister: configMapInformer.Lister(), + ServiceLister: serviceInformer.Lister(), + }) + if err != nil { + return errors.Wrap(err, "error setting up pod controller") + } + + cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig) + if err != nil { + return err + } + defer cancelHTTP() + + go func() { + if err := pc.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled { + log.G(ctx).Fatal(err) + } + }() + + if c.StartupTimeout > 0 { + // If there is a startup timeout, it does two things: + // 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period + // 2. It prevents node advertisement from happening until we're in an operational state + err = waitFor(ctx, c.StartupTimeout, pc.Ready()) + if err != nil { + return err + } + } + + go func() { + if err := nodeRunner.Run(ctx); err != nil { + log.G(ctx).Fatal(err) + } + }() + + log.G(ctx).Info("Initialized") + + <-ctx.Done() + return nil +} + +func waitFor(ctx context.Context, time time.Duration, ready <-chan struct{}) error { + ctx, cancel := context.WithTimeout(ctx, time) + defer cancel() + + // Wait for the VK / PC close the the ready channel, or time out and return + log.G(ctx).Info("Waiting for pod controller / VK to be ready") + + select { + case <-ready: + return nil + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "Error while starting up VK") + } +} + +func newClient(configPath string) (*kubernetes.Clientset, error) { + var config *rest.Config + + // Check if the kubeConfig file exists. + if _, err := os.Stat(configPath); !os.IsNotExist(err) { + // Get the kubeconfig from the filepath. + config, err = clientcmd.BuildConfigFromFlags("", configPath) + if err != nil { + return nil, errors.Wrap(err, "error building client config") + } + } else { + // Set to in-cluster config. + config, err = rest.InClusterConfig() + if err != nil { + return nil, errors.Wrap(err, "error building in cluster config") + } + } + + if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { + config.Host = masterURI + } + + return kubernetes.NewForConfig(config) +} diff --git a/cmd/virtual-kubelet/commands/root/tracing.go b/cmd/virtual-kubelet/commands/root/tracing.go new file mode 100644 index 0000000..e3dc619 --- /dev/null +++ b/cmd/virtual-kubelet/commands/root/tracing.go @@ -0,0 +1,114 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "context" + "net" + "net/http" + "os" + "strconv" + "strings" + + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "github.com/virtual-kubelet/virtual-kubelet/log" + octrace "go.opencensus.io/trace" + "go.opencensus.io/zpages" +) + +var ( + reservedTagNames = map[string]bool{ + "operatingSystem": true, + "provider": true, + "nodeName": true, + } +) + +func setupTracing(ctx context.Context, c Opts) error { + for k := range c.TraceConfig.Tags { + if reservedTagNames[k] { + return errdefs.InvalidInputf("invalid trace tag %q, must not use a reserved tag key", k) + } + } + if c.TraceConfig.Tags == nil { + c.TraceConfig.Tags = make(map[string]string, 3) + } + c.TraceConfig.Tags["operatingSystem"] = c.OperatingSystem + c.TraceConfig.Tags["provider"] = c.Provider + c.TraceConfig.Tags["nodeName"] = c.NodeName + for _, e := range c.TraceExporters { + if e == "zpages" { + setupZpages(ctx) + continue + } + exporter, err := GetTracingExporter(e, c.TraceConfig) + if err != nil { + return err + } + octrace.RegisterExporter(exporter) + } + if len(c.TraceExporters) > 0 { + var s octrace.Sampler + switch strings.ToLower(c.TraceSampleRate) { + case "": + case "always": + s = octrace.AlwaysSample() + case "never": + s = octrace.NeverSample() + default: + rate, err := strconv.Atoi(c.TraceSampleRate) + if err != nil { + return errdefs.AsInvalidInput(errors.Wrap(err, "unsupported trace sample rate")) + } + if rate < 0 || rate > 100 { + return errdefs.AsInvalidInput(errors.Wrap(err, "trace sample rate must be between 0 and 100")) + } + s = octrace.ProbabilitySampler(float64(rate) / 100) + } + + if s != nil { + octrace.ApplyConfig( + octrace.Config{ + DefaultSampler: s, + }, + ) + } + } + + return nil +} + +func setupZpages(ctx context.Context) { + p := os.Getenv("ZPAGES_PORT") + if p == "" { + log.G(ctx).Error("Missing ZPAGES_PORT env var, cannot setup zpages endpoint") + } + listener, err := net.Listen("tcp", p) + if err != nil { + log.G(ctx).WithError(err).Error("Cannot bind to ZPAGES PORT, cannot setup listener") + return + } + mux := http.NewServeMux() + zpages.Handle(mux, "/debug") + go func() { + // This should never terminate, if it does, it will always terminate with an error + e := http.Serve(listener, mux) + if e == http.ErrServerClosed { + return + } + log.G(ctx).WithError(e).Error("Zpages server exited") + }() +} diff --git a/cmd/virtual-kubelet/commands/root/tracing_register.go b/cmd/virtual-kubelet/commands/root/tracing_register.go new file mode 100644 index 0000000..70242a7 --- /dev/null +++ b/cmd/virtual-kubelet/commands/root/tracing_register.go @@ -0,0 +1,58 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "go.opencensus.io/trace" +) + +type TracingExporterOptions struct { + Tags map[string]string + ServiceName string +} + +var ( + tracingExporters = make(map[string]TracingExporterInitFunc) +) + +// TracingExporterInitFunc is the function that is called to initialize an exporter. +// This is used when registering an exporter and called when a user specifed they want to use the exporter. +type TracingExporterInitFunc func(TracingExporterOptions) (trace.Exporter, error) + +// RegisterTracingExporter registers a tracing exporter. +// For a user to select an exporter, it must be registered here. +func RegisterTracingExporter(name string, f TracingExporterInitFunc) { + tracingExporters[name] = f +} + +// GetTracingExporter gets the specified tracing exporter passing in the options to the exporter init function. +// For an exporter to be availbale here it must be registered with `RegisterTracingExporter`. +func GetTracingExporter(name string, opts TracingExporterOptions) (trace.Exporter, error) { + f, ok := tracingExporters[name] + if !ok { + return nil, errdefs.NotFoundf("tracing exporter %q not found", name) + } + return f(opts) +} + +// AvailableTraceExporters gets the list of registered exporters +func AvailableTraceExporters() []string { + out := make([]string, 0, len(tracingExporters)) + for k := range tracingExporters { + out = append(out, k) + } + return out +} diff --git a/cmd/virtual-kubelet/commands/root/tracing_register_jaeger.go b/cmd/virtual-kubelet/commands/root/tracing_register_jaeger.go new file mode 100644 index 0000000..ba5eb24 --- /dev/null +++ b/cmd/virtual-kubelet/commands/root/tracing_register_jaeger.go @@ -0,0 +1,51 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !no_jaeger_exporter + +package root + +import ( + "errors" + "os" + + "go.opencensus.io/exporter/jaeger" + "go.opencensus.io/trace" +) + +func init() { + RegisterTracingExporter("jaeger", NewJaegerExporter) +} + +// NewJaegerExporter creates a new opencensus tracing exporter. +func NewJaegerExporter(opts TracingExporterOptions) (trace.Exporter, error) { + jOpts := jaeger.Options{ + Endpoint: os.Getenv("JAEGER_ENDPOINT"), + AgentEndpoint: os.Getenv("JAEGER_AGENT_ENDPOINT"), + Username: os.Getenv("JAEGER_USER"), + Password: os.Getenv("JAEGER_PASSWORD"), + Process: jaeger.Process{ + ServiceName: opts.ServiceName, + }, + } + + if jOpts.Endpoint == "" && jOpts.AgentEndpoint == "" { + return nil, errors.New("Must specify either JAEGER_ENDPOINT or JAEGER_AGENT_ENDPOINT") + } + + for k, v := range opts.Tags { + jOpts.Process.Tags = append(jOpts.Process.Tags, jaeger.StringTag(k, v)) + } + return jaeger.NewExporter(jOpts) +} diff --git a/cmd/virtual-kubelet/commands/root/tracing_register_ocagent.go b/cmd/virtual-kubelet/commands/root/tracing_register_ocagent.go new file mode 100644 index 0000000..551bea0 --- /dev/null +++ b/cmd/virtual-kubelet/commands/root/tracing_register_ocagent.go @@ -0,0 +1,50 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !no_ocagent_exporter + +package root + +import ( + "os" + + "contrib.go.opencensus.io/exporter/ocagent" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "go.opencensus.io/trace" +) + +func init() { + RegisterTracingExporter("ocagent", NewOCAgentExporter) +} + +// NewOCAgentExporter creates a new opencensus tracing exporter using the opencensus agent forwarder. +func NewOCAgentExporter(opts TracingExporterOptions) (trace.Exporter, error) { + agentOpts := append([]ocagent.ExporterOption{}, ocagent.WithServiceName(opts.ServiceName)) + + if endpoint := os.Getenv("OCAGENT_ENDPOINT"); endpoint != "" { + agentOpts = append(agentOpts, ocagent.WithAddress(endpoint)) + } else { + return nil, errdefs.InvalidInput("must set endpoint address in OCAGENT_ENDPOINT") + } + + switch os.Getenv("OCAGENT_INSECURE") { + case "0", "no", "n", "off", "": + case "1", "yes", "y", "on": + agentOpts = append(agentOpts, ocagent.WithInsecure()) + default: + return nil, errdefs.InvalidInput("invalid value for OCAGENT_INSECURE") + } + + return ocagent.NewExporter(agentOpts...) +} diff --git a/cmd/virtual-kubelet/commands/root/tracing_register_test.go b/cmd/virtual-kubelet/commands/root/tracing_register_test.go new file mode 100644 index 0000000..7a6b36f --- /dev/null +++ b/cmd/virtual-kubelet/commands/root/tracing_register_test.go @@ -0,0 +1,58 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "testing" + + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "go.opencensus.io/trace" +) + +func TestGetTracingExporter(t *testing.T) { + defer delete(tracingExporters, "mock") + + mockExporterFn := func(_ TracingExporterOptions) (trace.Exporter, error) { + return nil, nil + } + + _, err := GetTracingExporter("notexist", TracingExporterOptions{}) + if !errdefs.IsNotFound(err) { + t.Fatalf("expected not found error, got: %v", err) + } + + RegisterTracingExporter("mock", mockExporterFn) + + if _, err := GetTracingExporter("mock", TracingExporterOptions{}); err != nil { + t.Fatal(err) + } +} + +func TestAvailableExporters(t *testing.T) { + defer delete(tracingExporters, "mock") + + mockExporterFn := func(_ TracingExporterOptions) (trace.Exporter, error) { + return nil, nil + } + RegisterTracingExporter("mock", mockExporterFn) + + for _, e := range AvailableTraceExporters() { + if e == "mock" { + return + } + } + + t.Fatal("could not find mock exporter in list of registered exporters") +} diff --git a/cmd/virtual-kubelet/commands/version/version.go b/cmd/virtual-kubelet/commands/version/version.go new file mode 100644 index 0000000..73f3e39 --- /dev/null +++ b/cmd/virtual-kubelet/commands/version/version.go @@ -0,0 +1,34 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "fmt" + + "github.com/spf13/cobra" + "github.com/virtual-kubelet/virtual-kubelet/version" +) + +// NewCommand creates a new version subcommand command +func NewCommand() *cobra.Command { + return &cobra.Command{ + Use: "version", + Short: "Show the version of the program", + Long: `Show the version of the program`, + Run: func(cmd *cobra.Command, args []string) { + fmt.Printf("Version: %s, Built: %s\n", version.Version, version.BuildTime) + }, + } +} diff --git a/cmd/virtual-kubelet/main.go b/cmd/virtual-kubelet/main.go new file mode 100644 index 0000000..e3f462c --- /dev/null +++ b/cmd/virtual-kubelet/main.go @@ -0,0 +1,82 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "os" + "os/signal" + "path/filepath" + "syscall" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/commands/providers" + "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/commands/root" + "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/commands/version" + "github.com/virtual-kubelet/virtual-kubelet/log" + logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" + "github.com/virtual-kubelet/virtual-kubelet/trace" + "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sig + cancel() + }() + + log.L = logruslogger.FromLogrus(logrus.NewEntry(logrus.StandardLogger())) + trace.T = opencensus.Adapter{} + + var opts root.Opts + optsErr := root.SetDefaultOpts(&opts) + + rootCmd := root.NewCommand(ctx, filepath.Base(os.Args[0]), opts) + rootCmd.AddCommand(version.NewCommand(), providers.NewCommand()) + preRun := rootCmd.PreRunE + + var logLevel string + rootCmd.PreRunE = func(cmd *cobra.Command, args []string) error { + if optsErr != nil { + return optsErr + } + if preRun != nil { + return preRun(cmd, args) + } + return nil + } + + rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", `set the log level, e.g. "debug", "info", "warn", "error"`) + + rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error { + if logLevel != "" { + lvl, err := logrus.ParseLevel(logLevel) + if err != nil { + return errors.Wrap(err, "could not parse log level") + } + logrus.SetLevel(lvl) + } + return nil + } + + if err := rootCmd.Execute(); err != nil && errors.Cause(err) != context.Canceled { + log.G(ctx).Fatal(err) + } +}