Refactor http server stuff (#466)
* Don't start things in New * Move http server handling up to daemon. This removes the burdern of dealing with listeners, http servers, etc in the core framework. Instead provide helpers to attach the appropriate routes to the caller's serve mux. With this change, the vkubelet package only helps callers setup HTTP rather than forcing a specific HTTP config on them.
This commit is contained in:
131
cmd/http.go
Normal file
131
cmd/http.go
Normal file
@@ -0,0 +1,131 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/cpuguy83/strongerrors"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/vkubelet"
|
||||
)
|
||||
|
||||
// AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided
|
||||
// Note this list should be a moving target.
|
||||
var AcceptedCiphers = []uint16{
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
|
||||
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||
}
|
||||
|
||||
func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) {
|
||||
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error loading tls certs")
|
||||
}
|
||||
|
||||
return &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
MinVersion: tls.VersionTLS12,
|
||||
PreferServerCipherSuites: true,
|
||||
CipherSuites: AcceptedCiphers,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func setupHTTPServer(ctx context.Context, cfg *apiServerConfig) (io.Closer, io.Closer, error) {
|
||||
var (
|
||||
podS *http.Server
|
||||
metricsS *http.Server
|
||||
)
|
||||
if cfg.CertPath == "" || cfg.KeyPath == "" {
|
||||
log.G(ctx).
|
||||
WithField("certPath", cfg.CertPath).
|
||||
WithField("keyPath", cfg.KeyPath).
|
||||
Error("TLS certificates not provided, not setting up pod http server")
|
||||
} else {
|
||||
tlsCfg, err := loadTLSConfig(cfg.CertPath, cfg.KeyPath)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
l, err := tls.Listen("tcp", cfg.Addr, tlsCfg)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "error setting up listener for pod http server")
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
vkubelet.AttachPodRoutes(p, mux)
|
||||
|
||||
podS = &http.Server{
|
||||
Handler: mux,
|
||||
TLSConfig: tlsCfg,
|
||||
}
|
||||
go serveHTTP(ctx, podS, l, "pods")
|
||||
}
|
||||
|
||||
if cfg.MetricsAddr == "" {
|
||||
log.G(ctx).Info("Pod metrics server not setup due to empty metrics address")
|
||||
} else {
|
||||
l, err := net.Listen("tcp", cfg.MetricsAddr)
|
||||
if err != nil {
|
||||
if l != nil {
|
||||
podS.Close()
|
||||
}
|
||||
return nil, nil, errors.Wrap(err, "could not setup listenr for pod metrics http server")
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
vkubelet.AttachMetricsRoutes(p, mux)
|
||||
metricsS = &http.Server{
|
||||
Handler: mux,
|
||||
}
|
||||
go serveHTTP(ctx, metricsS, l, "pod metrics")
|
||||
}
|
||||
|
||||
return podS, metricsS, nil
|
||||
}
|
||||
|
||||
func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string) {
|
||||
if err := s.Serve(l); err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
default:
|
||||
log.G(ctx).WithError(err).Errorf("Error setting up %s http server", name)
|
||||
}
|
||||
}
|
||||
l.Close()
|
||||
}
|
||||
|
||||
type apiServerConfig struct {
|
||||
CertPath string
|
||||
KeyPath string
|
||||
Addr string
|
||||
MetricsAddr string
|
||||
}
|
||||
|
||||
func getAPIConfig(metricsAddr string) (*apiServerConfig, error) {
|
||||
config := apiServerConfig{
|
||||
CertPath: os.Getenv("APISERVER_CERT_LOCATION"),
|
||||
KeyPath: os.Getenv("APISERVER_KEY_LOCATION"),
|
||||
}
|
||||
|
||||
port, err := strconv.Atoi(os.Getenv("KUBELET_PORT"))
|
||||
if err != nil {
|
||||
return nil, strongerrors.InvalidArgument(errors.Wrap(err, "error parsing KUBELET_PORT variable"))
|
||||
}
|
||||
config.Addr = fmt.Sprintf(":%d", port)
|
||||
config.MetricsAddr = metricsAddr
|
||||
|
||||
return &config, nil
|
||||
}
|
||||
39
cmd/root.go
39
cmd/root.go
@@ -26,7 +26,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/cpuguy83/strongerrors"
|
||||
"github.com/mitchellh/go-homedir"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/cobra"
|
||||
@@ -69,7 +68,7 @@ var taint *corev1.Taint
|
||||
var k8sClient *kubernetes.Clientset
|
||||
var p providers.Provider
|
||||
var rm *manager.ResourceManager
|
||||
var apiConfig vkubelet.APIConfig
|
||||
var apiConfig *apiServerConfig
|
||||
var podInformer corev1informers.PodInformer
|
||||
var kubeSharedInformerFactoryResync time.Duration
|
||||
var podSyncWorkers int
|
||||
@@ -91,21 +90,16 @@ This allows users to schedule kubernetes workloads on nodes that aren't running
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
defer rootContextCancel()
|
||||
|
||||
f, err := vkubelet.New(rootContext, vkubelet.Config{
|
||||
vk := vkubelet.New(vkubelet.Config{
|
||||
Client: k8sClient,
|
||||
Namespace: kubeNamespace,
|
||||
NodeName: nodeName,
|
||||
Taint: taint,
|
||||
MetricsAddr: metricsAddr,
|
||||
Provider: p,
|
||||
ResourceManager: rm,
|
||||
APIConfig: apiConfig,
|
||||
PodSyncWorkers: podSyncWorkers,
|
||||
PodInformer: podInformer,
|
||||
})
|
||||
if err != nil {
|
||||
log.L.WithError(err).Fatal("Error initializing virtual kubelet")
|
||||
}
|
||||
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
|
||||
@@ -114,8 +108,16 @@ This allows users to schedule kubernetes workloads on nodes that aren't running
|
||||
rootContextCancel()
|
||||
}()
|
||||
|
||||
if err := f.Run(rootContext); err != nil && errors.Cause(err) != context.Canceled {
|
||||
log.L.Fatal(err)
|
||||
c1, c2, err := setupHTTPServer(rootContext, apiConfig)
|
||||
if err != nil {
|
||||
log.G(rootContext).Fatal(err)
|
||||
}
|
||||
|
||||
defer c1.Close()
|
||||
defer c2.Close()
|
||||
|
||||
if err := vk.Run(rootContext); err != nil && errors.Cause(err) != context.Canceled {
|
||||
log.G(rootContext).Fatal(err)
|
||||
}
|
||||
},
|
||||
}
|
||||
@@ -313,7 +315,7 @@ func initConfig() {
|
||||
logger.WithError(err).Fatal("Error initializing provider")
|
||||
}
|
||||
|
||||
apiConfig, err = getAPIConfig()
|
||||
apiConfig, err = getAPIConfig(metricsAddr)
|
||||
if err != nil {
|
||||
logger.WithError(err).Fatal("Error reading API config")
|
||||
}
|
||||
@@ -369,18 +371,3 @@ func initConfig() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getAPIConfig() (vkubelet.APIConfig, error) {
|
||||
config := vkubelet.APIConfig{
|
||||
CertPath: os.Getenv("APISERVER_CERT_LOCATION"),
|
||||
KeyPath: os.Getenv("APISERVER_KEY_LOCATION"),
|
||||
}
|
||||
|
||||
port, err := strconv.Atoi(os.Getenv("KUBELET_PORT"))
|
||||
if err != nil {
|
||||
return vkubelet.APIConfig{}, strongerrors.InvalidArgument(errors.Wrap(err, "error parsing KUBELET_PORT variable"))
|
||||
}
|
||||
config.Addr = fmt.Sprintf(":%d", port)
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package vkubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
@@ -14,9 +12,19 @@ import (
|
||||
"go.opencensus.io/plugin/ochttp/propagation/b3"
|
||||
)
|
||||
|
||||
// ServeMux defines an interface used to attach routes to an existing http
|
||||
// serve mux.
|
||||
// It is used to enable callers creating a new server to completely manage
|
||||
// their own HTTP server while allowing us to attach the required routes to
|
||||
// satisfy the Kubelet HTTP interfaces.
|
||||
type ServeMux interface {
|
||||
Handle(path string, h http.Handler)
|
||||
}
|
||||
|
||||
// PodHandler creates an http handler for interacting with pods/containers.
|
||||
func PodHandler(p providers.Provider) http.Handler {
|
||||
r := mux.NewRouter()
|
||||
|
||||
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", api.PodLogsHandlerFunc(p)).Methods("GET")
|
||||
r.HandleFunc("/exec/{namespace}/{pod}/{container}", api.PodExecHandlerFunc(p)).Methods("POST")
|
||||
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
||||
@@ -47,18 +55,20 @@ func MetricsSummaryHandler(p providers.Provider) http.Handler {
|
||||
return r
|
||||
}
|
||||
|
||||
// KubeletServerStart starts the virtual kubelet HTTP server.
|
||||
func KubeletServerStart(p providers.Provider, l net.Listener, cert, key string) {
|
||||
if err := http.ServeTLS(l, InstrumentHandler(PodHandler(p)), cert, key); err != nil {
|
||||
log.G(context.TODO()).WithError(err).Error("error setting up http server")
|
||||
}
|
||||
// AttachPodRoutes adds the http routes for pod stuff to the passed in serve mux.
|
||||
//
|
||||
// Callers should take care to namespace the serve mux as they see fit, however
|
||||
// these routes get called by the Kubernetes API server.
|
||||
func AttachPodRoutes(p providers.Provider, mux ServeMux) {
|
||||
mux.Handle("/", InstrumentHandler(PodHandler(p)))
|
||||
}
|
||||
|
||||
// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API.
|
||||
func MetricsServerStart(p providers.Provider, l net.Listener) {
|
||||
if err := http.Serve(l, InstrumentHandler(MetricsSummaryHandler(p))); err != nil {
|
||||
log.G(context.TODO()).WithError(err).Error("Error starting http server")
|
||||
}
|
||||
// AttachMetricsRoutes adds the http routes for pod/node metrics to the passed in serve mux.
|
||||
//
|
||||
// Callers should take care to namespace the serve mux as they see fit, however
|
||||
// these routes get called by the Kubernetes API server.
|
||||
func AttachMetricsRoutes(p providers.Provider, mux ServeMux) {
|
||||
mux.Handle("/", InstrumentHandler(MetricsSummaryHandler(p)))
|
||||
}
|
||||
|
||||
func instrumentRequest(r *http.Request) *http.Request {
|
||||
|
||||
@@ -2,16 +2,13 @@ package vkubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
pkgerrors "github.com/pkg/errors"
|
||||
"go.opencensus.io/trace"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
corev1informers "k8s.io/client-go/informers/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
)
|
||||
@@ -34,9 +31,7 @@ type Server struct {
|
||||
|
||||
// Config is used to configure a new server.
|
||||
type Config struct {
|
||||
APIConfig APIConfig
|
||||
Client *kubernetes.Clientset
|
||||
MetricsAddr string
|
||||
Namespace string
|
||||
NodeName string
|
||||
Provider providers.Provider
|
||||
@@ -46,16 +41,13 @@ type Config struct {
|
||||
PodInformer corev1informers.PodInformer
|
||||
}
|
||||
|
||||
// APIConfig is used to configure the API server of the virtual kubelet.
|
||||
type APIConfig struct {
|
||||
CertPath string
|
||||
KeyPath string
|
||||
Addr string
|
||||
}
|
||||
|
||||
// New creates a new virtual-kubelet server.
|
||||
func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
|
||||
s = &Server{
|
||||
// This is the entrypoint to this package.
|
||||
//
|
||||
// This creates but does not start the server.
|
||||
// You must call `Run` on the returned object to start the server.
|
||||
func New(cfg Config) *Server {
|
||||
return &Server{
|
||||
namespace: cfg.Namespace,
|
||||
nodeName: cfg.NodeName,
|
||||
taint: cfg.Taint,
|
||||
@@ -65,61 +57,43 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
|
||||
podSyncWorkers: cfg.PodSyncWorkers,
|
||||
podInformer: cfg.PodInformer,
|
||||
}
|
||||
|
||||
ctx = log.WithLogger(ctx, log.G(ctx))
|
||||
|
||||
apiL, err := net.Listen("tcp", cfg.APIConfig.Addr)
|
||||
if err != nil {
|
||||
return nil, pkgerrors.Wrap(err, "error setting up API listener")
|
||||
}
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
apiL.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath)
|
||||
|
||||
if cfg.MetricsAddr != "" {
|
||||
metricsL, err := net.Listen("tcp", cfg.MetricsAddr)
|
||||
if err != nil {
|
||||
return nil, pkgerrors.Wrap(err, "error setting up metrics listener")
|
||||
}
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
metricsL.Close()
|
||||
}
|
||||
}()
|
||||
go MetricsServerStart(cfg.Provider, metricsL)
|
||||
} else {
|
||||
log.G(ctx).Info("Skipping metrics server startup since no address was provided")
|
||||
}
|
||||
|
||||
if err := s.registerNode(ctx); err != nil {
|
||||
return s, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
tick := time.NewTicker(5 * time.Second)
|
||||
defer tick.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-tick.C:
|
||||
ctx, span := trace.StartSpan(ctx, "syncActualState")
|
||||
s.updateNode(ctx)
|
||||
s.updatePodStatuses(ctx)
|
||||
span.End()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Run creates and starts an instance of the pod controller, blocking until it stops.
|
||||
//
|
||||
// Note that this does not setup the HTTP routes that are used to expose pod
|
||||
// info to the Kubernetes API Server, such as logs, metrics, exec, etc.
|
||||
// See `AttachPodRoutes` and `AttachMetricsRoutes` to set these up.
|
||||
func (s *Server) Run(ctx context.Context) error {
|
||||
if err := s.registerNode(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go s.providerSyncLoop(ctx)
|
||||
|
||||
return NewPodController(s).Run(ctx, s.podSyncWorkers)
|
||||
}
|
||||
|
||||
// providerSyncLoop syncronizes pod states from the provider back to kubernetes
|
||||
func (s *Server) providerSyncLoop(ctx context.Context) {
|
||||
// TODO(@cpuguy83): Ticker does not seem like the right thing to use here. A
|
||||
// ticker keeps ticking while we are updating state, which can be a long
|
||||
// operation. This would lead to just a constant re-sync rather than sleeping
|
||||
// for 5 seconds between each loop.
|
||||
//
|
||||
// Leaving this note here as fixing is out of scope for my current changeset.
|
||||
tick := time.NewTicker(5 * time.Second)
|
||||
defer tick.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-tick.C:
|
||||
ctx, span := trace.StartSpan(ctx, "syncActualState")
|
||||
s.updateNode(ctx)
|
||||
s.updatePodStatuses(ctx)
|
||||
span.End()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user