From 8de66934609aaa0cf5d47de0f60f49e63951727b Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 20 Aug 2018 11:52:54 -0700 Subject: [PATCH] Don't use globals for API server Refactors how HTTP servers are started and binds them to objects that can store the provider rather than relying on a global. --- vkubelet/apiserver.go | 48 +++++++++++++++++++++++-------------------- vkubelet/vkubelet.go | 10 +++++++-- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/vkubelet/apiserver.go b/vkubelet/apiserver.go index e65c18a35..b415742b1 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/apiserver.go @@ -18,9 +18,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" ) -var p Provider -var r mux.Router - func loggingContext(r *http.Request) context.Context { ctx := r.Context() logger := log.G(ctx).WithFields(logrus.Fields{ @@ -30,30 +27,31 @@ func loggingContext(r *http.Request) context.Context { return log.WithLogger(ctx, logger) } +// NotFound provides a handler for cases where the requested endpoint doesn't exist func NotFound(w http.ResponseWriter, r *http.Request) { logger := log.G(loggingContext(r)) log.Trace(logger, "404 request not found") http.Error(w, "404 request not found", http.StatusNotFound) } -func ApiserverStart(provider Provider, metricsAddr string) { - p = provider +// KubeletServer implements HTTP endpoints for serving kubelet API's +type KubeletServer struct { + p Provider +} + +// KubeletServertStart starts the virtual kubelet HTTP server. +func KubeletServerStart(p Provider) { certFilePath := os.Getenv("APISERVER_CERT_LOCATION") keyFilePath := os.Getenv("APISERVER_KEY_LOCATION") port := os.Getenv("KUBELET_PORT") addr := fmt.Sprintf(":%s", port) r := mux.NewRouter() - r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", ApiServerHandler).Methods("GET") - r.HandleFunc("/exec/{namespace}/{pod}/{container}", ApiServerHandlerExec).Methods("POST") + s := &KubeletServer{p: p} + r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", s.ApiServerHandler).Methods("GET") + r.HandleFunc("/exec/{namespace}/{pod}/{container}", s.ApiServerHandlerExec).Methods("POST") r.NotFoundHandler = http.HandlerFunc(NotFound) - if metricsAddr != "" { - go MetricsServerStart(metricsAddr) - } else { - log.G(context.TODO()).Info("Skipping metrics server startup since no address was provided") - } - if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil { log.G(context.TODO()).WithError(err).Error("error setting up http server") } @@ -61,21 +59,27 @@ func ApiserverStart(provider Provider, metricsAddr string) { // MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API. // TLS is never enabled on this endpoint. -func MetricsServerStart(addr string) { +func MetricsServerStart(p Provider, addr string) { r := mux.NewRouter() - r.HandleFunc("/stats/summary", MetricsSummaryHandler).Methods("GET") - r.HandleFunc("/stats/summary/", MetricsSummaryHandler).Methods("GET") + s := &MetricsServer{p: p} + r.HandleFunc("/stats/summary", s.MetricsSummaryHandler).Methods("GET") + r.HandleFunc("/stats/summary/", s.MetricsSummaryHandler).Methods("GET") r.NotFoundHandler = http.HandlerFunc(NotFound) if err := http.ListenAndServe(addr, r); err != nil { log.G(context.TODO()).WithError(err).Error("Error starting http server") } } +// MetricsServer provides an HTTP endpopint for accessing pod metrics +type MetricsServer struct { + p Provider +} + // MetricsSummaryHandler is an HTTP handler for implementing the kubelet summary stats endpoint -func MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) { +func (s *MetricsServer) MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) { ctx := loggingContext(req) - mp, ok := p.(MetricsProvider) + mp, ok := s.p.(MetricsProvider) if !ok { log.G(ctx).Debug("stats not implemented for provider") http.Error(w, "not implememnted", http.StatusNotImplemented) @@ -104,7 +108,7 @@ func MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) { } } -func ApiServerHandler(w http.ResponseWriter, req *http.Request) { +func (s *KubeletServer) ApiServerHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) if len(vars) != 3 { NotFound(w, req) @@ -130,7 +134,7 @@ func ApiServerHandler(w http.ResponseWriter, req *http.Request) { tail = t } - podsLogs, err := p.GetContainerLogs(namespace, pod, container, tail) + podsLogs, err := s.p.GetContainerLogs(namespace, pod, container, tail) if err != nil { log.G(ctx).WithError(err).Error("error getting container logs") http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError) @@ -142,7 +146,7 @@ func ApiServerHandler(w http.ResponseWriter, req *http.Request) { } } -func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) { +func (s *KubeletServer) ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) namespace := vars["namespace"] @@ -172,5 +176,5 @@ func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) { idleTimeout := time.Second * 30 streamCreationTimeout := time.Second * 30 - remotecommand.ServeExec(w, req, p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols) + remotecommand.ServeExec(w, req, s.p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols) } diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index 543d7594c..8f5702dd9 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -115,7 +115,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon Effect: vkTaintEffect, } - p, err = lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) + p, err := lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) if err != nil { return nil, err } @@ -137,7 +137,13 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon return s, err } - go ApiserverStart(p, metricsAddr) + go KubeletServerStart(p) + + if metricsAddr != "" { + go MetricsServerStart(p, metricsAddr) + } else { + log.G(ctx).Info("Skipping metrics server startup since no address was provided") + } tick := time.Tick(5 * time.Second)