// Copyright © 2017 The virtual-kubelet authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package root import ( "context" "os" "path" "time" "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/node" "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/register" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" ) // NewCommand creates a new top-level command. // This command is used to start the virtual-kubelet daemon func NewCommand(ctx context.Context, name string, c Opts) *cobra.Command { cmd := &cobra.Command{ Use: name, Short: name + " provides a virtual kubelet interface for your kubernetes cluster.", Long: name + ` implements the Kubelet interface with a pluggable backend implementation allowing users to create kubernetes nodes without running the kubelet. This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, RunE: func(cmd *cobra.Command, args []string) error { return runRootCommand(ctx, c) }, } installFlags(cmd.Flags(), &c) return cmd } func runRootCommand(ctx context.Context, c Opts) error { ctx, cancel := context.WithCancel(ctx) defer cancel() if ok := providers.ValidOperatingSystems[c.OperatingSystem]; !ok { return errdefs.InvalidInputf("operating system %q is not supported", c.OperatingSystem) } if c.PodSyncWorkers == 0 { return errdefs.InvalidInput("pod sync workers must be greater than 0") } var taint *corev1.Taint if !c.DisableTaint { var err error taint, err = getTaint(c) if err != nil { return err } } client, err := newClient(c.KubeConfigPath) if err != nil { return err } // Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node. podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions( client, c.InformerResyncPeriod, kubeinformers.WithNamespace(c.KubeNamespace), kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String() })) podInformer := podInformerFactory.Core().V1().Pods() // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, c.InformerResyncPeriod) // Create a secret informer and a config map informer so we can pass their listers to the resource manager. secretInformer := scmInformerFactory.Core().V1().Secrets() configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() serviceInformer := scmInformerFactory.Core().V1().Services() go podInformerFactory.Start(ctx.Done()) go scmInformerFactory.Start(ctx.Done()) rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister()) if err != nil { return errors.Wrap(err, "could not create resource manager") } apiConfig, err := getAPIConfig(c) if err != nil { return err } if err := setupTracing(ctx, c); err != nil { return err } initConfig := register.InitConfig{ ConfigPath: c.ProviderConfigPath, NodeName: c.NodeName, OperatingSystem: c.OperatingSystem, ResourceManager: rm, DaemonPort: int32(c.ListenPort), InternalIP: os.Getenv("VKUBELET_POD_IP"), } p, err := register.GetProvider(c.Provider, initConfig) if err != nil { return err } ctx = log.WithLogger(ctx, log.G(ctx).WithFields(log.Fields{ "provider": c.Provider, "operatingSystem": c.OperatingSystem, "node": c.NodeName, "watchedNamespace": c.KubeNamespace, })) var leaseClient v1beta1.LeaseInterface if c.EnableNodeLease { leaseClient = client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) } pNode := NodeFromProvider(ctx, c.NodeName, taint, p) nodeRunner, err := node.NewNodeController( node.NaiveNodeProvider{}, pNode, client.CoreV1().Nodes(), node.WithNodeEnableLeaseV1Beta1(leaseClient, nil), node.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error { if !k8serrors.IsNotFound(err) { return err } log.G(ctx).Debug("node not found") newNode := pNode.DeepCopy() newNode.ResourceVersion = "" _, err = client.CoreV1().Nodes().Create(newNode) if err != nil { return err } log.G(ctx).Debug("created new node") return nil }), ) if err != nil { log.G(ctx).Fatal(err) } eb := record.NewBroadcaster() eb.StartLogging(log.G(ctx).Infof) eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)}) pc, err := node.NewPodController(node.PodControllerConfig{ PodClient: client.CoreV1(), PodInformer: podInformer, EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}), Provider: p, SecretLister: secretInformer.Lister(), ConfigMapLister: configMapInformer.Lister(), ServiceLister: serviceInformer.Lister(), }) if err != nil { return errors.Wrap(err, "error setting up pod controller") } cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig) if err != nil { return err } defer cancelHTTP() go func() { if err := pc.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled { log.G(ctx).Fatal(err) } }() if c.StartupTimeout > 0 { // If there is a startup timeout, it does two things: // 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period // 2. It prevents node advertisement from happening until we're in an operational state err = waitFor(ctx, c.StartupTimeout, pc.Ready()) if err != nil { return err } } go func() { if err := nodeRunner.Run(ctx); err != nil { log.G(ctx).Fatal(err) } }() log.G(ctx).Info("Initialized") <-ctx.Done() return nil } func waitFor(ctx context.Context, time time.Duration, ready <-chan struct{}) error { ctx, cancel := context.WithTimeout(ctx, time) defer cancel() // Wait for the VK / PC close the the ready channel, or time out and return log.G(ctx).Info("Waiting for pod controller / VK to be ready") select { case <-ready: return nil case <-ctx.Done(): return errors.Wrap(ctx.Err(), "Error while starting up VK") } } func newClient(configPath string) (*kubernetes.Clientset, error) { var config *rest.Config // Check if the kubeConfig file exists. if _, err := os.Stat(configPath); !os.IsNotExist(err) { // Get the kubeconfig from the filepath. config, err = clientcmd.BuildConfigFromFlags("", configPath) if err != nil { return nil, errors.Wrap(err, "error building client config") } } else { // Set to in-cluster config. config, err = rest.InClusterConfig() if err != nil { return nil, errors.Wrap(err, "error building in cluster config") } } if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { config.Host = masterURI } return kubernetes.NewForConfig(config) }