diff --git a/cmd/virtual-kubelet/internal/commands/root/http.go b/cmd/virtual-kubelet/internal/commands/root/http.go index b20a4ff7d..2b9633edf 100644 --- a/cmd/virtual-kubelet/internal/commands/root/http.go +++ b/cmd/virtual-kubelet/internal/commands/root/http.go @@ -15,19 +15,12 @@ package root import ( - "context" "crypto/tls" "fmt" - "io" - "net" - "net/http" "os" "time" "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider" - "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/node/api" ) // AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided @@ -58,94 +51,6 @@ func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) { }, nil } -func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig, getPodsFromKubernetes api.PodListerFunc) (_ func(), retErr error) { - var closers []io.Closer - cancel := func() { - for _, c := range closers { - c.Close() - } - } - defer func() { - if retErr != nil { - cancel() - } - }() - - if cfg.CertPath == "" || cfg.KeyPath == "" { - log.G(ctx). - WithField("certPath", cfg.CertPath). - WithField("keyPath", cfg.KeyPath). - Error("TLS certificates not provided, not setting up pod http server") - } else { - tlsCfg, err := loadTLSConfig(cfg.CertPath, cfg.KeyPath) - if err != nil { - return nil, err - } - l, err := tls.Listen("tcp", cfg.Addr, tlsCfg) - if err != nil { - return nil, errors.Wrap(err, "error setting up listener for pod http server") - } - - mux := http.NewServeMux() - - podRoutes := api.PodHandlerConfig{ - RunInContainer: p.RunInContainer, - GetContainerLogs: p.GetContainerLogs, - GetPodsFromKubernetes: getPodsFromKubernetes, - GetPods: p.GetPods, - StreamIdleTimeout: cfg.StreamIdleTimeout, - StreamCreationTimeout: cfg.StreamCreationTimeout, - } - - api.AttachPodRoutes(podRoutes, mux, true) - - s := &http.Server{ - Handler: mux, - TLSConfig: tlsCfg, - } - go serveHTTP(ctx, s, l, "pods") - closers = append(closers, s) - } - - if cfg.MetricsAddr == "" { - log.G(ctx).Info("Pod metrics server not setup due to empty metrics address") - } else { - l, err := net.Listen("tcp", cfg.MetricsAddr) - if err != nil { - return nil, errors.Wrap(err, "could not setup listener for pod metrics http server") - } - - mux := http.NewServeMux() - - var summaryHandlerFunc api.PodStatsSummaryHandlerFunc - if mp, ok := p.(provider.PodMetricsProvider); ok { - summaryHandlerFunc = mp.GetStatsSummary - } - podMetricsRoutes := api.PodMetricsConfig{ - GetStatsSummary: summaryHandlerFunc, - } - api.AttachPodMetricsRoutes(podMetricsRoutes, mux) - s := &http.Server{ - Handler: mux, - } - go serveHTTP(ctx, s, l, "pod metrics") - closers = append(closers, s) - } - - return cancel, nil -} - -func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string) { - if err := s.Serve(l); err != nil { - select { - case <-ctx.Done(): - default: - log.G(ctx).WithError(err).Errorf("Error setting up %s http server", name) - } - } - l.Close() -} - type apiServerConfig struct { CertPath string KeyPath string diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index 7373bc862..c4777023d 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -74,14 +74,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } - cancelHTTP := func() {} - defer func() { - // note: this is purposefully using a closure so that when this is actually set the correct function will be called - if cancelHTTP != nil { - cancelHTTP() - } - }() - newProvider := func(cfg nodeutil.ProviderConfig) (node.PodLifecycleHandler, node.NodeProvider, error) { + newProvider := func(cfg nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) { var err error rm, err := manager.NewResourceManager(cfg.Pods, cfg.Secrets, cfg.ConfigMaps, cfg.Services) if err != nil { @@ -107,18 +100,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } p.ConfigureNode(ctx, cfg.Node) - apiConfig, err := getAPIConfig(c) - if err != nil { - return nil, nil, err - } - - cancelHTTP, err = setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) { - return rm.GetPods(), nil - }) - if err != nil { - return nil, nil, err - } - return p, nil, nil } @@ -131,6 +112,21 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { cfg.NodeSpec.Status.NodeInfo.Architecture = runtime.GOARCH cfg.NodeSpec.Status.NodeInfo.OperatingSystem = c.OperatingSystem + apiConfig, err := getAPIConfig(c) + if err != nil { + return err + } + + cfg.HTTPListenAddr = apiConfig.Addr + cfg.StreamCreationTimeout = apiConfig.StreamCreationTimeout + cfg.StreamIdleTimeout = apiConfig.StreamIdleTimeout + + cfg.TLSConfig, err = loadTLSConfig(apiConfig.CertPath, apiConfig.KeyPath) + if err != nil { + return errors.Wrap(err, "error loading tls config") + } + + cfg.DebugHTTP = true return nil }) if err != nil { @@ -148,8 +144,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { "watchedNamespace": c.KubeNamespace, })) - defer cancelHTTP() - go cm.Run(ctx, c.PodSyncWorkers) // nolint:errcheck defer func() { @@ -163,6 +157,8 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } + log.G(ctx).Info("Ready") + select { case <-ctx.Done(): case <-cm.Done(): diff --git a/cmd/virtual-kubelet/internal/provider/provider.go b/cmd/virtual-kubelet/internal/provider/provider.go index 1af5e58c8..a48630eb5 100644 --- a/cmd/virtual-kubelet/internal/provider/provider.go +++ b/cmd/virtual-kubelet/internal/provider/provider.go @@ -2,35 +2,15 @@ package provider import ( "context" - "io" - "github.com/virtual-kubelet/virtual-kubelet/node" - "github.com/virtual-kubelet/virtual-kubelet/node/api" - "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" v1 "k8s.io/api/core/v1" ) -// Provider contains the methods required to implement a virtual-kubelet provider. -// -// Errors produced by these methods should implement an interface from -// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the -// core logic to be able to understand the type of failure. +// Provider wraps the core provider type with an extra function needed to bootstrap the node type Provider interface { - node.PodLifecycleHandler - - // GetContainerLogs retrieves the logs of a container by name from the provider. - GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) - - // RunInContainer executes a command in a container in the pod, copying data - // between in/out/err and the container's stdin/stdout/stderr. - RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error - + nodeutil.Provider // ConfigureNode enables a provider to configure the node object that // will be used for Kubernetes. ConfigureNode(context.Context, *v1.Node) } - -// PodMetricsProvider is an optional interface that providers can implement to expose pod stats -type PodMetricsProvider interface { - GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) -} diff --git a/hack/skaffold/virtual-kubelet/pod.yml b/hack/skaffold/virtual-kubelet/pod.yml index 48e2c66a5..e95455b00 100644 --- a/hack/skaffold/virtual-kubelet/pod.yml +++ b/hack/skaffold/virtual-kubelet/pod.yml @@ -30,11 +30,4 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - ports: - - name: metrics - containerPort: 10255 - readinessProbe: - httpGet: - path: /stats/summary - port: metrics serviceAccountName: virtual-kubelet diff --git a/internal/test/e2e/framework/stats.go b/internal/test/e2e/framework/stats.go index 884c8ad0d..5e39d688a 100644 --- a/internal/test/e2e/framework/stats.go +++ b/internal/test/e2e/framework/stats.go @@ -3,7 +3,6 @@ package framework import ( "context" "encoding/json" - "strconv" stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" "k8s.io/apimachinery/pkg/util/net" @@ -18,7 +17,7 @@ func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error) Namespace(f.Namespace). Resource("pods"). SubResource("proxy"). - Name(net.JoinSchemeNamePort("http", f.NodeName, strconv.Itoa(10255))). + Name(net.JoinSchemeNamePort("https", f.NodeName, "10250")). Suffix("/stats/summary").DoRaw(ctx) if err != nil { return nil, err diff --git a/node/nodeutil/controller.go b/node/nodeutil/controller.go index 7f145c061..0500f9a6f 100644 --- a/node/nodeutil/controller.go +++ b/node/nodeutil/controller.go @@ -2,7 +2,10 @@ package nodeutil import ( "context" + "crypto/tls" "fmt" + "net" + "net/http" "os" "path" "time" @@ -10,13 +13,14 @@ import ( "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/api" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" ) @@ -38,6 +42,10 @@ type Node struct { scmInformerFactory informers.SharedInformerFactory client kubernetes.Interface + listenAddr string + httpHandler HTTPHandler + tlsConfig *tls.Config + eb record.EventBroadcaster } @@ -54,7 +62,12 @@ func (n *Node) PodController() *node.PodController { // Run starts all the underlying controllers func (n *Node) Run(ctx context.Context, workers int) (retErr error) { ctx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + cancel() + + n.err = retErr + close(n.done) + }() if n.podInformerFactory != nil { go n.podInformerFactory.Start(ctx.Done()) @@ -68,15 +81,23 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) { n.eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: n.client.CoreV1().Events(v1.NamespaceAll)}) } - go n.pc.Run(ctx, workers) // nolint:errcheck + l, err := tls.Listen("tcp", n.listenAddr, n.tlsConfig) + if err != nil { + return errors.Wrap(err, "error starting http listener") + } + log.G(ctx).Debug("Started TLS listener") + defer l.Close() + + srv := &http.Server{Handler: n.httpHandler, TLSConfig: n.tlsConfig} + go srv.Serve(l) + defer srv.Close() + + go n.pc.Run(ctx, workers) //nolint:errcheck + log.G(ctx).Debug("HTTP server running") defer func() { cancel() - <-n.pc.Done() - - n.err = retErr - close(n.done) }() select { @@ -87,6 +108,8 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) { return n.pc.Err() } + log.G(ctx).Debug("pod controller ready") + go n.nc.Run(ctx) // nolint:errcheck defer func() { @@ -103,6 +126,8 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) { return n.nc.Err() } + log.G(ctx).Debug("node controller ready") + if n.readyCb != nil { if err := n.readyCb(ctx); err != nil { return err @@ -162,18 +187,6 @@ func (n *Node) Err() error { } } -// ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself. -type ProviderConfig struct { - Pods corev1listers.PodLister - ConfigMaps corev1listers.ConfigMapLister - Secrets corev1listers.SecretLister - Services corev1listers.ServiceLister - // Hack to allow the provider to set things on the node - // Since the provider is bootstrapped after the node object is configured - // Primarily this is due to carry-over from the pre-1.0 interfaces that expect the provider to configure the node. - Node *v1.Node -} - // NodeOpt is used as functional options when configuring a new node in NewNodeFromClient type NodeOpt func(c *NodeConfig) error @@ -187,11 +200,32 @@ type NodeConfig struct { KubeconfigPath string // Set the period for a full resync for generated client-go informers InformerResyncPeriod time.Duration + + // Set the address to listen on for the http API + HTTPListenAddr string + // Set a custom API handler to use. + // You can use this to setup, for example, authentication middleware. + // If one is not provided a default one will be created. + // Pod routes will be attached to this handler when creating the node + HTTPHandler HTTPHandler + // Set the timeout for idle http streams + StreamIdleTimeout time.Duration + // Set the timeout for creating http streams + StreamCreationTimeout time.Duration + // Enable http debugging routes + DebugHTTP bool + // Set the tls config to use for the http server + TLSConfig *tls.Config + + // Specify the event recorder to use + // If this is not provided, a default one will be used. + EventRecorder record.EventRecorder } -// NewProviderFunc is used from NewNodeFromClient to bootstrap a provider using the client/listers/etc created there. -// If a nil node provider is returned a default one will be used. -type NewProviderFunc func(ProviderConfig) (node.PodLifecycleHandler, node.NodeProvider, error) +type HTTPHandler interface { + api.ServeMux + http.Handler +} // WithNodeConfig returns a NodeOpt which replaces the NodeConfig with the passed in value. func WithNodeConfig(c NodeConfig) NodeOpt { @@ -218,6 +252,7 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New // TODO: this is what was set in the cli code... its not clear what a good value actually is. InformerResyncPeriod: time.Minute, KubeconfigPath: os.Getenv("KUBECONFIG"), + HTTPListenAddr: ":10250", NodeSpec: v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -246,6 +281,14 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New } } + if _, _, err := net.SplitHostPort(cfg.HTTPListenAddr); err != nil { + return nil, errors.Wrap(err, "error parsing http listen address") + } + + if cfg.HTTPHandler == nil { + cfg.HTTPHandler = http.NewServeMux() + } + if client == nil { var err error client, err = ClientsetFromEnv(cfg.KubeconfigPath) @@ -281,6 +324,18 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New return nil, errors.Wrap(err, "error creating provider") } + api.AttachPodRoutes(api.PodHandlerConfig{ + RunInContainer: p.RunInContainer, + GetContainerLogs: p.GetContainerLogs, + GetPods: p.GetPods, + GetPodsFromKubernetes: func(context.Context) ([]*v1.Pod, error) { + return podInformer.Lister().List(labels.Everything()) + }, + GetStatsSummary: p.GetStatsSummary, + StreamIdleTimeout: cfg.StreamIdleTimeout, + StreamCreationTimeout: cfg.StreamCreationTimeout, + }, cfg.HTTPHandler, cfg.DebugHTTP) + var readyCb func(context.Context) error if np == nil { nnp := node.NewNaiveNodeProvider() @@ -303,11 +358,15 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New return nil, errors.Wrap(err, "error creating node controller") } - eb := record.NewBroadcaster() + var eb record.EventBroadcaster + if cfg.EventRecorder == nil { + eb := record.NewBroadcaster() + cfg.EventRecorder = eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(name, "pod-controller")}) + } pc, err := node.NewPodController(node.PodControllerConfig{ PodClient: client.CoreV1(), - EventRecorder: eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(name, "pod-controller")}), + EventRecorder: cfg.EventRecorder, Provider: p, PodInformer: podInformer, SecretInformer: secretInformer, @@ -317,6 +376,7 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New if err != nil { return nil, errors.Wrap(err, "error creating pod controller") } + return &Node{ nc: nc, pc: pc, @@ -327,6 +387,9 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New podInformerFactory: podInformerFactory, scmInformerFactory: scmInformerFactory, client: client, + tlsConfig: cfg.TLSConfig, + httpHandler: cfg.HTTPHandler, + listenAddr: cfg.HTTPListenAddr, }, nil } diff --git a/node/nodeutil/provider.go b/node/nodeutil/provider.go new file mode 100644 index 000000000..b85b454cb --- /dev/null +++ b/node/nodeutil/provider.go @@ -0,0 +1,47 @@ +package nodeutil + +import ( + "context" + "io" + + "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/api" + "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + v1 "k8s.io/api/core/v1" + corev1listers "k8s.io/client-go/listers/core/v1" +) + +// Provider contains the methods required to implement a virtual-kubelet provider. +// +// Errors produced by these methods should implement an interface from +// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the +// core logic to be able to understand the type of failure +type Provider interface { + node.PodLifecycleHandler + + // GetContainerLogs retrieves the logs of a container by name from the provider. + GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) + + // RunInContainer executes a command in a container in the pod, copying data + // between in/out/err and the container's stdin/stdout/stderr. + RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error + + // GetStatsSummary gets the stats for the node, including running pods + GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) +} + +// ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself. +type ProviderConfig struct { + Pods corev1listers.PodLister + ConfigMaps corev1listers.ConfigMapLister + Secrets corev1listers.SecretLister + Services corev1listers.ServiceLister + // Hack to allow the provider to set things on the node + // Since the provider is bootstrapped after the node object is configured + // Primarily this is due to carry-over from the pre-1.0 interfaces that expect the provider instead of the direct *caller* to configure the node. + Node *v1.Node +} + +// NewProviderFunc is used from NewNodeFromClient to bootstrap a provider using the client/listers/etc created there. +// If a nil node provider is returned a default one will be used. +type NewProviderFunc func(ProviderConfig) (Provider, node.NodeProvider, error)