From fe0170fc5cf6f0e5eac2c27d792540c5d1bdb95f Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 21 Dec 2018 11:45:07 -0800 Subject: [PATCH] Refactor http server stuff (#466) * Don't start things in New * Move http server handling up to daemon. This removes the burdern of dealing with listeners, http servers, etc in the core framework. Instead provide helpers to attach the appropriate routes to the caller's serve mux. With this change, the vkubelet package only helps callers setup HTTP rather than forcing a specific HTTP config on them. --- http.go | 131 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ root.go | 39 ++++++----------- 2 files changed, 144 insertions(+), 26 deletions(-) create mode 100644 http.go diff --git a/http.go b/http.go new file mode 100644 index 0000000..f6685f8 --- /dev/null +++ b/http.go @@ -0,0 +1,131 @@ +package cmd + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "os" + "strconv" + + "github.com/cpuguy83/strongerrors" + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet" +) + +// AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided +// Note this list should be a moving target. +var AcceptedCiphers = []uint16{ + tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, + + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, +} + +func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) { + cert, err := tls.LoadX509KeyPair(certPath, keyPath) + if err != nil { + return nil, errors.Wrap(err, "error loading tls certs") + } + + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + MinVersion: tls.VersionTLS12, + PreferServerCipherSuites: true, + CipherSuites: AcceptedCiphers, + }, nil +} + +func setupHTTPServer(ctx context.Context, cfg *apiServerConfig) (io.Closer, io.Closer, error) { + var ( + podS *http.Server + metricsS *http.Server + ) + if cfg.CertPath == "" || cfg.KeyPath == "" { + log.G(ctx). + WithField("certPath", cfg.CertPath). + WithField("keyPath", cfg.KeyPath). + Error("TLS certificates not provided, not setting up pod http server") + } else { + tlsCfg, err := loadTLSConfig(cfg.CertPath, cfg.KeyPath) + if err != nil { + return nil, nil, err + } + l, err := tls.Listen("tcp", cfg.Addr, tlsCfg) + if err != nil { + return nil, nil, errors.Wrap(err, "error setting up listener for pod http server") + } + + mux := http.NewServeMux() + vkubelet.AttachPodRoutes(p, mux) + + podS = &http.Server{ + Handler: mux, + TLSConfig: tlsCfg, + } + go serveHTTP(ctx, podS, l, "pods") + } + + if cfg.MetricsAddr == "" { + log.G(ctx).Info("Pod metrics server not setup due to empty metrics address") + } else { + l, err := net.Listen("tcp", cfg.MetricsAddr) + if err != nil { + if l != nil { + podS.Close() + } + return nil, nil, errors.Wrap(err, "could not setup listenr for pod metrics http server") + } + + mux := http.NewServeMux() + vkubelet.AttachMetricsRoutes(p, mux) + metricsS = &http.Server{ + Handler: mux, + } + go serveHTTP(ctx, metricsS, l, "pod metrics") + } + + return podS, metricsS, nil +} + +func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string) { + if err := s.Serve(l); err != nil { + select { + case <-ctx.Done(): + default: + log.G(ctx).WithError(err).Errorf("Error setting up %s http server", name) + } + } + l.Close() +} + +type apiServerConfig struct { + CertPath string + KeyPath string + Addr string + MetricsAddr string +} + +func getAPIConfig(metricsAddr string) (*apiServerConfig, error) { + config := apiServerConfig{ + CertPath: os.Getenv("APISERVER_CERT_LOCATION"), + KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), + } + + port, err := strconv.Atoi(os.Getenv("KUBELET_PORT")) + if err != nil { + return nil, strongerrors.InvalidArgument(errors.Wrap(err, "error parsing KUBELET_PORT variable")) + } + config.Addr = fmt.Sprintf(":%d", port) + config.MetricsAddr = metricsAddr + + return &config, nil +} diff --git a/root.go b/root.go index 22be92e..9817be2 100644 --- a/root.go +++ b/root.go @@ -26,7 +26,6 @@ import ( "time" "github.com/Sirupsen/logrus" - "github.com/cpuguy83/strongerrors" "github.com/mitchellh/go-homedir" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -69,7 +68,7 @@ var taint *corev1.Taint var k8sClient *kubernetes.Clientset var p providers.Provider var rm *manager.ResourceManager -var apiConfig vkubelet.APIConfig +var apiConfig *apiServerConfig var podInformer corev1informers.PodInformer var kubeSharedInformerFactoryResync time.Duration var podSyncWorkers int @@ -91,21 +90,16 @@ This allows users to schedule kubernetes workloads on nodes that aren't running Run: func(cmd *cobra.Command, args []string) { defer rootContextCancel() - f, err := vkubelet.New(rootContext, vkubelet.Config{ + vk := vkubelet.New(vkubelet.Config{ Client: k8sClient, Namespace: kubeNamespace, NodeName: nodeName, Taint: taint, - MetricsAddr: metricsAddr, Provider: p, ResourceManager: rm, - APIConfig: apiConfig, PodSyncWorkers: podSyncWorkers, PodInformer: podInformer, }) - if err != nil { - log.L.WithError(err).Fatal("Error initializing virtual kubelet") - } sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) @@ -114,8 +108,16 @@ This allows users to schedule kubernetes workloads on nodes that aren't running rootContextCancel() }() - if err := f.Run(rootContext); err != nil && errors.Cause(err) != context.Canceled { - log.L.Fatal(err) + c1, c2, err := setupHTTPServer(rootContext, apiConfig) + if err != nil { + log.G(rootContext).Fatal(err) + } + + defer c1.Close() + defer c2.Close() + + if err := vk.Run(rootContext); err != nil && errors.Cause(err) != context.Canceled { + log.G(rootContext).Fatal(err) } }, } @@ -313,7 +315,7 @@ func initConfig() { logger.WithError(err).Fatal("Error initializing provider") } - apiConfig, err = getAPIConfig() + apiConfig, err = getAPIConfig(metricsAddr) if err != nil { logger.WithError(err).Fatal("Error reading API config") } @@ -369,18 +371,3 @@ func initConfig() { } } } - -func getAPIConfig() (vkubelet.APIConfig, error) { - config := vkubelet.APIConfig{ - CertPath: os.Getenv("APISERVER_CERT_LOCATION"), - KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), - } - - port, err := strconv.Atoi(os.Getenv("KUBELET_PORT")) - if err != nil { - return vkubelet.APIConfig{}, strongerrors.InvalidArgument(errors.Wrap(err, "error parsing KUBELET_PORT variable")) - } - config.Addr = fmt.Sprintf(":%d", port) - - return config, nil -}