diff --git a/vkubelet/api/exec.go b/vkubelet/api/exec.go new file mode 100644 index 000000000..c52816443 --- /dev/null +++ b/vkubelet/api/exec.go @@ -0,0 +1,42 @@ +package api + +import ( + "fmt" + "net/http" + "strings" + "time" + + "github.com/gorilla/mux" + "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" +) + +// PodExecHandlerFunc makes an http handler func from a Provider which execs a command in a pod's container +// Note that this handler currently depends on gorrilla/mux to get url parts as variables. +// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function +func PodExecHandlerFunc(backend remotecommand.Executor) http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + + namespace := vars["namespace"] + pod := vars["pod"] + container := vars["container"] + + supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",") + + q := req.URL.Query() + command := q["command"] + + // TODO: tty flag causes remotecommand.createStreams to wait for the wrong number of streams + streamOpts := &remotecommand.Options{ + Stdin: true, + Stdout: true, + Stderr: true, + TTY: false, + } + + idleTimeout := time.Second * 30 + streamCreationTimeout := time.Second * 30 + + remotecommand.ServeExec(w, req, backend, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols) + } +} diff --git a/vkubelet/api/helpers.go b/vkubelet/api/helpers.go new file mode 100644 index 000000000..95e651b56 --- /dev/null +++ b/vkubelet/api/helpers.go @@ -0,0 +1,31 @@ +package api + +import ( + "io" + "net/http" + + "github.com/cpuguy83/strongerrors/status" + "github.com/virtual-kubelet/virtual-kubelet/log" +) + +type handlerFunc func(http.ResponseWriter, *http.Request) error + +func handleError(f handlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + err := f(w, req) + if err == nil { + return + } + + code, _ := status.HTTPCode(err) + w.WriteHeader(code) + io.WriteString(w, err.Error()) + logger := log.G(req.Context()).WithError(err).WithField("httpStatusCode", code) + + if code >= 500 { + logger.Error("Internal server error on request") + } else { + log.Trace(logger, "Error on request") + } + } +} diff --git a/vkubelet/api/logs.go b/vkubelet/api/logs.go new file mode 100644 index 000000000..d7eec9607 --- /dev/null +++ b/vkubelet/api/logs.go @@ -0,0 +1,53 @@ +package api + +import ( + "context" + "io" + "net/http" + "strconv" + + "github.com/cpuguy83/strongerrors" + "github.com/gorilla/mux" + "github.com/pkg/errors" +) + +// ContainerLogsBackend is used in place of backend implementations for getting container logs +type ContainerLogsBackend interface { + GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) +} + +// PodLogsHandlerFunc creates an http handler function from a provider to serve logs from a pod +func PodLogsHandlerFunc(p ContainerLogsBackend) http.HandlerFunc { + return handleError(func(w http.ResponseWriter, req *http.Request) error { + vars := mux.Vars(req) + if len(vars) != 3 { + return strongerrors.NotFound(errors.New("not found")) + } + + ctx := req.Context() + + namespace := vars["namespace"] + pod := vars["pod"] + container := vars["container"] + tail := 10 + q := req.URL.Query() + + if queryTail := q.Get("tailLines"); queryTail != "" { + t, err := strconv.Atoi(queryTail) + if err != nil { + return strongerrors.InvalidArgument(errors.Wrap(err, "could not parse \"tailLines\"")) + } + tail = t + } + + podsLogs, err := p.GetContainerLogs(ctx, namespace, pod, container, tail) + if err != nil { + return errors.Wrap(err, "error getting container logs?)") + } + + if _, err := io.WriteString(w, podsLogs); err != nil { + return strongerrors.Unknown(errors.Wrap(err, "error writing response to client")) + } + return nil + }) +} diff --git a/vkubelet/api/stats.go b/vkubelet/api/stats.go new file mode 100644 index 000000000..afa7b23a7 --- /dev/null +++ b/vkubelet/api/stats.go @@ -0,0 +1,39 @@ +package api + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/cpuguy83/strongerrors" + "github.com/pkg/errors" + stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" +) + +// PodMetricsBackend is used in place of backend implementations to get k8s pod metrics. +type PodMetricsBackend interface { + GetStatsSummary(context.Context) (*stats.Summary, error) +} + +// PodMetricsHandlerFunc makes an HTTP handler for implementing the kubelet summary stats endpoint +func PodMetricsHandlerFunc(b PodMetricsBackend) http.HandlerFunc { + return handleError(func(w http.ResponseWriter, req *http.Request) error { + stats, err := b.GetStatsSummary(req.Context()) + if err != nil { + if errors.Cause(err) == context.Canceled { + return strongerrors.Cancelled(err) + } + return errors.Wrap(err, "error getting status from provider") + } + + b, err := json.Marshal(stats) + if err != nil { + return strongerrors.Unknown(errors.Wrap(err, "error marshalling stats")) + } + + if _, err := w.Write(b); err != nil { + return strongerrors.Unknown(errors.Wrap(err, "could not write to client")) + } + return nil + }) +} diff --git a/vkubelet/apiserver.go b/vkubelet/apiserver.go index 59bc280c8..539f837d2 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/apiserver.go @@ -2,78 +2,16 @@ package vkubelet import ( "context" - "encoding/json" "fmt" - "io" "net/http" "os" - "strconv" - "strings" - "time" "github.com/Sirupsen/logrus" - "github.com/cpuguy83/strongerrors" - "github.com/cpuguy83/strongerrors/status" "github.com/gorilla/mux" - "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" - "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" ) -func instrumentContext(r *http.Request) context.Context { - ctx := r.Context() - logger := log.G(ctx).WithFields(logrus.Fields{ - "uri": r.RequestURI, - "vars": mux.Vars(r), - }) - return log.WithLogger(ctx, logger) -} - -// NotFound provides a handler for cases where the requested endpoint doesn't exist -func NotFound(w http.ResponseWriter, r *http.Request) { - logger := log.G(instrumentContext(r)) - log.Trace(logger, "404 request not found") - http.Error(w, "404 request not found", http.StatusNotFound) -} - -// NotImplemented provides a handler for cases where a provider does not implement a given API -func NotImplemented(w http.ResponseWriter, r *http.Request) { - logger := log.G(instrumentContext(r)) - log.Trace(logger, "501 not implemented") - http.Error(w, "501 not implemented", http.StatusNotImplemented) -} - -type handlerFunc func(http.ResponseWriter, *http.Request) error - -// InstrumentHandler wraps an http.Handler and injects instrumentation into the request context. -func InstrumentHandler(h http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - ctx := instrumentContext(req) - req = req.WithContext(ctx) - h.ServeHTTP(w, req) - }) -} - -func handleError(f handlerFunc) http.HandlerFunc { - return func(w http.ResponseWriter, req *http.Request) { - err := f(w, req) - if err == nil { - return - } - - code, _ := status.HTTPCode(err) - w.WriteHeader(code) - io.WriteString(w, err.Error()) - logger := log.G(req.Context()).WithError(err).WithField("httpStatusCode", code) - - if code >= 500 { - logger.Error("Internal server error on request") - } else { - log.Trace(logger, "Error on request") - } - } -} - // KubeletServerStart starts the virtual kubelet HTTP server. func KubeletServerStart(p Provider) { certFilePath := os.Getenv("APISERVER_CERT_LOCATION") @@ -82,8 +20,8 @@ func KubeletServerStart(p Provider) { addr := fmt.Sprintf(":%s", port) r := mux.NewRouter() - r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", PodLogsHandlerFunc(p)).Methods("GET") - r.HandleFunc("/exec/{namespace}/{pod}/{container}", PodExecHandlerFunc(p)).Methods("POST") + r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", api.PodLogsHandlerFunc(p)).Methods("GET") + r.HandleFunc("/exec/{namespace}/{pod}/{container}", api.PodExecHandlerFunc(p)).Methods("POST") r.NotFoundHandler = http.HandlerFunc(NotFound) if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, InstrumentHandler(r)); err != nil { @@ -96,13 +34,13 @@ func KubeletServerStart(p Provider) { func MetricsServerStart(p Provider, addr string) { r := mux.NewRouter() - mp, ok := p.(MetricsProvider) + mp, ok := p.(PodMetricsProvider) if !ok { r.HandleFunc("/stats/summary", NotImplemented).Methods("GET") r.HandleFunc("/stats/summary/", NotImplemented).Methods("GET") } else { - r.HandleFunc("/stats/summary", PodMetricsHandlerFunc(mp)).Methods("GET") - r.HandleFunc("/stats/summary/", PodMetricsHandlerFunc(mp)).Methods("GET") + r.HandleFunc("/stats/summary", api.PodMetricsHandlerFunc(mp)).Methods("GET") + r.HandleFunc("/stats/summary/", api.PodMetricsHandlerFunc(mp)).Methods("GET") } r.NotFoundHandler = http.HandlerFunc(NotFound) if err := http.ListenAndServe(addr, InstrumentHandler(r)); err != nil { @@ -110,90 +48,34 @@ func MetricsServerStart(p Provider, addr string) { } } -// PodMetricsHandlerFunc makes an HTTP handler for implementing the kubelet summary stats endpoint -func PodMetricsHandlerFunc(mp MetricsProvider) http.HandlerFunc { - return handleError(func(w http.ResponseWriter, req *http.Request) error { - stats, err := mp.GetStatsSummary(req.Context()) - if err != nil { - if errors.Cause(err) == context.Canceled { - return strongerrors.Cancelled(err) - } - return strongerrors.Unknown(errors.Wrap(err, "error getting status from provider")) - } +func instrumentRequest(r *http.Request) context.Context { + ctx := r.Context() + logger := log.G(ctx).WithFields(logrus.Fields{ + "uri": r.RequestURI, + "vars": mux.Vars(r), + }) + return log.WithLogger(ctx, logger) +} - b, err := json.Marshal(stats) - if err != nil { - return strongerrors.Unknown(errors.Wrap(err, "error marshalling stats")) - } - - if _, err := w.Write(b); err != nil { - return strongerrors.Unknown(errors.Wrap(err, "could not write to client")) - } - return nil +// InstrumentHandler wraps an http.Handler and injects instrumentation into the request context. +func InstrumentHandler(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := instrumentRequest(req) + req = req.WithContext(ctx) + h.ServeHTTP(w, req) }) } -// PodLogsHandlerFunc creates an http handler function from a provider to serve logs from a pod -func PodLogsHandlerFunc(p Provider) http.HandlerFunc { - return handleError(func(w http.ResponseWriter, req *http.Request) error { - vars := mux.Vars(req) - if len(vars) != 3 { - return strongerrors.NotFound(errors.New("not found")) - } - - ctx := req.Context() - - namespace := vars["namespace"] - pod := vars["pod"] - container := vars["container"] - tail := 10 - q := req.URL.Query() - - if queryTail := q.Get("tailLines"); queryTail != "" { - t, err := strconv.Atoi(queryTail) - if err != nil { - return strongerrors.InvalidArgument(errors.Wrap(err, "could not parse \"tailLines\"")) - } - tail = t - } - - podsLogs, err := p.GetContainerLogs(ctx, namespace, pod, container, tail) - if err != nil { - return errors.Wrap(err, "error getting container logs?)") - } - - if _, err := io.WriteString(w, podsLogs); err != nil { - return strongerrors.Unknown(errors.Wrap(err, "error writing response to client")) - } - return nil - }) +// NotFound provides a handler for cases where the requested endpoint doesn't exist +func NotFound(w http.ResponseWriter, r *http.Request) { + logger := log.G(r.Context()) + log.Trace(logger, "404 request not found") + http.Error(w, "404 request not found", http.StatusNotFound) } -// PodExecHandlerFunc makes an http handler func from a Provider which execs a command in a pod's container -func PodExecHandlerFunc(p Provider) http.HandlerFunc { - return func(w http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - - namespace := vars["namespace"] - pod := vars["pod"] - container := vars["container"] - - supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",") - - q := req.URL.Query() - command := q["command"] - - // TODO: tty flag causes remotecommand.createStreams to wait for the wrong number of streams - streamOpts := &remotecommand.Options{ - Stdin: true, - Stdout: true, - Stderr: true, - TTY: false, - } - - idleTimeout := time.Second * 30 - streamCreationTimeout := time.Second * 30 - - remotecommand.ServeExec(w, req, p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols) - } +// NotImplemented provides a handler for cases where a provider does not implement a given API +func NotImplemented(w http.ResponseWriter, r *http.Request) { + logger := log.G(r.Context()) + log.Trace(logger, "501 not implemented") + http.Error(w, "501 not implemented", http.StatusNotImplemented) } diff --git a/vkubelet/provider.go b/vkubelet/provider.go index 6ace5e839..873ba84a1 100644 --- a/vkubelet/provider.go +++ b/vkubelet/provider.go @@ -57,7 +57,7 @@ type Provider interface { OperatingSystem() string } -// MetricsProvider is an optional interface that providers can implement to expose pod stats -type MetricsProvider interface { +// PodMetricsProvider is an optional interface that providers can implement to expose pod stats +type PodMetricsProvider interface { GetStatsSummary(context.Context) (*stats.Summary, error) }