From a9a0ee50cf7eebb8ce2d1a9e1f42f164500f5da9 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Thu, 3 Jun 2021 16:26:36 +0000 Subject: [PATCH 1/5] Remove create-after-delete node e2e tst This test is only testing the sepcific implementation details of the mock CLI provided in this repo. The behavior is not inherent in the vk lib. --- test/e2e/node.go | 61 ------------------------------------------------ 1 file changed, 61 deletions(-) delete mode 100644 test/e2e/node.go diff --git a/test/e2e/node.go b/test/e2e/node.go deleted file mode 100644 index 61bd3aa84..000000000 --- a/test/e2e/node.go +++ /dev/null @@ -1,61 +0,0 @@ -package e2e - -import ( - "context" - "testing" - "time" - - "gotest.tools/assert" - is "gotest.tools/assert/cmp" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - watchapi "k8s.io/apimachinery/pkg/watch" -) - -// TestNodeCreateAfterDelete makes sure that a node is automatically recreated -// if it is deleted while VK is running. -func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - podList, err := f.KubeClient.CoreV1().Pods(f.Namespace).List(ctx, metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", f.NodeName).String(), - }) - - assert.NilError(t, err) - assert.Assert(t, is.Len(podList.Items, 0), "Kubernetes does not allow node deletion with dependent objects (pods) in existence: %v") - - chErr := make(chan error, 1) - - originalNode, err := f.GetNode(ctx) - assert.NilError(t, err) - - ctx, cancel = context.WithTimeout(ctx, time.Minute) - defer cancel() - - go func() { - wait := func(e watchapi.Event) (bool, error) { - err = ctx.Err() - // Our timeout has expired - if err != nil { - return true, err - } - if e.Type == watchapi.Deleted || e.Type == watchapi.Error { - return false, nil - } - - return originalNode.ObjectMeta.UID != e.Object.(*v1.Node).ObjectMeta.UID, nil - } - chErr <- f.WaitUntilNodeCondition(wait) - }() - - assert.NilError(t, f.DeleteNode(ctx)) - - select { - case result := <-chErr: - assert.NilError(t, result, "Did not observe new node object created after deletion") - case <-ctx.Done(): - t.Fatal("Test timed out while waiting for node object to be deleted / recreated") - } -} From 597e7dc2810a8854531a5fb0ef1d793e5cc29837 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Wed, 2 Jun 2021 23:58:25 +0000 Subject: [PATCH 2/5] Make ControllerManager more useful This changes `ControllerManager` to `Node`. `Node` is created from a client where the VK lib is responsible for creating all the things except the client (unless client is nil, then we use the env client). This should be a good replacement for node-cli. It offers a simpler API. *It only works with leases enabled* since this seems always desired, however an option could be added to disable if needed. The intent of this is to provide a simpler way to get a vk node up and running while also being extensible. We can slowly add options, but they should be focussed on a use-case rather than trying to support every possible scenario... in which case the user can just use the controllers directly. --- .../internal/commands/root/flag.go | 8 + .../internal/commands/root/node.go | 44 --- .../internal/commands/root/opts.go | 2 +- .../internal/commands/root/root.go | 184 ++++------- .../internal/provider/mock/mock.go | 2 +- .../internal/provider/types.go | 4 +- node/nodeutil/controller.go | 308 +++++++++++++++--- website/data/cli.yaml | 2 +- 8 files changed, 329 insertions(+), 225 deletions(-) diff --git a/cmd/virtual-kubelet/internal/commands/root/flag.go b/cmd/virtual-kubelet/internal/commands/root/flag.go index 1ef7946e1..c851c38a2 100644 --- a/cmd/virtual-kubelet/internal/commands/root/flag.go +++ b/cmd/virtual-kubelet/internal/commands/root/flag.go @@ -59,7 +59,11 @@ func (mv mapVar) Type() string { func installFlags(flags *pflag.FlagSet, c *Opts) { flags.StringVar(&c.KubeConfigPath, "kubeconfig", c.KubeConfigPath, "kube config file to use for connecting to the Kubernetes API server") + flags.StringVar(&c.KubeNamespace, "namespace", c.KubeNamespace, "kubernetes namespace (default is 'all')") + flags.MarkDeprecated("namespace", "Nodes must watch for pods in all namespaces. This option is now ignored.") //nolint:errcheck + flags.MarkHidden("namespace") //nolint:errcheck + flags.StringVar(&c.KubeClusterDomain, "cluster-domain", c.KubeClusterDomain, "kubernetes cluster-domain (default is 'cluster.local')") flags.StringVar(&c.NodeName, "nodename", c.NodeName, "kubernetes node name") flags.StringVar(&c.OperatingSystem, "os", c.OperatingSystem, "Operating System (Linux/Windows)") @@ -68,11 +72,15 @@ func installFlags(flags *pflag.FlagSet, c *Opts) { flags.StringVar(&c.MetricsAddr, "metrics-addr", c.MetricsAddr, "address to listen for metrics/stats requests") flags.StringVar(&c.TaintKey, "taint", c.TaintKey, "Set node taint key") + flags.BoolVar(&c.DisableTaint, "disable-taint", c.DisableTaint, "disable the virtual-kubelet node taint") flags.MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable") //nolint:errcheck flags.IntVar(&c.PodSyncWorkers, "pod-sync-workers", c.PodSyncWorkers, `set the number of pod synchronization workers`) + flags.BoolVar(&c.EnableNodeLease, "enable-node-lease", c.EnableNodeLease, `use node leases (1.13) for node heartbeats`) + flags.MarkDeprecated("enable-node-lease", "leases are always enabled") //nolint:errcheck + flags.MarkHidden("enable-node-lease") //nolint:errcheck flags.StringSliceVar(&c.TraceExporters, "trace-exporter", c.TraceExporters, fmt.Sprintf("sets the tracing exporter to use, available exporters: %s", AvailableTraceExporters())) flags.StringVar(&c.TraceConfig.ServiceName, "trace-service-name", c.TraceConfig.ServiceName, "sets the name of the service used to register with the trace exporter") diff --git a/cmd/virtual-kubelet/internal/commands/root/node.go b/cmd/virtual-kubelet/internal/commands/root/node.go index 876bcd7d5..a1b35900b 100644 --- a/cmd/virtual-kubelet/internal/commands/root/node.go +++ b/cmd/virtual-kubelet/internal/commands/root/node.go @@ -15,54 +15,10 @@ package root import ( - "context" - "strings" - - "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider" "github.com/virtual-kubelet/virtual-kubelet/errdefs" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const osLabel = "beta.kubernetes.io/os" - -// NodeFromProvider builds a kubernetes node object from a provider -// This is a temporary solution until node stuff actually split off from the provider interface itself. -func NodeFromProvider(ctx context.Context, name string, taint *v1.Taint, p provider.Provider, version string) *v1.Node { - taints := make([]v1.Taint, 0) - - if taint != nil { - taints = append(taints, *taint) - } - - node := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{ - "type": "virtual-kubelet", - "kubernetes.io/role": "agent", - "kubernetes.io/hostname": name, - }, - }, - Spec: v1.NodeSpec{ - Taints: taints, - }, - Status: v1.NodeStatus{ - NodeInfo: v1.NodeSystemInfo{ - Architecture: "amd64", - KubeletVersion: version, - }, - }, - } - - p.ConfigureNode(ctx, node) - if _, ok := node.ObjectMeta.Labels[osLabel]; !ok { - node.ObjectMeta.Labels[osLabel] = strings.ToLower(node.Status.NodeInfo.OperatingSystem) - } - return node -} - // getTaint creates a taint using the provided key/value. // Taint effect is read from the environment // The taint key/value may be overwritten by the environment. diff --git a/cmd/virtual-kubelet/internal/commands/root/opts.go b/cmd/virtual-kubelet/internal/commands/root/opts.go index 3b70ae1e6..0af8ee72c 100644 --- a/cmd/virtual-kubelet/internal/commands/root/opts.go +++ b/cmd/virtual-kubelet/internal/commands/root/opts.go @@ -28,7 +28,7 @@ import ( // Defaults for root command options const ( DefaultNodeName = "virtual-kubelet" - DefaultOperatingSystem = "Linux" + DefaultOperatingSystem = "linux" DefaultInformerResyncPeriod = 1 * time.Minute DefaultMetricsAddr = ":10255" DefaultListenPort = 10250 // TODO(cpuguy83)(VK1.0): Change this to an addr instead of just a port.. we should not be listening on all interfaces. diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index 6d21accf7..7373bc862 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -17,7 +17,7 @@ package root import ( "context" "os" - "path" + "runtime" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -28,12 +28,6 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/node" "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/scheme" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/record" ) // NewCommand creates a new top-level command. @@ -80,28 +74,65 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } - // Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node. - podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions( - client, - c.InformerResyncPeriod, - kubeinformers.WithNamespace(c.KubeNamespace), - nodeutil.PodInformerFilter(c.NodeName), - ) - podInformer := podInformerFactory.Core().V1().Pods() + cancelHTTP := func() {} + defer func() { + // note: this is purposefully using a closure so that when this is actually set the correct function will be called + if cancelHTTP != nil { + cancelHTTP() + } + }() + newProvider := func(cfg nodeutil.ProviderConfig) (node.PodLifecycleHandler, node.NodeProvider, error) { + var err error + rm, err := manager.NewResourceManager(cfg.Pods, cfg.Secrets, cfg.ConfigMaps, cfg.Services) + if err != nil { + return nil, nil, errors.Wrap(err, "could not create resource manager") + } + initConfig := provider.InitConfig{ + ConfigPath: c.ProviderConfigPath, + NodeName: c.NodeName, + OperatingSystem: c.OperatingSystem, + ResourceManager: rm, + DaemonPort: c.ListenPort, + InternalIP: os.Getenv("VKUBELET_POD_IP"), + KubeClusterDomain: c.KubeClusterDomain, + } + pInit := s.Get(c.Provider) + if pInit == nil { + return nil, nil, errors.Errorf("provider %q not found", c.Provider) + } - // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). - scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, c.InformerResyncPeriod) - // Create a secret informer and a config map informer so we can pass their listers to the resource manager. - secretInformer := scmInformerFactory.Core().V1().Secrets() - configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() - serviceInformer := scmInformerFactory.Core().V1().Services() + p, err := pInit(initConfig) + if err != nil { + return nil, nil, errors.Wrapf(err, "error initializing provider %s", c.Provider) + } + p.ConfigureNode(ctx, cfg.Node) - rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister()) - if err != nil { - return errors.Wrap(err, "could not create resource manager") + apiConfig, err := getAPIConfig(c) + if err != nil { + return nil, nil, err + } + + cancelHTTP, err = setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) { + return rm.GetPods(), nil + }) + if err != nil { + return nil, nil, err + } + + return p, nil, nil } - apiConfig, err := getAPIConfig(c) + cm, err := nodeutil.NewNodeFromClient(client, c.NodeName, newProvider, func(cfg *nodeutil.NodeConfig) error { + cfg.InformerResyncPeriod = c.InformerResyncPeriod + + if taint != nil { + cfg.NodeSpec.Spec.Taints = append(cfg.NodeSpec.Spec.Taints, *taint) + } + cfg.NodeSpec.Status.NodeInfo.Architecture = runtime.GOARCH + cfg.NodeSpec.Status.NodeInfo.OperatingSystem = c.OperatingSystem + + return nil + }) if err != nil { return err } @@ -110,26 +141,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } - initConfig := provider.InitConfig{ - ConfigPath: c.ProviderConfigPath, - NodeName: c.NodeName, - OperatingSystem: c.OperatingSystem, - ResourceManager: rm, - DaemonPort: c.ListenPort, - InternalIP: os.Getenv("VKUBELET_POD_IP"), - KubeClusterDomain: c.KubeClusterDomain, - } - - pInit := s.Get(c.Provider) - if pInit == nil { - return errors.Errorf("provider %q not found", c.Provider) - } - - p, err := pInit(initConfig) - if err != nil { - return errors.Wrapf(err, "error initializing provider %s", c.Provider) - } - ctx = log.WithLogger(ctx, log.G(ctx).WithFields(log.Fields{ "provider": c.Provider, "operatingSystem": c.OperatingSystem, @@ -137,69 +148,8 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { "watchedNamespace": c.KubeNamespace, })) - pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version) - np := node.NewNaiveNodeProvider() - additionalOptions := []node.NodeControllerOpt{ - node.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error { - if !k8serrors.IsNotFound(err) { - return err - } - - log.G(ctx).Debug("node not found") - newNode := pNode.DeepCopy() - newNode.ResourceVersion = "" - _, err = client.CoreV1().Nodes().Create(ctx, newNode, metav1.CreateOptions{}) - if err != nil { - return err - } - log.G(ctx).Debug("created new node") - return nil - }), - } - if c.EnableNodeLease { - leaseClient := nodeutil.NodeLeaseV1Client(client) - // 40 seconds is the default lease time in upstream kubelet - additionalOptions = append(additionalOptions, node.WithNodeEnableLeaseV1(leaseClient, 40)) - } - nodeRunner, err := node.NewNodeController( - np, - pNode, - client.CoreV1().Nodes(), - additionalOptions..., - ) - if err != nil { - log.G(ctx).Fatal(err) - } - - eb := record.NewBroadcaster() - eb.StartLogging(log.G(ctx).Infof) - eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)}) - - pc, err := node.NewPodController(node.PodControllerConfig{ - PodClient: client.CoreV1(), - PodInformer: podInformer, - EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}), - Provider: p, - SecretInformer: secretInformer, - ConfigMapInformer: configMapInformer, - ServiceInformer: serviceInformer, - }) - if err != nil { - return errors.Wrap(err, "error setting up pod controller") - } - - go podInformerFactory.Start(ctx.Done()) - go scmInformerFactory.Start(ctx.Done()) - - cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) { - return rm.GetPods(), nil - }) - if err != nil { - return err - } defer cancelHTTP() - cm := nodeutil.NewControllerManager(nodeRunner, pc) go cm.Run(ctx, c.PodSyncWorkers) // nolint:errcheck defer func() { @@ -213,12 +163,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } - setNodeReady(pNode) - if err := np.UpdateStatus(ctx, pNode); err != nil { - return errors.Wrap(err, "error marking the node as ready") - } - log.G(ctx).Info("Initialized") - select { case <-ctx.Done(): case <-cm.Done(): @@ -226,19 +170,3 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } return nil } - -func setNodeReady(n *corev1.Node) { - for i, c := range n.Status.Conditions { - if c.Type != "Ready" { - continue - } - - c.Message = "Kubelet is ready" - c.Reason = "KubeletReady" - c.Status = corev1.ConditionTrue - c.LastHeartbeatTime = metav1.Now() - c.LastTransitionTime = metav1.Now() - n.Status.Conditions[i] = c - return - } -} diff --git a/cmd/virtual-kubelet/internal/provider/mock/mock.go b/cmd/virtual-kubelet/internal/provider/mock/mock.go index 8a3b33293..4284060f6 100644 --- a/cmd/virtual-kubelet/internal/provider/mock/mock.go +++ b/cmd/virtual-kubelet/internal/provider/mock/mock.go @@ -339,7 +339,7 @@ func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) { // nolin n.Status.DaemonEndpoints = p.nodeDaemonEndpoints() os := p.operatingSystem if os == "" { - os = "Linux" + os = "linux" } n.Status.NodeInfo.OperatingSystem = os n.Status.NodeInfo.Architecture = "amd64" diff --git a/cmd/virtual-kubelet/internal/provider/types.go b/cmd/virtual-kubelet/internal/provider/types.go index f99e3bc1e..5ad323410 100644 --- a/cmd/virtual-kubelet/internal/provider/types.go +++ b/cmd/virtual-kubelet/internal/provider/types.go @@ -2,9 +2,9 @@ package provider const ( // OperatingSystemLinux is the configuration value for defining Linux. - OperatingSystemLinux = "Linux" + OperatingSystemLinux = "linux" // OperatingSystemWindows is the configuration value for defining Windows. - OperatingSystemWindows = "Windows" + OperatingSystemWindows = "windows" ) type OperatingSystems map[string]bool // nolint:golint diff --git a/node/nodeutil/controller.go b/node/nodeutil/controller.go index 5521f7935..7f145c061 100644 --- a/node/nodeutil/controller.go +++ b/node/nodeutil/controller.go @@ -3,100 +3,127 @@ package nodeutil import ( "context" "fmt" + "os" + "path" "time" + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/record" ) -// ControllerManager helps manage the startup/shutdown procedure for other controllers. +// Node helps manage the startup/shutdown procedure for other controllers. // It is intended as a convenience to reduce boiler plate code for starting up controllers. // -// Must be created with constructor `NewControllerManager`. -type ControllerManager struct { +// Must be created with constructor `NewNode`. +type Node struct { nc *node.NodeController pc *node.PodController + readyCb func(context.Context) error + ready chan struct{} done chan struct{} err error -} -// NewControllerManager creates a new ControllerManager. -func NewControllerManager(nc *node.NodeController, pc *node.PodController) *ControllerManager { - return &ControllerManager{ - nc: nc, - pc: pc, - ready: make(chan struct{}), - done: make(chan struct{}), - } + podInformerFactory informers.SharedInformerFactory + scmInformerFactory informers.SharedInformerFactory + client kubernetes.Interface + + eb record.EventBroadcaster } // NodeController returns the configured node controller. -func (c *ControllerManager) NodeController() *node.NodeController { - return c.nc +func (n *Node) NodeController() *node.NodeController { + return n.nc } // PodController returns the configured pod controller. -func (c *ControllerManager) PodController() *node.PodController { - return c.pc +func (n *Node) PodController() *node.PodController { + return n.pc } // Run starts all the underlying controllers -func (c *ControllerManager) Run(ctx context.Context, workers int) (retErr error) { +func (n *Node) Run(ctx context.Context, workers int) (retErr error) { ctx, cancel := context.WithCancel(ctx) defer cancel() - go c.pc.Run(ctx, workers) // nolint:errcheck + if n.podInformerFactory != nil { + go n.podInformerFactory.Start(ctx.Done()) + } + if n.scmInformerFactory != nil { + go n.scmInformerFactory.Start(ctx.Done()) + } + + if n.eb != nil { + n.eb.StartLogging(log.G(ctx).Infof) + n.eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: n.client.CoreV1().Events(v1.NamespaceAll)}) + } + + go n.pc.Run(ctx, workers) // nolint:errcheck defer func() { cancel() - <-c.pc.Done() + <-n.pc.Done() - c.err = retErr - close(c.done) + n.err = retErr + close(n.done) }() select { case <-ctx.Done(): - return c.err - case <-c.pc.Ready(): - case <-c.pc.Done(): - return c.pc.Err() + return n.err + case <-n.pc.Ready(): + case <-n.pc.Done(): + return n.pc.Err() } - go c.nc.Run(ctx) // nolint:errcheck + go n.nc.Run(ctx) // nolint:errcheck defer func() { cancel() - <-c.nc.Done() + <-n.nc.Done() }() select { case <-ctx.Done(): - c.err = ctx.Err() - return c.err - case <-c.nc.Ready(): - case <-c.nc.Done(): - return c.nc.Err() + n.err = ctx.Err() + return n.err + case <-n.nc.Ready(): + case <-n.nc.Done(): + return n.nc.Err() } - close(c.ready) + if n.readyCb != nil { + if err := n.readyCb(ctx); err != nil { + return err + } + } + close(n.ready) select { - case <-c.nc.Done(): + case <-n.nc.Done(): cancel() - return c.nc.Err() - case <-c.pc.Done(): + return n.nc.Err() + case <-n.pc.Done(): cancel() - return c.pc.Err() + return n.pc.Err() } } // WaitReady waits for the specified timeout for the controller to be ready. // // The timeout is for convenience so the caller doesn't have to juggle an extra context. -func (c *ControllerManager) WaitReady(ctx context.Context, timeout time.Duration) error { +func (n *Node) WaitReady(ctx context.Context, timeout time.Duration) error { if timeout > 0 { var cancel func() ctx, cancel = context.WithTimeout(ctx, timeout) @@ -104,33 +131,218 @@ func (c *ControllerManager) WaitReady(ctx context.Context, timeout time.Duration } select { - case <-c.ready: + case <-n.ready: return nil - case <-c.done: - return fmt.Errorf("controller exited before ready: %w", c.err) + case <-n.done: + return fmt.Errorf("controller exited before ready: %w", n.err) case <-ctx.Done(): return ctx.Err() } } // Ready returns a channel that will be closed after the controller is ready. -func (c *ControllerManager) Ready() <-chan struct{} { - return c.ready +func (n *Node) Ready() <-chan struct{} { + return n.ready } // Done returns a channel that will be closed when the controller has exited. -func (c *ControllerManager) Done() <-chan struct{} { - return c.done +func (n *Node) Done() <-chan struct{} { + return n.done } // Err returns any error that occurred with the controller. // // This always return nil before `<-Done()`. -func (c *ControllerManager) Err() error { +func (n *Node) Err() error { select { - case <-c.Done(): - return c.err + case <-n.Done(): + return n.err default: return nil } } + +// ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself. +type ProviderConfig struct { + Pods corev1listers.PodLister + ConfigMaps corev1listers.ConfigMapLister + Secrets corev1listers.SecretLister + Services corev1listers.ServiceLister + // Hack to allow the provider to set things on the node + // Since the provider is bootstrapped after the node object is configured + // Primarily this is due to carry-over from the pre-1.0 interfaces that expect the provider to configure the node. + Node *v1.Node +} + +// NodeOpt is used as functional options when configuring a new node in NewNodeFromClient +type NodeOpt func(c *NodeConfig) error + +// NodeConfig is used to hold configuration items for a Node. +// It gets used in conjection with NodeOpt in NewNodeFromClient +type NodeConfig struct { + // Set the node spec to register with Kubernetes + NodeSpec v1.Node + // Set the path to read a kubeconfig from for creating a client. + // This is ignored when a client is provided to NewNodeFromClient + KubeconfigPath string + // Set the period for a full resync for generated client-go informers + InformerResyncPeriod time.Duration +} + +// NewProviderFunc is used from NewNodeFromClient to bootstrap a provider using the client/listers/etc created there. +// If a nil node provider is returned a default one will be used. +type NewProviderFunc func(ProviderConfig) (node.PodLifecycleHandler, node.NodeProvider, error) + +// WithNodeConfig returns a NodeOpt which replaces the NodeConfig with the passed in value. +func WithNodeConfig(c NodeConfig) NodeOpt { + return func(orig *NodeConfig) error { + *orig = c + return nil + } +} + +// NewNode calls NewNodeFromClient with a nil client +func NewNode(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) { + return NewNodeFromClient(nil, name, newProvider, opts...) +} + +// NewNodeFromClient creates a new node using the provided client and name. +// This is intended for high-level/low boiler-plate usage. +// Use the constructors in the `node` package for lower level configuration. +// +// Some basic values are set for node status, you'll almost certainly want to modify it. +// +// If client is nil, this will construct a client using ClientsetFromEnv +func NewNodeFromClient(client kubernetes.Interface, name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) { + cfg := NodeConfig{ + // TODO: this is what was set in the cli code... its not clear what a good value actually is. + InformerResyncPeriod: time.Minute, + KubeconfigPath: os.Getenv("KUBECONFIG"), + NodeSpec: v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "type": "virtual-kubelet", + "kubernetes.io/role": "agent", + "kubernetes.io/hostname": name, + }, + }, + Status: v1.NodeStatus{ + Phase: v1.NodePending, + Conditions: []v1.NodeCondition{ + {Type: v1.NodeReady}, + {Type: v1.NodeDiskPressure}, + {Type: v1.NodeMemoryPressure}, + {Type: v1.NodePIDPressure}, + {Type: v1.NodeNetworkUnavailable}, + }, + }, + }, + } + + for _, o := range opts { + if err := o(&cfg); err != nil { + return nil, err + } + } + + if client == nil { + var err error + client, err = ClientsetFromEnv(cfg.KubeconfigPath) + if err != nil { + return nil, errors.Wrap(err, "error creating clientset from env") + } + } + + podInformerFactory := informers.NewSharedInformerFactoryWithOptions( + client, + cfg.InformerResyncPeriod, + PodInformerFilter(name), + ) + + scmInformerFactory := informers.NewSharedInformerFactoryWithOptions( + client, + cfg.InformerResyncPeriod, + ) + + podInformer := podInformerFactory.Core().V1().Pods() + secretInformer := scmInformerFactory.Core().V1().Secrets() + configMapInformer := scmInformerFactory.Core().V1().ConfigMaps() + serviceInformer := scmInformerFactory.Core().V1().Services() + + p, np, err := newProvider(ProviderConfig{ + Pods: podInformer.Lister(), + ConfigMaps: configMapInformer.Lister(), + Secrets: secretInformer.Lister(), + Services: serviceInformer.Lister(), + Node: &cfg.NodeSpec, + }) + if err != nil { + return nil, errors.Wrap(err, "error creating provider") + } + + var readyCb func(context.Context) error + if np == nil { + nnp := node.NewNaiveNodeProvider() + np = nnp + + readyCb = func(ctx context.Context) error { + setNodeReady(&cfg.NodeSpec) + err := nnp.UpdateStatus(ctx, &cfg.NodeSpec) + return errors.Wrap(err, "error marking node as ready") + } + } + + nc, err := node.NewNodeController( + np, + &cfg.NodeSpec, + client.CoreV1().Nodes(), + node.WithNodeEnableLeaseV1(NodeLeaseV1Client(client), node.DefaultLeaseDuration), + ) + if err != nil { + return nil, errors.Wrap(err, "error creating node controller") + } + + eb := record.NewBroadcaster() + + pc, err := node.NewPodController(node.PodControllerConfig{ + PodClient: client.CoreV1(), + EventRecorder: eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(name, "pod-controller")}), + Provider: p, + PodInformer: podInformer, + SecretInformer: secretInformer, + ConfigMapInformer: configMapInformer, + ServiceInformer: serviceInformer, + }) + if err != nil { + return nil, errors.Wrap(err, "error creating pod controller") + } + return &Node{ + nc: nc, + pc: pc, + readyCb: readyCb, + ready: make(chan struct{}), + done: make(chan struct{}), + eb: eb, + podInformerFactory: podInformerFactory, + scmInformerFactory: scmInformerFactory, + client: client, + }, nil +} + +func setNodeReady(n *v1.Node) { + n.Status.Phase = v1.NodeRunning + for i, c := range n.Status.Conditions { + if c.Type != "Ready" { + continue + } + + c.Message = "Kubelet is ready" + c.Reason = "KubeletReady" + c.Status = v1.ConditionTrue + c.LastHeartbeatTime = metav1.Now() + c.LastTransitionTime = metav1.Now() + n.Status.Conditions[i] = c + return + } +} diff --git a/website/data/cli.yaml b/website/data/cli.yaml index 0cd709df3..e0b795f44 100644 --- a/website/data/cli.yaml +++ b/website/data/cli.yaml @@ -38,7 +38,7 @@ flags: default: virtual-kubelet - name: --os arg: string - description: The operating system (must be `Linux` or `Windows`) + description: The operating system (must be `linux` or `windows`) default: Linux - name: --pod-sync-workers arg: int From e1342777d6dc0e72ee3ce13d5ae91600af00890a Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Thu, 3 Jun 2021 23:55:09 +0000 Subject: [PATCH 3/5] Add API config to node set This moves API handling into the node object so now everything can be done in one place. TLS is required. In the current form, auth must be setup by the caller. --- .../internal/commands/root/http.go | 95 --------------- .../internal/commands/root/root.go | 40 +++---- .../internal/provider/provider.go | 26 +--- hack/skaffold/virtual-kubelet/pod.yml | 7 -- internal/test/e2e/framework/stats.go | 3 +- node/nodeutil/controller.go | 111 ++++++++++++++---- node/nodeutil/provider.go | 47 ++++++++ 7 files changed, 156 insertions(+), 173 deletions(-) create mode 100644 node/nodeutil/provider.go diff --git a/cmd/virtual-kubelet/internal/commands/root/http.go b/cmd/virtual-kubelet/internal/commands/root/http.go index b20a4ff7d..2b9633edf 100644 --- a/cmd/virtual-kubelet/internal/commands/root/http.go +++ b/cmd/virtual-kubelet/internal/commands/root/http.go @@ -15,19 +15,12 @@ package root import ( - "context" "crypto/tls" "fmt" - "io" - "net" - "net/http" "os" "time" "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider" - "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/node/api" ) // AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided @@ -58,94 +51,6 @@ func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) { }, nil } -func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig, getPodsFromKubernetes api.PodListerFunc) (_ func(), retErr error) { - var closers []io.Closer - cancel := func() { - for _, c := range closers { - c.Close() - } - } - defer func() { - if retErr != nil { - cancel() - } - }() - - if cfg.CertPath == "" || cfg.KeyPath == "" { - log.G(ctx). - WithField("certPath", cfg.CertPath). - WithField("keyPath", cfg.KeyPath). - Error("TLS certificates not provided, not setting up pod http server") - } else { - tlsCfg, err := loadTLSConfig(cfg.CertPath, cfg.KeyPath) - if err != nil { - return nil, err - } - l, err := tls.Listen("tcp", cfg.Addr, tlsCfg) - if err != nil { - return nil, errors.Wrap(err, "error setting up listener for pod http server") - } - - mux := http.NewServeMux() - - podRoutes := api.PodHandlerConfig{ - RunInContainer: p.RunInContainer, - GetContainerLogs: p.GetContainerLogs, - GetPodsFromKubernetes: getPodsFromKubernetes, - GetPods: p.GetPods, - StreamIdleTimeout: cfg.StreamIdleTimeout, - StreamCreationTimeout: cfg.StreamCreationTimeout, - } - - api.AttachPodRoutes(podRoutes, mux, true) - - s := &http.Server{ - Handler: mux, - TLSConfig: tlsCfg, - } - go serveHTTP(ctx, s, l, "pods") - closers = append(closers, s) - } - - if cfg.MetricsAddr == "" { - log.G(ctx).Info("Pod metrics server not setup due to empty metrics address") - } else { - l, err := net.Listen("tcp", cfg.MetricsAddr) - if err != nil { - return nil, errors.Wrap(err, "could not setup listener for pod metrics http server") - } - - mux := http.NewServeMux() - - var summaryHandlerFunc api.PodStatsSummaryHandlerFunc - if mp, ok := p.(provider.PodMetricsProvider); ok { - summaryHandlerFunc = mp.GetStatsSummary - } - podMetricsRoutes := api.PodMetricsConfig{ - GetStatsSummary: summaryHandlerFunc, - } - api.AttachPodMetricsRoutes(podMetricsRoutes, mux) - s := &http.Server{ - Handler: mux, - } - go serveHTTP(ctx, s, l, "pod metrics") - closers = append(closers, s) - } - - return cancel, nil -} - -func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string) { - if err := s.Serve(l); err != nil { - select { - case <-ctx.Done(): - default: - log.G(ctx).WithError(err).Errorf("Error setting up %s http server", name) - } - } - l.Close() -} - type apiServerConfig struct { CertPath string KeyPath string diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index 7373bc862..c4777023d 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -74,14 +74,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } - cancelHTTP := func() {} - defer func() { - // note: this is purposefully using a closure so that when this is actually set the correct function will be called - if cancelHTTP != nil { - cancelHTTP() - } - }() - newProvider := func(cfg nodeutil.ProviderConfig) (node.PodLifecycleHandler, node.NodeProvider, error) { + newProvider := func(cfg nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) { var err error rm, err := manager.NewResourceManager(cfg.Pods, cfg.Secrets, cfg.ConfigMaps, cfg.Services) if err != nil { @@ -107,18 +100,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } p.ConfigureNode(ctx, cfg.Node) - apiConfig, err := getAPIConfig(c) - if err != nil { - return nil, nil, err - } - - cancelHTTP, err = setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) { - return rm.GetPods(), nil - }) - if err != nil { - return nil, nil, err - } - return p, nil, nil } @@ -131,6 +112,21 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { cfg.NodeSpec.Status.NodeInfo.Architecture = runtime.GOARCH cfg.NodeSpec.Status.NodeInfo.OperatingSystem = c.OperatingSystem + apiConfig, err := getAPIConfig(c) + if err != nil { + return err + } + + cfg.HTTPListenAddr = apiConfig.Addr + cfg.StreamCreationTimeout = apiConfig.StreamCreationTimeout + cfg.StreamIdleTimeout = apiConfig.StreamIdleTimeout + + cfg.TLSConfig, err = loadTLSConfig(apiConfig.CertPath, apiConfig.KeyPath) + if err != nil { + return errors.Wrap(err, "error loading tls config") + } + + cfg.DebugHTTP = true return nil }) if err != nil { @@ -148,8 +144,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { "watchedNamespace": c.KubeNamespace, })) - defer cancelHTTP() - go cm.Run(ctx, c.PodSyncWorkers) // nolint:errcheck defer func() { @@ -163,6 +157,8 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } + log.G(ctx).Info("Ready") + select { case <-ctx.Done(): case <-cm.Done(): diff --git a/cmd/virtual-kubelet/internal/provider/provider.go b/cmd/virtual-kubelet/internal/provider/provider.go index 1af5e58c8..a48630eb5 100644 --- a/cmd/virtual-kubelet/internal/provider/provider.go +++ b/cmd/virtual-kubelet/internal/provider/provider.go @@ -2,35 +2,15 @@ package provider import ( "context" - "io" - "github.com/virtual-kubelet/virtual-kubelet/node" - "github.com/virtual-kubelet/virtual-kubelet/node/api" - "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" v1 "k8s.io/api/core/v1" ) -// Provider contains the methods required to implement a virtual-kubelet provider. -// -// Errors produced by these methods should implement an interface from -// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the -// core logic to be able to understand the type of failure. +// Provider wraps the core provider type with an extra function needed to bootstrap the node type Provider interface { - node.PodLifecycleHandler - - // GetContainerLogs retrieves the logs of a container by name from the provider. - GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) - - // RunInContainer executes a command in a container in the pod, copying data - // between in/out/err and the container's stdin/stdout/stderr. - RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error - + nodeutil.Provider // ConfigureNode enables a provider to configure the node object that // will be used for Kubernetes. ConfigureNode(context.Context, *v1.Node) } - -// PodMetricsProvider is an optional interface that providers can implement to expose pod stats -type PodMetricsProvider interface { - GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) -} diff --git a/hack/skaffold/virtual-kubelet/pod.yml b/hack/skaffold/virtual-kubelet/pod.yml index 48e2c66a5..e95455b00 100644 --- a/hack/skaffold/virtual-kubelet/pod.yml +++ b/hack/skaffold/virtual-kubelet/pod.yml @@ -30,11 +30,4 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - ports: - - name: metrics - containerPort: 10255 - readinessProbe: - httpGet: - path: /stats/summary - port: metrics serviceAccountName: virtual-kubelet diff --git a/internal/test/e2e/framework/stats.go b/internal/test/e2e/framework/stats.go index 884c8ad0d..5e39d688a 100644 --- a/internal/test/e2e/framework/stats.go +++ b/internal/test/e2e/framework/stats.go @@ -3,7 +3,6 @@ package framework import ( "context" "encoding/json" - "strconv" stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" "k8s.io/apimachinery/pkg/util/net" @@ -18,7 +17,7 @@ func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error) Namespace(f.Namespace). Resource("pods"). SubResource("proxy"). - Name(net.JoinSchemeNamePort("http", f.NodeName, strconv.Itoa(10255))). + Name(net.JoinSchemeNamePort("https", f.NodeName, "10250")). Suffix("/stats/summary").DoRaw(ctx) if err != nil { return nil, err diff --git a/node/nodeutil/controller.go b/node/nodeutil/controller.go index 7f145c061..0500f9a6f 100644 --- a/node/nodeutil/controller.go +++ b/node/nodeutil/controller.go @@ -2,7 +2,10 @@ package nodeutil import ( "context" + "crypto/tls" "fmt" + "net" + "net/http" "os" "path" "time" @@ -10,13 +13,14 @@ import ( "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/api" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" ) @@ -38,6 +42,10 @@ type Node struct { scmInformerFactory informers.SharedInformerFactory client kubernetes.Interface + listenAddr string + httpHandler HTTPHandler + tlsConfig *tls.Config + eb record.EventBroadcaster } @@ -54,7 +62,12 @@ func (n *Node) PodController() *node.PodController { // Run starts all the underlying controllers func (n *Node) Run(ctx context.Context, workers int) (retErr error) { ctx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + cancel() + + n.err = retErr + close(n.done) + }() if n.podInformerFactory != nil { go n.podInformerFactory.Start(ctx.Done()) @@ -68,15 +81,23 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) { n.eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: n.client.CoreV1().Events(v1.NamespaceAll)}) } - go n.pc.Run(ctx, workers) // nolint:errcheck + l, err := tls.Listen("tcp", n.listenAddr, n.tlsConfig) + if err != nil { + return errors.Wrap(err, "error starting http listener") + } + log.G(ctx).Debug("Started TLS listener") + defer l.Close() + + srv := &http.Server{Handler: n.httpHandler, TLSConfig: n.tlsConfig} + go srv.Serve(l) + defer srv.Close() + + go n.pc.Run(ctx, workers) //nolint:errcheck + log.G(ctx).Debug("HTTP server running") defer func() { cancel() - <-n.pc.Done() - - n.err = retErr - close(n.done) }() select { @@ -87,6 +108,8 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) { return n.pc.Err() } + log.G(ctx).Debug("pod controller ready") + go n.nc.Run(ctx) // nolint:errcheck defer func() { @@ -103,6 +126,8 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) { return n.nc.Err() } + log.G(ctx).Debug("node controller ready") + if n.readyCb != nil { if err := n.readyCb(ctx); err != nil { return err @@ -162,18 +187,6 @@ func (n *Node) Err() error { } } -// ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself. -type ProviderConfig struct { - Pods corev1listers.PodLister - ConfigMaps corev1listers.ConfigMapLister - Secrets corev1listers.SecretLister - Services corev1listers.ServiceLister - // Hack to allow the provider to set things on the node - // Since the provider is bootstrapped after the node object is configured - // Primarily this is due to carry-over from the pre-1.0 interfaces that expect the provider to configure the node. - Node *v1.Node -} - // NodeOpt is used as functional options when configuring a new node in NewNodeFromClient type NodeOpt func(c *NodeConfig) error @@ -187,11 +200,32 @@ type NodeConfig struct { KubeconfigPath string // Set the period for a full resync for generated client-go informers InformerResyncPeriod time.Duration + + // Set the address to listen on for the http API + HTTPListenAddr string + // Set a custom API handler to use. + // You can use this to setup, for example, authentication middleware. + // If one is not provided a default one will be created. + // Pod routes will be attached to this handler when creating the node + HTTPHandler HTTPHandler + // Set the timeout for idle http streams + StreamIdleTimeout time.Duration + // Set the timeout for creating http streams + StreamCreationTimeout time.Duration + // Enable http debugging routes + DebugHTTP bool + // Set the tls config to use for the http server + TLSConfig *tls.Config + + // Specify the event recorder to use + // If this is not provided, a default one will be used. + EventRecorder record.EventRecorder } -// NewProviderFunc is used from NewNodeFromClient to bootstrap a provider using the client/listers/etc created there. -// If a nil node provider is returned a default one will be used. -type NewProviderFunc func(ProviderConfig) (node.PodLifecycleHandler, node.NodeProvider, error) +type HTTPHandler interface { + api.ServeMux + http.Handler +} // WithNodeConfig returns a NodeOpt which replaces the NodeConfig with the passed in value. func WithNodeConfig(c NodeConfig) NodeOpt { @@ -218,6 +252,7 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New // TODO: this is what was set in the cli code... its not clear what a good value actually is. InformerResyncPeriod: time.Minute, KubeconfigPath: os.Getenv("KUBECONFIG"), + HTTPListenAddr: ":10250", NodeSpec: v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -246,6 +281,14 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New } } + if _, _, err := net.SplitHostPort(cfg.HTTPListenAddr); err != nil { + return nil, errors.Wrap(err, "error parsing http listen address") + } + + if cfg.HTTPHandler == nil { + cfg.HTTPHandler = http.NewServeMux() + } + if client == nil { var err error client, err = ClientsetFromEnv(cfg.KubeconfigPath) @@ -281,6 +324,18 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New return nil, errors.Wrap(err, "error creating provider") } + api.AttachPodRoutes(api.PodHandlerConfig{ + RunInContainer: p.RunInContainer, + GetContainerLogs: p.GetContainerLogs, + GetPods: p.GetPods, + GetPodsFromKubernetes: func(context.Context) ([]*v1.Pod, error) { + return podInformer.Lister().List(labels.Everything()) + }, + GetStatsSummary: p.GetStatsSummary, + StreamIdleTimeout: cfg.StreamIdleTimeout, + StreamCreationTimeout: cfg.StreamCreationTimeout, + }, cfg.HTTPHandler, cfg.DebugHTTP) + var readyCb func(context.Context) error if np == nil { nnp := node.NewNaiveNodeProvider() @@ -303,11 +358,15 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New return nil, errors.Wrap(err, "error creating node controller") } - eb := record.NewBroadcaster() + var eb record.EventBroadcaster + if cfg.EventRecorder == nil { + eb := record.NewBroadcaster() + cfg.EventRecorder = eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(name, "pod-controller")}) + } pc, err := node.NewPodController(node.PodControllerConfig{ PodClient: client.CoreV1(), - EventRecorder: eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(name, "pod-controller")}), + EventRecorder: cfg.EventRecorder, Provider: p, PodInformer: podInformer, SecretInformer: secretInformer, @@ -317,6 +376,7 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New if err != nil { return nil, errors.Wrap(err, "error creating pod controller") } + return &Node{ nc: nc, pc: pc, @@ -327,6 +387,9 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New podInformerFactory: podInformerFactory, scmInformerFactory: scmInformerFactory, client: client, + tlsConfig: cfg.TLSConfig, + httpHandler: cfg.HTTPHandler, + listenAddr: cfg.HTTPListenAddr, }, nil } diff --git a/node/nodeutil/provider.go b/node/nodeutil/provider.go new file mode 100644 index 000000000..b85b454cb --- /dev/null +++ b/node/nodeutil/provider.go @@ -0,0 +1,47 @@ +package nodeutil + +import ( + "context" + "io" + + "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/api" + "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + v1 "k8s.io/api/core/v1" + corev1listers "k8s.io/client-go/listers/core/v1" +) + +// Provider contains the methods required to implement a virtual-kubelet provider. +// +// Errors produced by these methods should implement an interface from +// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the +// core logic to be able to understand the type of failure +type Provider interface { + node.PodLifecycleHandler + + // GetContainerLogs retrieves the logs of a container by name from the provider. + GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) + + // RunInContainer executes a command in a container in the pod, copying data + // between in/out/err and the container's stdin/stdout/stderr. + RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error + + // GetStatsSummary gets the stats for the node, including running pods + GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) +} + +// ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself. +type ProviderConfig struct { + Pods corev1listers.PodLister + ConfigMaps corev1listers.ConfigMapLister + Secrets corev1listers.SecretLister + Services corev1listers.ServiceLister + // Hack to allow the provider to set things on the node + // Since the provider is bootstrapped after the node object is configured + // Primarily this is due to carry-over from the pre-1.0 interfaces that expect the provider instead of the direct *caller* to configure the node. + Node *v1.Node +} + +// NewProviderFunc is used from NewNodeFromClient to bootstrap a provider using the client/listers/etc created there. +// If a nil node provider is returned a default one will be used. +type NewProviderFunc func(ProviderConfig) (Provider, node.NodeProvider, error) From 4974e062d0e7a5ef9b1df9a1f5e7facb1bbd947a Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Tue, 8 Jun 2021 16:37:29 +0000 Subject: [PATCH 4/5] Add webhook and anon auth support Auth is not automatically enabled because this requires some bootstrapping to work. I'll leave this for some future work. In the meantime people can use the current code similar to how they used the node-cli code to inject their own auth. --- .../internal/commands/root/http.go | 37 +-- .../internal/commands/root/root.go | 72 ++++-- go.sum | 10 + hack/skaffold/virtual-kubelet/base.yml | 8 + hack/skaffold/virtual-kubelet/pod.yml | 7 + node/nodeutil/auth.go | 220 ++++++++++++++++++ node/nodeutil/controller.go | 124 +++++----- node/nodeutil/provider.go | 23 ++ node/nodeutil/tls.go | 83 +++++++ 9 files changed, 483 insertions(+), 101 deletions(-) create mode 100644 node/nodeutil/auth.go create mode 100644 node/nodeutil/tls.go diff --git a/cmd/virtual-kubelet/internal/commands/root/http.go b/cmd/virtual-kubelet/internal/commands/root/http.go index 2b9633edf..a14b2c9df 100644 --- a/cmd/virtual-kubelet/internal/commands/root/http.go +++ b/cmd/virtual-kubelet/internal/commands/root/http.go @@ -15,45 +15,15 @@ package root import ( - "crypto/tls" "fmt" "os" "time" - - "github.com/pkg/errors" ) -// AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided -// Note this list should be a moving target. -var AcceptedCiphers = []uint16{ - tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, - tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, - tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, - - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, -} - -func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) { - cert, err := tls.LoadX509KeyPair(certPath, keyPath) - if err != nil { - return nil, errors.Wrap(err, "error loading tls certs") - } - - return &tls.Config{ - Certificates: []tls.Certificate{cert}, - MinVersion: tls.VersionTLS12, - PreferServerCipherSuites: true, - CipherSuites: AcceptedCiphers, - }, nil -} - type apiServerConfig struct { CertPath string KeyPath string + CACertPath string Addr string MetricsAddr string StreamIdleTimeout time.Duration @@ -62,8 +32,9 @@ type apiServerConfig struct { func getAPIConfig(c Opts) (*apiServerConfig, error) { config := apiServerConfig{ - CertPath: os.Getenv("APISERVER_CERT_LOCATION"), - KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), + CertPath: os.Getenv("APISERVER_CERT_LOCATION"), + KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), + CACertPath: os.Getenv("APISERVER_CA_CERT_LOCATION"), } config.Addr = fmt.Sprintf(":%d", c.ListenPort) diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index c4777023d..ade4daa61 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -16,6 +16,8 @@ package root import ( "context" + "crypto/tls" + "net/http" "os" "runtime" @@ -26,8 +28,11 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/internal/manager" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/api" "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" corev1 "k8s.io/api/core/v1" + "k8s.io/apiserver/pkg/server/dynamiccertificates" + "k8s.io/client-go/kubernetes" ) // NewCommand creates a new top-level command. @@ -74,8 +79,8 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } + mux := http.NewServeMux() newProvider := func(cfg nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) { - var err error rm, err := manager.NewResourceManager(cfg.Pods, cfg.Secrets, cfg.ConfigMaps, cfg.Services) if err != nil { return nil, nil, errors.Wrap(err, "could not create resource manager") @@ -99,11 +104,17 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return nil, nil, errors.Wrapf(err, "error initializing provider %s", c.Provider) } p.ConfigureNode(ctx, cfg.Node) - + cfg.Node.Status.NodeInfo.KubeletVersion = c.Version return p, nil, nil } - cm, err := nodeutil.NewNodeFromClient(client, c.NodeName, newProvider, func(cfg *nodeutil.NodeConfig) error { + apiConfig, err := getAPIConfig(c) + if err != nil { + return err + } + + cm, err := nodeutil.NewNodeFromClient(c.NodeName, newProvider, func(cfg *nodeutil.NodeConfig) error { + cfg.Handler = mux cfg.InformerResyncPeriod = c.InformerResyncPeriod if taint != nil { @@ -112,23 +123,23 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { cfg.NodeSpec.Status.NodeInfo.Architecture = runtime.GOARCH cfg.NodeSpec.Status.NodeInfo.OperatingSystem = c.OperatingSystem - apiConfig, err := getAPIConfig(c) - if err != nil { - return err - } - cfg.HTTPListenAddr = apiConfig.Addr cfg.StreamCreationTimeout = apiConfig.StreamCreationTimeout cfg.StreamIdleTimeout = apiConfig.StreamIdleTimeout - - cfg.TLSConfig, err = loadTLSConfig(apiConfig.CertPath, apiConfig.KeyPath) - if err != nil { - return errors.Wrap(err, "error loading tls config") - } - cfg.DebugHTTP = true + + cfg.NumWorkers = c.PodSyncWorkers + return nil - }) + }, + nodeutil.WithClient(client), + setAuth(client, c.NodeName, apiConfig), + nodeutil.WithTLSConfig( + nodeutil.WithKeyPairFromPath(apiConfig.CertPath, apiConfig.KeyPath), + maybeCA(apiConfig.CACertPath), + ), + nodeutil.AttachProviderRoutes(mux), + ) if err != nil { return err } @@ -144,7 +155,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { "watchedNamespace": c.KubeNamespace, })) - go cm.Run(ctx, c.PodSyncWorkers) // nolint:errcheck + go cm.Run(ctx) //nolint:errcheck defer func() { log.G(ctx).Debug("Waiting for controllers to be done") @@ -166,3 +177,32 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } return nil } + +func setAuth(client kubernetes.Interface, node string, apiCfg *apiServerConfig) nodeutil.NodeOpt { + if apiCfg.CACertPath == "" { + return func(cfg *nodeutil.NodeConfig) error { + cfg.Handler = api.InstrumentHandler(nodeutil.WithAuth(nodeutil.NoAuth(), cfg.Handler)) + return nil + } + } + + return func(cfg *nodeutil.NodeConfig) error { + auth, err := nodeutil.WebhookAuth(client, node, func(cfg *nodeutil.WebhookAuthConfig) error { + var err error + cfg.AuthnConfig.ClientCertificateCAContentProvider, err = dynamiccertificates.NewDynamicCAContentFromFile("ca-cert-bundle", apiCfg.CACertPath) + return err + }) + if err != nil { + return err + } + cfg.Handler = api.InstrumentHandler(nodeutil.WithAuth(auth, cfg.Handler)) + return nil + } +} + +func maybeCA(p string) func(*tls.Config) error { + if p == "" { + return func(*tls.Config) error { return nil } + } + return nodeutil.WithCAFromPath(p) +} diff --git a/go.sum b/go.sum index 0c24d7c00..5910cae94 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,10 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= +github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= @@ -55,6 +57,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/blang/semver v3.5.0+incompatible h1:CGxCgetQ64DKk7rdZ++Vfnb1+ogGNnB17OJKJXD2Cfs= github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/bombsimon/logrusr v1.0.0 h1:CTCkURYAt5nhCCnKH9eLShYayj2/8Kn/4Qg3QfiU+Ro= github.com/bombsimon/logrusr v1.0.0/go.mod h1:Jq0nHtvxabKE5EMwAAdgTaz7dfWE8C4i11NOltxGQpc= @@ -140,11 +143,13 @@ github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+ github.com/go-openapi/jsonpointer v0.17.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M= github.com/go-openapi/jsonpointer v0.18.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M= github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg= +github.com/go-openapi/jsonpointer v0.19.3 h1:gihV7YNZK1iK6Tgwwsxo2rJbD1GTbdm72325Bq8FI3w= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg= github.com/go-openapi/jsonreference v0.17.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.18.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.19.2/go.mod h1:jMjeRr2HHw6nAVajTXJ4eiUwohSTlpa0o73RUL1owJc= +github.com/go-openapi/jsonreference v0.19.3 h1:5cxNfTy0UVC3X8JL5ymxzyoUZmo8iZb+jeTWn7tUa8o= github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL98+wF9xc8zWvFonSJ8= github.com/go-openapi/loads v0.17.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU= github.com/go-openapi/loads v0.18.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU= @@ -158,6 +163,7 @@ github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nA github.com/go-openapi/spec v0.17.0/go.mod h1:XkF/MOi14NmjsfZ8VtAKf8pIlbZzyoTvZsdfssdxcBI= github.com/go-openapi/spec v0.18.0/go.mod h1:XkF/MOi14NmjsfZ8VtAKf8pIlbZzyoTvZsdfssdxcBI= github.com/go-openapi/spec v0.19.2/go.mod h1:sCxk3jxKgioEJikev4fgkNmwS+3kuYdJtcsZsD5zxMY= +github.com/go-openapi/spec v0.19.3 h1:0XRyw8kguri6Yw4SxhsQA/atC88yqrk0+G4YhI2wabc= github.com/go-openapi/spec v0.19.3/go.mod h1:FpwSN1ksY1eteniUU7X0N/BgJ7a4WvBFVA8Lj9mJglo= github.com/go-openapi/strfmt v0.17.0/go.mod h1:P82hnJI0CXkErkXi8IKjPbNBM6lV6+5pLP5l494TcyU= github.com/go-openapi/strfmt v0.18.0/go.mod h1:P82hnJI0CXkErkXi8IKjPbNBM6lV6+5pLP5l494TcyU= @@ -167,6 +173,7 @@ github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dp github.com/go-openapi/swag v0.17.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg= github.com/go-openapi/swag v0.18.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg= github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= +github.com/go-openapi/swag v0.19.5 h1:lTz6Ys4CmqqCQmZPBlbQENR1/GucA2bzYTE12Pw4tFY= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4= github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA= @@ -281,6 +288,7 @@ github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/mailru/easyjson v0.7.0 h1:aizVhC/NAAcKWb+5QsU1iNOZb4Yws5UO2I+aIprQITM= github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= @@ -371,6 +379,7 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= @@ -704,6 +713,7 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800 h1:9ZNvfPvVIEsp/T1ez4GQuzCcCTEQW k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.9/go.mod h1:dzAXnQbTRyDlZPJX2SUPEqvnB+j7AJjtlox7PEwigU0= +sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15 h1:4uqm9Mv+w2MmBYD+F4qf/v6tDFUdPOk29C095RbU5mY= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15/go.mod h1:LEScyzhFmoF5pso/YSeBstl57mOzx9xlU9n85RGrDQg= sigs.k8s.io/controller-runtime v0.7.1 h1:nqVwzVzdenfd9xIbB35pC7JJH2IXVL4hDo3MNzkyCh4= sigs.k8s.io/controller-runtime v0.7.1/go.mod h1:pJ3YBrJiAqMAZKi6UVGuE98ZrroV1p+pIhoHsMm9wdU= diff --git a/hack/skaffold/virtual-kubelet/base.yml b/hack/skaffold/virtual-kubelet/base.yml index 0cbfc14c2..f13ad0426 100644 --- a/hack/skaffold/virtual-kubelet/base.yml +++ b/hack/skaffold/virtual-kubelet/base.yml @@ -56,6 +56,14 @@ rules: verbs: - create - patch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - get + - create + - update --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/hack/skaffold/virtual-kubelet/pod.yml b/hack/skaffold/virtual-kubelet/pod.yml index e95455b00..bdc4ff5f5 100644 --- a/hack/skaffold/virtual-kubelet/pod.yml +++ b/hack/skaffold/virtual-kubelet/pod.yml @@ -4,6 +4,8 @@ metadata: name: vkubelet-mock-0 spec: containers: + - name: jaeger-tracing + image: jaegertracing/all-in-one:1.22 - name: vkubelet-mock-0 image: virtual-kubelet # "IfNotPresent" is used to prevent Minikube from trying to pull from the registry (and failing) in the first place. @@ -23,7 +25,12 @@ spec: - --klog.logtostderr - --log-level - debug + - --trace-exporter + - jaeger + - --trace-sample-rate=always env: + - name: JAEGER_AGENT_ENDPOINT + value: localhost:6831 - name: KUBELET_PORT value: "10250" - name: VKUBELET_POD_IP diff --git a/node/nodeutil/auth.go b/node/nodeutil/auth.go new file mode 100644 index 000000000..ef5e52cac --- /dev/null +++ b/node/nodeutil/auth.go @@ -0,0 +1,220 @@ +package nodeutil + +import ( + "context" + "net/http" + "strings" + "time" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" + "k8s.io/apiserver/pkg/authentication/authenticator" + "k8s.io/apiserver/pkg/authentication/authenticatorfactory" + "k8s.io/apiserver/pkg/authentication/request/anonymous" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/apiserver/pkg/authorization/authorizerfactory" + "k8s.io/client-go/kubernetes" +) + +// Auth is the interface used to implement authn/authz for http requests +type Auth interface { + authenticator.Request + authorizer.RequestAttributesGetter + authorizer.Authorizer +} + +type authWrapper struct { + authenticator.Request + authorizer.RequestAttributesGetter + authorizer.Authorizer +} + +// InstrumentAuth wraps the provided Auth in a new instrumented Auth +// +// Note: You would only need this if you rolled your own auth. +// The Auth implementations defined in this package are already instrumented. +func InstrumentAuth(auth Auth) Auth { + if _, ok := auth.(*authWrapper); ok { + // This is already instrumented + return auth + } + return &authWrapper{ + Request: auth, + RequestAttributesGetter: auth, + Authorizer: auth, + } +} + +// NoAuth creates an Auth which allows anonymous access to all resouorces +func NoAuth() Auth { + return &authWrapper{ + Request: anonymous.NewAuthenticator(), + RequestAttributesGetter: &NodeRequestAttr{}, + Authorizer: authorizerfactory.NewAlwaysAllowAuthorizer(), + } +} + +// WithAuth makes a new http handler which wraps the provided handler with authn/authz. +func WithAuth(auth Auth, h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleAuth(auth, w, r, h) + }) +} + +func handleAuth(auth Auth, w http.ResponseWriter, r *http.Request, next http.Handler) { + ctx := r.Context() + ctx, span := trace.StartSpan(ctx, "vk.handleAuth") + defer span.End() + r = r.WithContext(ctx) + + info, ok, err := auth.AuthenticateRequest(r) + if err != nil || !ok { + log.G(r.Context()).WithError(err).Error("Authorization error") + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + logger := log.G(ctx).WithFields(log.Fields{ + "user-name": info.User.GetName(), + "user-id": info.User.GetUID(), + }) + + ctx = log.WithLogger(ctx, logger) + r = r.WithContext(ctx) + + attrs := auth.GetRequestAttributes(info.User, r) + + decision, _, err := auth.Authorize(ctx, attrs) + if err != nil { + log.G(r.Context()).WithError(err).Error("Authorization error") + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if decision != authorizer.DecisionAllow { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + + next.ServeHTTP(w, r) +} + +// WebhookAuthOption is used as a functional argument to configure webhook auth. +type WebhookAuthOption func(*WebhookAuthConfig) error + +// WebhookAuthConfig stores the configurations for authn/authz and is used by WebhookAuthOption to expose to callers. +type WebhookAuthConfig struct { + AuthnConfig authenticatorfactory.DelegatingAuthenticatorConfig + AuthzConfig authorizerfactory.DelegatingAuthorizerConfig +} + +// WebhookAuth creates an Auth suitable to use with kubelet webhook auth. +// You must provide a CA provider to the authentication config, otherwise mTLS is disabled. +func WebhookAuth(client kubernetes.Interface, nodeName string, opts ...WebhookAuthOption) (Auth, error) { + cfg := WebhookAuthConfig{ + AuthnConfig: authenticatorfactory.DelegatingAuthenticatorConfig{ + CacheTTL: 2 * time.Minute, // default taken from k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1 + // TODO: After upgrading k8s libs, we need to add the retry backoff option + }, + AuthzConfig: authorizerfactory.DelegatingAuthorizerConfig{ + AllowCacheTTL: 5 * time.Minute, // default taken from k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1 + DenyCacheTTL: 30 * time.Second, // default taken from k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1 + // TODO: After upgrading k8s libs, we need to add the retry backoff option + }, + } + + for _, o := range opts { + if err := o(&cfg); err != nil { + return nil, err + } + } + + cfg.AuthnConfig.TokenAccessReviewClient = client.AuthenticationV1().TokenReviews() + cfg.AuthzConfig.SubjectAccessReviewClient = client.AuthorizationV1().SubjectAccessReviews() + + authn, _, err := cfg.AuthnConfig.New() + if err != nil { + return nil, err + } + + authz, err := cfg.AuthzConfig.New() + if err != nil { + return nil, err + } + return &authWrapper{ + Request: authn, + RequestAttributesGetter: NodeRequestAttr{nodeName}, + Authorizer: authz, + }, nil +} + +func (w *authWrapper) AuthenticateRequest(r *http.Request) (*authenticator.Response, bool, error) { + ctx, span := trace.StartSpan(r.Context(), "AuthenticateRequest") + defer span.End() + return w.Request.AuthenticateRequest(r.WithContext(ctx)) +} + +func (w *authWrapper) Authorize(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) { + ctx, span := trace.StartSpan(ctx, "Authorize") + defer span.End() + return w.Authorizer.Authorize(ctx, a) +} + +// NodeRequestAttr is a authorizor.RequeestAttributesGetter which can be used in the Auth interface. +type NodeRequestAttr struct { + NodeName string +} + +// GetRequestAttributes satisfies the authorizer.RequestAttributesGetter interface for use with an `Auth`. +func (a NodeRequestAttr) GetRequestAttributes(u user.Info, r *http.Request) authorizer.Attributes { + return authorizer.AttributesRecord{ + User: u, + Verb: getAPIVerb(r), + Namespace: "", + APIGroup: "", + APIVersion: "v1", + Resource: "nodes", + Name: a.NodeName, + ResourceRequest: true, + Path: r.URL.Path, + Subresource: getSubresource(r), + } +} + +func getAPIVerb(r *http.Request) string { + switch r.Method { + case http.MethodPost: + return "create" + case http.MethodGet: + return "get" + case http.MethodPut: + return "update" + case http.MethodPatch: + return "patch" + case http.MethodDelete: + return "delete" + } + return "" +} + +func isSubpath(subpath, path string) bool { + // Taken from k8s.io/kubernetes/pkg/kubelet/server/auth.go + return subpath == path || (strings.HasPrefix(subpath, path) && subpath[len(path)] == '/') +} + +func getSubresource(r *http.Request) string { + if isSubpath(r.URL.Path, "/stats") { + return "stats" + } + if isSubpath(r.URL.Path, "/metrics") { + return "metrics" + } + if isSubpath(r.URL.Path, "/logs") { + // yes, "log", not "logs" + // per kubelet code: "log" to match other log subresources (pods/log, etc) + return "log" + } + + return "proxy" +} diff --git a/node/nodeutil/controller.go b/node/nodeutil/controller.go index 0500f9a6f..1edae926b 100644 --- a/node/nodeutil/controller.go +++ b/node/nodeutil/controller.go @@ -8,19 +8,19 @@ import ( "net/http" "os" "path" + "runtime" "time" "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node" - "github.com/virtual-kubelet/virtual-kubelet/node/api" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" ) @@ -42,9 +42,11 @@ type Node struct { scmInformerFactory informers.SharedInformerFactory client kubernetes.Interface - listenAddr string - httpHandler HTTPHandler - tlsConfig *tls.Config + listenAddr string + h http.Handler + tlsConfig *tls.Config + + workers int eb record.EventBroadcaster } @@ -59,8 +61,35 @@ func (n *Node) PodController() *node.PodController { return n.pc } +func (n *Node) runHTTP(ctx context.Context) (func(), error) { + if n.tlsConfig == nil { + log.G(ctx).Warn("TLS config not provided, not starting up http service") + return func() {}, nil + } + if n.h == nil { + log.G(ctx).Debug("No http handler, not starting up http service") + return func() {}, nil + } + + l, err := tls.Listen("tcp", n.listenAddr, n.tlsConfig) + if err != nil { + return nil, errors.Wrap(err, "error starting http listener") + } + + log.G(ctx).Debug("Started TLS listener") + + srv := &http.Server{Handler: n.h, TLSConfig: n.tlsConfig} + go srv.Serve(l) //nolint:errcheck + log.G(ctx).Debug("HTTP server running") + + return func() { + srv.Close() + l.Close() + }, nil +} + // Run starts all the underlying controllers -func (n *Node) Run(ctx context.Context, workers int) (retErr error) { +func (n *Node) Run(ctx context.Context) (retErr error) { ctx, cancel := context.WithCancel(ctx) defer func() { cancel() @@ -69,31 +98,22 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) { close(n.done) }() - if n.podInformerFactory != nil { - go n.podInformerFactory.Start(ctx.Done()) - } - if n.scmInformerFactory != nil { - go n.scmInformerFactory.Start(ctx.Done()) - } - if n.eb != nil { n.eb.StartLogging(log.G(ctx).Infof) n.eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: n.client.CoreV1().Events(v1.NamespaceAll)}) + defer n.eb.Shutdown() + log.G(ctx).Debug("Started event broadcaster") } - l, err := tls.Listen("tcp", n.listenAddr, n.tlsConfig) + cancelHTTP, err := n.runHTTP(ctx) if err != nil { - return errors.Wrap(err, "error starting http listener") + return err } - log.G(ctx).Debug("Started TLS listener") - defer l.Close() + defer cancelHTTP() - srv := &http.Server{Handler: n.httpHandler, TLSConfig: n.tlsConfig} - go srv.Serve(l) - defer srv.Close() - - go n.pc.Run(ctx, workers) //nolint:errcheck - log.G(ctx).Debug("HTTP server running") + go n.podInformerFactory.Start(ctx.Done()) + go n.scmInformerFactory.Start(ctx.Done()) + go n.pc.Run(ctx, n.workers) //nolint:errcheck defer func() { cancel() @@ -110,7 +130,7 @@ func (n *Node) Run(ctx context.Context, workers int) (retErr error) { log.G(ctx).Debug("pod controller ready") - go n.nc.Run(ctx) // nolint:errcheck + go n.nc.Run(ctx) //nolint:errcheck defer func() { cancel() @@ -193,6 +213,9 @@ type NodeOpt func(c *NodeConfig) error // NodeConfig is used to hold configuration items for a Node. // It gets used in conjection with NodeOpt in NewNodeFromClient type NodeConfig struct { + // Set the client to use, otherwise a client will be created from ClientsetFromEnv + Client kubernetes.Interface + // Set the node spec to register with Kubernetes NodeSpec v1.Node // Set the path to read a kubeconfig from for creating a client. @@ -206,8 +229,9 @@ type NodeConfig struct { // Set a custom API handler to use. // You can use this to setup, for example, authentication middleware. // If one is not provided a default one will be created. - // Pod routes will be attached to this handler when creating the node - HTTPHandler HTTPHandler + // + // Note: If you provide your own handler, you'll need to handle all auth, routes, etc. + Handler http.Handler // Set the timeout for idle http streams StreamIdleTimeout time.Duration // Set the timeout for creating http streams @@ -220,11 +244,12 @@ type NodeConfig struct { // Specify the event recorder to use // If this is not provided, a default one will be used. EventRecorder record.EventRecorder -} -type HTTPHandler interface { - api.ServeMux - http.Handler + // Set the number of workers to reconcile pods + // The default value is derived from the number of cores available. + NumWorkers int + + routeAttacher func(Provider, NodeConfig, corev1listers.PodLister) } // WithNodeConfig returns a NodeOpt which replaces the NodeConfig with the passed in value. @@ -235,9 +260,12 @@ func WithNodeConfig(c NodeConfig) NodeOpt { } } -// NewNode calls NewNodeFromClient with a nil client -func NewNode(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) { - return NewNodeFromClient(nil, name, newProvider, opts...) +// WithClient return a NodeOpt that sets the client that will be used to create/manage the node. +func WithClient(c kubernetes.Interface) NodeOpt { + return func(cfg *NodeConfig) error { + cfg.Client = c + return nil + } } // NewNodeFromClient creates a new node using the provided client and name. @@ -247,9 +275,11 @@ func NewNode(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, // Some basic values are set for node status, you'll almost certainly want to modify it. // // If client is nil, this will construct a client using ClientsetFromEnv -func NewNodeFromClient(client kubernetes.Interface, name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) { +// +// It is up to the caller to configure auth on the HTTP handler. +func NewNodeFromClient(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) { cfg := NodeConfig{ - // TODO: this is what was set in the cli code... its not clear what a good value actually is. + NumWorkers: runtime.NumCPU(), InformerResyncPeriod: time.Minute, KubeconfigPath: os.Getenv("KUBECONFIG"), HTTPListenAddr: ":10250", @@ -285,10 +315,7 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New return nil, errors.Wrap(err, "error parsing http listen address") } - if cfg.HTTPHandler == nil { - cfg.HTTPHandler = http.NewServeMux() - } - + client := cfg.Client if client == nil { var err error client, err = ClientsetFromEnv(cfg.KubeconfigPath) @@ -324,17 +351,9 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New return nil, errors.Wrap(err, "error creating provider") } - api.AttachPodRoutes(api.PodHandlerConfig{ - RunInContainer: p.RunInContainer, - GetContainerLogs: p.GetContainerLogs, - GetPods: p.GetPods, - GetPodsFromKubernetes: func(context.Context) ([]*v1.Pod, error) { - return podInformer.Lister().List(labels.Everything()) - }, - GetStatsSummary: p.GetStatsSummary, - StreamIdleTimeout: cfg.StreamIdleTimeout, - StreamCreationTimeout: cfg.StreamCreationTimeout, - }, cfg.HTTPHandler, cfg.DebugHTTP) + if cfg.routeAttacher != nil { + cfg.routeAttacher(p, cfg, podInformer.Lister()) + } var readyCb func(context.Context) error if np == nil { @@ -360,7 +379,7 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New var eb record.EventBroadcaster if cfg.EventRecorder == nil { - eb := record.NewBroadcaster() + eb = record.NewBroadcaster() cfg.EventRecorder = eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(name, "pod-controller")}) } @@ -388,8 +407,9 @@ func NewNodeFromClient(client kubernetes.Interface, name string, newProvider New scmInformerFactory: scmInformerFactory, client: client, tlsConfig: cfg.TLSConfig, - httpHandler: cfg.HTTPHandler, + h: cfg.Handler, listenAddr: cfg.HTTPListenAddr, + workers: cfg.NumWorkers, }, nil } diff --git a/node/nodeutil/provider.go b/node/nodeutil/provider.go index b85b454cb..0f6a72f03 100644 --- a/node/nodeutil/provider.go +++ b/node/nodeutil/provider.go @@ -8,6 +8,7 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/node/api" "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" corev1listers "k8s.io/client-go/listers/core/v1" ) @@ -45,3 +46,25 @@ type ProviderConfig struct { // NewProviderFunc is used from NewNodeFromClient to bootstrap a provider using the client/listers/etc created there. // If a nil node provider is returned a default one will be used. type NewProviderFunc func(ProviderConfig) (Provider, node.NodeProvider, error) + +// AttachProviderRoutes returns a NodeOpt which uses api.PodHandler to attach the routes to the provider functions. +// +// Note this only attaches routes, you'll need to ensure to set the handler in the node config. +func AttachProviderRoutes(mux api.ServeMux) NodeOpt { + return func(cfg *NodeConfig) error { + cfg.routeAttacher = func(p Provider, cfg NodeConfig, pods corev1listers.PodLister) { + mux.Handle("/", api.PodHandler(api.PodHandlerConfig{ + RunInContainer: p.RunInContainer, + GetContainerLogs: p.GetContainerLogs, + GetPods: p.GetPods, + GetPodsFromKubernetes: func(context.Context) ([]*v1.Pod, error) { + return pods.List(labels.Everything()) + }, + GetStatsSummary: p.GetStatsSummary, + StreamIdleTimeout: cfg.StreamIdleTimeout, + StreamCreationTimeout: cfg.StreamCreationTimeout, + }, true)) + } + return nil + } +} diff --git a/node/nodeutil/tls.go b/node/nodeutil/tls.go new file mode 100644 index 000000000..ac29e3f1b --- /dev/null +++ b/node/nodeutil/tls.go @@ -0,0 +1,83 @@ +package nodeutil + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" +) + +// WithTLSConfig returns a NodeOpt which creates a base TLSConfig with the default cipher suites and tls min verions. +// The tls config can be modified through functional options. +func WithTLSConfig(opts ...func(*tls.Config) error) NodeOpt { + return func(cfg *NodeConfig) error { + tlsCfg := &tls.Config{ + MinVersion: tls.VersionTLS12, + PreferServerCipherSuites: true, + CipherSuites: DefaultServerCiphers(), + ClientAuth: tls.RequestClientCert, + } + for _, o := range opts { + if err := o(tlsCfg); err != nil { + return err + } + } + + cfg.TLSConfig = tlsCfg + return nil + } +} + +// WithCAFromPath makes a TLS config option to set up client auth using the path to a PEM encoded CA cert. +func WithCAFromPath(p string) func(*tls.Config) error { + return func(cfg *tls.Config) error { + pem, err := ioutil.ReadFile(p) + if err != nil { + return fmt.Errorf("error reading ca cert pem: %w", err) + } + cfg.ClientAuth = tls.RequireAndVerifyClientCert + return WithCACert(pem)(cfg) + } +} + +// WithKeyPairFromPath make sa TLS config option which loads the key pair paths from disk and appends them to the tls config. +func WithKeyPairFromPath(cert, key string) func(*tls.Config) error { + return func(cfg *tls.Config) error { + cert, err := tls.LoadX509KeyPair(cert, key) + if err != nil { + return err + } + cfg.Certificates = append(cfg.Certificates, cert) + return nil + } +} + +// WithCACert makes a TLS config opotion which appends the provided PEM encoded bytes the tls config's cert pool. +// If a cert pool is not defined on the tls config an empty one will be created. +func WithCACert(pem []byte) func(*tls.Config) error { + return func(cfg *tls.Config) error { + if cfg.ClientCAs == nil { + cfg.ClientCAs = x509.NewCertPool() + } + if !cfg.ClientCAs.AppendCertsFromPEM(pem) { + return fmt.Errorf("could not parse ca cert pem") + } + return nil + } +} + +// DefaultServerCiphers is the list of accepted TLS ciphers, with known weak ciphers elided +// Note this list should be a moving target. +func DefaultServerCiphers() []uint16 { + return []uint16{ + tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, + + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + } +} From c9c0d990642d78c0fc3bb43b2771ac6e130165f6 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Wed, 9 Jun 2021 16:59:23 +0000 Subject: [PATCH 5/5] Rename NewNodeFromClient to just NewNode Since we now store the client on the config, we don't need to use a custom client. --- .../internal/commands/root/root.go | 16 +++++--------- node/nodeutil/controller.go | 21 +++++++++---------- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index ade4daa61..88dd709b1 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -32,7 +32,6 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" corev1 "k8s.io/api/core/v1" "k8s.io/apiserver/pkg/server/dynamiccertificates" - "k8s.io/client-go/kubernetes" ) // NewCommand creates a new top-level command. @@ -74,11 +73,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } } - client, err := nodeutil.ClientsetFromEnv(c.KubeConfigPath) - if err != nil { - return err - } - mux := http.NewServeMux() newProvider := func(cfg nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) { rm, err := manager.NewResourceManager(cfg.Pods, cfg.Secrets, cfg.ConfigMaps, cfg.Services) @@ -113,7 +107,8 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return err } - cm, err := nodeutil.NewNodeFromClient(c.NodeName, newProvider, func(cfg *nodeutil.NodeConfig) error { + cm, err := nodeutil.NewNode(c.NodeName, newProvider, func(cfg *nodeutil.NodeConfig) error { + cfg.KubeconfigPath = c.KubeConfigPath cfg.Handler = mux cfg.InformerResyncPeriod = c.InformerResyncPeriod @@ -132,8 +127,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return nil }, - nodeutil.WithClient(client), - setAuth(client, c.NodeName, apiConfig), + setAuth(c.NodeName, apiConfig), nodeutil.WithTLSConfig( nodeutil.WithKeyPairFromPath(apiConfig.CertPath, apiConfig.KeyPath), maybeCA(apiConfig.CACertPath), @@ -178,7 +172,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return nil } -func setAuth(client kubernetes.Interface, node string, apiCfg *apiServerConfig) nodeutil.NodeOpt { +func setAuth(node string, apiCfg *apiServerConfig) nodeutil.NodeOpt { if apiCfg.CACertPath == "" { return func(cfg *nodeutil.NodeConfig) error { cfg.Handler = api.InstrumentHandler(nodeutil.WithAuth(nodeutil.NoAuth(), cfg.Handler)) @@ -187,7 +181,7 @@ func setAuth(client kubernetes.Interface, node string, apiCfg *apiServerConfig) } return func(cfg *nodeutil.NodeConfig) error { - auth, err := nodeutil.WebhookAuth(client, node, func(cfg *nodeutil.WebhookAuthConfig) error { + auth, err := nodeutil.WebhookAuth(cfg.Client, node, func(cfg *nodeutil.WebhookAuthConfig) error { var err error cfg.AuthnConfig.ClientCertificateCAContentProvider, err = dynamiccertificates.NewDynamicCAContentFromFile("ca-cert-bundle", apiCfg.CACertPath) return err diff --git a/node/nodeutil/controller.go b/node/nodeutil/controller.go index 1edae926b..edb8c3c72 100644 --- a/node/nodeutil/controller.go +++ b/node/nodeutil/controller.go @@ -268,7 +268,7 @@ func WithClient(c kubernetes.Interface) NodeOpt { } } -// NewNodeFromClient creates a new node using the provided client and name. +// NewNode creates a new node using the provided client and name. // This is intended for high-level/low boiler-plate usage. // Use the constructors in the `node` package for lower level configuration. // @@ -277,7 +277,7 @@ func WithClient(c kubernetes.Interface) NodeOpt { // If client is nil, this will construct a client using ClientsetFromEnv // // It is up to the caller to configure auth on the HTTP handler. -func NewNodeFromClient(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) { +func NewNode(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) { cfg := NodeConfig{ NumWorkers: runtime.NumCPU(), InformerResyncPeriod: time.Minute, @@ -315,23 +315,22 @@ func NewNodeFromClient(name string, newProvider NewProviderFunc, opts ...NodeOpt return nil, errors.Wrap(err, "error parsing http listen address") } - client := cfg.Client - if client == nil { + if cfg.Client == nil { var err error - client, err = ClientsetFromEnv(cfg.KubeconfigPath) + cfg.Client, err = ClientsetFromEnv(cfg.KubeconfigPath) if err != nil { return nil, errors.Wrap(err, "error creating clientset from env") } } podInformerFactory := informers.NewSharedInformerFactoryWithOptions( - client, + cfg.Client, cfg.InformerResyncPeriod, PodInformerFilter(name), ) scmInformerFactory := informers.NewSharedInformerFactoryWithOptions( - client, + cfg.Client, cfg.InformerResyncPeriod, ) @@ -370,8 +369,8 @@ func NewNodeFromClient(name string, newProvider NewProviderFunc, opts ...NodeOpt nc, err := node.NewNodeController( np, &cfg.NodeSpec, - client.CoreV1().Nodes(), - node.WithNodeEnableLeaseV1(NodeLeaseV1Client(client), node.DefaultLeaseDuration), + cfg.Client.CoreV1().Nodes(), + node.WithNodeEnableLeaseV1(NodeLeaseV1Client(cfg.Client), node.DefaultLeaseDuration), ) if err != nil { return nil, errors.Wrap(err, "error creating node controller") @@ -384,7 +383,7 @@ func NewNodeFromClient(name string, newProvider NewProviderFunc, opts ...NodeOpt } pc, err := node.NewPodController(node.PodControllerConfig{ - PodClient: client.CoreV1(), + PodClient: cfg.Client.CoreV1(), EventRecorder: cfg.EventRecorder, Provider: p, PodInformer: podInformer, @@ -405,7 +404,7 @@ func NewNodeFromClient(name string, newProvider NewProviderFunc, opts ...NodeOpt eb: eb, podInformerFactory: podInformerFactory, scmInformerFactory: scmInformerFactory, - client: client, + client: cfg.Client, tlsConfig: cfg.TLSConfig, h: cfg.Handler, listenAddr: cfg.HTTPListenAddr,