From e1342777d6dc0e72ee3ce13d5ae91600af00890a Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Thu, 3 Jun 2021 23:55:09 +0000 Subject: [PATCH] Add API config to node set This moves API handling into the node object so now everything can be done in one place. TLS is required. In the current form, auth must be setup by the caller. --- .../internal/commands/root/http.go | 95 --------------- .../internal/commands/root/root.go | 40 +++---- .../internal/provider/provider.go | 26 +--- hack/skaffold/virtual-kubelet/pod.yml | 7 -- internal/test/e2e/framework/stats.go | 3 +- node/nodeutil/controller.go | 111 ++++++++++++++---- node/nodeutil/provider.go | 47 ++++++++ 7 files changed, 156 insertions(+), 173 deletions(-) create mode 100644 node/nodeutil/provider.go diff --git a/cmd/virtual-kubelet/internal/commands/root/http.go b/cmd/virtual-kubelet/internal/commands/root/http.go index b20a4ff7d..2b9633edf 100644 --- a/cmd/virtual-kubelet/internal/commands/root/http.go +++ b/cmd/virtual-kubelet/internal/commands/root/http.go @@ -15,19 +15,12 @@ package root import ( - "context" "crypto/tls" "fmt" - "io" - "net" - "net/http" "os" "time" "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider" - "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/node/api" ) // AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided @@ -58,94 +51,6 @@ func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) { }, nil } -func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig, getPodsFromKubernetes api.PodListerFunc) (_ func(), retErr error) { - var closers []io.Closer - cancel := func() { - for _, c := range closers { - c.Close() - } - } - defer func() { - if retErr != nil { - cancel() - } - }() - - if cfg.CertPath == "" || cfg.KeyPath == "" { - log.G(ctx). - WithField("certPath", cfg.CertPath). - WithField("keyPath", cfg.KeyPath). - Error("TLS certificates not provided, not setting up pod http server") - } else { - tlsCfg, err := loadTLSConfig(cfg.CertPath, cfg.KeyPath) - if err != nil { - return nil, err - } - l, err := tls.Listen("tcp", cfg.Addr, tlsCfg) - if err != nil { - return nil, errors.Wrap(err, "error setting up listener for pod http server") - } - - mux := http.NewServeMux() - - podRoutes := api.PodHandlerConfig{ - RunInContainer: p.RunInContainer, - GetContainerLogs: p.GetContainerLogs, - GetPodsFromKubernetes: getPodsFromKubernetes, - GetPods: p.GetPods, - StreamIdleTimeout: cfg.StreamIdleTimeout, - StreamCreationTimeout: cfg.StreamCreationTimeout, - } - - api.AttachPodRoutes(podRoutes, mux, true) - - s := &http.Server{ - Handler: mux, - TLSConfig: tlsCfg, - } - go serveHTTP(ctx, s, l, "pods") - closers = append(closers, s) - } - - if cfg.MetricsAddr == "" { - log.G(ctx).Info("Pod metrics server not setup due to empty metrics address") - } else { - l, err := net.Listen("tcp", cfg.MetricsAddr) - if err != nil { - return nil, errors.Wrap(err, "could not setup listener for pod metrics http server") - } - - mux := http.NewServeMux() - - var summaryHandlerFunc api.PodStatsSummaryHandlerFunc - if mp, ok := p.(provider.PodMetricsProvider); ok { - summaryHandlerFunc = mp.GetStatsSummary - } - podMetricsRoutes := api.PodMetricsConfig{ - GetStatsSummary: summaryHandlerFunc, - } - api.AttachPodMetricsRoutes(podMetricsRoutes, mux) - s := &http.Server{ - Handler: mux, - } - go serveHTTP(ctx, s, l, "pod metrics") - closers = append(closers, s) - } - - return cancel, nil -} - -func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string) { - if err := s.Serve(l); err != nil { - select { - case <-ctx.Done(): - default: - log.G(ctx).WithError(err).Errorf("Error setting up %s http server", name) - } - } - l.Close() -} - type apiServerConfig struct { CertPath string KeyPath string diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index 7373bc862..c4777023d 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -74,14 +74,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } - cancelHTTP := func() {} - defer func() { - // note: this is purposefully using a closure so that when this is actually set the correct function will be called - if cancelHTTP != nil { - cancelHTTP() - } - }() - newProvider := func(cfg nodeutil.ProviderConfig) (node.PodLifecycleHandler, node.NodeProvider, error) { + newProvider := func(cfg nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) { var err error rm, err := manager.NewResourceManager(cfg.Pods, cfg.Secrets, cfg.ConfigMaps, cfg.Services) if err != nil { @@ -107,18 +100,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } p.ConfigureNode(ctx, cfg.Node) - apiConfig, err := getAPIConfig(c) - if err != nil { - return nil, nil, err - } - - cancelHTTP, err = setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) { - return rm.GetPods(), nil - }) - if err != nil { - return nil, nil, err - } - return p, nil, nil } @@ -131,6 +112,21 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { cfg.NodeSpec.Status.NodeInfo.Architecture = runtime.GOARCH cfg.NodeSpec.Status.NodeInfo.OperatingSystem = c.OperatingSystem + apiConfig, err := getAPIConfig(c) + if err != nil { + return err + } + + cfg.HTTPListenAddr = apiConfig.Addr + cfg.StreamCreationTimeout = apiConfig.StreamCreationTimeout + cfg.StreamIdleTimeout = apiConfig.StreamIdleTimeout + + cfg.TLSConfig, err = loadTLSConfig(apiConfig.CertPath, apiConfig.KeyPath) + if err != nil { + return errors.Wrap(err, "error loading tls config") + } + + cfg.DebugHTTP = true return nil }) if err != nil { @@ -148,8 +144,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { "watchedNamespace": c.KubeNamespace, })) - defer cancelHTTP() - go cm.Run(ctx, c.PodSyncWorkers) // nolint:errcheck defer func() { @@ -163,6 +157,8 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } + log.G(ctx).Info("Ready") + select { case <-ctx.Done(): case <-cm.Done(): diff --git a/cmd/virtual-kubelet/internal/provider/provider.go b/cmd/virtual-kubelet/internal/provider/provider.go index 1af5e58c8..a48630eb5 100644 --- a/cmd/virtual-kubelet/internal/provider/provider.go +++ b/cmd/virtual-kubelet/internal/provider/provider.go @@ -2,35 +2,15 @@ package provider import ( "context" - "io" - "github.com/virtual-kubelet/virtual-kubelet/node" - "github.com/virtual-kubelet/virtual-kubelet/node/api" - "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" v1 "k8s.io/api/core/v1" ) -// Provider contains the methods required to implement a virtual-kubelet provider. -// -// Errors produced by these methods should implement an interface from -// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the -// core logic to be able to understand the type of failure. +// Provider wraps the core provider type with an extra function needed to bootstrap the node type Provider interface { - node.PodLifecycleHandler - - // GetContainerLogs retrieves the logs of a container by name from the provider. - GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) - - // RunInContainer executes a command in a container in the pod, copying data - // between in/out/err and the container's stdin/stdout/stderr. - RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error - + nodeutil.Provider // ConfigureNode enables a provider to configure the node object that // will be used for Kubernetes. ConfigureNode(context.Context, *v1.Node) } - -// PodMetricsProvider is an optional interface that providers can implement to expose pod stats -type PodMetricsProvider interface { - GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) -} diff --git a/hack/skaffold/virtual-kubelet/pod.yml b/hack/skaffold/virtual-kubelet/pod.yml index 48e2c66a5..e95455b00 100644 --- a/hack/skaffold/virtual-kubelet/pod.yml +++ b/hack/skaffold/virtual-kubelet/pod.yml @@ -30,11 +30,4 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - ports: - - name: metrics - containerPort: 10255 - readinessProbe: - httpGet: - path: /stats/summary - port: metrics serviceAccountName: virtual-kubelet diff --git a/internal/test/e2e/framework/stats.go b/internal/test/e2e/framework/stats.go index 884c8ad0d..5e39d688a 100644 --- a/internal/test/e2e/framework/stats.go +++ b/internal/test/e2e/framework/stats.go @@ -3,7 +3,6 @@ package framework import ( "context" "encoding/json" - "strconv" stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" "k8s.io/apimachinery/pkg/util/net" @@ -18,7 +17,7 @@ func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error) Namespace(f.Namespace). Resource("pods"). SubResource("proxy"). - Name(net.JoinSchemeNamePort("http", f.NodeName, strconv.Itoa(10255))). + Name(net.JoinSchemeNamePort("https", f.NodeName, "10250")). Suffix("/stats/summary").DoRaw(ctx) if err != nil { return nil, err diff --git a/node/nodeutil/controller.go b/node/nodeutil/controller.go index 7f145c061..0500f9a6f 100644 --- a/node/nodeutil/controller.go +++ b/node/nodeutil/controller.go @@ -2,7 +2,10 @@ package nodeutil import ( "context" + "crypto/tls" "fmt" + "net" + "net/http" "os" "path" "time" @@ -10,13 +13,14 @@ import ( "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/api" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" ) @@ -38,6 +42,10 @@ type Node struct { scmInformerFactory informers.SharedInformerFactory client kubernetes.Interface + listenAddr string + httpHandler HTTPHandler + tlsConfig *tls.Config + eb record.EventBroadcaster } @@ -54,7 +62,12 @@ func (n *Node) PodController() *node.PodController { // Run starts all the underlying controllers func (n *Node) Run(ctx context.Context, workers int) (retErr error) { ctx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + cancel() + + n.err = retErr + close(n.done) + }() if n.podInformerFactory != nil { go n.podInformerFactory.Start(ctx.Done()) @@ -68,15 +81,23 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) { n.eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: n.client.CoreV1().Events(v1.NamespaceAll)}) } - go n.pc.Run(ctx, workers) // nolint:errcheck + l, err := tls.Listen("tcp", n.listenAddr, n.tlsConfig) + if err != nil { + return errors.Wrap(err, "error starting http listener") + } + log.G(ctx).Debug("Started TLS listener") + defer l.Close() + + srv := &http.Server{Handler: n.httpHandler, TLSConfig: n.tlsConfig} + go srv.Serve(l) + defer srv.Close() + + go n.pc.Run(ctx, workers) //nolint:errcheck + log.G(ctx).Debug("HTTP server running") defer func() { cancel() - <-n.pc.Done() - - n.err = retErr - close(n.done) }() select { @@ -87,6 +108,8 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) { return n.pc.Err() } + log.G(ctx).Debug("pod controller ready") + go n.nc.Run(ctx) // nolint:errcheck defer func() { @@ -103,6 +126,8 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) { return n.nc.Err() } + log.G(ctx).Debug("node controller ready") + if n.readyCb != nil { if err := n.readyCb(ctx); err != nil { return err @@ -162,18 +187,6 @@ func (n *Node) Err() error { } } -// ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself. -type ProviderConfig struct { - Pods corev1listers.PodLister - ConfigMaps corev1listers.ConfigMapLister - Secrets corev1listers.SecretLister - Services corev1listers.ServiceLister - // Hack to allow the provider to set things on the node - // Since the provider is bootstrapped after the node object is configured - // Primarily this is due to carry-over from the pre-1.0 interfaces that expect the provider to configure the node. - Node *v1.Node -} - // NodeOpt is used as functional options when configuring a new node in NewNodeFromClient type NodeOpt func(c *NodeConfig) error @@ -187,11 +200,32 @@ type NodeConfig struct { KubeconfigPath string // Set the period for a full resync for generated client-go informers InformerResyncPeriod time.Duration + + // Set the address to listen on for the http API + HTTPListenAddr string + // Set a custom API handler to use. + // You can use this to setup, for example, authentication middleware. + // If one is not provided a default one will be created. + // Pod routes will be attached to this handler when creating the node + HTTPHandler HTTPHandler + // Set the timeout for idle http streams + StreamIdleTimeout time.Duration + // Set the timeout for creating http streams + StreamCreationTimeout time.Duration + // Enable http debugging routes + DebugHTTP bool + // Set the tls config to use for the http server + TLSConfig *tls.Config + + // Specify the event recorder to use + // If this is not provided, a default one will be used. + EventRecorder record.EventRecorder } -// NewProviderFunc is used from NewNodeFromClient to bootstrap a provider using the client/listers/etc created there. -// If a nil node provider is returned a default one will be used. -type NewProviderFunc func(ProviderConfig) (node.PodLifecycleHandler, node.NodeProvider, error) +type HTTPHandler interface { + api.ServeMux + http.Handler +} // WithNodeConfig returns a NodeOpt which replaces the NodeConfig with the passed in value. func WithNodeConfig(c NodeConfig) NodeOpt { @@ -218,6 +252,7 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New // TODO: this is what was set in the cli code... its not clear what a good value actually is. InformerResyncPeriod: time.Minute, KubeconfigPath: os.Getenv("KUBECONFIG"), + HTTPListenAddr: ":10250", NodeSpec: v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -246,6 +281,14 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New } } + if _, _, err := net.SplitHostPort(cfg.HTTPListenAddr); err != nil { + return nil, errors.Wrap(err, "error parsing http listen address") + } + + if cfg.HTTPHandler == nil { + cfg.HTTPHandler = http.NewServeMux() + } + if client == nil { var err error client, err = ClientsetFromEnv(cfg.KubeconfigPath) @@ -281,6 +324,18 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New return nil, errors.Wrap(err, "error creating provider") } + api.AttachPodRoutes(api.PodHandlerConfig{ + RunInContainer: p.RunInContainer, + GetContainerLogs: p.GetContainerLogs, + GetPods: p.GetPods, + GetPodsFromKubernetes: func(context.Context) ([]*v1.Pod, error) { + return podInformer.Lister().List(labels.Everything()) + }, + GetStatsSummary: p.GetStatsSummary, + StreamIdleTimeout: cfg.StreamIdleTimeout, + StreamCreationTimeout: cfg.StreamCreationTimeout, + }, cfg.HTTPHandler, cfg.DebugHTTP) + var readyCb func(context.Context) error if np == nil { nnp := node.NewNaiveNodeProvider() @@ -303,11 +358,15 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New return nil, errors.Wrap(err, "error creating node controller") } - eb := record.NewBroadcaster() + var eb record.EventBroadcaster + if cfg.EventRecorder == nil { + eb := record.NewBroadcaster() + cfg.EventRecorder = eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(name, "pod-controller")}) + } pc, err := node.NewPodController(node.PodControllerConfig{ PodClient: client.CoreV1(), - EventRecorder: eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(name, "pod-controller")}), + EventRecorder: cfg.EventRecorder, Provider: p, PodInformer: podInformer, SecretInformer: secretInformer, @@ -317,6 +376,7 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New if err != nil { return nil, errors.Wrap(err, "error creating pod controller") } + return &Node{ nc: nc, pc: pc, @@ -327,6 +387,9 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New podInformerFactory: podInformerFactory, scmInformerFactory: scmInformerFactory, client: client, + tlsConfig: cfg.TLSConfig, + httpHandler: cfg.HTTPHandler, + listenAddr: cfg.HTTPListenAddr, }, nil } diff --git a/node/nodeutil/provider.go b/node/nodeutil/provider.go new file mode 100644 index 000000000..b85b454cb --- /dev/null +++ b/node/nodeutil/provider.go @@ -0,0 +1,47 @@ +package nodeutil + +import ( + "context" + "io" + + "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/api" + "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + v1 "k8s.io/api/core/v1" + corev1listers "k8s.io/client-go/listers/core/v1" +) + +// Provider contains the methods required to implement a virtual-kubelet provider. +// +// Errors produced by these methods should implement an interface from +// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the +// core logic to be able to understand the type of failure +type Provider interface { + node.PodLifecycleHandler + + // GetContainerLogs retrieves the logs of a container by name from the provider. + GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) + + // RunInContainer executes a command in a container in the pod, copying data + // between in/out/err and the container's stdin/stdout/stderr. + RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error + + // GetStatsSummary gets the stats for the node, including running pods + GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) +} + +// ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself. +type ProviderConfig struct { + Pods corev1listers.PodLister + ConfigMaps corev1listers.ConfigMapLister + Secrets corev1listers.SecretLister + Services corev1listers.ServiceLister + // Hack to allow the provider to set things on the node + // Since the provider is bootstrapped after the node object is configured + // Primarily this is due to carry-over from the pre-1.0 interfaces that expect the provider instead of the direct *caller* to configure the node. + Node *v1.Node +} + +// NewProviderFunc is used from NewNodeFromClient to bootstrap a provider using the client/listers/etc created there. +// If a nil node provider is returned a default one will be used. +type NewProviderFunc func(ProviderConfig) (Provider, node.NodeProvider, error)