Merge pull request #325 from cpuguy83/refactor_api_init
Don't use globals for API server
This commit is contained in:
@@ -18,9 +18,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
|
||||
)
|
||||
|
||||
var p Provider
|
||||
var r mux.Router
|
||||
|
||||
func loggingContext(r *http.Request) context.Context {
|
||||
ctx := r.Context()
|
||||
logger := log.G(ctx).WithFields(logrus.Fields{
|
||||
@@ -30,30 +27,31 @@ func loggingContext(r *http.Request) context.Context {
|
||||
return log.WithLogger(ctx, logger)
|
||||
}
|
||||
|
||||
// NotFound provides a handler for cases where the requested endpoint doesn't exist
|
||||
func NotFound(w http.ResponseWriter, r *http.Request) {
|
||||
logger := log.G(loggingContext(r))
|
||||
log.Trace(logger, "404 request not found")
|
||||
http.Error(w, "404 request not found", http.StatusNotFound)
|
||||
}
|
||||
|
||||
func ApiserverStart(provider Provider, metricsAddr string) {
|
||||
p = provider
|
||||
// KubeletServer implements HTTP endpoints for serving kubelet API's
|
||||
type KubeletServer struct {
|
||||
p Provider
|
||||
}
|
||||
|
||||
// KubeletServertStart starts the virtual kubelet HTTP server.
|
||||
func KubeletServerStart(p Provider) {
|
||||
certFilePath := os.Getenv("APISERVER_CERT_LOCATION")
|
||||
keyFilePath := os.Getenv("APISERVER_KEY_LOCATION")
|
||||
port := os.Getenv("KUBELET_PORT")
|
||||
addr := fmt.Sprintf(":%s", port)
|
||||
|
||||
r := mux.NewRouter()
|
||||
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", ApiServerHandler).Methods("GET")
|
||||
r.HandleFunc("/exec/{namespace}/{pod}/{container}", ApiServerHandlerExec).Methods("POST")
|
||||
s := &KubeletServer{p: p}
|
||||
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", s.ApiServerHandler).Methods("GET")
|
||||
r.HandleFunc("/exec/{namespace}/{pod}/{container}", s.ApiServerHandlerExec).Methods("POST")
|
||||
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
||||
|
||||
if metricsAddr != "" {
|
||||
go MetricsServerStart(metricsAddr)
|
||||
} else {
|
||||
log.G(context.TODO()).Info("Skipping metrics server startup since no address was provided")
|
||||
}
|
||||
|
||||
if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil {
|
||||
log.G(context.TODO()).WithError(err).Error("error setting up http server")
|
||||
}
|
||||
@@ -61,21 +59,27 @@ func ApiserverStart(provider Provider, metricsAddr string) {
|
||||
|
||||
// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API.
|
||||
// TLS is never enabled on this endpoint.
|
||||
func MetricsServerStart(addr string) {
|
||||
func MetricsServerStart(p Provider, addr string) {
|
||||
r := mux.NewRouter()
|
||||
r.HandleFunc("/stats/summary", MetricsSummaryHandler).Methods("GET")
|
||||
r.HandleFunc("/stats/summary/", MetricsSummaryHandler).Methods("GET")
|
||||
s := &MetricsServer{p: p}
|
||||
r.HandleFunc("/stats/summary", s.MetricsSummaryHandler).Methods("GET")
|
||||
r.HandleFunc("/stats/summary/", s.MetricsSummaryHandler).Methods("GET")
|
||||
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
||||
if err := http.ListenAndServe(addr, r); err != nil {
|
||||
log.G(context.TODO()).WithError(err).Error("Error starting http server")
|
||||
}
|
||||
}
|
||||
|
||||
// MetricsServer provides an HTTP endpopint for accessing pod metrics
|
||||
type MetricsServer struct {
|
||||
p Provider
|
||||
}
|
||||
|
||||
// MetricsSummaryHandler is an HTTP handler for implementing the kubelet summary stats endpoint
|
||||
func MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) {
|
||||
func (s *MetricsServer) MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) {
|
||||
ctx := loggingContext(req)
|
||||
|
||||
mp, ok := p.(MetricsProvider)
|
||||
mp, ok := s.p.(MetricsProvider)
|
||||
if !ok {
|
||||
log.G(ctx).Debug("stats not implemented for provider")
|
||||
http.Error(w, "not implememnted", http.StatusNotImplemented)
|
||||
@@ -104,7 +108,7 @@ func MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
|
||||
func (s *KubeletServer) ApiServerHandler(w http.ResponseWriter, req *http.Request) {
|
||||
vars := mux.Vars(req)
|
||||
if len(vars) != 3 {
|
||||
NotFound(w, req)
|
||||
@@ -130,7 +134,7 @@ func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
|
||||
tail = t
|
||||
}
|
||||
|
||||
podsLogs, err := p.GetContainerLogs(namespace, pod, container, tail)
|
||||
podsLogs, err := s.p.GetContainerLogs(namespace, pod, container, tail)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("error getting container logs")
|
||||
http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError)
|
||||
@@ -142,7 +146,7 @@ func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
|
||||
func (s *KubeletServer) ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
|
||||
vars := mux.Vars(req)
|
||||
|
||||
namespace := vars["namespace"]
|
||||
@@ -172,5 +176,5 @@ func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
|
||||
idleTimeout := time.Second * 30
|
||||
streamCreationTimeout := time.Second * 30
|
||||
|
||||
remotecommand.ServeExec(w, req, p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
|
||||
remotecommand.ServeExec(w, req, s.p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
|
||||
}
|
||||
|
||||
@@ -115,7 +115,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
|
||||
Effect: vkTaintEffect,
|
||||
}
|
||||
|
||||
p, err = lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
p, err := lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -137,7 +137,13 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
|
||||
return s, err
|
||||
}
|
||||
|
||||
go ApiserverStart(p, metricsAddr)
|
||||
go KubeletServerStart(p)
|
||||
|
||||
if metricsAddr != "" {
|
||||
go MetricsServerStart(p, metricsAddr)
|
||||
} else {
|
||||
log.G(ctx).Info("Skipping metrics server startup since no address was provided")
|
||||
}
|
||||
|
||||
tick := time.Tick(5 * time.Second)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user