diff --git a/Makefile b/Makefile index 510c53593..e8a9f4b79 100644 --- a/Makefile +++ b/Makefile @@ -187,7 +187,7 @@ kubebuilder_2.3.1_${TEST_OS}_${TEST_ARCH}: kubebuilder_2.3.1_${TEST_OS}_${TEST_A envtest: kubebuilder_2.3.1_${TEST_OS}_${TEST_ARCH} # You can add klog flags for debugging, like: -klog.v=10 -klog.logtostderr # klogv2 flags just wraps our existing logrus. - KUBEBUILDER_ASSETS=$(PWD)/kubebuilder_2.3.1_${TEST_OS}_${TEST_ARCH}/bin go test -v ./internal/test/vk_envtest -envtest=true + KUBEBUILDER_ASSETS=$(PWD)/kubebuilder_2.3.1_${TEST_OS}_${TEST_ARCH}/bin go test -v -run=TestEnvtest ./node -envtest=true .PHONY: fmt fmt: diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index d1f59a28c..2252d651e 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -32,7 +32,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" ) @@ -138,18 +137,9 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { "watchedNamespace": c.KubeNamespace, })) - var leaseClient v1beta1.LeaseInterface - if c.EnableNodeLease { - leaseClient = nodeutil.NodeLeaseV1Beta1Client(client) - } - pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version) np := node.NewNaiveNodeProvider() - nodeRunner, err := node.NewNodeController( - np, - pNode, - client.CoreV1().Nodes(), - node.WithNodeEnableLeaseV1Beta1(leaseClient, nil), + additionalOptions := []node.NodeControllerOpt{ node.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error { if !k8serrors.IsNotFound(err) { return err @@ -165,6 +155,17 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { log.G(ctx).Debug("created new node") return nil }), + } + if c.EnableNodeLease { + leaseClient := nodeutil.NodeLeaseV1Client(client) + // 40 seconds is the default lease time in upstream kubelet + additionalOptions = append(additionalOptions, node.WithNodeEnableLeaseV1(leaseClient, 40)) + } + nodeRunner, err := node.NewNodeController( + np, + pNode, + client.CoreV1().Nodes(), + additionalOptions..., ) if err != nil { log.G(ctx).Fatal(err) diff --git a/internal/test/vk_envtest/env_test.go b/node/env_test.go similarity index 80% rename from internal/test/vk_envtest/env_test.go rename to node/env_test.go index 3f33fca6d..ae85ef0d5 100644 --- a/internal/test/vk_envtest/env_test.go +++ b/node/env_test.go @@ -1,8 +1,7 @@ -package e2e_test +package node import ( "context" - "flag" "os" "strings" "testing" @@ -12,30 +11,16 @@ import ( "github.com/sirupsen/logrus" "github.com/virtual-kubelet/virtual-kubelet/log" logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" - "github.com/virtual-kubelet/virtual-kubelet/node" "gotest.tools/assert" is "gotest.tools/assert/cmp" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - klogv1 "k8s.io/klog" klogv2 "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/envtest" ) -var enableEnvTest = flag.Bool("envtest", false, "Enable envtest based tests") - -func TestMain(m *testing.M) { - flagset := flag.NewFlagSet("klog", flag.PanicOnError) - klogv1.InitFlags(flagset) - flagset.VisitAll(func(f *flag.Flag) { - flag.Var(f.Value, "klog."+f.Name, f.Usage) - }) - flag.Parse() - os.Exit(m.Run()) -} - func TestEnvtest(t *testing.T) { if !*enableEnvTest || os.Getenv("VK_ENVTEST") != "" { t.Skip("test only runs when -envtest is passed or if VK_ENVTEST is set to a non-empty value") @@ -81,7 +66,7 @@ func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) { _, err = clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) assert.NilError(t, err) - testProvider := node.NewNaiveNodeProvider() + testProvider := NewNaiveNodeProvider() testNode := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -91,12 +76,12 @@ func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) { testNodeCopy := testNode.DeepCopy() - opts := []node.NodeControllerOpt{} - leasesClient := clientset.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) + opts := []NodeControllerOpt{} + leasesClient := clientset.CoordinationV1().Leases(corev1.NamespaceNodeLease) if withLeases { - opts = append(opts, node.WithNodeEnableLeaseV1Beta1(leasesClient, nil)) + opts = append(opts, WithNodeEnableLeaseV1(leasesClient, 0)) } - node, err := node.NewNodeController(testProvider, testNode, nodes, opts...) + node, err := NewNodeController(testProvider, testNode, nodes, opts...) assert.NilError(t, err) chErr := make(chan error, 1) diff --git a/node/lease_controller_test.go b/node/lease_controller_test.go new file mode 100644 index 000000000..5c1fda511 --- /dev/null +++ b/node/lease_controller_test.go @@ -0,0 +1,13 @@ +package node + +import ( + "errors" + "testing" + + "gotest.tools/assert" +) + +func TestNotReadyError(t *testing.T) { + n := newNodeNotReadyError(nil) + assert.Assert(t, errors.Is(n, &nodeNotReadyError{})) +} diff --git a/node/lease_controller_v1.go b/node/lease_controller_v1.go new file mode 100644 index 000000000..aba55910f --- /dev/null +++ b/node/lease_controller_v1.go @@ -0,0 +1,345 @@ +package node + +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1" + "k8s.io/utils/clock" + "k8s.io/utils/pointer" +) + +// Code heavily borrowed from: https://github.com/kubernetes/kubernetes/blob/v1.18.13/pkg/kubelet/nodelease/controller.go +// Primary changes: +// * Use our internal logging library rather than klog +// * Add tracing support +// * Allow for customization of intervals and such +// * Rather than using a node client, and having to build an independent node cache for the lease +// controller, we provide a cached version of the node object. +// * Use contexts for cancellation so the controller can be stopped versus running until the process terminates + +const ( + // DefaultRenewIntervalFraction is the fraction of lease duration to renew the lease + DefaultRenewIntervalFraction = 0.25 + // maxUpdateRetries is the number of immediate, successive retries the Kubelet will attempt + // when renewing the lease before it waits for the renewal interval before trying again, + // similar to what we do for node status retries + maxUpdateRetries = 5 + // maxBackoff is the maximum sleep time during backoff (e.g. in backoffEnsureLease) + maxBackoff = 7 * time.Second + + // DefaultLeaseDuration is from upstream kubelet, where the default lease duration is 40 seconds + DefaultLeaseDuration = 40 +) + +// leaseController is a v1 lease controller and responsible for maintaining a server-side lease as long as the node +// is healthy +type leaseController struct { + leaseClient coordclientset.LeaseInterface + leaseDurationSeconds int32 + renewInterval time.Duration + clock clock.Clock + nodeController *NodeController + // latestLease is the latest node lease which Kubelet updated or created + latestLease *coordinationv1.Lease +} + +// newLeaseControllerWithRenewInterval constructs and returns a v1 lease controller with a specific interval of how often to +// renew leases +func newLeaseControllerWithRenewInterval( + clock clock.Clock, + client coordclientset.LeaseInterface, + leaseDurationSeconds int32, + renewInterval time.Duration, + nodeController *NodeController) (*leaseController, error) { + + if leaseDurationSeconds <= 0 { + return nil, fmt.Errorf("Lease duration seconds %d is invalid, it must be > 0", leaseDurationSeconds) + } + + if renewInterval == 0 { + return nil, fmt.Errorf("Lease renew interval %s is invalid, it must be > 0", renewInterval.String()) + } + + if float64(leaseDurationSeconds) <= renewInterval.Seconds() { + return nil, fmt.Errorf("Lease renew interval %s is invalid, it must be less than lease duration seconds %d", renewInterval.String(), leaseDurationSeconds) + } + + return &leaseController{ + leaseClient: client, + leaseDurationSeconds: leaseDurationSeconds, + renewInterval: renewInterval, + clock: clock, + nodeController: nodeController, + }, nil +} + +// Run runs the controller +func (c *leaseController) Run(ctx context.Context) { + c.sync(ctx) + wait.UntilWithContext(ctx, c.sync, c.renewInterval) +} + +func (c *leaseController) sync(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var err error + ctx, span := trace.StartSpan(ctx, "lease.sync") + defer span.End() + + pingResult, err := c.nodeController.nodePingController.getResult(ctx) + if err != nil { + log.G(ctx).WithError(err).Error("Could not get ping status") + } + if pingResult.error != nil { + log.G(ctx).WithError(pingResult.error).Error("Ping result is not clean, not updating lease") + return + } + + node, err := c.nodeController.getServerNode(ctx) + if err != nil { + log.G(ctx).WithError(err).Error("Could not get server node") + span.SetStatus(err) + return + } + if node == nil { + err = errors.New("Servernode is null") + log.G(ctx).WithError(err).Error("servernode is null") + span.SetStatus(err) + return + } + + if c.latestLease != nil { + // As long as node lease is not (or very rarely) updated by any other agent than Kubelet, + // we can optimistically assume it didn't change since our last update and try updating + // based on the version from that time. Thanks to it we avoid GET call and reduce load + // on etcd and kube-apiserver. + // If at some point other agents will also be frequently updating the Lease object, this + // can result in performance degradation, because we will end up with calling additional + // GET/PUT - at this point this whole "if" should be removed. + err := c.retryUpdateLease(ctx, node, c.newLease(ctx, node, c.latestLease)) + if err == nil { + span.SetStatus(err) + return + } + log.G(ctx).WithError(err).Info("failed to update lease using latest lease, fallback to ensure lease") + } + + lease, created := c.backoffEnsureLease(ctx, node) + c.latestLease = lease + // we don't need to update the lease if we just created it + if !created && lease != nil { + if err := c.retryUpdateLease(ctx, node, lease); err != nil { + log.G(ctx).WithError(err).WithField("renewInterval", c.renewInterval).Errorf("Will retry after") + span.SetStatus(err) + } + } +} + +// backoffEnsureLease attempts to create the lease if it does not exist, +// and uses exponentially increasing waits to prevent overloading the API server +// with retries. Returns the lease, and true if this call created the lease, +// false otherwise. +func (c *leaseController) backoffEnsureLease(ctx context.Context, node *corev1.Node) (*coordinationv1.Lease, bool) { + ctx, span := trace.StartSpan(ctx, "lease.backoffEnsureLease") + defer span.End() + + var ( + lease *coordinationv1.Lease + created bool + err error + ) + sleep := 100 * time.Millisecond + for { + lease, created, err = c.ensureLease(ctx, node) + if err == nil { + break + } + sleep = minDuration(2*sleep, maxBackoff) + log.G(ctx).WithError(err).Errorf("failed to ensure node lease exists, will retry in %v", sleep) + // backoff wait + c.clock.Sleep(sleep) + timer := c.clock.NewTimer(sleep) + defer timer.Stop() + select { + case <-timer.C(): + case <-ctx.Done(): + return nil, false + } + } + return lease, created +} + +// ensureLease creates the lease if it does not exist. Returns the lease and +// a bool (true if this call created the lease), or any error that occurs. +func (c *leaseController) ensureLease(ctx context.Context, node *corev1.Node) (*coordinationv1.Lease, bool, error) { + ctx, span := trace.StartSpan(ctx, "lease.ensureLease") + defer span.End() + + lease, err := c.leaseClient.Get(ctx, node.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + // lease does not exist, create it. + leaseToCreate := c.newLease(ctx, node, nil) + if len(leaseToCreate.OwnerReferences) == 0 { + // We want to ensure that a lease will always have OwnerReferences set. + // Thus, given that we weren't able to set it correctly, we simply + // not create it this time - we will retry in the next iteration. + return nil, false, nil + } + lease, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}) + if err != nil { + span.SetStatus(err) + return nil, false, err + } + log.G(ctx).Debug("Successfully created lease") + return lease, true, nil + } else if err != nil { + // unexpected error getting lease + log.G(ctx).WithError(err).Error("Unexpected error getting lease") + span.SetStatus(err) + return nil, false, err + } + log.G(ctx).Debug("Successfully recovered existing lease") + // lease already existed + return lease, false, nil +} + +// retryUpdateLease attempts to update the lease for maxUpdateRetries, +// call this once you're sure the lease has been created +func (c *leaseController) retryUpdateLease(ctx context.Context, node *corev1.Node, base *coordinationv1.Lease) error { + ctx, span := trace.StartSpan(ctx, "controller.retryUpdateLease") + defer span.End() + + for i := 0; i < maxUpdateRetries; i++ { + lease, err := c.leaseClient.Update(ctx, c.newLease(ctx, node, base), metav1.UpdateOptions{}) + if err == nil { + log.G(ctx).WithField("retries", i).Debug("Successfully updated lease") + c.latestLease = lease + return nil + } + log.G(ctx).WithError(err).Error("failed to update node lease") + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + err = fmt.Errorf("failed after %d attempts to update node lease: %w", maxUpdateRetries, err) + span.SetStatus(err) + return err + } + // OptimisticLockError requires getting the newer version of lease to proceed. + if apierrors.IsConflict(err) { + base, _ = c.backoffEnsureLease(ctx, node) + continue + } + } + + err := fmt.Errorf("failed after %d attempts to update node lease", maxUpdateRetries) + span.SetStatus(err) + return err +} + +// newLease constructs a new lease if base is nil, or returns a copy of base +// with desired state asserted on the copy. +func (c *leaseController) newLease(ctx context.Context, node *corev1.Node, base *coordinationv1.Lease) *coordinationv1.Lease { + ctx, span := trace.StartSpan(ctx, "lease.newLease") + defer span.End() + // Use the bare minimum set of fields; other fields exist for debugging/legacy, + // but we don't need to make node heartbeats more complicated by using them. + var lease *coordinationv1.Lease + if base == nil { + lease = &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: node.Name, + Namespace: corev1.NamespaceNodeLease, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: pointer.StringPtr(node.Name), + LeaseDurationSeconds: pointer.Int32Ptr(c.leaseDurationSeconds), + }, + } + } else { + lease = base.DeepCopy() + } + lease.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()} + + // Setting owner reference needs node's UID. Note that it is different from + // kubelet.nodeRef.UID. When lease is initially created, it is possible that + // the connection between master and node is not ready yet. So try to set + // owner reference every time when renewing the lease, until successful. + if len(lease.OwnerReferences) == 0 { + lease.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: corev1.SchemeGroupVersion.WithKind("Node").Version, + Kind: corev1.SchemeGroupVersion.WithKind("Node").Kind, + Name: node.Name, + UID: node.UID, + }, + } + } + + ctx = span.WithFields(ctx, map[string]interface{}{ + "lease": lease, + }) + log.G(ctx).Debug("Generated lease") + return lease +} + +func minDuration(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b +} + +// nodeNotReadyError indicates that the node was not ready / ping is failing +type nodeNotReadyError struct { + pingResult *pingResult +} + +func newNodeNotReadyError(pingResult *pingResult) error { + return &nodeNotReadyError{ + pingResult: pingResult, + } +} + +func (e *nodeNotReadyError) Unwrap() error { + return e.pingResult.error +} + +func (e *nodeNotReadyError) Is(target error) bool { + _, ok := target.(*nodeNotReadyError) + return ok +} + +func (e *nodeNotReadyError) As(target error) bool { + val, ok := target.(*nodeNotReadyError) + if ok { + *val = *e + } + return ok +} + +func (e *nodeNotReadyError) Error() string { + return fmt.Sprintf("New node not ready error: %s", e.pingResult.error) +} diff --git a/node/main_test.go b/node/main_test.go new file mode 100644 index 000000000..3cd740ad7 --- /dev/null +++ b/node/main_test.go @@ -0,0 +1,22 @@ +/* This file is just a place for the TestMain override function to live, plus whatever custom flags we are interested in */ +package node + +import ( + "flag" + "os" + "testing" + + klogv1 "k8s.io/klog" +) + +var enableEnvTest = flag.Bool("envtest", false, "Enable envtest based tests") + +func TestMain(m *testing.M) { + flagset := flag.NewFlagSet("klog", flag.PanicOnError) + klogv1.InitFlags(flagset) + flagset.VisitAll(func(f *flag.Flag) { + flag.Var(f.Value, "klog."+f.Name, f.Usage) + }) + flag.Parse() + os.Exit(m.Run()) +} diff --git a/node/node.go b/node/node.go index 5a61c8671..4b2f3aa79 100644 --- a/node/node.go +++ b/node/node.go @@ -17,21 +17,23 @@ package node import ( "context" "encoding/json" + "fmt" + "sync" "time" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" - coord "k8s.io/api/coordination/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" + coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1" v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/retry" + "k8s.io/utils/clock" ) const ( @@ -41,6 +43,12 @@ const ( virtualKubeletLastNodeAppliedObjectMeta = "virtual-kubelet.io/last-applied-object-meta" ) +var ( + // ErrConflictingLeaseControllerConfiguration is returned when the lease controller related options have been + // specified multiple times + ErrConflictingLeaseControllerConfiguration = pkgerrors.New("Multiple, conflicting lease configurations have been put into place") +) + // NodeProvider is the interface used for registering a node and updating its // status in Kubernetes. // @@ -98,24 +106,49 @@ func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface // NodeControllerOpt are the functional options used for configuring a node type NodeControllerOpt func(*NodeController) error // nolint:golint -// WithNodeEnableLeaseV1Beta1 enables support for v1beta1 leases. -// If client is nil, leases will not be enabled. -// If baseLease is nil, a default base lease will be used. -// -// The lease will be updated after each successful node ping. To change the -// lease update interval, you must set the node ping interval. -// See WithNodePingInterval(). -// -// This also affects the frequency of node status updates: -// - When leases are *not* enabled (or are disabled due to no support on the cluster) -// the node status is updated at every ping interval. -// - When node leases are enabled, node status updates are controlled by the -// node status update interval option. -// To set a custom node status update interval, see WithNodeStatusUpdateInterval(). -func WithNodeEnableLeaseV1Beta1(client v1beta1.LeaseInterface, baseLease *coord.Lease) NodeControllerOpt { +// WithNodeEnableLeaseV1 enables support for v1 leases. +// V1 Leases share all the same properties as v1beta1 leases, except they do not fallback like +// the v1beta1 lease controller does if the API server does not support it. If the lease duration is not specified (0) +// then DefaultLeaseDuration will be used +func WithNodeEnableLeaseV1(client coordclientset.LeaseInterface, leaseDurationSeconds int32) NodeControllerOpt { + if leaseDurationSeconds == 0 { + leaseDurationSeconds = DefaultLeaseDuration + } + + interval := float64(leaseDurationSeconds) * DefaultRenewIntervalFraction + intervalDuration := time.Second * time.Duration(int(interval)) + + return WithNodeEnableLeaseV1WithRenewInterval(client, leaseDurationSeconds, intervalDuration) +} + +// WithNodeEnableLeaseV1WithRenewInterval enables support for v1 leases, and sets a specific renew interval, +// as opposed to the standard multiplier specified by DefaultRenewIntervalFraction +func WithNodeEnableLeaseV1WithRenewInterval(client coordclientset.LeaseInterface, leaseDurationSeconds int32, interval time.Duration) NodeControllerOpt { + if client == nil { + panic("client is nil") + } + + if leaseDurationSeconds == 0 { + leaseDurationSeconds = DefaultLeaseDuration + } + return func(n *NodeController) error { - n.leases = client - n.lease = baseLease + if n.leaseController != nil { + return ErrConflictingLeaseControllerConfiguration + } + + leaseController, err := newLeaseControllerWithRenewInterval( + &clock.RealClock{}, + client, + leaseDurationSeconds, + interval, + n, + ) + if err != nil { + return fmt.Errorf("Unable to configure lease controller: %w", err) + } + + n.leaseController = leaseController return nil } } @@ -177,16 +210,15 @@ type ErrorHandler func(context.Context, error) error type NodeController struct { // nolint:golint p NodeProvider - // serverNode should only be written to on initialization, or as the result of node creation. - serverNode *corev1.Node + // serverNode must be updated each time it is updated in API Server + serverNodeLock sync.Mutex + serverNode *corev1.Node + nodes v1.NodeInterface - leases v1beta1.LeaseInterface - nodes v1.NodeInterface + leaseController *leaseController - disableLease bool pingInterval time.Duration statusInterval time.Duration - lease *coord.Lease chStatusUpdate chan *corev1.Node nodeStatusUpdateErrorHandler ErrorHandler @@ -195,6 +227,8 @@ type NodeController struct { // nolint:golint nodePingController *nodePingController pingTimeout *time.Duration + + group wait.Group } // The default intervals used for lease and status updates. @@ -221,30 +255,21 @@ func (n *NodeController) Run(ctx context.Context) error { n.chStatusUpdate <- node }) + n.group.StartWithContext(ctx, n.nodePingController.Run) + + n.serverNodeLock.Lock() providerNode := n.serverNode.DeepCopy() + n.serverNodeLock.Unlock() if err := n.ensureNode(ctx, providerNode); err != nil { return err } - if n.leases == nil { - n.disableLease = true - return n.controlLoop(ctx, providerNode) + if n.leaseController != nil { + log.G(ctx).WithField("leaseController", n.leaseController).Debug("Starting leasecontroller") + n.group.StartWithContext(ctx, n.leaseController.Run) } - n.lease = newLease(ctx, n.lease, n.serverNode, n.pingInterval) - - l, err := ensureLease(ctx, n.leases, n.lease) - if err != nil { - if !errors.IsNotFound(err) { - return pkgerrors.Wrap(err, "error creating node lease") - } - log.G(ctx).Info("Node leases not supported, falling back to only node status updates") - n.disableLease = true - } - n.lease = l - - log.G(ctx).Debug("Created node lease") return n.controlLoop(ctx, providerNode) } @@ -260,12 +285,17 @@ func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.No return err } - node, err := n.nodes.Create(ctx, n.serverNode, metav1.CreateOptions{}) + n.serverNodeLock.Lock() + serverNode := n.serverNode + n.serverNodeLock.Unlock() + node, err := n.nodes.Create(ctx, serverNode, metav1.CreateOptions{}) if err != nil { return pkgerrors.Wrap(err, "error registering node with kubernetes") } + n.serverNodeLock.Lock() n.serverNode = node + n.serverNodeLock.Unlock() // Bad things will happen if the node is deleted in k8s and recreated by someone else // we rely on this persisting providerNode.ObjectMeta.Name = node.Name @@ -283,50 +313,33 @@ func (n *NodeController) Ready() <-chan struct{} { } func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.Node) error { - pingTimer := time.NewTimer(n.pingInterval) - defer pingTimer.Stop() - - statusTimer := time.NewTimer(n.statusInterval) - defer statusTimer.Stop() - timerResetDuration := n.statusInterval - if n.disableLease { - // when resetting the timer after processing a status update, reset it to the ping interval - // (since it will be the ping timer as serverNode.disableLease == true) - timerResetDuration = n.pingInterval - - // hack to make sure this channel always blocks since we won't be using it - if !statusTimer.Stop() { - <-statusTimer.C - } - } - close(n.chReady) - group := &wait.Group{} - group.StartWithContext(ctx, n.nodePingController.run) - defer group.Wait() + defer n.group.Wait() + + var sleepInterval time.Duration + if n.leaseController == nil { + log.G(ctx).WithField("pingInterval", n.pingInterval).Debug("lease controller is not enabled, updating node status in Kube API server at Ping Time Interval") + sleepInterval = n.pingInterval + } else { + log.G(ctx).WithField("statusInterval", n.statusInterval).Debug("lease controller in use, updating at statusInterval") + sleepInterval = n.statusInterval + } loop := func() bool { ctx, span := trace.StartSpan(ctx, "node.controlLoop.loop") defer span.End() + var timer *time.Timer + ctx = span.WithField(ctx, "sleepTime", n.pingInterval) + timer = time.NewTimer(sleepInterval) + defer timer.Stop() + select { case <-ctx.Done(): return true case updated := <-n.chStatusUpdate: - var t *time.Timer - if n.disableLease { - t = pingTimer - } else { - t = statusTimer - } - log.G(ctx).Debug("Received node status update") - // Performing a status update so stop/reset the status update timer in this - // branch otherwise there could be an unnecessary status update. - if !t.Stop() { - <-t.C - } providerNode.Status = updated.Status providerNode.ObjectMeta.Annotations = updated.Annotations @@ -334,19 +347,10 @@ func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.N if err := n.updateStatus(ctx, providerNode, false); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } - t.Reset(timerResetDuration) - case <-statusTimer.C: + case <-timer.C: if err := n.updateStatus(ctx, providerNode, false); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } - statusTimer.Reset(n.statusInterval) - case <-pingTimer.C: - if err := n.handlePing(ctx, providerNode); err != nil { - log.G(ctx).WithError(err).Error("Error while handling node ping") - } else { - log.G(ctx).Debug("Successful node ping") - } - pingTimer.Reset(n.pingInterval) } return false } @@ -359,42 +363,6 @@ func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.N } } -func (n *NodeController) handlePing(ctx context.Context, providerNode *corev1.Node) (retErr error) { - ctx, span := trace.StartSpan(ctx, "node.handlePing") - defer span.End() - defer func() { - span.SetStatus(retErr) - }() - - result, err := n.nodePingController.getResult(ctx) - if err != nil { - err = pkgerrors.Wrap(err, "error while fetching result of node ping") - return err - } - - if result.error != nil { - err = pkgerrors.Wrap(err, "node ping returned error on ping") - return err - } - - if n.disableLease { - return n.updateStatus(ctx, providerNode, false) - } - - // TODO(Sargun): Pass down the result / timestamp so we can accurately track when the ping actually occurred - return n.updateLease(ctx) -} - -func (n *NodeController) updateLease(ctx context.Context) error { - l, err := updateNodeLease(ctx, n.leases, newLease(ctx, n.lease, n.serverNode, n.pingInterval)) - if err != nil { - return err - } - - n.lease = l - return nil -} - func (n *NodeController) updateStatus(ctx context.Context, providerNode *corev1.Node, skipErrorCb bool) (err error) { ctx, span := trace.StartSpan(ctx, "node.updateStatus") defer span.End() @@ -402,6 +370,12 @@ func (n *NodeController) updateStatus(ctx context.Context, providerNode *corev1. span.SetStatus(err) }() + if result, err := n.nodePingController.getResult(ctx); err != nil { + return err + } else if result.error != nil { + return fmt.Errorf("Not updating node status because node ping failed: %w", result.error) + } + updateNodeStatusHeartbeat(providerNode) node, err := updateNodeStatus(ctx, n.nodes, providerNode) @@ -420,64 +394,20 @@ func (n *NodeController) updateStatus(ctx context.Context, providerNode *corev1. } } + n.serverNodeLock.Lock() n.serverNode = node + n.serverNodeLock.Unlock() return nil } -func ensureLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) { - l, err := leases.Create(ctx, lease, metav1.CreateOptions{}) - if err != nil { - switch { - case errors.IsNotFound(err): - log.G(ctx).WithError(err).Info("Node lease not supported") - return nil, err - case errors.IsAlreadyExists(err), errors.IsConflict(err): - log.G(ctx).WithError(err).Warn("Error creating lease, deleting and recreating") - if err := leases.Delete(ctx, lease.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { - log.G(ctx).WithError(err).Error("could not delete old node lease") - return nil, pkgerrors.Wrap(err, "old lease exists but could not delete it") - } - l, err = leases.Create(ctx, lease, metav1.CreateOptions{}) - } +// Returns a copy of the server node object +func (n *NodeController) getServerNode(ctx context.Context) (*corev1.Node, error) { + n.serverNodeLock.Lock() + defer n.serverNodeLock.Unlock() + if n.serverNode == nil { + return nil, pkgerrors.New("Server node does not yet exist") } - - return l, err -} - -// updateNodeLease updates the node lease. -// -// If this function returns an errors.IsNotFound(err) error, this likely means -// that node leases are not supported, if this is the case, call updateNodeStatus -// instead. -func updateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) { - ctx, span := trace.StartSpan(ctx, "node.UpdateNodeLease") - defer span.End() - - ctx = span.WithFields(ctx, log.Fields{ - "lease.name": lease.Name, - "lease.time": lease.Spec.RenewTime, - }) - - if lease.Spec.LeaseDurationSeconds != nil { - ctx = span.WithField(ctx, "lease.expiresSeconds", *lease.Spec.LeaseDurationSeconds) - } - - l, err := leases.Update(ctx, lease, metav1.UpdateOptions{}) - if err != nil { - if errors.IsNotFound(err) { - log.G(ctx).Debug("lease not found") - l, err = ensureLease(ctx, leases, lease) - } - if err != nil { - span.SetStatus(err) - return nil, err - } - log.G(ctx).Debug("created new lease") - } else { - log.G(ctx).Debug("updated lease") - } - - return l, nil + return n.serverNode.DeepCopy(), nil } // just so we don't have to allocate this on every get request @@ -642,77 +572,6 @@ func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvi return updatedNode, nil } -// This will return a new lease. It will either update base lease (and the set the renewal time appropriately), or create a brand new lease -func newLease(ctx context.Context, base *coord.Lease, serverNode *corev1.Node, leaseRenewalInterval time.Duration) *coord.Lease { - var lease *coord.Lease - if base == nil { - lease = &coord.Lease{} - } else { - lease = base.DeepCopy() - } - - lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()} - - if lease.Spec.LeaseDurationSeconds == nil { - // This is 25 due to historical reasons. It was supposed to be * 5, but...reasons - d := int32(leaseRenewalInterval.Seconds()) * 25 - lease.Spec.LeaseDurationSeconds = &d - } - - if lease.Name == "" { - lease.Name = serverNode.Name - } - if lease.Spec.HolderIdentity == nil { - // Let's do a copy here - name := serverNode.Name - lease.Spec.HolderIdentity = &name - } - - // Copied and pasted from: https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/pkg/kubelet/nodelease/controller.go#L213-L216 - // Setting owner reference needs node's UID. Note that it is different from - // kubelet.nodeRef.UID. When lease is initially created, it is possible that - // the connection between master and node is not ready yet. So try to set - // owner reference every time when renewing the lease, until successful. - // - // We have a special case to deal with in the node may be deleted and - // come back with a different UID. In this case the lease object should - // be deleted due to a owner reference cascading deletion, and when we renew - // lease again updateNodeLease will call ensureLease, and establish a new - // lease with the right node ID - if l := len(lease.OwnerReferences); l == 0 { - lease.OwnerReferences = []metav1.OwnerReference{ - { - APIVersion: corev1.SchemeGroupVersion.WithKind("Node").Version, - Kind: corev1.SchemeGroupVersion.WithKind("Node").Kind, - Name: serverNode.Name, - UID: serverNode.UID, - }, - } - } else if l > 0 { - var foundAnyNode bool - for _, ref := range lease.OwnerReferences { - if ref.APIVersion == corev1.SchemeGroupVersion.WithKind("Node").Version && ref.Kind == corev1.SchemeGroupVersion.WithKind("Node").Kind { - foundAnyNode = true - if serverNode.UID == ref.UID && serverNode.Name == ref.Name { - return lease - } - - log.G(ctx).WithFields(map[string]interface{}{ - "node.UID": serverNode.UID, - "ref.UID": ref.UID, - "node.Name": serverNode.Name, - "ref.Name": ref.Name, - }).Warn("Found that lease had node in owner references that is not this node") - } - } - if !foundAnyNode { - log.G(ctx).Warn("Found that lease had owner references, but no nodes in owner references") - } - } - - return lease -} - func updateNodeStatusHeartbeat(n *corev1.Node) { now := metav1.NewTime(time.Now()) for i := range n.Status.Conditions { diff --git a/node/node_ping_controller.go b/node/node_ping_controller.go index 0d603fd66..1efa6b605 100644 --- a/node/node_ping_controller.go +++ b/node/node_ping_controller.go @@ -11,6 +11,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) +// nodePingController is responsible for node pinging behaviour type nodePingController struct { nodeProvider NodeProvider pingInterval time.Duration @@ -18,11 +19,15 @@ type nodePingController struct { cond lock.MonitorVariable } +// pingResult encapsulates the result of the last ping. It is the time the ping was started, and the error. +// If there is a timeout, the error will be context.DeadlineExceeded type pingResult struct { - pingTime time.Time - error error + time time.Time + error error } +// newNodePingController creates a new node ping controller. pingInterval must be non-zero. Optionally, a timeout may be specfied on +// how long to wait for the provider to respond func newNodePingController(node NodeProvider, pingInterval time.Duration, timeout *time.Duration) *nodePingController { if pingInterval == 0 { panic("Node ping interval is 0") @@ -40,7 +45,8 @@ func newNodePingController(node NodeProvider, pingInterval time.Duration, timeou } } -func (npc *nodePingController) run(ctx context.Context) { +// Run runs the controller until context is cancelled +func (npc *nodePingController) Run(ctx context.Context) { const key = "key" sf := &singleflight.Group{} @@ -80,7 +86,7 @@ func (npc *nodePingController) run(ctx context.Context) { log.G(ctx).WithError(pingResult.error).Warn("Failed to ping node due to context cancellation") case result := <-doChan: pingResult.error = result.Err - pingResult.pingTime = result.Val.(time.Time) + pingResult.time = result.Val.(time.Time) } npc.cond.Set(&pingResult) @@ -93,7 +99,7 @@ func (npc *nodePingController) run(ctx context.Context) { wait.UntilWithContext(ctx, checkFunc, npc.pingInterval) } -// getResult returns the current ping result in a non-blocking fashion except for the first ping. It waits for the +// GetResult returns the current ping result in a non-blocking fashion except for the first ping. It waits for the // first ping to be successful before returning. If the context is cancelled while waiting for that value, it will // return immediately. func (npc *nodePingController) getResult(ctx context.Context) (*pingResult, error) { diff --git a/node/node_test.go b/node/node_test.go index aad5f91c4..aa037e465 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -21,10 +21,11 @@ import ( "testing" "time" + coordinationv1 "k8s.io/api/coordination/v1" + "gotest.tools/assert" "gotest.tools/assert/cmp" is "gotest.tools/assert/cmp" - coord "k8s.io/api/coordination/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -47,7 +48,7 @@ func testNodeRun(t *testing.T, enableLease bool) { testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}} nodes := c.CoreV1().Nodes() - leases := c.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) + leases := c.CoordinationV1().Leases(corev1.NamespaceNodeLease) interval := 1 * time.Millisecond opts := []NodeControllerOpt{ @@ -55,7 +56,7 @@ func testNodeRun(t *testing.T, enableLease bool) { WithNodeStatusUpdateInterval(interval), } if enableLease { - opts = append(opts, WithNodeEnableLeaseV1Beta1(leases, nil)) + opts = append(opts, WithNodeEnableLeaseV1WithRenewInterval(leases, 40, interval)) } testNode := testNode(t) // We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller @@ -84,7 +85,7 @@ func testNodeRun(t *testing.T, enableLease bool) { lr := lw.ResultChan() var ( - lBefore *coord.Lease + lBefore *coordinationv1.Lease nodeUpdates int leaseUpdates int @@ -94,7 +95,7 @@ func testNodeRun(t *testing.T, enableLease bool) { timeout := time.After(30 * time.Second) for i := 0; i < iters; i++ { - var l *coord.Lease + var l *coordinationv1.Lease select { case <-timeout: @@ -108,7 +109,7 @@ func testNodeRun(t *testing.T, enableLease bool) { nodeUpdates++ continue case le := <-lr: - l = le.Object.(*coord.Lease) + l = le.Object.(*coordinationv1.Lease) leaseUpdates++ assert.Assert(t, cmp.Equal(l.Spec.HolderIdentity != nil, true)) @@ -224,23 +225,6 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) { } } -func TestEnsureLease(t *testing.T) { - c := testclient.NewSimpleClientset().CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) - n := testNode(t) - ctx := context.Background() - - lease := newLease(ctx, nil, n, 1*time.Second) - - l1, err := ensureLease(ctx, c, lease.DeepCopy()) - assert.NilError(t, err) - assert.Check(t, timeEqual(l1.Spec.RenewTime.Time, lease.Spec.RenewTime.Time)) - - l1.Spec.RenewTime.Time = time.Now().Add(1 * time.Second) - l2, err := ensureLease(ctx, c, l1.DeepCopy()) - assert.NilError(t, err) - assert.Check(t, timeEqual(l2.Spec.RenewTime.Time, l1.Spec.RenewTime.Time)) -} - func TestUpdateNodeStatus(t *testing.T) { n := testNode(t) n.Status.Conditions = append(n.Status.Conditions, corev1.NodeCondition{ @@ -277,53 +261,6 @@ func TestUpdateNodeStatus(t *testing.T) { assert.Equal(t, errors.IsNotFound(err), true, err) } -func TestUpdateNodeLease(t *testing.T) { - ctx := context.Background() - - leases := testclient.NewSimpleClientset().CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) - n := testNode(t) - - lease := newLease(ctx, nil, n, time.Duration(0)) - - l, err := updateNodeLease(ctx, leases, lease) - assert.NilError(t, err) - assert.Equal(t, l.Name, lease.Name) - assert.Assert(t, cmp.DeepEqual(l.Spec.HolderIdentity, lease.Spec.HolderIdentity)) - - compare, err := leases.Get(ctx, l.Name, emptyGetOptions) - assert.NilError(t, err) - assert.Equal(t, l.Spec.RenewTime.Time.Unix(), compare.Spec.RenewTime.Time.Unix()) - assert.Equal(t, compare.Name, lease.Name) - assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity)) - - l.Spec.RenewTime.Time = time.Now().Add(10 * time.Second) - - compare, err = updateNodeLease(ctx, leases, l.DeepCopy()) - assert.NilError(t, err) - assert.Equal(t, compare.Spec.RenewTime.Time.Unix(), l.Spec.RenewTime.Time.Unix()) - assert.Equal(t, compare.Name, lease.Name) - assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity)) -} - -func TestFixNodeLeaseReferences(t *testing.T) { - ctx := context.Background() - n := testNode(t) - - lease1 := newLease(ctx, nil, n, time.Second) - // Let's break owner references - lease1.OwnerReferences = nil - time.Sleep(2 * time.Nanosecond) - lease2 := newLease(ctx, lease1, n, time.Second) - - // Make sure that newLease actually did its jobs - assert.Assert(t, lease2.Spec.RenewTime.Nanosecond() > lease1.Spec.RenewTime.Nanosecond()) - - // Let's check if owner references got set - assert.Assert(t, is.Len(lease2.OwnerReferences, 1)) - assert.Assert(t, is.Equal(lease2.OwnerReferences[0].UID, n.UID)) - assert.Assert(t, is.Equal(lease2.OwnerReferences[0].Name, n.Name)) -} - // TestPingAfterStatusUpdate checks that Ping continues to be called with the specified interval // after a node status update occurs, when leases are disabled. // @@ -672,7 +609,7 @@ func TestNodePingSingleInflight(t *testing.T) { assert.NilError(t, err) start := time.Now() - go node.nodePingController.run(ctx) + go node.nodePingController.Run(ctx) firstPing, err := node.nodePingController.getResult(ctx) assert.NilError(t, err) timeTakenToCompleteFirstPing := time.Since(start) @@ -773,15 +710,6 @@ func before(x, y time.Time) cmp.Comparison { } } -func timeEqual(x, y time.Time) cmp.Comparison { - return func() cmp.Result { - if x.Equal(y) { - return cmp.ResultSuccess - } - return cmp.ResultFailureTemplate(failTemplate("!="), map[string]interface{}{"x": x, "y": y}) - } -} - // waitForEvent waits for the `check` function to return true // `check` is run when an event is received // Cancelling the context will cancel the wait, with the context error sent on diff --git a/node/nodeutil/client.go b/node/nodeutil/client.go index da3d0ed7a..4274a4a4b 100644 --- a/node/nodeutil/client.go +++ b/node/nodeutil/client.go @@ -9,7 +9,7 @@ import ( "k8s.io/apimachinery/pkg/fields" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" + coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) @@ -57,9 +57,9 @@ func PodInformerFilter(node string) kubeinformers.SharedInformerOption { }) } -// NodeLeaseV1Beta1Client creates a v1beta1 Lease client for use with node leases from the passed in client. +// NodeLeaseV1Client creates a V1 Lease client for use with node leases from the passed in client. // // Use this with node.WithNodeEnableLeaseV1Beta1 when creating a node controller. -func NodeLeaseV1Beta1Client(client kubernetes.Interface) v1beta1.LeaseInterface { - return client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) +func NodeLeaseV1Client(client kubernetes.Interface) coordclientset.LeaseInterface { + return client.CoordinationV1().Leases(corev1.NamespaceNodeLease) }