diff --git a/vkubelet/apiserver.go b/vkubelet/apiserver.go index 0b971bae9..f69c18933 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/apiserver.go @@ -34,9 +34,11 @@ func NotFound(w http.ResponseWriter, r *http.Request) { http.Error(w, "404 request not found", http.StatusNotFound) } -// KubeletServer implements HTTP endpoints for serving kubelet API's -type KubeletServer struct { - p Provider +// NotImplemented provides a handler for cases where a provider does not implement a given API +func NotImplemented(w http.ResponseWriter, r *http.Request) { + logger := log.G(loggingContext(r)) + log.Trace(logger, "501 not implemented") + http.Error(w, "501 not implemented", http.StatusNotImplemented) } // KubeletServertStart starts the virtual kubelet HTTP server. @@ -47,9 +49,8 @@ func KubeletServerStart(p Provider) { addr := fmt.Sprintf(":%s", port) r := mux.NewRouter() - s := &KubeletServer{p: p} - r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", s.ApiServerHandler).Methods("GET") - r.HandleFunc("/exec/{namespace}/{pod}/{container}", s.ApiServerHandlerExec).Methods("POST") + r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", PodLogsHandlerFunc(p)).Methods("GET") + r.HandleFunc("/exec/{namespace}/{pod}/{container}", PodExecHandlerFunc(p)).Methods("POST") r.NotFoundHandler = http.HandlerFunc(NotFound) if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil { @@ -61,120 +62,115 @@ func KubeletServerStart(p Provider) { // TLS is never enabled on this endpoint. func MetricsServerStart(p Provider, addr string) { r := mux.NewRouter() - s := &MetricsServer{p: p} - r.HandleFunc("/stats/summary", s.MetricsSummaryHandler).Methods("GET") - r.HandleFunc("/stats/summary/", s.MetricsSummaryHandler).Methods("GET") + + mp, ok := p.(MetricsProvider) + if !ok { + r.HandleFunc("/stats/summary", NotImplemented).Methods("GET") + r.HandleFunc("/stats/summary/", NotImplemented).Methods("GET") + } else { + r.HandleFunc("/stats/summary", PodMetricsHandlerFunc(mp)).Methods("GET") + r.HandleFunc("/stats/summary/", PodMetricsHandlerFunc(mp)).Methods("GET") + } r.NotFoundHandler = http.HandlerFunc(NotFound) if err := http.ListenAndServe(addr, r); err != nil { log.G(context.TODO()).WithError(err).Error("Error starting http server") } } -// MetricsServer provides an HTTP endpopint for accessing pod metrics -type MetricsServer struct { - p Provider -} +// PodMetricsHandlerFunc makes an HTTP handler for implementing the kubelet summary stats endpoint +func PodMetricsHandlerFunc(mp MetricsProvider) http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + ctx := loggingContext(req) -// MetricsSummaryHandler is an HTTP handler for implementing the kubelet summary stats endpoint -func (s *MetricsServer) MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) { - ctx := loggingContext(req) - - mp, ok := s.p.(MetricsProvider) - if !ok { - log.G(ctx).Debug("stats not implemented for provider") - http.Error(w, "not implememnted", http.StatusNotImplemented) - return - } - - stats, err := mp.GetStatsSummary(req.Context()) - if err != nil { - if errors.Cause(err) == context.Canceled { - return - } - log.G(ctx).Error("Error getting stats from provider:", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - b, err := json.Marshal(stats) - if err != nil { - log.G(ctx).WithError(err).Error("Could not marshal stats") - http.Error(w, "could not marshal stats: "+err.Error(), http.StatusInternalServerError) - return - } - - if _, err := w.Write(b); err != nil { - log.G(ctx).WithError(err).Debug("Could not write to client") - } -} - -func (s *KubeletServer) ApiServerHandler(w http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - if len(vars) != 3 { - NotFound(w, req) - return - } - - ctx := loggingContext(req) - - namespace := vars["namespace"] - pod := vars["pod"] - container := vars["container"] - tail := 10 - q := req.URL.Query() - - if queryTail := q.Get("tailLines"); queryTail != "" { - t, err := strconv.Atoi(queryTail) + stats, err := mp.GetStatsSummary(req.Context()) if err != nil { - logger := log.G(context.TODO()).WithError(err) - log.Trace(logger, "could not parse tailLines") - http.Error(w, fmt.Sprintf("could not parse \"tailLines\": %v", err), http.StatusBadRequest) + if errors.Cause(err) == context.Canceled { + return + } + log.G(ctx).Error("Error getting stats from provider:", err) + http.Error(w, err.Error(), http.StatusInternalServerError) return } - tail = t - } - podsLogs, err := s.p.GetContainerLogs(ctx, namespace, pod, container, tail) - if err != nil { - log.G(ctx).WithError(err).Error("error getting container logs") - http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError) - return - } + b, err := json.Marshal(stats) + if err != nil { + log.G(ctx).WithError(err).Error("Could not marshal stats") + http.Error(w, "could not marshal stats: "+err.Error(), http.StatusInternalServerError) + return + } - if _, err := io.WriteString(w, podsLogs); err != nil { - log.G(ctx).WithError(err).Warn("error writing response to client") + if _, err := w.Write(b); err != nil { + log.G(ctx).WithError(err).Debug("Could not write to client") + } } } -func (s *KubeletServer) ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) +// PodLogsHandlerFunc creates an http handler function from a provider to serve logs from a pod +func PodLogsHandlerFunc(p Provider) http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + if len(vars) != 3 { + NotFound(w, req) + return + } - namespace := vars["namespace"] - pod := vars["pod"] - container := vars["container"] + ctx := loggingContext(req) - supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",") + namespace := vars["namespace"] + pod := vars["pod"] + container := vars["container"] + tail := 10 + q := req.URL.Query() - q := req.URL.Query() - command := q["command"] + if queryTail := q.Get("tailLines"); queryTail != "" { + t, err := strconv.Atoi(queryTail) + if err != nil { + logger := log.G(context.TODO()).WithError(err) + log.Trace(logger, "could not parse tailLines") + http.Error(w, fmt.Sprintf("could not parse \"tailLines\": %v", err), http.StatusBadRequest) + return + } + tail = t + } - // streamOpts := &remotecommand.Options{ - // Stdin: (q.Get("input") == "1"), - // Stdout: (q.Get("output") == "1"), - // Stderr: (q.Get("error") == "1"), - // TTY: (q.Get("tty") == "1"), - // } + podsLogs, err := p.GetContainerLogs(ctx, namespace, pod, container, tail) + if err != nil { + log.G(ctx).WithError(err).Error("error getting container logs") + http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError) + return + } - // TODO: tty flag causes remotecommand.createStreams to wait for the wrong number of streams - streamOpts := &remotecommand.Options{ - Stdin: true, - Stdout: true, - Stderr: true, - TTY: false, + if _, err := io.WriteString(w, podsLogs); err != nil { + log.G(ctx).WithError(err).Warn("error writing response to client") + } + } +} + +// PodExecHandlerFunc makes an http handler func from a Provider which execs a command in a pod's container +func PodExecHandlerFunc(p Provider) http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + + namespace := vars["namespace"] + pod := vars["pod"] + container := vars["container"] + + supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",") + + q := req.URL.Query() + command := q["command"] + + // TODO: tty flag causes remotecommand.createStreams to wait for the wrong number of streams + streamOpts := &remotecommand.Options{ + Stdin: true, + Stdout: true, + Stderr: true, + TTY: false, + } + + idleTimeout := time.Second * 30 + streamCreationTimeout := time.Second * 30 + + remotecommand.ServeExec(w, req, p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols) } - - idleTimeout := time.Second * 30 - streamCreationTimeout := time.Second * 30 - - remotecommand.ServeExec(w, req, s.p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols) }