Move API handlers to separate package

This makes the package split a little cleaner and easier to import the
HTTP handlers for other consumers.
This commit is contained in:
Brian Goff
2018-09-18 10:05:54 -07:00
parent 74f76c75d5
commit da5e24ef4d
6 changed files with 197 additions and 150 deletions

42
vkubelet/api/exec.go Normal file
View File

@@ -0,0 +1,42 @@
package api
import (
"fmt"
"net/http"
"strings"
"time"
"github.com/gorilla/mux"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
)
// PodExecHandlerFunc makes an http handler func from a Provider which execs a command in a pod's container
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
func PodExecHandlerFunc(backend remotecommand.Executor) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",")
q := req.URL.Query()
command := q["command"]
// TODO: tty flag causes remotecommand.createStreams to wait for the wrong number of streams
streamOpts := &remotecommand.Options{
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}
idleTimeout := time.Second * 30
streamCreationTimeout := time.Second * 30
remotecommand.ServeExec(w, req, backend, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
}
}

31
vkubelet/api/helpers.go Normal file
View File

@@ -0,0 +1,31 @@
package api
import (
"io"
"net/http"
"github.com/cpuguy83/strongerrors/status"
"github.com/virtual-kubelet/virtual-kubelet/log"
)
type handlerFunc func(http.ResponseWriter, *http.Request) error
func handleError(f handlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
err := f(w, req)
if err == nil {
return
}
code, _ := status.HTTPCode(err)
w.WriteHeader(code)
io.WriteString(w, err.Error())
logger := log.G(req.Context()).WithError(err).WithField("httpStatusCode", code)
if code >= 500 {
logger.Error("Internal server error on request")
} else {
log.Trace(logger, "Error on request")
}
}
}

53
vkubelet/api/logs.go Normal file
View File

@@ -0,0 +1,53 @@
package api
import (
"context"
"io"
"net/http"
"strconv"
"github.com/cpuguy83/strongerrors"
"github.com/gorilla/mux"
"github.com/pkg/errors"
)
// ContainerLogsBackend is used in place of backend implementations for getting container logs
type ContainerLogsBackend interface {
GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error)
}
// PodLogsHandlerFunc creates an http handler function from a provider to serve logs from a pod
func PodLogsHandlerFunc(p ContainerLogsBackend) http.HandlerFunc {
return handleError(func(w http.ResponseWriter, req *http.Request) error {
vars := mux.Vars(req)
if len(vars) != 3 {
return strongerrors.NotFound(errors.New("not found"))
}
ctx := req.Context()
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
if queryTail := q.Get("tailLines"); queryTail != "" {
t, err := strconv.Atoi(queryTail)
if err != nil {
return strongerrors.InvalidArgument(errors.Wrap(err, "could not parse \"tailLines\""))
}
tail = t
}
podsLogs, err := p.GetContainerLogs(ctx, namespace, pod, container, tail)
if err != nil {
return errors.Wrap(err, "error getting container logs?)")
}
if _, err := io.WriteString(w, podsLogs); err != nil {
return strongerrors.Unknown(errors.Wrap(err, "error writing response to client"))
}
return nil
})
}

39
vkubelet/api/stats.go Normal file
View File

@@ -0,0 +1,39 @@
package api
import (
"context"
"encoding/json"
"net/http"
"github.com/cpuguy83/strongerrors"
"github.com/pkg/errors"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// PodMetricsBackend is used in place of backend implementations to get k8s pod metrics.
type PodMetricsBackend interface {
GetStatsSummary(context.Context) (*stats.Summary, error)
}
// PodMetricsHandlerFunc makes an HTTP handler for implementing the kubelet summary stats endpoint
func PodMetricsHandlerFunc(b PodMetricsBackend) http.HandlerFunc {
return handleError(func(w http.ResponseWriter, req *http.Request) error {
stats, err := b.GetStatsSummary(req.Context())
if err != nil {
if errors.Cause(err) == context.Canceled {
return strongerrors.Cancelled(err)
}
return errors.Wrap(err, "error getting status from provider")
}
b, err := json.Marshal(stats)
if err != nil {
return strongerrors.Unknown(errors.Wrap(err, "error marshalling stats"))
}
if _, err := w.Write(b); err != nil {
return strongerrors.Unknown(errors.Wrap(err, "could not write to client"))
}
return nil
})
}

View File

@@ -2,78 +2,16 @@ package vkubelet
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io"
"net/http" "net/http"
"os" "os"
"strconv"
"strings"
"time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/cpuguy83/strongerrors"
"github.com/cpuguy83/strongerrors/status"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/log"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api"
) )
func instrumentContext(r *http.Request) context.Context {
ctx := r.Context()
logger := log.G(ctx).WithFields(logrus.Fields{
"uri": r.RequestURI,
"vars": mux.Vars(r),
})
return log.WithLogger(ctx, logger)
}
// NotFound provides a handler for cases where the requested endpoint doesn't exist
func NotFound(w http.ResponseWriter, r *http.Request) {
logger := log.G(instrumentContext(r))
log.Trace(logger, "404 request not found")
http.Error(w, "404 request not found", http.StatusNotFound)
}
// NotImplemented provides a handler for cases where a provider does not implement a given API
func NotImplemented(w http.ResponseWriter, r *http.Request) {
logger := log.G(instrumentContext(r))
log.Trace(logger, "501 not implemented")
http.Error(w, "501 not implemented", http.StatusNotImplemented)
}
type handlerFunc func(http.ResponseWriter, *http.Request) error
// InstrumentHandler wraps an http.Handler and injects instrumentation into the request context.
func InstrumentHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := instrumentContext(req)
req = req.WithContext(ctx)
h.ServeHTTP(w, req)
})
}
func handleError(f handlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
err := f(w, req)
if err == nil {
return
}
code, _ := status.HTTPCode(err)
w.WriteHeader(code)
io.WriteString(w, err.Error())
logger := log.G(req.Context()).WithError(err).WithField("httpStatusCode", code)
if code >= 500 {
logger.Error("Internal server error on request")
} else {
log.Trace(logger, "Error on request")
}
}
}
// KubeletServerStart starts the virtual kubelet HTTP server. // KubeletServerStart starts the virtual kubelet HTTP server.
func KubeletServerStart(p Provider) { func KubeletServerStart(p Provider) {
certFilePath := os.Getenv("APISERVER_CERT_LOCATION") certFilePath := os.Getenv("APISERVER_CERT_LOCATION")
@@ -82,8 +20,8 @@ func KubeletServerStart(p Provider) {
addr := fmt.Sprintf(":%s", port) addr := fmt.Sprintf(":%s", port)
r := mux.NewRouter() r := mux.NewRouter()
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", PodLogsHandlerFunc(p)).Methods("GET") r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", api.PodLogsHandlerFunc(p)).Methods("GET")
r.HandleFunc("/exec/{namespace}/{pod}/{container}", PodExecHandlerFunc(p)).Methods("POST") r.HandleFunc("/exec/{namespace}/{pod}/{container}", api.PodExecHandlerFunc(p)).Methods("POST")
r.NotFoundHandler = http.HandlerFunc(NotFound) r.NotFoundHandler = http.HandlerFunc(NotFound)
if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, InstrumentHandler(r)); err != nil { if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, InstrumentHandler(r)); err != nil {
@@ -96,13 +34,13 @@ func KubeletServerStart(p Provider) {
func MetricsServerStart(p Provider, addr string) { func MetricsServerStart(p Provider, addr string) {
r := mux.NewRouter() r := mux.NewRouter()
mp, ok := p.(MetricsProvider) mp, ok := p.(PodMetricsProvider)
if !ok { if !ok {
r.HandleFunc("/stats/summary", NotImplemented).Methods("GET") r.HandleFunc("/stats/summary", NotImplemented).Methods("GET")
r.HandleFunc("/stats/summary/", NotImplemented).Methods("GET") r.HandleFunc("/stats/summary/", NotImplemented).Methods("GET")
} else { } else {
r.HandleFunc("/stats/summary", PodMetricsHandlerFunc(mp)).Methods("GET") r.HandleFunc("/stats/summary", api.PodMetricsHandlerFunc(mp)).Methods("GET")
r.HandleFunc("/stats/summary/", PodMetricsHandlerFunc(mp)).Methods("GET") r.HandleFunc("/stats/summary/", api.PodMetricsHandlerFunc(mp)).Methods("GET")
} }
r.NotFoundHandler = http.HandlerFunc(NotFound) r.NotFoundHandler = http.HandlerFunc(NotFound)
if err := http.ListenAndServe(addr, InstrumentHandler(r)); err != nil { if err := http.ListenAndServe(addr, InstrumentHandler(r)); err != nil {
@@ -110,90 +48,34 @@ func MetricsServerStart(p Provider, addr string) {
} }
} }
// PodMetricsHandlerFunc makes an HTTP handler for implementing the kubelet summary stats endpoint func instrumentRequest(r *http.Request) context.Context {
func PodMetricsHandlerFunc(mp MetricsProvider) http.HandlerFunc { ctx := r.Context()
return handleError(func(w http.ResponseWriter, req *http.Request) error { logger := log.G(ctx).WithFields(logrus.Fields{
stats, err := mp.GetStatsSummary(req.Context()) "uri": r.RequestURI,
if err != nil { "vars": mux.Vars(r),
if errors.Cause(err) == context.Canceled { })
return strongerrors.Cancelled(err) return log.WithLogger(ctx, logger)
} }
return strongerrors.Unknown(errors.Wrap(err, "error getting status from provider"))
}
b, err := json.Marshal(stats) // InstrumentHandler wraps an http.Handler and injects instrumentation into the request context.
if err != nil { func InstrumentHandler(h http.Handler) http.Handler {
return strongerrors.Unknown(errors.Wrap(err, "error marshalling stats")) return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
} ctx := instrumentRequest(req)
req = req.WithContext(ctx)
if _, err := w.Write(b); err != nil { h.ServeHTTP(w, req)
return strongerrors.Unknown(errors.Wrap(err, "could not write to client"))
}
return nil
}) })
} }
// PodLogsHandlerFunc creates an http handler function from a provider to serve logs from a pod // NotFound provides a handler for cases where the requested endpoint doesn't exist
func PodLogsHandlerFunc(p Provider) http.HandlerFunc { func NotFound(w http.ResponseWriter, r *http.Request) {
return handleError(func(w http.ResponseWriter, req *http.Request) error { logger := log.G(r.Context())
vars := mux.Vars(req) log.Trace(logger, "404 request not found")
if len(vars) != 3 { http.Error(w, "404 request not found", http.StatusNotFound)
return strongerrors.NotFound(errors.New("not found"))
}
ctx := req.Context()
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
if queryTail := q.Get("tailLines"); queryTail != "" {
t, err := strconv.Atoi(queryTail)
if err != nil {
return strongerrors.InvalidArgument(errors.Wrap(err, "could not parse \"tailLines\""))
}
tail = t
}
podsLogs, err := p.GetContainerLogs(ctx, namespace, pod, container, tail)
if err != nil {
return errors.Wrap(err, "error getting container logs?)")
}
if _, err := io.WriteString(w, podsLogs); err != nil {
return strongerrors.Unknown(errors.Wrap(err, "error writing response to client"))
}
return nil
})
} }
// PodExecHandlerFunc makes an http handler func from a Provider which execs a command in a pod's container // NotImplemented provides a handler for cases where a provider does not implement a given API
func PodExecHandlerFunc(p Provider) http.HandlerFunc { func NotImplemented(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, req *http.Request) { logger := log.G(r.Context())
vars := mux.Vars(req) log.Trace(logger, "501 not implemented")
http.Error(w, "501 not implemented", http.StatusNotImplemented)
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",")
q := req.URL.Query()
command := q["command"]
// TODO: tty flag causes remotecommand.createStreams to wait for the wrong number of streams
streamOpts := &remotecommand.Options{
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}
idleTimeout := time.Second * 30
streamCreationTimeout := time.Second * 30
remotecommand.ServeExec(w, req, p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
}
} }

View File

@@ -57,7 +57,7 @@ type Provider interface {
OperatingSystem() string OperatingSystem() string
} }
// MetricsProvider is an optional interface that providers can implement to expose pod stats // PodMetricsProvider is an optional interface that providers can implement to expose pod stats
type MetricsProvider interface { type PodMetricsProvider interface {
GetStatsSummary(context.Context) (*stats.Summary, error) GetStatsSummary(context.Context) (*stats.Summary, error)
} }