Remove intermediate API server objects

Instead just generate HTTP handler functions directly.
This commit is contained in:
Brian Goff
2018-09-17 14:47:26 -07:00
parent 1aa2f0ccce
commit 8eb6ab4bcd

View File

@@ -34,9 +34,11 @@ func NotFound(w http.ResponseWriter, r *http.Request) {
http.Error(w, "404 request not found", http.StatusNotFound)
}
// KubeletServer implements HTTP endpoints for serving kubelet API's
type KubeletServer struct {
p Provider
// NotImplemented provides a handler for cases where a provider does not implement a given API
func NotImplemented(w http.ResponseWriter, r *http.Request) {
logger := log.G(loggingContext(r))
log.Trace(logger, "501 not implemented")
http.Error(w, "501 not implemented", http.StatusNotImplemented)
}
// KubeletServertStart starts the virtual kubelet HTTP server.
@@ -47,9 +49,8 @@ func KubeletServerStart(p Provider) {
addr := fmt.Sprintf(":%s", port)
r := mux.NewRouter()
s := &KubeletServer{p: p}
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", s.ApiServerHandler).Methods("GET")
r.HandleFunc("/exec/{namespace}/{pod}/{container}", s.ApiServerHandlerExec).Methods("POST")
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", PodLogsHandlerFunc(p)).Methods("GET")
r.HandleFunc("/exec/{namespace}/{pod}/{container}", PodExecHandlerFunc(p)).Methods("POST")
r.NotFoundHandler = http.HandlerFunc(NotFound)
if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil {
@@ -61,120 +62,115 @@ func KubeletServerStart(p Provider) {
// TLS is never enabled on this endpoint.
func MetricsServerStart(p Provider, addr string) {
r := mux.NewRouter()
s := &MetricsServer{p: p}
r.HandleFunc("/stats/summary", s.MetricsSummaryHandler).Methods("GET")
r.HandleFunc("/stats/summary/", s.MetricsSummaryHandler).Methods("GET")
mp, ok := p.(MetricsProvider)
if !ok {
r.HandleFunc("/stats/summary", NotImplemented).Methods("GET")
r.HandleFunc("/stats/summary/", NotImplemented).Methods("GET")
} else {
r.HandleFunc("/stats/summary", PodMetricsHandlerFunc(mp)).Methods("GET")
r.HandleFunc("/stats/summary/", PodMetricsHandlerFunc(mp)).Methods("GET")
}
r.NotFoundHandler = http.HandlerFunc(NotFound)
if err := http.ListenAndServe(addr, r); err != nil {
log.G(context.TODO()).WithError(err).Error("Error starting http server")
}
}
// MetricsServer provides an HTTP endpopint for accessing pod metrics
type MetricsServer struct {
p Provider
}
// PodMetricsHandlerFunc makes an HTTP handler for implementing the kubelet summary stats endpoint
func PodMetricsHandlerFunc(mp MetricsProvider) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
ctx := loggingContext(req)
// MetricsSummaryHandler is an HTTP handler for implementing the kubelet summary stats endpoint
func (s *MetricsServer) MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) {
ctx := loggingContext(req)
mp, ok := s.p.(MetricsProvider)
if !ok {
log.G(ctx).Debug("stats not implemented for provider")
http.Error(w, "not implememnted", http.StatusNotImplemented)
return
}
stats, err := mp.GetStatsSummary(req.Context())
if err != nil {
if errors.Cause(err) == context.Canceled {
return
}
log.G(ctx).Error("Error getting stats from provider:", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
b, err := json.Marshal(stats)
if err != nil {
log.G(ctx).WithError(err).Error("Could not marshal stats")
http.Error(w, "could not marshal stats: "+err.Error(), http.StatusInternalServerError)
return
}
if _, err := w.Write(b); err != nil {
log.G(ctx).WithError(err).Debug("Could not write to client")
}
}
func (s *KubeletServer) ApiServerHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
if len(vars) != 3 {
NotFound(w, req)
return
}
ctx := loggingContext(req)
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
if queryTail := q.Get("tailLines"); queryTail != "" {
t, err := strconv.Atoi(queryTail)
stats, err := mp.GetStatsSummary(req.Context())
if err != nil {
logger := log.G(context.TODO()).WithError(err)
log.Trace(logger, "could not parse tailLines")
http.Error(w, fmt.Sprintf("could not parse \"tailLines\": %v", err), http.StatusBadRequest)
if errors.Cause(err) == context.Canceled {
return
}
log.G(ctx).Error("Error getting stats from provider:", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
tail = t
}
podsLogs, err := s.p.GetContainerLogs(ctx, namespace, pod, container, tail)
if err != nil {
log.G(ctx).WithError(err).Error("error getting container logs")
http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError)
return
}
b, err := json.Marshal(stats)
if err != nil {
log.G(ctx).WithError(err).Error("Could not marshal stats")
http.Error(w, "could not marshal stats: "+err.Error(), http.StatusInternalServerError)
return
}
if _, err := io.WriteString(w, podsLogs); err != nil {
log.G(ctx).WithError(err).Warn("error writing response to client")
if _, err := w.Write(b); err != nil {
log.G(ctx).WithError(err).Debug("Could not write to client")
}
}
}
func (s *KubeletServer) ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
// PodLogsHandlerFunc creates an http handler function from a provider to serve logs from a pod
func PodLogsHandlerFunc(p Provider) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
if len(vars) != 3 {
NotFound(w, req)
return
}
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
ctx := loggingContext(req)
supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",")
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
q := req.URL.Query()
command := q["command"]
if queryTail := q.Get("tailLines"); queryTail != "" {
t, err := strconv.Atoi(queryTail)
if err != nil {
logger := log.G(context.TODO()).WithError(err)
log.Trace(logger, "could not parse tailLines")
http.Error(w, fmt.Sprintf("could not parse \"tailLines\": %v", err), http.StatusBadRequest)
return
}
tail = t
}
// streamOpts := &remotecommand.Options{
// Stdin: (q.Get("input") == "1"),
// Stdout: (q.Get("output") == "1"),
// Stderr: (q.Get("error") == "1"),
// TTY: (q.Get("tty") == "1"),
// }
podsLogs, err := p.GetContainerLogs(ctx, namespace, pod, container, tail)
if err != nil {
log.G(ctx).WithError(err).Error("error getting container logs")
http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError)
return
}
// TODO: tty flag causes remotecommand.createStreams to wait for the wrong number of streams
streamOpts := &remotecommand.Options{
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
if _, err := io.WriteString(w, podsLogs); err != nil {
log.G(ctx).WithError(err).Warn("error writing response to client")
}
}
}
// PodExecHandlerFunc makes an http handler func from a Provider which execs a command in a pod's container
func PodExecHandlerFunc(p Provider) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",")
q := req.URL.Query()
command := q["command"]
// TODO: tty flag causes remotecommand.createStreams to wait for the wrong number of streams
streamOpts := &remotecommand.Options{
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}
idleTimeout := time.Second * 30
streamCreationTimeout := time.Second * 30
remotecommand.ServeExec(w, req, p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
}
idleTimeout := time.Second * 30
streamCreationTimeout := time.Second * 30
remotecommand.ServeExec(w, req, s.p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
}