Don't use globals for API server
Refactors how HTTP servers are started and binds them to objects that can store the provider rather than relying on a global.
This commit is contained in:
@@ -18,9 +18,6 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
|
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
|
||||||
)
|
)
|
||||||
|
|
||||||
var p Provider
|
|
||||||
var r mux.Router
|
|
||||||
|
|
||||||
func loggingContext(r *http.Request) context.Context {
|
func loggingContext(r *http.Request) context.Context {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
logger := log.G(ctx).WithFields(logrus.Fields{
|
logger := log.G(ctx).WithFields(logrus.Fields{
|
||||||
@@ -30,30 +27,31 @@ func loggingContext(r *http.Request) context.Context {
|
|||||||
return log.WithLogger(ctx, logger)
|
return log.WithLogger(ctx, logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NotFound provides a handler for cases where the requested endpoint doesn't exist
|
||||||
func NotFound(w http.ResponseWriter, r *http.Request) {
|
func NotFound(w http.ResponseWriter, r *http.Request) {
|
||||||
logger := log.G(loggingContext(r))
|
logger := log.G(loggingContext(r))
|
||||||
log.Trace(logger, "404 request not found")
|
log.Trace(logger, "404 request not found")
|
||||||
http.Error(w, "404 request not found", http.StatusNotFound)
|
http.Error(w, "404 request not found", http.StatusNotFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ApiserverStart(provider Provider, metricsAddr string) {
|
// KubeletServer implements HTTP endpoints for serving kubelet API's
|
||||||
p = provider
|
type KubeletServer struct {
|
||||||
|
p Provider
|
||||||
|
}
|
||||||
|
|
||||||
|
// KubeletServertStart starts the virtual kubelet HTTP server.
|
||||||
|
func KubeletServerStart(p Provider) {
|
||||||
certFilePath := os.Getenv("APISERVER_CERT_LOCATION")
|
certFilePath := os.Getenv("APISERVER_CERT_LOCATION")
|
||||||
keyFilePath := os.Getenv("APISERVER_KEY_LOCATION")
|
keyFilePath := os.Getenv("APISERVER_KEY_LOCATION")
|
||||||
port := os.Getenv("KUBELET_PORT")
|
port := os.Getenv("KUBELET_PORT")
|
||||||
addr := fmt.Sprintf(":%s", port)
|
addr := fmt.Sprintf(":%s", port)
|
||||||
|
|
||||||
r := mux.NewRouter()
|
r := mux.NewRouter()
|
||||||
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", ApiServerHandler).Methods("GET")
|
s := &KubeletServer{p: p}
|
||||||
r.HandleFunc("/exec/{namespace}/{pod}/{container}", ApiServerHandlerExec).Methods("POST")
|
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", s.ApiServerHandler).Methods("GET")
|
||||||
|
r.HandleFunc("/exec/{namespace}/{pod}/{container}", s.ApiServerHandlerExec).Methods("POST")
|
||||||
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
||||||
|
|
||||||
if metricsAddr != "" {
|
|
||||||
go MetricsServerStart(metricsAddr)
|
|
||||||
} else {
|
|
||||||
log.G(context.TODO()).Info("Skipping metrics server startup since no address was provided")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil {
|
if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil {
|
||||||
log.G(context.TODO()).WithError(err).Error("error setting up http server")
|
log.G(context.TODO()).WithError(err).Error("error setting up http server")
|
||||||
}
|
}
|
||||||
@@ -61,21 +59,27 @@ func ApiserverStart(provider Provider, metricsAddr string) {
|
|||||||
|
|
||||||
// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API.
|
// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API.
|
||||||
// TLS is never enabled on this endpoint.
|
// TLS is never enabled on this endpoint.
|
||||||
func MetricsServerStart(addr string) {
|
func MetricsServerStart(p Provider, addr string) {
|
||||||
r := mux.NewRouter()
|
r := mux.NewRouter()
|
||||||
r.HandleFunc("/stats/summary", MetricsSummaryHandler).Methods("GET")
|
s := &MetricsServer{p: p}
|
||||||
r.HandleFunc("/stats/summary/", MetricsSummaryHandler).Methods("GET")
|
r.HandleFunc("/stats/summary", s.MetricsSummaryHandler).Methods("GET")
|
||||||
|
r.HandleFunc("/stats/summary/", s.MetricsSummaryHandler).Methods("GET")
|
||||||
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
||||||
if err := http.ListenAndServe(addr, r); err != nil {
|
if err := http.ListenAndServe(addr, r); err != nil {
|
||||||
log.G(context.TODO()).WithError(err).Error("Error starting http server")
|
log.G(context.TODO()).WithError(err).Error("Error starting http server")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MetricsServer provides an HTTP endpopint for accessing pod metrics
|
||||||
|
type MetricsServer struct {
|
||||||
|
p Provider
|
||||||
|
}
|
||||||
|
|
||||||
// MetricsSummaryHandler is an HTTP handler for implementing the kubelet summary stats endpoint
|
// MetricsSummaryHandler is an HTTP handler for implementing the kubelet summary stats endpoint
|
||||||
func MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) {
|
func (s *MetricsServer) MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) {
|
||||||
ctx := loggingContext(req)
|
ctx := loggingContext(req)
|
||||||
|
|
||||||
mp, ok := p.(MetricsProvider)
|
mp, ok := s.p.(MetricsProvider)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.G(ctx).Debug("stats not implemented for provider")
|
log.G(ctx).Debug("stats not implemented for provider")
|
||||||
http.Error(w, "not implememnted", http.StatusNotImplemented)
|
http.Error(w, "not implememnted", http.StatusNotImplemented)
|
||||||
@@ -104,7 +108,7 @@ func MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
|
func (s *KubeletServer) ApiServerHandler(w http.ResponseWriter, req *http.Request) {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
if len(vars) != 3 {
|
if len(vars) != 3 {
|
||||||
NotFound(w, req)
|
NotFound(w, req)
|
||||||
@@ -130,7 +134,7 @@ func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
|
|||||||
tail = t
|
tail = t
|
||||||
}
|
}
|
||||||
|
|
||||||
podsLogs, err := p.GetContainerLogs(namespace, pod, container, tail)
|
podsLogs, err := s.p.GetContainerLogs(namespace, pod, container, tail)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).Error("error getting container logs")
|
log.G(ctx).WithError(err).Error("error getting container logs")
|
||||||
http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError)
|
http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError)
|
||||||
@@ -142,7 +146,7 @@ func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
|
func (s *KubeletServer) ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
|
|
||||||
namespace := vars["namespace"]
|
namespace := vars["namespace"]
|
||||||
@@ -172,5 +176,5 @@ func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
|
|||||||
idleTimeout := time.Second * 30
|
idleTimeout := time.Second * 30
|
||||||
streamCreationTimeout := time.Second * 30
|
streamCreationTimeout := time.Second * 30
|
||||||
|
|
||||||
remotecommand.ServeExec(w, req, p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
|
remotecommand.ServeExec(w, req, s.p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -115,7 +115,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
|
|||||||
Effect: vkTaintEffect,
|
Effect: vkTaintEffect,
|
||||||
}
|
}
|
||||||
|
|
||||||
p, err = lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
p, err := lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -137,7 +137,13 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
|
|||||||
return s, err
|
return s, err
|
||||||
}
|
}
|
||||||
|
|
||||||
go ApiserverStart(p, metricsAddr)
|
go KubeletServerStart(p)
|
||||||
|
|
||||||
|
if metricsAddr != "" {
|
||||||
|
go MetricsServerStart(p, metricsAddr)
|
||||||
|
} else {
|
||||||
|
log.G(ctx).Info("Skipping metrics server startup since no address was provided")
|
||||||
|
}
|
||||||
|
|
||||||
tick := time.Tick(5 * time.Second)
|
tick := time.Tick(5 * time.Second)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user