diff --git a/cmd/http.go b/cmd/http.go new file mode 100644 index 000000000..f6685f859 --- /dev/null +++ b/cmd/http.go @@ -0,0 +1,131 @@ +package cmd + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "os" + "strconv" + + "github.com/cpuguy83/strongerrors" + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet" +) + +// AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided +// Note this list should be a moving target. +var AcceptedCiphers = []uint16{ + tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, + + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, +} + +func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) { + cert, err := tls.LoadX509KeyPair(certPath, keyPath) + if err != nil { + return nil, errors.Wrap(err, "error loading tls certs") + } + + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + MinVersion: tls.VersionTLS12, + PreferServerCipherSuites: true, + CipherSuites: AcceptedCiphers, + }, nil +} + +func setupHTTPServer(ctx context.Context, cfg *apiServerConfig) (io.Closer, io.Closer, error) { + var ( + podS *http.Server + metricsS *http.Server + ) + if cfg.CertPath == "" || cfg.KeyPath == "" { + log.G(ctx). + WithField("certPath", cfg.CertPath). + WithField("keyPath", cfg.KeyPath). + Error("TLS certificates not provided, not setting up pod http server") + } else { + tlsCfg, err := loadTLSConfig(cfg.CertPath, cfg.KeyPath) + if err != nil { + return nil, nil, err + } + l, err := tls.Listen("tcp", cfg.Addr, tlsCfg) + if err != nil { + return nil, nil, errors.Wrap(err, "error setting up listener for pod http server") + } + + mux := http.NewServeMux() + vkubelet.AttachPodRoutes(p, mux) + + podS = &http.Server{ + Handler: mux, + TLSConfig: tlsCfg, + } + go serveHTTP(ctx, podS, l, "pods") + } + + if cfg.MetricsAddr == "" { + log.G(ctx).Info("Pod metrics server not setup due to empty metrics address") + } else { + l, err := net.Listen("tcp", cfg.MetricsAddr) + if err != nil { + if l != nil { + podS.Close() + } + return nil, nil, errors.Wrap(err, "could not setup listenr for pod metrics http server") + } + + mux := http.NewServeMux() + vkubelet.AttachMetricsRoutes(p, mux) + metricsS = &http.Server{ + Handler: mux, + } + go serveHTTP(ctx, metricsS, l, "pod metrics") + } + + return podS, metricsS, nil +} + +func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string) { + if err := s.Serve(l); err != nil { + select { + case <-ctx.Done(): + default: + log.G(ctx).WithError(err).Errorf("Error setting up %s http server", name) + } + } + l.Close() +} + +type apiServerConfig struct { + CertPath string + KeyPath string + Addr string + MetricsAddr string +} + +func getAPIConfig(metricsAddr string) (*apiServerConfig, error) { + config := apiServerConfig{ + CertPath: os.Getenv("APISERVER_CERT_LOCATION"), + KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), + } + + port, err := strconv.Atoi(os.Getenv("KUBELET_PORT")) + if err != nil { + return nil, strongerrors.InvalidArgument(errors.Wrap(err, "error parsing KUBELET_PORT variable")) + } + config.Addr = fmt.Sprintf(":%d", port) + config.MetricsAddr = metricsAddr + + return &config, nil +} diff --git a/cmd/root.go b/cmd/root.go index 22be92ed0..9817be20c 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -26,7 +26,6 @@ import ( "time" "github.com/Sirupsen/logrus" - "github.com/cpuguy83/strongerrors" "github.com/mitchellh/go-homedir" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -69,7 +68,7 @@ var taint *corev1.Taint var k8sClient *kubernetes.Clientset var p providers.Provider var rm *manager.ResourceManager -var apiConfig vkubelet.APIConfig +var apiConfig *apiServerConfig var podInformer corev1informers.PodInformer var kubeSharedInformerFactoryResync time.Duration var podSyncWorkers int @@ -91,21 +90,16 @@ This allows users to schedule kubernetes workloads on nodes that aren't running Run: func(cmd *cobra.Command, args []string) { defer rootContextCancel() - f, err := vkubelet.New(rootContext, vkubelet.Config{ + vk := vkubelet.New(vkubelet.Config{ Client: k8sClient, Namespace: kubeNamespace, NodeName: nodeName, Taint: taint, - MetricsAddr: metricsAddr, Provider: p, ResourceManager: rm, - APIConfig: apiConfig, PodSyncWorkers: podSyncWorkers, PodInformer: podInformer, }) - if err != nil { - log.L.WithError(err).Fatal("Error initializing virtual kubelet") - } sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) @@ -114,8 +108,16 @@ This allows users to schedule kubernetes workloads on nodes that aren't running rootContextCancel() }() - if err := f.Run(rootContext); err != nil && errors.Cause(err) != context.Canceled { - log.L.Fatal(err) + c1, c2, err := setupHTTPServer(rootContext, apiConfig) + if err != nil { + log.G(rootContext).Fatal(err) + } + + defer c1.Close() + defer c2.Close() + + if err := vk.Run(rootContext); err != nil && errors.Cause(err) != context.Canceled { + log.G(rootContext).Fatal(err) } }, } @@ -313,7 +315,7 @@ func initConfig() { logger.WithError(err).Fatal("Error initializing provider") } - apiConfig, err = getAPIConfig() + apiConfig, err = getAPIConfig(metricsAddr) if err != nil { logger.WithError(err).Fatal("Error reading API config") } @@ -369,18 +371,3 @@ func initConfig() { } } } - -func getAPIConfig() (vkubelet.APIConfig, error) { - config := vkubelet.APIConfig{ - CertPath: os.Getenv("APISERVER_CERT_LOCATION"), - KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), - } - - port, err := strconv.Atoi(os.Getenv("KUBELET_PORT")) - if err != nil { - return vkubelet.APIConfig{}, strongerrors.InvalidArgument(errors.Wrap(err, "error parsing KUBELET_PORT variable")) - } - config.Addr = fmt.Sprintf(":%d", port) - - return config, nil -} diff --git a/vkubelet/apiserver.go b/vkubelet/apiserver.go index dc35e038b..39ca4d86c 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/apiserver.go @@ -1,8 +1,6 @@ package vkubelet import ( - "context" - "net" "net/http" "github.com/Sirupsen/logrus" @@ -14,9 +12,19 @@ import ( "go.opencensus.io/plugin/ochttp/propagation/b3" ) +// ServeMux defines an interface used to attach routes to an existing http +// serve mux. +// It is used to enable callers creating a new server to completely manage +// their own HTTP server while allowing us to attach the required routes to +// satisfy the Kubelet HTTP interfaces. +type ServeMux interface { + Handle(path string, h http.Handler) +} + // PodHandler creates an http handler for interacting with pods/containers. func PodHandler(p providers.Provider) http.Handler { r := mux.NewRouter() + r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", api.PodLogsHandlerFunc(p)).Methods("GET") r.HandleFunc("/exec/{namespace}/{pod}/{container}", api.PodExecHandlerFunc(p)).Methods("POST") r.NotFoundHandler = http.HandlerFunc(NotFound) @@ -47,18 +55,20 @@ func MetricsSummaryHandler(p providers.Provider) http.Handler { return r } -// KubeletServerStart starts the virtual kubelet HTTP server. -func KubeletServerStart(p providers.Provider, l net.Listener, cert, key string) { - if err := http.ServeTLS(l, InstrumentHandler(PodHandler(p)), cert, key); err != nil { - log.G(context.TODO()).WithError(err).Error("error setting up http server") - } +// AttachPodRoutes adds the http routes for pod stuff to the passed in serve mux. +// +// Callers should take care to namespace the serve mux as they see fit, however +// these routes get called by the Kubernetes API server. +func AttachPodRoutes(p providers.Provider, mux ServeMux) { + mux.Handle("/", InstrumentHandler(PodHandler(p))) } -// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API. -func MetricsServerStart(p providers.Provider, l net.Listener) { - if err := http.Serve(l, InstrumentHandler(MetricsSummaryHandler(p))); err != nil { - log.G(context.TODO()).WithError(err).Error("Error starting http server") - } +// AttachMetricsRoutes adds the http routes for pod/node metrics to the passed in serve mux. +// +// Callers should take care to namespace the serve mux as they see fit, however +// these routes get called by the Kubernetes API server. +func AttachMetricsRoutes(p providers.Provider, mux ServeMux) { + mux.Handle("/", InstrumentHandler(MetricsSummaryHandler(p))) } func instrumentRequest(r *http.Request) *http.Request { diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index 549351f01..68a7ac9e1 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -2,16 +2,13 @@ package vkubelet import ( "context" - "net" "time" - pkgerrors "github.com/pkg/errors" "go.opencensus.io/trace" corev1 "k8s.io/api/core/v1" corev1informers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" - "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" ) @@ -34,9 +31,7 @@ type Server struct { // Config is used to configure a new server. type Config struct { - APIConfig APIConfig Client *kubernetes.Clientset - MetricsAddr string Namespace string NodeName string Provider providers.Provider @@ -46,16 +41,13 @@ type Config struct { PodInformer corev1informers.PodInformer } -// APIConfig is used to configure the API server of the virtual kubelet. -type APIConfig struct { - CertPath string - KeyPath string - Addr string -} - // New creates a new virtual-kubelet server. -func New(ctx context.Context, cfg Config) (s *Server, retErr error) { - s = &Server{ +// This is the entrypoint to this package. +// +// This creates but does not start the server. +// You must call `Run` on the returned object to start the server. +func New(cfg Config) *Server { + return &Server{ namespace: cfg.Namespace, nodeName: cfg.NodeName, taint: cfg.Taint, @@ -65,61 +57,43 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) { podSyncWorkers: cfg.PodSyncWorkers, podInformer: cfg.PodInformer, } - - ctx = log.WithLogger(ctx, log.G(ctx)) - - apiL, err := net.Listen("tcp", cfg.APIConfig.Addr) - if err != nil { - return nil, pkgerrors.Wrap(err, "error setting up API listener") - } - defer func() { - if retErr != nil { - apiL.Close() - } - }() - - go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath) - - if cfg.MetricsAddr != "" { - metricsL, err := net.Listen("tcp", cfg.MetricsAddr) - if err != nil { - return nil, pkgerrors.Wrap(err, "error setting up metrics listener") - } - defer func() { - if retErr != nil { - metricsL.Close() - } - }() - go MetricsServerStart(cfg.Provider, metricsL) - } else { - log.G(ctx).Info("Skipping metrics server startup since no address was provided") - } - - if err := s.registerNode(ctx); err != nil { - return s, err - } - - go func() { - tick := time.NewTicker(5 * time.Second) - defer tick.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-tick.C: - ctx, span := trace.StartSpan(ctx, "syncActualState") - s.updateNode(ctx) - s.updatePodStatuses(ctx) - span.End() - } - } - }() - - return s, nil } // Run creates and starts an instance of the pod controller, blocking until it stops. +// +// Note that this does not setup the HTTP routes that are used to expose pod +// info to the Kubernetes API Server, such as logs, metrics, exec, etc. +// See `AttachPodRoutes` and `AttachMetricsRoutes` to set these up. func (s *Server) Run(ctx context.Context) error { + if err := s.registerNode(ctx); err != nil { + return err + } + + go s.providerSyncLoop(ctx) + return NewPodController(s).Run(ctx, s.podSyncWorkers) } + +// providerSyncLoop syncronizes pod states from the provider back to kubernetes +func (s *Server) providerSyncLoop(ctx context.Context) { + // TODO(@cpuguy83): Ticker does not seem like the right thing to use here. A + // ticker keeps ticking while we are updating state, which can be a long + // operation. This would lead to just a constant re-sync rather than sleeping + // for 5 seconds between each loop. + // + // Leaving this note here as fixing is out of scope for my current changeset. + tick := time.NewTicker(5 * time.Second) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + ctx, span := trace.StartSpan(ctx, "syncActualState") + s.updateNode(ctx) + s.updatePodStatuses(ctx) + span.End() + } + } +}