diff --git a/cmd/virtual-kubelet/commands/root/root.go b/cmd/virtual-kubelet/commands/root/root.go index c8813f1fe..0c91fe644 100644 --- a/cmd/virtual-kubelet/commands/root/root.go +++ b/cmd/virtual-kubelet/commands/root/root.go @@ -35,6 +35,7 @@ import ( kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -140,13 +141,17 @@ func runRootCommand(ctx context.Context, c Opts) error { "watchedNamespace": c.KubeNamespace, })) + var leaseClient v1beta1.LeaseInterface + if c.EnableNodeLease { + leaseClient = client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) + } + pNode := NodeFromProvider(ctx, c.NodeName, taint, p) node, err := vkubelet.NewNode( vkubelet.NaiveNodeProvider{}, pNode, - client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease), client.CoreV1().Nodes(), - vkubelet.WithNodeDisableLease(!c.EnableNodeLease), + vkubelet.WithNodeEnableLeaseV1Beta1(leaseClient, nil), vkubelet.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error { if !k8serrors.IsNotFound(err) { return err diff --git a/vkubelet/node.go b/vkubelet/node.go index 4fa73bc21..10e5b8f20 100644 --- a/vkubelet/node.go +++ b/vkubelet/node.go @@ -45,8 +45,11 @@ type NodeProvider interface { // // Use the node's `Run` method to register and run the loops to update the node // in Kubernetes. -func NewNode(p NodeProvider, node *corev1.Node, leases v1beta1.LeaseInterface, nodes v1.NodeInterface, opts ...NodeOpt) (*Node, error) { - n := &Node{p: p, n: node, leases: leases, nodes: nodes, chReady: make(chan struct{})} +// +// Note: When if there are multiple NodeOpts which apply against the same +// underlying options, the last NodeOpt will win. +func NewNode(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeOpt) (*Node, error) { + n := &Node{p: p, n: node, nodes: nodes, chReady: make(chan struct{})} for _, o := range opts { if err := o(n); err != nil { return nil, pkgerrors.Wrap(err, "error applying node option") @@ -58,17 +61,31 @@ func NewNode(p NodeProvider, node *corev1.Node, leases v1beta1.LeaseInterface, n // NodeOpt are the functional options used for configuring a node type NodeOpt func(*Node) error -// WithNodeDisableLease forces node leases to be disabled and to only update -// using node status -// Note that this will force status updates to occur on the ping interval frequency -func WithNodeDisableLease(v bool) NodeOpt { +// WithNodeEnableLeaseV1Beta1 enables support for v1beta1 leases. +// If client is nil, leases will not be enabled. +// If baseLease is nil, a default base lease will be used. +// +// The lease will be updated after each successful node ping. To change the +// lease update interval, you must set the node ping interval. +// See WithNodePingInterval(). +// +// This also affects the frequency of node status updates: +// - When leases are *not* enabled (or are disabled due to no support on the cluster) +// the node status is updated at every ping interval. +// - When node leases are enabled, node status updates are colled by the +// node status update interval option. +// To set a custom node status update interval, see WithNodeStatusUpdateInterval(). +func WithNodeEnableLeaseV1Beta1(client v1beta1.LeaseInterface, baseLease *coord.Lease) NodeOpt { return func(n *Node) error { - n.disableLease = v + n.leases = client + n.lease = baseLease return nil } } -// WithNodePingInterval sets the inteval for checking node status +// WithNodePingInterval sets the interval for checking node status +// If node leases are not supported (or not enabled), this is the frequency +// with which the node status will be updated in Kubernetes. func WithNodePingInterval(d time.Duration) NodeOpt { return func(n *Node) error { n.pingInterval = d @@ -79,6 +96,8 @@ func WithNodePingInterval(d time.Duration) NodeOpt { // WithNodeStatusUpdateInterval sets the interval for updating node status // This is only used when leases are supported and only for updating the actual // node status, not the node lease. +// When node leases are not enabled (or are not supported on the cluster) this +// has no affect and node status is updated on the "ping" interval. func WithNodeStatusUpdateInterval(d time.Duration) NodeOpt { return func(n *Node) error { n.statusInterval = d @@ -105,15 +124,6 @@ func WithNodeStatusUpdateErrorHandler(h ErrorHandler) NodeOpt { // progress can continue (or a retry is possible). type ErrorHandler func(context.Context, error) error -// WithNodeLease sets the base node lease to use. -// If a lease time is set, it will be ignored. -func WithNodeLease(l *coord.Lease) NodeOpt { - return func(n *Node) error { - n.lease = l - return nil - } -} - // Node deals with creating and managing a node object in Kubernetes. // It can register a node with Kubernetes and periodically update its status. type Node struct { @@ -136,7 +146,7 @@ type Node struct { // The default intervals used for lease and status updates. const ( - DefaultPingInterval = 5 * time.Second + DefaultPingInterval = 10 * time.Second DefaultStatusUpdateInterval = 1 * time.Minute ) @@ -169,27 +179,25 @@ func (n *Node) Run(ctx context.Context) error { return err } - if !n.disableLease { - n.lease = newLease(n.lease) - setLeaseAttrs(n.lease, n.n, n.pingInterval*5) + if n.leases == nil { + n.disableLease = true + return n.controlLoop(ctx) + } - l, err := ensureLease(ctx, n.leases, n.lease) - if err != nil { - if errors.IsNotFound(err) { - n.disableLease = true - } else { - return pkgerrors.Wrap(err, "error creating node lease") - } + n.lease = newLease(n.lease) + setLeaseAttrs(n.lease, n.n, n.pingInterval*5) + + l, err := ensureLease(ctx, n.leases, n.lease) + if err != nil { + if !errors.IsNotFound(err) { + return pkgerrors.Wrap(err, "error creating node lease") } - log.G(ctx).Debug("Created node lease") - - n.lease = l - } - - if n.disableLease { log.G(ctx).Info("Node leases not supported, falling back to only node status updates") + n.disableLease = true } + n.lease = l + log.G(ctx).Debug("Created node lease") return n.controlLoop(ctx) } diff --git a/vkubelet/node_test.go b/vkubelet/node_test.go index a6c69ab34..55f502355 100644 --- a/vkubelet/node_test.go +++ b/vkubelet/node_test.go @@ -33,11 +33,14 @@ func testNodeRun(t *testing.T, enableLease bool) { leases := c.Coordination().Leases(corev1.NamespaceNodeLease) interval := 1 * time.Millisecond - node, err := NewNode(testP, testNode(t), leases, nodes, + opts := []NodeOpt{ WithNodePingInterval(interval), WithNodeStatusUpdateInterval(interval), - WithNodeDisableLease(!enableLease), - ) + } + if enableLease { + opts = append(opts, WithNodeEnableLeaseV1Beta1(leases, nil)) + } + node, err := NewNode(testP, testNode(t), nodes, opts...) assert.NilError(t, err) chErr := make(chan error) @@ -157,12 +160,11 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - node, err := NewNode(testP, testNode(t), nil, nodes, + node, err := NewNode(testP, testNode(t), nodes, WithNodeStatusUpdateErrorHandler(func(_ context.Context, err error) error { cancel() return nil }), - WithNodeDisableLease(true), ) assert.NilError(t, err)