Add webhook and anon auth support
Auth is not automatically enabled because this requires some bootstrapping to work. I'll leave this for some future work. In the meantime people can use the current code similar to how they used the node-cli code to inject their own auth.
This commit is contained in:
@@ -8,19 +8,19 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
)
|
||||
|
||||
@@ -42,9 +42,11 @@ type Node struct {
|
||||
scmInformerFactory informers.SharedInformerFactory
|
||||
client kubernetes.Interface
|
||||
|
||||
listenAddr string
|
||||
httpHandler HTTPHandler
|
||||
tlsConfig *tls.Config
|
||||
listenAddr string
|
||||
h http.Handler
|
||||
tlsConfig *tls.Config
|
||||
|
||||
workers int
|
||||
|
||||
eb record.EventBroadcaster
|
||||
}
|
||||
@@ -59,8 +61,35 @@ func (n *Node) PodController() *node.PodController {
|
||||
return n.pc
|
||||
}
|
||||
|
||||
func (n *Node) runHTTP(ctx context.Context) (func(), error) {
|
||||
if n.tlsConfig == nil {
|
||||
log.G(ctx).Warn("TLS config not provided, not starting up http service")
|
||||
return func() {}, nil
|
||||
}
|
||||
if n.h == nil {
|
||||
log.G(ctx).Debug("No http handler, not starting up http service")
|
||||
return func() {}, nil
|
||||
}
|
||||
|
||||
l, err := tls.Listen("tcp", n.listenAddr, n.tlsConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error starting http listener")
|
||||
}
|
||||
|
||||
log.G(ctx).Debug("Started TLS listener")
|
||||
|
||||
srv := &http.Server{Handler: n.h, TLSConfig: n.tlsConfig}
|
||||
go srv.Serve(l) //nolint:errcheck
|
||||
log.G(ctx).Debug("HTTP server running")
|
||||
|
||||
return func() {
|
||||
srv.Close()
|
||||
l.Close()
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run starts all the underlying controllers
|
||||
func (n *Node) Run(ctx context.Context, workers int) (retErr error) {
|
||||
func (n *Node) Run(ctx context.Context) (retErr error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
cancel()
|
||||
@@ -69,31 +98,22 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) {
|
||||
close(n.done)
|
||||
}()
|
||||
|
||||
if n.podInformerFactory != nil {
|
||||
go n.podInformerFactory.Start(ctx.Done())
|
||||
}
|
||||
if n.scmInformerFactory != nil {
|
||||
go n.scmInformerFactory.Start(ctx.Done())
|
||||
}
|
||||
|
||||
if n.eb != nil {
|
||||
n.eb.StartLogging(log.G(ctx).Infof)
|
||||
n.eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: n.client.CoreV1().Events(v1.NamespaceAll)})
|
||||
defer n.eb.Shutdown()
|
||||
log.G(ctx).Debug("Started event broadcaster")
|
||||
}
|
||||
|
||||
l, err := tls.Listen("tcp", n.listenAddr, n.tlsConfig)
|
||||
cancelHTTP, err := n.runHTTP(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error starting http listener")
|
||||
return err
|
||||
}
|
||||
log.G(ctx).Debug("Started TLS listener")
|
||||
defer l.Close()
|
||||
defer cancelHTTP()
|
||||
|
||||
srv := &http.Server{Handler: n.httpHandler, TLSConfig: n.tlsConfig}
|
||||
go srv.Serve(l)
|
||||
defer srv.Close()
|
||||
|
||||
go n.pc.Run(ctx, workers) //nolint:errcheck
|
||||
log.G(ctx).Debug("HTTP server running")
|
||||
go n.podInformerFactory.Start(ctx.Done())
|
||||
go n.scmInformerFactory.Start(ctx.Done())
|
||||
go n.pc.Run(ctx, n.workers) //nolint:errcheck
|
||||
|
||||
defer func() {
|
||||
cancel()
|
||||
@@ -110,7 +130,7 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) {
|
||||
|
||||
log.G(ctx).Debug("pod controller ready")
|
||||
|
||||
go n.nc.Run(ctx) // nolint:errcheck
|
||||
go n.nc.Run(ctx) //nolint:errcheck
|
||||
|
||||
defer func() {
|
||||
cancel()
|
||||
@@ -193,6 +213,9 @@ type NodeOpt func(c *NodeConfig) error
|
||||
// NodeConfig is used to hold configuration items for a Node.
|
||||
// It gets used in conjection with NodeOpt in NewNodeFromClient
|
||||
type NodeConfig struct {
|
||||
// Set the client to use, otherwise a client will be created from ClientsetFromEnv
|
||||
Client kubernetes.Interface
|
||||
|
||||
// Set the node spec to register with Kubernetes
|
||||
NodeSpec v1.Node
|
||||
// Set the path to read a kubeconfig from for creating a client.
|
||||
@@ -206,8 +229,9 @@ type NodeConfig struct {
|
||||
// Set a custom API handler to use.
|
||||
// You can use this to setup, for example, authentication middleware.
|
||||
// If one is not provided a default one will be created.
|
||||
// Pod routes will be attached to this handler when creating the node
|
||||
HTTPHandler HTTPHandler
|
||||
//
|
||||
// Note: If you provide your own handler, you'll need to handle all auth, routes, etc.
|
||||
Handler http.Handler
|
||||
// Set the timeout for idle http streams
|
||||
StreamIdleTimeout time.Duration
|
||||
// Set the timeout for creating http streams
|
||||
@@ -220,11 +244,12 @@ type NodeConfig struct {
|
||||
// Specify the event recorder to use
|
||||
// If this is not provided, a default one will be used.
|
||||
EventRecorder record.EventRecorder
|
||||
}
|
||||
|
||||
type HTTPHandler interface {
|
||||
api.ServeMux
|
||||
http.Handler
|
||||
// Set the number of workers to reconcile pods
|
||||
// The default value is derived from the number of cores available.
|
||||
NumWorkers int
|
||||
|
||||
routeAttacher func(Provider, NodeConfig, corev1listers.PodLister)
|
||||
}
|
||||
|
||||
// WithNodeConfig returns a NodeOpt which replaces the NodeConfig with the passed in value.
|
||||
@@ -235,9 +260,12 @@ func WithNodeConfig(c NodeConfig) NodeOpt {
|
||||
}
|
||||
}
|
||||
|
||||
// NewNode calls NewNodeFromClient with a nil client
|
||||
func NewNode(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) {
|
||||
return NewNodeFromClient(nil, name, newProvider, opts...)
|
||||
// WithClient return a NodeOpt that sets the client that will be used to create/manage the node.
|
||||
func WithClient(c kubernetes.Interface) NodeOpt {
|
||||
return func(cfg *NodeConfig) error {
|
||||
cfg.Client = c
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewNodeFromClient creates a new node using the provided client and name.
|
||||
@@ -247,9 +275,11 @@ func NewNode(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node,
|
||||
// Some basic values are set for node status, you'll almost certainly want to modify it.
|
||||
//
|
||||
// If client is nil, this will construct a client using ClientsetFromEnv
|
||||
func NewNodeFromClient(client kubernetes.Interface, name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) {
|
||||
//
|
||||
// It is up to the caller to configure auth on the HTTP handler.
|
||||
func NewNodeFromClient(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) {
|
||||
cfg := NodeConfig{
|
||||
// TODO: this is what was set in the cli code... its not clear what a good value actually is.
|
||||
NumWorkers: runtime.NumCPU(),
|
||||
InformerResyncPeriod: time.Minute,
|
||||
KubeconfigPath: os.Getenv("KUBECONFIG"),
|
||||
HTTPListenAddr: ":10250",
|
||||
@@ -285,10 +315,7 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New
|
||||
return nil, errors.Wrap(err, "error parsing http listen address")
|
||||
}
|
||||
|
||||
if cfg.HTTPHandler == nil {
|
||||
cfg.HTTPHandler = http.NewServeMux()
|
||||
}
|
||||
|
||||
client := cfg.Client
|
||||
if client == nil {
|
||||
var err error
|
||||
client, err = ClientsetFromEnv(cfg.KubeconfigPath)
|
||||
@@ -324,17 +351,9 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New
|
||||
return nil, errors.Wrap(err, "error creating provider")
|
||||
}
|
||||
|
||||
api.AttachPodRoutes(api.PodHandlerConfig{
|
||||
RunInContainer: p.RunInContainer,
|
||||
GetContainerLogs: p.GetContainerLogs,
|
||||
GetPods: p.GetPods,
|
||||
GetPodsFromKubernetes: func(context.Context) ([]*v1.Pod, error) {
|
||||
return podInformer.Lister().List(labels.Everything())
|
||||
},
|
||||
GetStatsSummary: p.GetStatsSummary,
|
||||
StreamIdleTimeout: cfg.StreamIdleTimeout,
|
||||
StreamCreationTimeout: cfg.StreamCreationTimeout,
|
||||
}, cfg.HTTPHandler, cfg.DebugHTTP)
|
||||
if cfg.routeAttacher != nil {
|
||||
cfg.routeAttacher(p, cfg, podInformer.Lister())
|
||||
}
|
||||
|
||||
var readyCb func(context.Context) error
|
||||
if np == nil {
|
||||
@@ -360,7 +379,7 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New
|
||||
|
||||
var eb record.EventBroadcaster
|
||||
if cfg.EventRecorder == nil {
|
||||
eb := record.NewBroadcaster()
|
||||
eb = record.NewBroadcaster()
|
||||
cfg.EventRecorder = eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(name, "pod-controller")})
|
||||
}
|
||||
|
||||
@@ -388,8 +407,9 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New
|
||||
scmInformerFactory: scmInformerFactory,
|
||||
client: client,
|
||||
tlsConfig: cfg.TLSConfig,
|
||||
httpHandler: cfg.HTTPHandler,
|
||||
h: cfg.Handler,
|
||||
listenAddr: cfg.HTTPListenAddr,
|
||||
workers: cfg.NumWorkers,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user