diff --git a/cmd/virtual-kubelet/internal/commands/root/flag.go b/cmd/virtual-kubelet/internal/commands/root/flag.go index 1ef7946e1..c851c38a2 100644 --- a/cmd/virtual-kubelet/internal/commands/root/flag.go +++ b/cmd/virtual-kubelet/internal/commands/root/flag.go @@ -59,7 +59,11 @@ func (mv mapVar) Type() string { func installFlags(flags *pflag.FlagSet, c *Opts) { flags.StringVar(&c.KubeConfigPath, "kubeconfig", c.KubeConfigPath, "kube config file to use for connecting to the Kubernetes API server") + flags.StringVar(&c.KubeNamespace, "namespace", c.KubeNamespace, "kubernetes namespace (default is 'all')") + flags.MarkDeprecated("namespace", "Nodes must watch for pods in all namespaces. This option is now ignored.") //nolint:errcheck + flags.MarkHidden("namespace") //nolint:errcheck + flags.StringVar(&c.KubeClusterDomain, "cluster-domain", c.KubeClusterDomain, "kubernetes cluster-domain (default is 'cluster.local')") flags.StringVar(&c.NodeName, "nodename", c.NodeName, "kubernetes node name") flags.StringVar(&c.OperatingSystem, "os", c.OperatingSystem, "Operating System (Linux/Windows)") @@ -68,11 +72,15 @@ func installFlags(flags *pflag.FlagSet, c *Opts) { flags.StringVar(&c.MetricsAddr, "metrics-addr", c.MetricsAddr, "address to listen for metrics/stats requests") flags.StringVar(&c.TaintKey, "taint", c.TaintKey, "Set node taint key") + flags.BoolVar(&c.DisableTaint, "disable-taint", c.DisableTaint, "disable the virtual-kubelet node taint") flags.MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable") //nolint:errcheck flags.IntVar(&c.PodSyncWorkers, "pod-sync-workers", c.PodSyncWorkers, `set the number of pod synchronization workers`) + flags.BoolVar(&c.EnableNodeLease, "enable-node-lease", c.EnableNodeLease, `use node leases (1.13) for node heartbeats`) + flags.MarkDeprecated("enable-node-lease", "leases are always enabled") //nolint:errcheck + flags.MarkHidden("enable-node-lease") //nolint:errcheck flags.StringSliceVar(&c.TraceExporters, "trace-exporter", c.TraceExporters, fmt.Sprintf("sets the tracing exporter to use, available exporters: %s", AvailableTraceExporters())) flags.StringVar(&c.TraceConfig.ServiceName, "trace-service-name", c.TraceConfig.ServiceName, "sets the name of the service used to register with the trace exporter") diff --git a/cmd/virtual-kubelet/internal/commands/root/http.go b/cmd/virtual-kubelet/internal/commands/root/http.go index b20a4ff7d..a14b2c9df 100644 --- a/cmd/virtual-kubelet/internal/commands/root/http.go +++ b/cmd/virtual-kubelet/internal/commands/root/http.go @@ -15,140 +15,15 @@ package root import ( - "context" - "crypto/tls" "fmt" - "io" - "net" - "net/http" "os" "time" - - "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider" - "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/node/api" ) -// AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided -// Note this list should be a moving target. -var AcceptedCiphers = []uint16{ - tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, - tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, - tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, - - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, -} - -func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) { - cert, err := tls.LoadX509KeyPair(certPath, keyPath) - if err != nil { - return nil, errors.Wrap(err, "error loading tls certs") - } - - return &tls.Config{ - Certificates: []tls.Certificate{cert}, - MinVersion: tls.VersionTLS12, - PreferServerCipherSuites: true, - CipherSuites: AcceptedCiphers, - }, nil -} - -func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig, getPodsFromKubernetes api.PodListerFunc) (_ func(), retErr error) { - var closers []io.Closer - cancel := func() { - for _, c := range closers { - c.Close() - } - } - defer func() { - if retErr != nil { - cancel() - } - }() - - if cfg.CertPath == "" || cfg.KeyPath == "" { - log.G(ctx). - WithField("certPath", cfg.CertPath). - WithField("keyPath", cfg.KeyPath). - Error("TLS certificates not provided, not setting up pod http server") - } else { - tlsCfg, err := loadTLSConfig(cfg.CertPath, cfg.KeyPath) - if err != nil { - return nil, err - } - l, err := tls.Listen("tcp", cfg.Addr, tlsCfg) - if err != nil { - return nil, errors.Wrap(err, "error setting up listener for pod http server") - } - - mux := http.NewServeMux() - - podRoutes := api.PodHandlerConfig{ - RunInContainer: p.RunInContainer, - GetContainerLogs: p.GetContainerLogs, - GetPodsFromKubernetes: getPodsFromKubernetes, - GetPods: p.GetPods, - StreamIdleTimeout: cfg.StreamIdleTimeout, - StreamCreationTimeout: cfg.StreamCreationTimeout, - } - - api.AttachPodRoutes(podRoutes, mux, true) - - s := &http.Server{ - Handler: mux, - TLSConfig: tlsCfg, - } - go serveHTTP(ctx, s, l, "pods") - closers = append(closers, s) - } - - if cfg.MetricsAddr == "" { - log.G(ctx).Info("Pod metrics server not setup due to empty metrics address") - } else { - l, err := net.Listen("tcp", cfg.MetricsAddr) - if err != nil { - return nil, errors.Wrap(err, "could not setup listener for pod metrics http server") - } - - mux := http.NewServeMux() - - var summaryHandlerFunc api.PodStatsSummaryHandlerFunc - if mp, ok := p.(provider.PodMetricsProvider); ok { - summaryHandlerFunc = mp.GetStatsSummary - } - podMetricsRoutes := api.PodMetricsConfig{ - GetStatsSummary: summaryHandlerFunc, - } - api.AttachPodMetricsRoutes(podMetricsRoutes, mux) - s := &http.Server{ - Handler: mux, - } - go serveHTTP(ctx, s, l, "pod metrics") - closers = append(closers, s) - } - - return cancel, nil -} - -func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string) { - if err := s.Serve(l); err != nil { - select { - case <-ctx.Done(): - default: - log.G(ctx).WithError(err).Errorf("Error setting up %s http server", name) - } - } - l.Close() -} - type apiServerConfig struct { CertPath string KeyPath string + CACertPath string Addr string MetricsAddr string StreamIdleTimeout time.Duration @@ -157,8 +32,9 @@ type apiServerConfig struct { func getAPIConfig(c Opts) (*apiServerConfig, error) { config := apiServerConfig{ - CertPath: os.Getenv("APISERVER_CERT_LOCATION"), - KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), + CertPath: os.Getenv("APISERVER_CERT_LOCATION"), + KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), + CACertPath: os.Getenv("APISERVER_CA_CERT_LOCATION"), } config.Addr = fmt.Sprintf(":%d", c.ListenPort) diff --git a/cmd/virtual-kubelet/internal/commands/root/node.go b/cmd/virtual-kubelet/internal/commands/root/node.go index 876bcd7d5..a1b35900b 100644 --- a/cmd/virtual-kubelet/internal/commands/root/node.go +++ b/cmd/virtual-kubelet/internal/commands/root/node.go @@ -15,54 +15,10 @@ package root import ( - "context" - "strings" - - "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider" "github.com/virtual-kubelet/virtual-kubelet/errdefs" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const osLabel = "beta.kubernetes.io/os" - -// NodeFromProvider builds a kubernetes node object from a provider -// This is a temporary solution until node stuff actually split off from the provider interface itself. -func NodeFromProvider(ctx context.Context, name string, taint *v1.Taint, p provider.Provider, version string) *v1.Node { - taints := make([]v1.Taint, 0) - - if taint != nil { - taints = append(taints, *taint) - } - - node := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{ - "type": "virtual-kubelet", - "kubernetes.io/role": "agent", - "kubernetes.io/hostname": name, - }, - }, - Spec: v1.NodeSpec{ - Taints: taints, - }, - Status: v1.NodeStatus{ - NodeInfo: v1.NodeSystemInfo{ - Architecture: "amd64", - KubeletVersion: version, - }, - }, - } - - p.ConfigureNode(ctx, node) - if _, ok := node.ObjectMeta.Labels[osLabel]; !ok { - node.ObjectMeta.Labels[osLabel] = strings.ToLower(node.Status.NodeInfo.OperatingSystem) - } - return node -} - // getTaint creates a taint using the provided key/value. // Taint effect is read from the environment // The taint key/value may be overwritten by the environment. diff --git a/cmd/virtual-kubelet/internal/commands/root/opts.go b/cmd/virtual-kubelet/internal/commands/root/opts.go index 3b70ae1e6..0af8ee72c 100644 --- a/cmd/virtual-kubelet/internal/commands/root/opts.go +++ b/cmd/virtual-kubelet/internal/commands/root/opts.go @@ -28,7 +28,7 @@ import ( // Defaults for root command options const ( DefaultNodeName = "virtual-kubelet" - DefaultOperatingSystem = "Linux" + DefaultOperatingSystem = "linux" DefaultInformerResyncPeriod = 1 * time.Minute DefaultMetricsAddr = ":10255" DefaultListenPort = 10250 // TODO(cpuguy83)(VK1.0): Change this to an addr instead of just a port.. we should not be listening on all interfaces. diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index 6d21accf7..88dd709b1 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -16,8 +16,10 @@ package root import ( "context" + "crypto/tls" + "net/http" "os" - "path" + "runtime" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -26,14 +28,10 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/internal/manager" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/api" "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/scheme" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/record" + "k8s.io/apiserver/pkg/server/dynamiccertificates" ) // NewCommand creates a new top-level command. @@ -75,30 +73,33 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } } - client, err := nodeutil.ClientsetFromEnv(c.KubeConfigPath) - if err != nil { - return err - } + mux := http.NewServeMux() + newProvider := func(cfg nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) { + rm, err := manager.NewResourceManager(cfg.Pods, cfg.Secrets, cfg.ConfigMaps, cfg.Services) + if err != nil { + return nil, nil, errors.Wrap(err, "could not create resource manager") + } + initConfig := provider.InitConfig{ + ConfigPath: c.ProviderConfigPath, + NodeName: c.NodeName, + OperatingSystem: c.OperatingSystem, + ResourceManager: rm, + DaemonPort: c.ListenPort, + InternalIP: os.Getenv("VKUBELET_POD_IP"), + KubeClusterDomain: c.KubeClusterDomain, + } + pInit := s.Get(c.Provider) + if pInit == nil { + return nil, nil, errors.Errorf("provider %q not found", c.Provider) + } - // Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node. - podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions( - client, - c.InformerResyncPeriod, - kubeinformers.WithNamespace(c.KubeNamespace), - nodeutil.PodInformerFilter(c.NodeName), - ) - podInformer := podInformerFactory.Core().V1().Pods() - - // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). - scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, c.InformerResyncPeriod) - // Create a secret informer and a config map informer so we can pass their listers to the resource manager. - secretInformer := scmInformerFactory.Core().V1().Secrets() - configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() - serviceInformer := scmInformerFactory.Core().V1().Services() - - rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister()) - if err != nil { - return errors.Wrap(err, "could not create resource manager") + p, err := pInit(initConfig) + if err != nil { + return nil, nil, errors.Wrapf(err, "error initializing provider %s", c.Provider) + } + p.ConfigureNode(ctx, cfg.Node) + cfg.Node.Status.NodeInfo.KubeletVersion = c.Version + return p, nil, nil } apiConfig, err := getAPIConfig(c) @@ -106,28 +107,39 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } - if err := setupTracing(ctx, c); err != nil { + cm, err := nodeutil.NewNode(c.NodeName, newProvider, func(cfg *nodeutil.NodeConfig) error { + cfg.KubeconfigPath = c.KubeConfigPath + cfg.Handler = mux + cfg.InformerResyncPeriod = c.InformerResyncPeriod + + if taint != nil { + cfg.NodeSpec.Spec.Taints = append(cfg.NodeSpec.Spec.Taints, *taint) + } + cfg.NodeSpec.Status.NodeInfo.Architecture = runtime.GOARCH + cfg.NodeSpec.Status.NodeInfo.OperatingSystem = c.OperatingSystem + + cfg.HTTPListenAddr = apiConfig.Addr + cfg.StreamCreationTimeout = apiConfig.StreamCreationTimeout + cfg.StreamIdleTimeout = apiConfig.StreamIdleTimeout + cfg.DebugHTTP = true + + cfg.NumWorkers = c.PodSyncWorkers + + return nil + }, + setAuth(c.NodeName, apiConfig), + nodeutil.WithTLSConfig( + nodeutil.WithKeyPairFromPath(apiConfig.CertPath, apiConfig.KeyPath), + maybeCA(apiConfig.CACertPath), + ), + nodeutil.AttachProviderRoutes(mux), + ) + if err != nil { return err } - initConfig := provider.InitConfig{ - ConfigPath: c.ProviderConfigPath, - NodeName: c.NodeName, - OperatingSystem: c.OperatingSystem, - ResourceManager: rm, - DaemonPort: c.ListenPort, - InternalIP: os.Getenv("VKUBELET_POD_IP"), - KubeClusterDomain: c.KubeClusterDomain, - } - - pInit := s.Get(c.Provider) - if pInit == nil { - return errors.Errorf("provider %q not found", c.Provider) - } - - p, err := pInit(initConfig) - if err != nil { - return errors.Wrapf(err, "error initializing provider %s", c.Provider) + if err := setupTracing(ctx, c); err != nil { + return err } ctx = log.WithLogger(ctx, log.G(ctx).WithFields(log.Fields{ @@ -137,70 +149,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { "watchedNamespace": c.KubeNamespace, })) - pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version) - np := node.NewNaiveNodeProvider() - additionalOptions := []node.NodeControllerOpt{ - node.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error { - if !k8serrors.IsNotFound(err) { - return err - } - - log.G(ctx).Debug("node not found") - newNode := pNode.DeepCopy() - newNode.ResourceVersion = "" - _, err = client.CoreV1().Nodes().Create(ctx, newNode, metav1.CreateOptions{}) - if err != nil { - return err - } - log.G(ctx).Debug("created new node") - return nil - }), - } - if c.EnableNodeLease { - leaseClient := nodeutil.NodeLeaseV1Client(client) - // 40 seconds is the default lease time in upstream kubelet - additionalOptions = append(additionalOptions, node.WithNodeEnableLeaseV1(leaseClient, 40)) - } - nodeRunner, err := node.NewNodeController( - np, - pNode, - client.CoreV1().Nodes(), - additionalOptions..., - ) - if err != nil { - log.G(ctx).Fatal(err) - } - - eb := record.NewBroadcaster() - eb.StartLogging(log.G(ctx).Infof) - eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)}) - - pc, err := node.NewPodController(node.PodControllerConfig{ - PodClient: client.CoreV1(), - PodInformer: podInformer, - EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}), - Provider: p, - SecretInformer: secretInformer, - ConfigMapInformer: configMapInformer, - ServiceInformer: serviceInformer, - }) - if err != nil { - return errors.Wrap(err, "error setting up pod controller") - } - - go podInformerFactory.Start(ctx.Done()) - go scmInformerFactory.Start(ctx.Done()) - - cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) { - return rm.GetPods(), nil - }) - if err != nil { - return err - } - defer cancelHTTP() - - cm := nodeutil.NewControllerManager(nodeRunner, pc) - go cm.Run(ctx, c.PodSyncWorkers) // nolint:errcheck + go cm.Run(ctx) //nolint:errcheck defer func() { log.G(ctx).Debug("Waiting for controllers to be done") @@ -213,11 +162,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } - setNodeReady(pNode) - if err := np.UpdateStatus(ctx, pNode); err != nil { - return errors.Wrap(err, "error marking the node as ready") - } - log.G(ctx).Info("Initialized") + log.G(ctx).Info("Ready") select { case <-ctx.Done(): @@ -227,18 +172,31 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return nil } -func setNodeReady(n *corev1.Node) { - for i, c := range n.Status.Conditions { - if c.Type != "Ready" { - continue +func setAuth(node string, apiCfg *apiServerConfig) nodeutil.NodeOpt { + if apiCfg.CACertPath == "" { + return func(cfg *nodeutil.NodeConfig) error { + cfg.Handler = api.InstrumentHandler(nodeutil.WithAuth(nodeutil.NoAuth(), cfg.Handler)) + return nil } + } - c.Message = "Kubelet is ready" - c.Reason = "KubeletReady" - c.Status = corev1.ConditionTrue - c.LastHeartbeatTime = metav1.Now() - c.LastTransitionTime = metav1.Now() - n.Status.Conditions[i] = c - return + return func(cfg *nodeutil.NodeConfig) error { + auth, err := nodeutil.WebhookAuth(cfg.Client, node, func(cfg *nodeutil.WebhookAuthConfig) error { + var err error + cfg.AuthnConfig.ClientCertificateCAContentProvider, err = dynamiccertificates.NewDynamicCAContentFromFile("ca-cert-bundle", apiCfg.CACertPath) + return err + }) + if err != nil { + return err + } + cfg.Handler = api.InstrumentHandler(nodeutil.WithAuth(auth, cfg.Handler)) + return nil } } + +func maybeCA(p string) func(*tls.Config) error { + if p == "" { + return func(*tls.Config) error { return nil } + } + return nodeutil.WithCAFromPath(p) +} diff --git a/cmd/virtual-kubelet/internal/provider/mock/mock.go b/cmd/virtual-kubelet/internal/provider/mock/mock.go index 8a3b33293..4284060f6 100644 --- a/cmd/virtual-kubelet/internal/provider/mock/mock.go +++ b/cmd/virtual-kubelet/internal/provider/mock/mock.go @@ -339,7 +339,7 @@ func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) { // nolin n.Status.DaemonEndpoints = p.nodeDaemonEndpoints() os := p.operatingSystem if os == "" { - os = "Linux" + os = "linux" } n.Status.NodeInfo.OperatingSystem = os n.Status.NodeInfo.Architecture = "amd64" diff --git a/cmd/virtual-kubelet/internal/provider/provider.go b/cmd/virtual-kubelet/internal/provider/provider.go index 1af5e58c8..a48630eb5 100644 --- a/cmd/virtual-kubelet/internal/provider/provider.go +++ b/cmd/virtual-kubelet/internal/provider/provider.go @@ -2,35 +2,15 @@ package provider import ( "context" - "io" - "github.com/virtual-kubelet/virtual-kubelet/node" - "github.com/virtual-kubelet/virtual-kubelet/node/api" - "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" v1 "k8s.io/api/core/v1" ) -// Provider contains the methods required to implement a virtual-kubelet provider. -// -// Errors produced by these methods should implement an interface from -// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the -// core logic to be able to understand the type of failure. +// Provider wraps the core provider type with an extra function needed to bootstrap the node type Provider interface { - node.PodLifecycleHandler - - // GetContainerLogs retrieves the logs of a container by name from the provider. - GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) - - // RunInContainer executes a command in a container in the pod, copying data - // between in/out/err and the container's stdin/stdout/stderr. - RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error - + nodeutil.Provider // ConfigureNode enables a provider to configure the node object that // will be used for Kubernetes. ConfigureNode(context.Context, *v1.Node) } - -// PodMetricsProvider is an optional interface that providers can implement to expose pod stats -type PodMetricsProvider interface { - GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) -} diff --git a/cmd/virtual-kubelet/internal/provider/types.go b/cmd/virtual-kubelet/internal/provider/types.go index f99e3bc1e..5ad323410 100644 --- a/cmd/virtual-kubelet/internal/provider/types.go +++ b/cmd/virtual-kubelet/internal/provider/types.go @@ -2,9 +2,9 @@ package provider const ( // OperatingSystemLinux is the configuration value for defining Linux. - OperatingSystemLinux = "Linux" + OperatingSystemLinux = "linux" // OperatingSystemWindows is the configuration value for defining Windows. - OperatingSystemWindows = "Windows" + OperatingSystemWindows = "windows" ) type OperatingSystems map[string]bool // nolint:golint diff --git a/go.sum b/go.sum index 0c24d7c00..5910cae94 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,10 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= +github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= @@ -55,6 +57,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/blang/semver v3.5.0+incompatible h1:CGxCgetQ64DKk7rdZ++Vfnb1+ogGNnB17OJKJXD2Cfs= github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/bombsimon/logrusr v1.0.0 h1:CTCkURYAt5nhCCnKH9eLShYayj2/8Kn/4Qg3QfiU+Ro= github.com/bombsimon/logrusr v1.0.0/go.mod h1:Jq0nHtvxabKE5EMwAAdgTaz7dfWE8C4i11NOltxGQpc= @@ -140,11 +143,13 @@ github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+ github.com/go-openapi/jsonpointer v0.17.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M= github.com/go-openapi/jsonpointer v0.18.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M= github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg= +github.com/go-openapi/jsonpointer v0.19.3 h1:gihV7YNZK1iK6Tgwwsxo2rJbD1GTbdm72325Bq8FI3w= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg= github.com/go-openapi/jsonreference v0.17.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.18.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.19.2/go.mod h1:jMjeRr2HHw6nAVajTXJ4eiUwohSTlpa0o73RUL1owJc= +github.com/go-openapi/jsonreference v0.19.3 h1:5cxNfTy0UVC3X8JL5ymxzyoUZmo8iZb+jeTWn7tUa8o= github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL98+wF9xc8zWvFonSJ8= github.com/go-openapi/loads v0.17.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU= github.com/go-openapi/loads v0.18.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU= @@ -158,6 +163,7 @@ github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nA github.com/go-openapi/spec v0.17.0/go.mod h1:XkF/MOi14NmjsfZ8VtAKf8pIlbZzyoTvZsdfssdxcBI= github.com/go-openapi/spec v0.18.0/go.mod h1:XkF/MOi14NmjsfZ8VtAKf8pIlbZzyoTvZsdfssdxcBI= github.com/go-openapi/spec v0.19.2/go.mod h1:sCxk3jxKgioEJikev4fgkNmwS+3kuYdJtcsZsD5zxMY= +github.com/go-openapi/spec v0.19.3 h1:0XRyw8kguri6Yw4SxhsQA/atC88yqrk0+G4YhI2wabc= github.com/go-openapi/spec v0.19.3/go.mod h1:FpwSN1ksY1eteniUU7X0N/BgJ7a4WvBFVA8Lj9mJglo= github.com/go-openapi/strfmt v0.17.0/go.mod h1:P82hnJI0CXkErkXi8IKjPbNBM6lV6+5pLP5l494TcyU= github.com/go-openapi/strfmt v0.18.0/go.mod h1:P82hnJI0CXkErkXi8IKjPbNBM6lV6+5pLP5l494TcyU= @@ -167,6 +173,7 @@ github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dp github.com/go-openapi/swag v0.17.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg= github.com/go-openapi/swag v0.18.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg= github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= +github.com/go-openapi/swag v0.19.5 h1:lTz6Ys4CmqqCQmZPBlbQENR1/GucA2bzYTE12Pw4tFY= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4= github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA= @@ -281,6 +288,7 @@ github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/mailru/easyjson v0.7.0 h1:aizVhC/NAAcKWb+5QsU1iNOZb4Yws5UO2I+aIprQITM= github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= @@ -371,6 +379,7 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= @@ -704,6 +713,7 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800 h1:9ZNvfPvVIEsp/T1ez4GQuzCcCTEQW k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.9/go.mod h1:dzAXnQbTRyDlZPJX2SUPEqvnB+j7AJjtlox7PEwigU0= +sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15 h1:4uqm9Mv+w2MmBYD+F4qf/v6tDFUdPOk29C095RbU5mY= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15/go.mod h1:LEScyzhFmoF5pso/YSeBstl57mOzx9xlU9n85RGrDQg= sigs.k8s.io/controller-runtime v0.7.1 h1:nqVwzVzdenfd9xIbB35pC7JJH2IXVL4hDo3MNzkyCh4= sigs.k8s.io/controller-runtime v0.7.1/go.mod h1:pJ3YBrJiAqMAZKi6UVGuE98ZrroV1p+pIhoHsMm9wdU= diff --git a/hack/skaffold/virtual-kubelet/base.yml b/hack/skaffold/virtual-kubelet/base.yml index 0cbfc14c2..f13ad0426 100644 --- a/hack/skaffold/virtual-kubelet/base.yml +++ b/hack/skaffold/virtual-kubelet/base.yml @@ -56,6 +56,14 @@ rules: verbs: - create - patch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - get + - create + - update --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/hack/skaffold/virtual-kubelet/pod.yml b/hack/skaffold/virtual-kubelet/pod.yml index 48e2c66a5..bdc4ff5f5 100644 --- a/hack/skaffold/virtual-kubelet/pod.yml +++ b/hack/skaffold/virtual-kubelet/pod.yml @@ -4,6 +4,8 @@ metadata: name: vkubelet-mock-0 spec: containers: + - name: jaeger-tracing + image: jaegertracing/all-in-one:1.22 - name: vkubelet-mock-0 image: virtual-kubelet # "IfNotPresent" is used to prevent Minikube from trying to pull from the registry (and failing) in the first place. @@ -23,18 +25,16 @@ spec: - --klog.logtostderr - --log-level - debug + - --trace-exporter + - jaeger + - --trace-sample-rate=always env: + - name: JAEGER_AGENT_ENDPOINT + value: localhost:6831 - name: KUBELET_PORT value: "10250" - name: VKUBELET_POD_IP valueFrom: fieldRef: fieldPath: status.podIP - ports: - - name: metrics - containerPort: 10255 - readinessProbe: - httpGet: - path: /stats/summary - port: metrics serviceAccountName: virtual-kubelet diff --git a/internal/test/e2e/framework/stats.go b/internal/test/e2e/framework/stats.go index 884c8ad0d..5e39d688a 100644 --- a/internal/test/e2e/framework/stats.go +++ b/internal/test/e2e/framework/stats.go @@ -3,7 +3,6 @@ package framework import ( "context" "encoding/json" - "strconv" stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" "k8s.io/apimachinery/pkg/util/net" @@ -18,7 +17,7 @@ func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error) Namespace(f.Namespace). Resource("pods"). SubResource("proxy"). - Name(net.JoinSchemeNamePort("http", f.NodeName, strconv.Itoa(10255))). + Name(net.JoinSchemeNamePort("https", f.NodeName, "10250")). Suffix("/stats/summary").DoRaw(ctx) if err != nil { return nil, err diff --git a/node/nodeutil/auth.go b/node/nodeutil/auth.go new file mode 100644 index 000000000..ef5e52cac --- /dev/null +++ b/node/nodeutil/auth.go @@ -0,0 +1,220 @@ +package nodeutil + +import ( + "context" + "net/http" + "strings" + "time" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" + "k8s.io/apiserver/pkg/authentication/authenticator" + "k8s.io/apiserver/pkg/authentication/authenticatorfactory" + "k8s.io/apiserver/pkg/authentication/request/anonymous" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/apiserver/pkg/authorization/authorizerfactory" + "k8s.io/client-go/kubernetes" +) + +// Auth is the interface used to implement authn/authz for http requests +type Auth interface { + authenticator.Request + authorizer.RequestAttributesGetter + authorizer.Authorizer +} + +type authWrapper struct { + authenticator.Request + authorizer.RequestAttributesGetter + authorizer.Authorizer +} + +// InstrumentAuth wraps the provided Auth in a new instrumented Auth +// +// Note: You would only need this if you rolled your own auth. +// The Auth implementations defined in this package are already instrumented. +func InstrumentAuth(auth Auth) Auth { + if _, ok := auth.(*authWrapper); ok { + // This is already instrumented + return auth + } + return &authWrapper{ + Request: auth, + RequestAttributesGetter: auth, + Authorizer: auth, + } +} + +// NoAuth creates an Auth which allows anonymous access to all resouorces +func NoAuth() Auth { + return &authWrapper{ + Request: anonymous.NewAuthenticator(), + RequestAttributesGetter: &NodeRequestAttr{}, + Authorizer: authorizerfactory.NewAlwaysAllowAuthorizer(), + } +} + +// WithAuth makes a new http handler which wraps the provided handler with authn/authz. +func WithAuth(auth Auth, h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleAuth(auth, w, r, h) + }) +} + +func handleAuth(auth Auth, w http.ResponseWriter, r *http.Request, next http.Handler) { + ctx := r.Context() + ctx, span := trace.StartSpan(ctx, "vk.handleAuth") + defer span.End() + r = r.WithContext(ctx) + + info, ok, err := auth.AuthenticateRequest(r) + if err != nil || !ok { + log.G(r.Context()).WithError(err).Error("Authorization error") + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + logger := log.G(ctx).WithFields(log.Fields{ + "user-name": info.User.GetName(), + "user-id": info.User.GetUID(), + }) + + ctx = log.WithLogger(ctx, logger) + r = r.WithContext(ctx) + + attrs := auth.GetRequestAttributes(info.User, r) + + decision, _, err := auth.Authorize(ctx, attrs) + if err != nil { + log.G(r.Context()).WithError(err).Error("Authorization error") + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if decision != authorizer.DecisionAllow { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + + next.ServeHTTP(w, r) +} + +// WebhookAuthOption is used as a functional argument to configure webhook auth. +type WebhookAuthOption func(*WebhookAuthConfig) error + +// WebhookAuthConfig stores the configurations for authn/authz and is used by WebhookAuthOption to expose to callers. +type WebhookAuthConfig struct { + AuthnConfig authenticatorfactory.DelegatingAuthenticatorConfig + AuthzConfig authorizerfactory.DelegatingAuthorizerConfig +} + +// WebhookAuth creates an Auth suitable to use with kubelet webhook auth. +// You must provide a CA provider to the authentication config, otherwise mTLS is disabled. +func WebhookAuth(client kubernetes.Interface, nodeName string, opts ...WebhookAuthOption) (Auth, error) { + cfg := WebhookAuthConfig{ + AuthnConfig: authenticatorfactory.DelegatingAuthenticatorConfig{ + CacheTTL: 2 * time.Minute, // default taken from k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1 + // TODO: After upgrading k8s libs, we need to add the retry backoff option + }, + AuthzConfig: authorizerfactory.DelegatingAuthorizerConfig{ + AllowCacheTTL: 5 * time.Minute, // default taken from k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1 + DenyCacheTTL: 30 * time.Second, // default taken from k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1 + // TODO: After upgrading k8s libs, we need to add the retry backoff option + }, + } + + for _, o := range opts { + if err := o(&cfg); err != nil { + return nil, err + } + } + + cfg.AuthnConfig.TokenAccessReviewClient = client.AuthenticationV1().TokenReviews() + cfg.AuthzConfig.SubjectAccessReviewClient = client.AuthorizationV1().SubjectAccessReviews() + + authn, _, err := cfg.AuthnConfig.New() + if err != nil { + return nil, err + } + + authz, err := cfg.AuthzConfig.New() + if err != nil { + return nil, err + } + return &authWrapper{ + Request: authn, + RequestAttributesGetter: NodeRequestAttr{nodeName}, + Authorizer: authz, + }, nil +} + +func (w *authWrapper) AuthenticateRequest(r *http.Request) (*authenticator.Response, bool, error) { + ctx, span := trace.StartSpan(r.Context(), "AuthenticateRequest") + defer span.End() + return w.Request.AuthenticateRequest(r.WithContext(ctx)) +} + +func (w *authWrapper) Authorize(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) { + ctx, span := trace.StartSpan(ctx, "Authorize") + defer span.End() + return w.Authorizer.Authorize(ctx, a) +} + +// NodeRequestAttr is a authorizor.RequeestAttributesGetter which can be used in the Auth interface. +type NodeRequestAttr struct { + NodeName string +} + +// GetRequestAttributes satisfies the authorizer.RequestAttributesGetter interface for use with an `Auth`. +func (a NodeRequestAttr) GetRequestAttributes(u user.Info, r *http.Request) authorizer.Attributes { + return authorizer.AttributesRecord{ + User: u, + Verb: getAPIVerb(r), + Namespace: "", + APIGroup: "", + APIVersion: "v1", + Resource: "nodes", + Name: a.NodeName, + ResourceRequest: true, + Path: r.URL.Path, + Subresource: getSubresource(r), + } +} + +func getAPIVerb(r *http.Request) string { + switch r.Method { + case http.MethodPost: + return "create" + case http.MethodGet: + return "get" + case http.MethodPut: + return "update" + case http.MethodPatch: + return "patch" + case http.MethodDelete: + return "delete" + } + return "" +} + +func isSubpath(subpath, path string) bool { + // Taken from k8s.io/kubernetes/pkg/kubelet/server/auth.go + return subpath == path || (strings.HasPrefix(subpath, path) && subpath[len(path)] == '/') +} + +func getSubresource(r *http.Request) string { + if isSubpath(r.URL.Path, "/stats") { + return "stats" + } + if isSubpath(r.URL.Path, "/metrics") { + return "metrics" + } + if isSubpath(r.URL.Path, "/logs") { + // yes, "log", not "logs" + // per kubelet code: "log" to match other log subresources (pods/log, etc) + return "log" + } + + return "proxy" +} diff --git a/node/nodeutil/controller.go b/node/nodeutil/controller.go index 5521f7935..edb8c3c72 100644 --- a/node/nodeutil/controller.go +++ b/node/nodeutil/controller.go @@ -2,101 +2,173 @@ package nodeutil import ( "context" + "crypto/tls" "fmt" + "net" + "net/http" + "os" + "path" + "runtime" "time" + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/record" ) -// ControllerManager helps manage the startup/shutdown procedure for other controllers. +// Node helps manage the startup/shutdown procedure for other controllers. // It is intended as a convenience to reduce boiler plate code for starting up controllers. // -// Must be created with constructor `NewControllerManager`. -type ControllerManager struct { +// Must be created with constructor `NewNode`. +type Node struct { nc *node.NodeController pc *node.PodController + readyCb func(context.Context) error + ready chan struct{} done chan struct{} err error -} -// NewControllerManager creates a new ControllerManager. -func NewControllerManager(nc *node.NodeController, pc *node.PodController) *ControllerManager { - return &ControllerManager{ - nc: nc, - pc: pc, - ready: make(chan struct{}), - done: make(chan struct{}), - } + podInformerFactory informers.SharedInformerFactory + scmInformerFactory informers.SharedInformerFactory + client kubernetes.Interface + + listenAddr string + h http.Handler + tlsConfig *tls.Config + + workers int + + eb record.EventBroadcaster } // NodeController returns the configured node controller. -func (c *ControllerManager) NodeController() *node.NodeController { - return c.nc +func (n *Node) NodeController() *node.NodeController { + return n.nc } // PodController returns the configured pod controller. -func (c *ControllerManager) PodController() *node.PodController { - return c.pc +func (n *Node) PodController() *node.PodController { + return n.pc +} + +func (n *Node) runHTTP(ctx context.Context) (func(), error) { + if n.tlsConfig == nil { + log.G(ctx).Warn("TLS config not provided, not starting up http service") + return func() {}, nil + } + if n.h == nil { + log.G(ctx).Debug("No http handler, not starting up http service") + return func() {}, nil + } + + l, err := tls.Listen("tcp", n.listenAddr, n.tlsConfig) + if err != nil { + return nil, errors.Wrap(err, "error starting http listener") + } + + log.G(ctx).Debug("Started TLS listener") + + srv := &http.Server{Handler: n.h, TLSConfig: n.tlsConfig} + go srv.Serve(l) //nolint:errcheck + log.G(ctx).Debug("HTTP server running") + + return func() { + srv.Close() + l.Close() + }, nil } // Run starts all the underlying controllers -func (c *ControllerManager) Run(ctx context.Context, workers int) (retErr error) { +func (n *Node) Run(ctx context.Context) (retErr error) { ctx, cancel := context.WithCancel(ctx) - defer cancel() - - go c.pc.Run(ctx, workers) // nolint:errcheck - defer func() { cancel() - <-c.pc.Done() + n.err = retErr + close(n.done) + }() - c.err = retErr - close(c.done) + if n.eb != nil { + n.eb.StartLogging(log.G(ctx).Infof) + n.eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: n.client.CoreV1().Events(v1.NamespaceAll)}) + defer n.eb.Shutdown() + log.G(ctx).Debug("Started event broadcaster") + } + + cancelHTTP, err := n.runHTTP(ctx) + if err != nil { + return err + } + defer cancelHTTP() + + go n.podInformerFactory.Start(ctx.Done()) + go n.scmInformerFactory.Start(ctx.Done()) + go n.pc.Run(ctx, n.workers) //nolint:errcheck + + defer func() { + cancel() + <-n.pc.Done() }() select { case <-ctx.Done(): - return c.err - case <-c.pc.Ready(): - case <-c.pc.Done(): - return c.pc.Err() + return n.err + case <-n.pc.Ready(): + case <-n.pc.Done(): + return n.pc.Err() } - go c.nc.Run(ctx) // nolint:errcheck + log.G(ctx).Debug("pod controller ready") + + go n.nc.Run(ctx) //nolint:errcheck defer func() { cancel() - <-c.nc.Done() + <-n.nc.Done() }() select { case <-ctx.Done(): - c.err = ctx.Err() - return c.err - case <-c.nc.Ready(): - case <-c.nc.Done(): - return c.nc.Err() + n.err = ctx.Err() + return n.err + case <-n.nc.Ready(): + case <-n.nc.Done(): + return n.nc.Err() } - close(c.ready) + log.G(ctx).Debug("node controller ready") + + if n.readyCb != nil { + if err := n.readyCb(ctx); err != nil { + return err + } + } + close(n.ready) select { - case <-c.nc.Done(): + case <-n.nc.Done(): cancel() - return c.nc.Err() - case <-c.pc.Done(): + return n.nc.Err() + case <-n.pc.Done(): cancel() - return c.pc.Err() + return n.pc.Err() } } // WaitReady waits for the specified timeout for the controller to be ready. // // The timeout is for convenience so the caller doesn't have to juggle an extra context. -func (c *ControllerManager) WaitReady(ctx context.Context, timeout time.Duration) error { +func (n *Node) WaitReady(ctx context.Context, timeout time.Duration) error { if timeout > 0 { var cancel func() ctx, cancel = context.WithTimeout(ctx, timeout) @@ -104,33 +176,255 @@ func (c *ControllerManager) WaitReady(ctx context.Context, timeout time.Duration } select { - case <-c.ready: + case <-n.ready: return nil - case <-c.done: - return fmt.Errorf("controller exited before ready: %w", c.err) + case <-n.done: + return fmt.Errorf("controller exited before ready: %w", n.err) case <-ctx.Done(): return ctx.Err() } } // Ready returns a channel that will be closed after the controller is ready. -func (c *ControllerManager) Ready() <-chan struct{} { - return c.ready +func (n *Node) Ready() <-chan struct{} { + return n.ready } // Done returns a channel that will be closed when the controller has exited. -func (c *ControllerManager) Done() <-chan struct{} { - return c.done +func (n *Node) Done() <-chan struct{} { + return n.done } // Err returns any error that occurred with the controller. // // This always return nil before `<-Done()`. -func (c *ControllerManager) Err() error { +func (n *Node) Err() error { select { - case <-c.Done(): - return c.err + case <-n.Done(): + return n.err default: return nil } } + +// NodeOpt is used as functional options when configuring a new node in NewNodeFromClient +type NodeOpt func(c *NodeConfig) error + +// NodeConfig is used to hold configuration items for a Node. +// It gets used in conjection with NodeOpt in NewNodeFromClient +type NodeConfig struct { + // Set the client to use, otherwise a client will be created from ClientsetFromEnv + Client kubernetes.Interface + + // Set the node spec to register with Kubernetes + NodeSpec v1.Node + // Set the path to read a kubeconfig from for creating a client. + // This is ignored when a client is provided to NewNodeFromClient + KubeconfigPath string + // Set the period for a full resync for generated client-go informers + InformerResyncPeriod time.Duration + + // Set the address to listen on for the http API + HTTPListenAddr string + // Set a custom API handler to use. + // You can use this to setup, for example, authentication middleware. + // If one is not provided a default one will be created. + // + // Note: If you provide your own handler, you'll need to handle all auth, routes, etc. + Handler http.Handler + // Set the timeout for idle http streams + StreamIdleTimeout time.Duration + // Set the timeout for creating http streams + StreamCreationTimeout time.Duration + // Enable http debugging routes + DebugHTTP bool + // Set the tls config to use for the http server + TLSConfig *tls.Config + + // Specify the event recorder to use + // If this is not provided, a default one will be used. + EventRecorder record.EventRecorder + + // Set the number of workers to reconcile pods + // The default value is derived from the number of cores available. + NumWorkers int + + routeAttacher func(Provider, NodeConfig, corev1listers.PodLister) +} + +// WithNodeConfig returns a NodeOpt which replaces the NodeConfig with the passed in value. +func WithNodeConfig(c NodeConfig) NodeOpt { + return func(orig *NodeConfig) error { + *orig = c + return nil + } +} + +// WithClient return a NodeOpt that sets the client that will be used to create/manage the node. +func WithClient(c kubernetes.Interface) NodeOpt { + return func(cfg *NodeConfig) error { + cfg.Client = c + return nil + } +} + +// NewNode creates a new node using the provided client and name. +// This is intended for high-level/low boiler-plate usage. +// Use the constructors in the `node` package for lower level configuration. +// +// Some basic values are set for node status, you'll almost certainly want to modify it. +// +// If client is nil, this will construct a client using ClientsetFromEnv +// +// It is up to the caller to configure auth on the HTTP handler. +func NewNode(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) { + cfg := NodeConfig{ + NumWorkers: runtime.NumCPU(), + InformerResyncPeriod: time.Minute, + KubeconfigPath: os.Getenv("KUBECONFIG"), + HTTPListenAddr: ":10250", + NodeSpec: v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "type": "virtual-kubelet", + "kubernetes.io/role": "agent", + "kubernetes.io/hostname": name, + }, + }, + Status: v1.NodeStatus{ + Phase: v1.NodePending, + Conditions: []v1.NodeCondition{ + {Type: v1.NodeReady}, + {Type: v1.NodeDiskPressure}, + {Type: v1.NodeMemoryPressure}, + {Type: v1.NodePIDPressure}, + {Type: v1.NodeNetworkUnavailable}, + }, + }, + }, + } + + for _, o := range opts { + if err := o(&cfg); err != nil { + return nil, err + } + } + + if _, _, err := net.SplitHostPort(cfg.HTTPListenAddr); err != nil { + return nil, errors.Wrap(err, "error parsing http listen address") + } + + if cfg.Client == nil { + var err error + cfg.Client, err = ClientsetFromEnv(cfg.KubeconfigPath) + if err != nil { + return nil, errors.Wrap(err, "error creating clientset from env") + } + } + + podInformerFactory := informers.NewSharedInformerFactoryWithOptions( + cfg.Client, + cfg.InformerResyncPeriod, + PodInformerFilter(name), + ) + + scmInformerFactory := informers.NewSharedInformerFactoryWithOptions( + cfg.Client, + cfg.InformerResyncPeriod, + ) + + podInformer := podInformerFactory.Core().V1().Pods() + secretInformer := scmInformerFactory.Core().V1().Secrets() + configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() + serviceInformer := scmInformerFactory.Core().V1().Services() + + p, np, err := newProvider(ProviderConfig{ + Pods: podInformer.Lister(), + ConfigMaps: configMapInformer.Lister(), + Secrets: secretInformer.Lister(), + Services: serviceInformer.Lister(), + Node: &cfg.NodeSpec, + }) + if err != nil { + return nil, errors.Wrap(err, "error creating provider") + } + + if cfg.routeAttacher != nil { + cfg.routeAttacher(p, cfg, podInformer.Lister()) + } + + var readyCb func(context.Context) error + if np == nil { + nnp := node.NewNaiveNodeProvider() + np = nnp + + readyCb = func(ctx context.Context) error { + setNodeReady(&cfg.NodeSpec) + err := nnp.UpdateStatus(ctx, &cfg.NodeSpec) + return errors.Wrap(err, "error marking node as ready") + } + } + + nc, err := node.NewNodeController( + np, + &cfg.NodeSpec, + cfg.Client.CoreV1().Nodes(), + node.WithNodeEnableLeaseV1(NodeLeaseV1Client(cfg.Client), node.DefaultLeaseDuration), + ) + if err != nil { + return nil, errors.Wrap(err, "error creating node controller") + } + + var eb record.EventBroadcaster + if cfg.EventRecorder == nil { + eb = record.NewBroadcaster() + cfg.EventRecorder = eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(name, "pod-controller")}) + } + + pc, err := node.NewPodController(node.PodControllerConfig{ + PodClient: cfg.Client.CoreV1(), + EventRecorder: cfg.EventRecorder, + Provider: p, + PodInformer: podInformer, + SecretInformer: secretInformer, + ConfigMapInformer: configMapInformer, + ServiceInformer: serviceInformer, + }) + if err != nil { + return nil, errors.Wrap(err, "error creating pod controller") + } + + return &Node{ + nc: nc, + pc: pc, + readyCb: readyCb, + ready: make(chan struct{}), + done: make(chan struct{}), + eb: eb, + podInformerFactory: podInformerFactory, + scmInformerFactory: scmInformerFactory, + client: cfg.Client, + tlsConfig: cfg.TLSConfig, + h: cfg.Handler, + listenAddr: cfg.HTTPListenAddr, + workers: cfg.NumWorkers, + }, nil +} + +func setNodeReady(n *v1.Node) { + n.Status.Phase = v1.NodeRunning + for i, c := range n.Status.Conditions { + if c.Type != "Ready" { + continue + } + + c.Message = "Kubelet is ready" + c.Reason = "KubeletReady" + c.Status = v1.ConditionTrue + c.LastHeartbeatTime = metav1.Now() + c.LastTransitionTime = metav1.Now() + n.Status.Conditions[i] = c + return + } +} diff --git a/node/nodeutil/provider.go b/node/nodeutil/provider.go new file mode 100644 index 000000000..0f6a72f03 --- /dev/null +++ b/node/nodeutil/provider.go @@ -0,0 +1,70 @@ +package nodeutil + +import ( + "context" + "io" + + "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/api" + "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + corev1listers "k8s.io/client-go/listers/core/v1" +) + +// Provider contains the methods required to implement a virtual-kubelet provider. +// +// Errors produced by these methods should implement an interface from +// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the +// core logic to be able to understand the type of failure +type Provider interface { + node.PodLifecycleHandler + + // GetContainerLogs retrieves the logs of a container by name from the provider. + GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) + + // RunInContainer executes a command in a container in the pod, copying data + // between in/out/err and the container's stdin/stdout/stderr. + RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error + + // GetStatsSummary gets the stats for the node, including running pods + GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) +} + +// ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself. +type ProviderConfig struct { + Pods corev1listers.PodLister + ConfigMaps corev1listers.ConfigMapLister + Secrets corev1listers.SecretLister + Services corev1listers.ServiceLister + // Hack to allow the provider to set things on the node + // Since the provider is bootstrapped after the node object is configured + // Primarily this is due to carry-over from the pre-1.0 interfaces that expect the provider instead of the direct *caller* to configure the node. + Node *v1.Node +} + +// NewProviderFunc is used from NewNodeFromClient to bootstrap a provider using the client/listers/etc created there. +// If a nil node provider is returned a default one will be used. +type NewProviderFunc func(ProviderConfig) (Provider, node.NodeProvider, error) + +// AttachProviderRoutes returns a NodeOpt which uses api.PodHandler to attach the routes to the provider functions. +// +// Note this only attaches routes, you'll need to ensure to set the handler in the node config. +func AttachProviderRoutes(mux api.ServeMux) NodeOpt { + return func(cfg *NodeConfig) error { + cfg.routeAttacher = func(p Provider, cfg NodeConfig, pods corev1listers.PodLister) { + mux.Handle("/", api.PodHandler(api.PodHandlerConfig{ + RunInContainer: p.RunInContainer, + GetContainerLogs: p.GetContainerLogs, + GetPods: p.GetPods, + GetPodsFromKubernetes: func(context.Context) ([]*v1.Pod, error) { + return pods.List(labels.Everything()) + }, + GetStatsSummary: p.GetStatsSummary, + StreamIdleTimeout: cfg.StreamIdleTimeout, + StreamCreationTimeout: cfg.StreamCreationTimeout, + }, true)) + } + return nil + } +} diff --git a/node/nodeutil/tls.go b/node/nodeutil/tls.go new file mode 100644 index 000000000..ac29e3f1b --- /dev/null +++ b/node/nodeutil/tls.go @@ -0,0 +1,83 @@ +package nodeutil + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" +) + +// WithTLSConfig returns a NodeOpt which creates a base TLSConfig with the default cipher suites and tls min verions. +// The tls config can be modified through functional options. +func WithTLSConfig(opts ...func(*tls.Config) error) NodeOpt { + return func(cfg *NodeConfig) error { + tlsCfg := &tls.Config{ + MinVersion: tls.VersionTLS12, + PreferServerCipherSuites: true, + CipherSuites: DefaultServerCiphers(), + ClientAuth: tls.RequestClientCert, + } + for _, o := range opts { + if err := o(tlsCfg); err != nil { + return err + } + } + + cfg.TLSConfig = tlsCfg + return nil + } +} + +// WithCAFromPath makes a TLS config option to set up client auth using the path to a PEM encoded CA cert. +func WithCAFromPath(p string) func(*tls.Config) error { + return func(cfg *tls.Config) error { + pem, err := ioutil.ReadFile(p) + if err != nil { + return fmt.Errorf("error reading ca cert pem: %w", err) + } + cfg.ClientAuth = tls.RequireAndVerifyClientCert + return WithCACert(pem)(cfg) + } +} + +// WithKeyPairFromPath make sa TLS config option which loads the key pair paths from disk and appends them to the tls config. +func WithKeyPairFromPath(cert, key string) func(*tls.Config) error { + return func(cfg *tls.Config) error { + cert, err := tls.LoadX509KeyPair(cert, key) + if err != nil { + return err + } + cfg.Certificates = append(cfg.Certificates, cert) + return nil + } +} + +// WithCACert makes a TLS config opotion which appends the provided PEM encoded bytes the tls config's cert pool. +// If a cert pool is not defined on the tls config an empty one will be created. +func WithCACert(pem []byte) func(*tls.Config) error { + return func(cfg *tls.Config) error { + if cfg.ClientCAs == nil { + cfg.ClientCAs = x509.NewCertPool() + } + if !cfg.ClientCAs.AppendCertsFromPEM(pem) { + return fmt.Errorf("could not parse ca cert pem") + } + return nil + } +} + +// DefaultServerCiphers is the list of accepted TLS ciphers, with known weak ciphers elided +// Note this list should be a moving target. +func DefaultServerCiphers() []uint16 { + return []uint16{ + tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, + + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + } +} diff --git a/test/e2e/node.go b/test/e2e/node.go deleted file mode 100644 index 61bd3aa84..000000000 --- a/test/e2e/node.go +++ /dev/null @@ -1,61 +0,0 @@ -package e2e - -import ( - "context" - "testing" - "time" - - "gotest.tools/assert" - is "gotest.tools/assert/cmp" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - watchapi "k8s.io/apimachinery/pkg/watch" -) - -// TestNodeCreateAfterDelete makes sure that a node is automatically recreated -// if it is deleted while VK is running. -func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - podList, err := f.KubeClient.CoreV1().Pods(f.Namespace).List(ctx, metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", f.NodeName).String(), - }) - - assert.NilError(t, err) - assert.Assert(t, is.Len(podList.Items, 0), "Kubernetes does not allow node deletion with dependent objects (pods) in existence: %v") - - chErr := make(chan error, 1) - - originalNode, err := f.GetNode(ctx) - assert.NilError(t, err) - - ctx, cancel = context.WithTimeout(ctx, time.Minute) - defer cancel() - - go func() { - wait := func(e watchapi.Event) (bool, error) { - err = ctx.Err() - // Our timeout has expired - if err != nil { - return true, err - } - if e.Type == watchapi.Deleted || e.Type == watchapi.Error { - return false, nil - } - - return originalNode.ObjectMeta.UID != e.Object.(*v1.Node).ObjectMeta.UID, nil - } - chErr <- f.WaitUntilNodeCondition(wait) - }() - - assert.NilError(t, f.DeleteNode(ctx)) - - select { - case result := <-chErr: - assert.NilError(t, result, "Did not observe new node object created after deletion") - case <-ctx.Done(): - t.Fatal("Test timed out while waiting for node object to be deleted / recreated") - } -} diff --git a/website/data/cli.yaml b/website/data/cli.yaml index 0cd709df3..e0b795f44 100644 --- a/website/data/cli.yaml +++ b/website/data/cli.yaml @@ -38,7 +38,7 @@ flags: default: virtual-kubelet - name: --os arg: string - description: The operating system (must be `Linux` or `Windows`) + description: The operating system (must be `linux` or `windows`) default: Linux - name: --pod-sync-workers arg: int