This adds the v1 lease controller

This refactors the v1 lease controller. It makes two functional differences
to the lease controller:
* It no longer ties lease updates to node pings or node status updates
* There is no fallback mechanism to status updates

This also moves vk_envtest, allowing for future brown-box testing of the
lease controller with envtest
This commit is contained in:
Sargun Dhillon
2020-12-07 10:37:17 -08:00
parent 8affa1c42a
commit 735eb34829
10 changed files with 525 additions and 366 deletions

View File

@@ -187,7 +187,7 @@ kubebuilder_2.3.1_${TEST_OS}_${TEST_ARCH}: kubebuilder_2.3.1_${TEST_OS}_${TEST_A
envtest: kubebuilder_2.3.1_${TEST_OS}_${TEST_ARCH}
# You can add klog flags for debugging, like: -klog.v=10 -klog.logtostderr
# klogv2 flags just wraps our existing logrus.
KUBEBUILDER_ASSETS=$(PWD)/kubebuilder_2.3.1_${TEST_OS}_${TEST_ARCH}/bin go test -v ./internal/test/vk_envtest -envtest=true
KUBEBUILDER_ASSETS=$(PWD)/kubebuilder_2.3.1_${TEST_OS}_${TEST_ARCH}/bin go test -v -run=TestEnvtest ./node -envtest=true
.PHONY: fmt
fmt:

View File

@@ -32,7 +32,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
)
@@ -138,18 +137,9 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
"watchedNamespace": c.KubeNamespace,
}))
var leaseClient v1beta1.LeaseInterface
if c.EnableNodeLease {
leaseClient = nodeutil.NodeLeaseV1Beta1Client(client)
}
pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version)
np := node.NewNaiveNodeProvider()
nodeRunner, err := node.NewNodeController(
np,
pNode,
client.CoreV1().Nodes(),
node.WithNodeEnableLeaseV1Beta1(leaseClient, nil),
additionalOptions := []node.NodeControllerOpt{
node.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error {
if !k8serrors.IsNotFound(err) {
return err
@@ -165,6 +155,17 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
log.G(ctx).Debug("created new node")
return nil
}),
}
if c.EnableNodeLease {
leaseClient := nodeutil.NodeLeaseV1Client(client)
// 40 seconds is the default lease time in upstream kubelet
additionalOptions = append(additionalOptions, node.WithNodeEnableLeaseV1(leaseClient, 40))
}
nodeRunner, err := node.NewNodeController(
np,
pNode,
client.CoreV1().Nodes(),
additionalOptions...,
)
if err != nil {
log.G(ctx).Fatal(err)

View File

@@ -1,8 +1,7 @@
package e2e_test
package node
import (
"context"
"flag"
"os"
"strings"
"testing"
@@ -12,30 +11,16 @@ import (
"github.com/sirupsen/logrus"
"github.com/virtual-kubelet/virtual-kubelet/log"
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
"github.com/virtual-kubelet/virtual-kubelet/node"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
klogv1 "k8s.io/klog"
klogv2 "k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/envtest"
)
var enableEnvTest = flag.Bool("envtest", false, "Enable envtest based tests")
func TestMain(m *testing.M) {
flagset := flag.NewFlagSet("klog", flag.PanicOnError)
klogv1.InitFlags(flagset)
flagset.VisitAll(func(f *flag.Flag) {
flag.Var(f.Value, "klog."+f.Name, f.Usage)
})
flag.Parse()
os.Exit(m.Run())
}
func TestEnvtest(t *testing.T) {
if !*enableEnvTest || os.Getenv("VK_ENVTEST") != "" {
t.Skip("test only runs when -envtest is passed or if VK_ENVTEST is set to a non-empty value")
@@ -81,7 +66,7 @@ func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) {
_, err = clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
assert.NilError(t, err)
testProvider := node.NewNaiveNodeProvider()
testProvider := NewNaiveNodeProvider()
testNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
@@ -91,12 +76,12 @@ func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) {
testNodeCopy := testNode.DeepCopy()
opts := []node.NodeControllerOpt{}
leasesClient := clientset.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
opts := []NodeControllerOpt{}
leasesClient := clientset.CoordinationV1().Leases(corev1.NamespaceNodeLease)
if withLeases {
opts = append(opts, node.WithNodeEnableLeaseV1Beta1(leasesClient, nil))
opts = append(opts, WithNodeEnableLeaseV1(leasesClient, 0))
}
node, err := node.NewNodeController(testProvider, testNode, nodes, opts...)
node, err := NewNodeController(testProvider, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error, 1)

View File

@@ -0,0 +1,13 @@
package node
import (
"errors"
"testing"
"gotest.tools/assert"
)
func TestNotReadyError(t *testing.T) {
n := newNodeNotReadyError(nil)
assert.Assert(t, errors.Is(n, &nodeNotReadyError{}))
}

345
node/lease_controller_v1.go Normal file
View File

@@ -0,0 +1,345 @@
package node
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"context"
"errors"
"fmt"
"time"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
)
// Code heavily borrowed from: https://github.com/kubernetes/kubernetes/blob/v1.18.13/pkg/kubelet/nodelease/controller.go
// Primary changes:
// * Use our internal logging library rather than klog
// * Add tracing support
// * Allow for customization of intervals and such
// * Rather than using a node client, and having to build an independent node cache for the lease
// controller, we provide a cached version of the node object.
// * Use contexts for cancellation so the controller can be stopped versus running until the process terminates
const (
// DefaultRenewIntervalFraction is the fraction of lease duration to renew the lease
DefaultRenewIntervalFraction = 0.25
// maxUpdateRetries is the number of immediate, successive retries the Kubelet will attempt
// when renewing the lease before it waits for the renewal interval before trying again,
// similar to what we do for node status retries
maxUpdateRetries = 5
// maxBackoff is the maximum sleep time during backoff (e.g. in backoffEnsureLease)
maxBackoff = 7 * time.Second
// DefaultLeaseDuration is from upstream kubelet, where the default lease duration is 40 seconds
DefaultLeaseDuration = 40
)
// leaseController is a v1 lease controller and responsible for maintaining a server-side lease as long as the node
// is healthy
type leaseController struct {
leaseClient coordclientset.LeaseInterface
leaseDurationSeconds int32
renewInterval time.Duration
clock clock.Clock
nodeController *NodeController
// latestLease is the latest node lease which Kubelet updated or created
latestLease *coordinationv1.Lease
}
// newLeaseControllerWithRenewInterval constructs and returns a v1 lease controller with a specific interval of how often to
// renew leases
func newLeaseControllerWithRenewInterval(
clock clock.Clock,
client coordclientset.LeaseInterface,
leaseDurationSeconds int32,
renewInterval time.Duration,
nodeController *NodeController) (*leaseController, error) {
if leaseDurationSeconds <= 0 {
return nil, fmt.Errorf("Lease duration seconds %d is invalid, it must be > 0", leaseDurationSeconds)
}
if renewInterval == 0 {
return nil, fmt.Errorf("Lease renew interval %s is invalid, it must be > 0", renewInterval.String())
}
if float64(leaseDurationSeconds) <= renewInterval.Seconds() {
return nil, fmt.Errorf("Lease renew interval %s is invalid, it must be less than lease duration seconds %d", renewInterval.String(), leaseDurationSeconds)
}
return &leaseController{
leaseClient: client,
leaseDurationSeconds: leaseDurationSeconds,
renewInterval: renewInterval,
clock: clock,
nodeController: nodeController,
}, nil
}
// Run runs the controller
func (c *leaseController) Run(ctx context.Context) {
c.sync(ctx)
wait.UntilWithContext(ctx, c.sync, c.renewInterval)
}
func (c *leaseController) sync(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var err error
ctx, span := trace.StartSpan(ctx, "lease.sync")
defer span.End()
pingResult, err := c.nodeController.nodePingController.getResult(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("Could not get ping status")
}
if pingResult.error != nil {
log.G(ctx).WithError(pingResult.error).Error("Ping result is not clean, not updating lease")
return
}
node, err := c.nodeController.getServerNode(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("Could not get server node")
span.SetStatus(err)
return
}
if node == nil {
err = errors.New("Servernode is null")
log.G(ctx).WithError(err).Error("servernode is null")
span.SetStatus(err)
return
}
if c.latestLease != nil {
// As long as node lease is not (or very rarely) updated by any other agent than Kubelet,
// we can optimistically assume it didn't change since our last update and try updating
// based on the version from that time. Thanks to it we avoid GET call and reduce load
// on etcd and kube-apiserver.
// If at some point other agents will also be frequently updating the Lease object, this
// can result in performance degradation, because we will end up with calling additional
// GET/PUT - at this point this whole "if" should be removed.
err := c.retryUpdateLease(ctx, node, c.newLease(ctx, node, c.latestLease))
if err == nil {
span.SetStatus(err)
return
}
log.G(ctx).WithError(err).Info("failed to update lease using latest lease, fallback to ensure lease")
}
lease, created := c.backoffEnsureLease(ctx, node)
c.latestLease = lease
// we don't need to update the lease if we just created it
if !created && lease != nil {
if err := c.retryUpdateLease(ctx, node, lease); err != nil {
log.G(ctx).WithError(err).WithField("renewInterval", c.renewInterval).Errorf("Will retry after")
span.SetStatus(err)
}
}
}
// backoffEnsureLease attempts to create the lease if it does not exist,
// and uses exponentially increasing waits to prevent overloading the API server
// with retries. Returns the lease, and true if this call created the lease,
// false otherwise.
func (c *leaseController) backoffEnsureLease(ctx context.Context, node *corev1.Node) (*coordinationv1.Lease, bool) {
ctx, span := trace.StartSpan(ctx, "lease.backoffEnsureLease")
defer span.End()
var (
lease *coordinationv1.Lease
created bool
err error
)
sleep := 100 * time.Millisecond
for {
lease, created, err = c.ensureLease(ctx, node)
if err == nil {
break
}
sleep = minDuration(2*sleep, maxBackoff)
log.G(ctx).WithError(err).Errorf("failed to ensure node lease exists, will retry in %v", sleep)
// backoff wait
c.clock.Sleep(sleep)
timer := c.clock.NewTimer(sleep)
defer timer.Stop()
select {
case <-timer.C():
case <-ctx.Done():
return nil, false
}
}
return lease, created
}
// ensureLease creates the lease if it does not exist. Returns the lease and
// a bool (true if this call created the lease), or any error that occurs.
func (c *leaseController) ensureLease(ctx context.Context, node *corev1.Node) (*coordinationv1.Lease, bool, error) {
ctx, span := trace.StartSpan(ctx, "lease.ensureLease")
defer span.End()
lease, err := c.leaseClient.Get(ctx, node.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// lease does not exist, create it.
leaseToCreate := c.newLease(ctx, node, nil)
if len(leaseToCreate.OwnerReferences) == 0 {
// We want to ensure that a lease will always have OwnerReferences set.
// Thus, given that we weren't able to set it correctly, we simply
// not create it this time - we will retry in the next iteration.
return nil, false, nil
}
lease, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{})
if err != nil {
span.SetStatus(err)
return nil, false, err
}
log.G(ctx).Debug("Successfully created lease")
return lease, true, nil
} else if err != nil {
// unexpected error getting lease
log.G(ctx).WithError(err).Error("Unexpected error getting lease")
span.SetStatus(err)
return nil, false, err
}
log.G(ctx).Debug("Successfully recovered existing lease")
// lease already existed
return lease, false, nil
}
// retryUpdateLease attempts to update the lease for maxUpdateRetries,
// call this once you're sure the lease has been created
func (c *leaseController) retryUpdateLease(ctx context.Context, node *corev1.Node, base *coordinationv1.Lease) error {
ctx, span := trace.StartSpan(ctx, "controller.retryUpdateLease")
defer span.End()
for i := 0; i < maxUpdateRetries; i++ {
lease, err := c.leaseClient.Update(ctx, c.newLease(ctx, node, base), metav1.UpdateOptions{})
if err == nil {
log.G(ctx).WithField("retries", i).Debug("Successfully updated lease")
c.latestLease = lease
return nil
}
log.G(ctx).WithError(err).Error("failed to update node lease")
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
err = fmt.Errorf("failed after %d attempts to update node lease: %w", maxUpdateRetries, err)
span.SetStatus(err)
return err
}
// OptimisticLockError requires getting the newer version of lease to proceed.
if apierrors.IsConflict(err) {
base, _ = c.backoffEnsureLease(ctx, node)
continue
}
}
err := fmt.Errorf("failed after %d attempts to update node lease", maxUpdateRetries)
span.SetStatus(err)
return err
}
// newLease constructs a new lease if base is nil, or returns a copy of base
// with desired state asserted on the copy.
func (c *leaseController) newLease(ctx context.Context, node *corev1.Node, base *coordinationv1.Lease) *coordinationv1.Lease {
ctx, span := trace.StartSpan(ctx, "lease.newLease")
defer span.End()
// Use the bare minimum set of fields; other fields exist for debugging/legacy,
// but we don't need to make node heartbeats more complicated by using them.
var lease *coordinationv1.Lease
if base == nil {
lease = &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: node.Name,
Namespace: corev1.NamespaceNodeLease,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: pointer.StringPtr(node.Name),
LeaseDurationSeconds: pointer.Int32Ptr(c.leaseDurationSeconds),
},
}
} else {
lease = base.DeepCopy()
}
lease.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
// Setting owner reference needs node's UID. Note that it is different from
// kubelet.nodeRef.UID. When lease is initially created, it is possible that
// the connection between master and node is not ready yet. So try to set
// owner reference every time when renewing the lease, until successful.
if len(lease.OwnerReferences) == 0 {
lease.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: corev1.SchemeGroupVersion.WithKind("Node").Version,
Kind: corev1.SchemeGroupVersion.WithKind("Node").Kind,
Name: node.Name,
UID: node.UID,
},
}
}
ctx = span.WithFields(ctx, map[string]interface{}{
"lease": lease,
})
log.G(ctx).Debug("Generated lease")
return lease
}
func minDuration(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
// nodeNotReadyError indicates that the node was not ready / ping is failing
type nodeNotReadyError struct {
pingResult *pingResult
}
func newNodeNotReadyError(pingResult *pingResult) error {
return &nodeNotReadyError{
pingResult: pingResult,
}
}
func (e *nodeNotReadyError) Unwrap() error {
return e.pingResult.error
}
func (e *nodeNotReadyError) Is(target error) bool {
_, ok := target.(*nodeNotReadyError)
return ok
}
func (e *nodeNotReadyError) As(target error) bool {
val, ok := target.(*nodeNotReadyError)
if ok {
*val = *e
}
return ok
}
func (e *nodeNotReadyError) Error() string {
return fmt.Sprintf("New node not ready error: %s", e.pingResult.error)
}

22
node/main_test.go Normal file
View File

@@ -0,0 +1,22 @@
/* This file is just a place for the TestMain override function to live, plus whatever custom flags we are interested in */
package node
import (
"flag"
"os"
"testing"
klogv1 "k8s.io/klog"
)
var enableEnvTest = flag.Bool("envtest", false, "Enable envtest based tests")
func TestMain(m *testing.M) {
flagset := flag.NewFlagSet("klog", flag.PanicOnError)
klogv1.InitFlags(flagset)
flagset.VisitAll(func(f *flag.Flag) {
flag.Var(f.Value, "klog."+f.Name, f.Usage)
})
flag.Parse()
os.Exit(m.Run())
}

View File

@@ -17,21 +17,23 @@ package node
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
coord "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/utils/clock"
)
const (
@@ -41,6 +43,12 @@ const (
virtualKubeletLastNodeAppliedObjectMeta = "virtual-kubelet.io/last-applied-object-meta"
)
var (
// ErrConflictingLeaseControllerConfiguration is returned when the lease controller related options have been
// specified multiple times
ErrConflictingLeaseControllerConfiguration = pkgerrors.New("Multiple, conflicting lease configurations have been put into place")
)
// NodeProvider is the interface used for registering a node and updating its
// status in Kubernetes.
//
@@ -98,24 +106,49 @@ func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface
// NodeControllerOpt are the functional options used for configuring a node
type NodeControllerOpt func(*NodeController) error // nolint:golint
// WithNodeEnableLeaseV1Beta1 enables support for v1beta1 leases.
// If client is nil, leases will not be enabled.
// If baseLease is nil, a default base lease will be used.
//
// The lease will be updated after each successful node ping. To change the
// lease update interval, you must set the node ping interval.
// See WithNodePingInterval().
//
// This also affects the frequency of node status updates:
// - When leases are *not* enabled (or are disabled due to no support on the cluster)
// the node status is updated at every ping interval.
// - When node leases are enabled, node status updates are controlled by the
// node status update interval option.
// To set a custom node status update interval, see WithNodeStatusUpdateInterval().
func WithNodeEnableLeaseV1Beta1(client v1beta1.LeaseInterface, baseLease *coord.Lease) NodeControllerOpt {
// WithNodeEnableLeaseV1 enables support for v1 leases.
// V1 Leases share all the same properties as v1beta1 leases, except they do not fallback like
// the v1beta1 lease controller does if the API server does not support it. If the lease duration is not specified (0)
// then DefaultLeaseDuration will be used
func WithNodeEnableLeaseV1(client coordclientset.LeaseInterface, leaseDurationSeconds int32) NodeControllerOpt {
if leaseDurationSeconds == 0 {
leaseDurationSeconds = DefaultLeaseDuration
}
interval := float64(leaseDurationSeconds) * DefaultRenewIntervalFraction
intervalDuration := time.Second * time.Duration(int(interval))
return WithNodeEnableLeaseV1WithRenewInterval(client, leaseDurationSeconds, intervalDuration)
}
// WithNodeEnableLeaseV1WithRenewInterval enables support for v1 leases, and sets a specific renew interval,
// as opposed to the standard multiplier specified by DefaultRenewIntervalFraction
func WithNodeEnableLeaseV1WithRenewInterval(client coordclientset.LeaseInterface, leaseDurationSeconds int32, interval time.Duration) NodeControllerOpt {
if client == nil {
panic("client is nil")
}
if leaseDurationSeconds == 0 {
leaseDurationSeconds = DefaultLeaseDuration
}
return func(n *NodeController) error {
n.leases = client
n.lease = baseLease
if n.leaseController != nil {
return ErrConflictingLeaseControllerConfiguration
}
leaseController, err := newLeaseControllerWithRenewInterval(
&clock.RealClock{},
client,
leaseDurationSeconds,
interval,
n,
)
if err != nil {
return fmt.Errorf("Unable to configure lease controller: %w", err)
}
n.leaseController = leaseController
return nil
}
}
@@ -177,16 +210,15 @@ type ErrorHandler func(context.Context, error) error
type NodeController struct { // nolint:golint
p NodeProvider
// serverNode should only be written to on initialization, or as the result of node creation.
serverNode *corev1.Node
// serverNode must be updated each time it is updated in API Server
serverNodeLock sync.Mutex
serverNode *corev1.Node
nodes v1.NodeInterface
leases v1beta1.LeaseInterface
nodes v1.NodeInterface
leaseController *leaseController
disableLease bool
pingInterval time.Duration
statusInterval time.Duration
lease *coord.Lease
chStatusUpdate chan *corev1.Node
nodeStatusUpdateErrorHandler ErrorHandler
@@ -195,6 +227,8 @@ type NodeController struct { // nolint:golint
nodePingController *nodePingController
pingTimeout *time.Duration
group wait.Group
}
// The default intervals used for lease and status updates.
@@ -221,30 +255,21 @@ func (n *NodeController) Run(ctx context.Context) error {
n.chStatusUpdate <- node
})
n.group.StartWithContext(ctx, n.nodePingController.Run)
n.serverNodeLock.Lock()
providerNode := n.serverNode.DeepCopy()
n.serverNodeLock.Unlock()
if err := n.ensureNode(ctx, providerNode); err != nil {
return err
}
if n.leases == nil {
n.disableLease = true
return n.controlLoop(ctx, providerNode)
if n.leaseController != nil {
log.G(ctx).WithField("leaseController", n.leaseController).Debug("Starting leasecontroller")
n.group.StartWithContext(ctx, n.leaseController.Run)
}
n.lease = newLease(ctx, n.lease, n.serverNode, n.pingInterval)
l, err := ensureLease(ctx, n.leases, n.lease)
if err != nil {
if !errors.IsNotFound(err) {
return pkgerrors.Wrap(err, "error creating node lease")
}
log.G(ctx).Info("Node leases not supported, falling back to only node status updates")
n.disableLease = true
}
n.lease = l
log.G(ctx).Debug("Created node lease")
return n.controlLoop(ctx, providerNode)
}
@@ -260,12 +285,17 @@ func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.No
return err
}
node, err := n.nodes.Create(ctx, n.serverNode, metav1.CreateOptions{})
n.serverNodeLock.Lock()
serverNode := n.serverNode
n.serverNodeLock.Unlock()
node, err := n.nodes.Create(ctx, serverNode, metav1.CreateOptions{})
if err != nil {
return pkgerrors.Wrap(err, "error registering node with kubernetes")
}
n.serverNodeLock.Lock()
n.serverNode = node
n.serverNodeLock.Unlock()
// Bad things will happen if the node is deleted in k8s and recreated by someone else
// we rely on this persisting
providerNode.ObjectMeta.Name = node.Name
@@ -283,50 +313,33 @@ func (n *NodeController) Ready() <-chan struct{} {
}
func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.Node) error {
pingTimer := time.NewTimer(n.pingInterval)
defer pingTimer.Stop()
statusTimer := time.NewTimer(n.statusInterval)
defer statusTimer.Stop()
timerResetDuration := n.statusInterval
if n.disableLease {
// when resetting the timer after processing a status update, reset it to the ping interval
// (since it will be the ping timer as serverNode.disableLease == true)
timerResetDuration = n.pingInterval
// hack to make sure this channel always blocks since we won't be using it
if !statusTimer.Stop() {
<-statusTimer.C
}
}
close(n.chReady)
group := &wait.Group{}
group.StartWithContext(ctx, n.nodePingController.run)
defer group.Wait()
defer n.group.Wait()
var sleepInterval time.Duration
if n.leaseController == nil {
log.G(ctx).WithField("pingInterval", n.pingInterval).Debug("lease controller is not enabled, updating node status in Kube API server at Ping Time Interval")
sleepInterval = n.pingInterval
} else {
log.G(ctx).WithField("statusInterval", n.statusInterval).Debug("lease controller in use, updating at statusInterval")
sleepInterval = n.statusInterval
}
loop := func() bool {
ctx, span := trace.StartSpan(ctx, "node.controlLoop.loop")
defer span.End()
var timer *time.Timer
ctx = span.WithField(ctx, "sleepTime", n.pingInterval)
timer = time.NewTimer(sleepInterval)
defer timer.Stop()
select {
case <-ctx.Done():
return true
case updated := <-n.chStatusUpdate:
var t *time.Timer
if n.disableLease {
t = pingTimer
} else {
t = statusTimer
}
log.G(ctx).Debug("Received node status update")
// Performing a status update so stop/reset the status update timer in this
// branch otherwise there could be an unnecessary status update.
if !t.Stop() {
<-t.C
}
providerNode.Status = updated.Status
providerNode.ObjectMeta.Annotations = updated.Annotations
@@ -334,19 +347,10 @@ func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.N
if err := n.updateStatus(ctx, providerNode, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
t.Reset(timerResetDuration)
case <-statusTimer.C:
case <-timer.C:
if err := n.updateStatus(ctx, providerNode, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
statusTimer.Reset(n.statusInterval)
case <-pingTimer.C:
if err := n.handlePing(ctx, providerNode); err != nil {
log.G(ctx).WithError(err).Error("Error while handling node ping")
} else {
log.G(ctx).Debug("Successful node ping")
}
pingTimer.Reset(n.pingInterval)
}
return false
}
@@ -359,42 +363,6 @@ func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.N
}
}
func (n *NodeController) handlePing(ctx context.Context, providerNode *corev1.Node) (retErr error) {
ctx, span := trace.StartSpan(ctx, "node.handlePing")
defer span.End()
defer func() {
span.SetStatus(retErr)
}()
result, err := n.nodePingController.getResult(ctx)
if err != nil {
err = pkgerrors.Wrap(err, "error while fetching result of node ping")
return err
}
if result.error != nil {
err = pkgerrors.Wrap(err, "node ping returned error on ping")
return err
}
if n.disableLease {
return n.updateStatus(ctx, providerNode, false)
}
// TODO(Sargun): Pass down the result / timestamp so we can accurately track when the ping actually occurred
return n.updateLease(ctx)
}
func (n *NodeController) updateLease(ctx context.Context) error {
l, err := updateNodeLease(ctx, n.leases, newLease(ctx, n.lease, n.serverNode, n.pingInterval))
if err != nil {
return err
}
n.lease = l
return nil
}
func (n *NodeController) updateStatus(ctx context.Context, providerNode *corev1.Node, skipErrorCb bool) (err error) {
ctx, span := trace.StartSpan(ctx, "node.updateStatus")
defer span.End()
@@ -402,6 +370,12 @@ func (n *NodeController) updateStatus(ctx context.Context, providerNode *corev1.
span.SetStatus(err)
}()
if result, err := n.nodePingController.getResult(ctx); err != nil {
return err
} else if result.error != nil {
return fmt.Errorf("Not updating node status because node ping failed: %w", result.error)
}
updateNodeStatusHeartbeat(providerNode)
node, err := updateNodeStatus(ctx, n.nodes, providerNode)
@@ -420,64 +394,20 @@ func (n *NodeController) updateStatus(ctx context.Context, providerNode *corev1.
}
}
n.serverNodeLock.Lock()
n.serverNode = node
n.serverNodeLock.Unlock()
return nil
}
func ensureLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) {
l, err := leases.Create(ctx, lease, metav1.CreateOptions{})
if err != nil {
switch {
case errors.IsNotFound(err):
log.G(ctx).WithError(err).Info("Node lease not supported")
return nil, err
case errors.IsAlreadyExists(err), errors.IsConflict(err):
log.G(ctx).WithError(err).Warn("Error creating lease, deleting and recreating")
if err := leases.Delete(ctx, lease.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
log.G(ctx).WithError(err).Error("could not delete old node lease")
return nil, pkgerrors.Wrap(err, "old lease exists but could not delete it")
}
l, err = leases.Create(ctx, lease, metav1.CreateOptions{})
}
// Returns a copy of the server node object
func (n *NodeController) getServerNode(ctx context.Context) (*corev1.Node, error) {
n.serverNodeLock.Lock()
defer n.serverNodeLock.Unlock()
if n.serverNode == nil {
return nil, pkgerrors.New("Server node does not yet exist")
}
return l, err
}
// updateNodeLease updates the node lease.
//
// If this function returns an errors.IsNotFound(err) error, this likely means
// that node leases are not supported, if this is the case, call updateNodeStatus
// instead.
func updateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) {
ctx, span := trace.StartSpan(ctx, "node.UpdateNodeLease")
defer span.End()
ctx = span.WithFields(ctx, log.Fields{
"lease.name": lease.Name,
"lease.time": lease.Spec.RenewTime,
})
if lease.Spec.LeaseDurationSeconds != nil {
ctx = span.WithField(ctx, "lease.expiresSeconds", *lease.Spec.LeaseDurationSeconds)
}
l, err := leases.Update(ctx, lease, metav1.UpdateOptions{})
if err != nil {
if errors.IsNotFound(err) {
log.G(ctx).Debug("lease not found")
l, err = ensureLease(ctx, leases, lease)
}
if err != nil {
span.SetStatus(err)
return nil, err
}
log.G(ctx).Debug("created new lease")
} else {
log.G(ctx).Debug("updated lease")
}
return l, nil
return n.serverNode.DeepCopy(), nil
}
// just so we don't have to allocate this on every get request
@@ -642,77 +572,6 @@ func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvi
return updatedNode, nil
}
// This will return a new lease. It will either update base lease (and the set the renewal time appropriately), or create a brand new lease
func newLease(ctx context.Context, base *coord.Lease, serverNode *corev1.Node, leaseRenewalInterval time.Duration) *coord.Lease {
var lease *coord.Lease
if base == nil {
lease = &coord.Lease{}
} else {
lease = base.DeepCopy()
}
lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
if lease.Spec.LeaseDurationSeconds == nil {
// This is 25 due to historical reasons. It was supposed to be * 5, but...reasons
d := int32(leaseRenewalInterval.Seconds()) * 25
lease.Spec.LeaseDurationSeconds = &d
}
if lease.Name == "" {
lease.Name = serverNode.Name
}
if lease.Spec.HolderIdentity == nil {
// Let's do a copy here
name := serverNode.Name
lease.Spec.HolderIdentity = &name
}
// Copied and pasted from: https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/pkg/kubelet/nodelease/controller.go#L213-L216
// Setting owner reference needs node's UID. Note that it is different from
// kubelet.nodeRef.UID. When lease is initially created, it is possible that
// the connection between master and node is not ready yet. So try to set
// owner reference every time when renewing the lease, until successful.
//
// We have a special case to deal with in the node may be deleted and
// come back with a different UID. In this case the lease object should
// be deleted due to a owner reference cascading deletion, and when we renew
// lease again updateNodeLease will call ensureLease, and establish a new
// lease with the right node ID
if l := len(lease.OwnerReferences); l == 0 {
lease.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: corev1.SchemeGroupVersion.WithKind("Node").Version,
Kind: corev1.SchemeGroupVersion.WithKind("Node").Kind,
Name: serverNode.Name,
UID: serverNode.UID,
},
}
} else if l > 0 {
var foundAnyNode bool
for _, ref := range lease.OwnerReferences {
if ref.APIVersion == corev1.SchemeGroupVersion.WithKind("Node").Version && ref.Kind == corev1.SchemeGroupVersion.WithKind("Node").Kind {
foundAnyNode = true
if serverNode.UID == ref.UID && serverNode.Name == ref.Name {
return lease
}
log.G(ctx).WithFields(map[string]interface{}{
"node.UID": serverNode.UID,
"ref.UID": ref.UID,
"node.Name": serverNode.Name,
"ref.Name": ref.Name,
}).Warn("Found that lease had node in owner references that is not this node")
}
}
if !foundAnyNode {
log.G(ctx).Warn("Found that lease had owner references, but no nodes in owner references")
}
}
return lease
}
func updateNodeStatusHeartbeat(n *corev1.Node) {
now := metav1.NewTime(time.Now())
for i := range n.Status.Conditions {

View File

@@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)
// nodePingController is responsible for node pinging behaviour
type nodePingController struct {
nodeProvider NodeProvider
pingInterval time.Duration
@@ -18,11 +19,15 @@ type nodePingController struct {
cond lock.MonitorVariable
}
// pingResult encapsulates the result of the last ping. It is the time the ping was started, and the error.
// If there is a timeout, the error will be context.DeadlineExceeded
type pingResult struct {
pingTime time.Time
error error
time time.Time
error error
}
// newNodePingController creates a new node ping controller. pingInterval must be non-zero. Optionally, a timeout may be specfied on
// how long to wait for the provider to respond
func newNodePingController(node NodeProvider, pingInterval time.Duration, timeout *time.Duration) *nodePingController {
if pingInterval == 0 {
panic("Node ping interval is 0")
@@ -40,7 +45,8 @@ func newNodePingController(node NodeProvider, pingInterval time.Duration, timeou
}
}
func (npc *nodePingController) run(ctx context.Context) {
// Run runs the controller until context is cancelled
func (npc *nodePingController) Run(ctx context.Context) {
const key = "key"
sf := &singleflight.Group{}
@@ -80,7 +86,7 @@ func (npc *nodePingController) run(ctx context.Context) {
log.G(ctx).WithError(pingResult.error).Warn("Failed to ping node due to context cancellation")
case result := <-doChan:
pingResult.error = result.Err
pingResult.pingTime = result.Val.(time.Time)
pingResult.time = result.Val.(time.Time)
}
npc.cond.Set(&pingResult)
@@ -93,7 +99,7 @@ func (npc *nodePingController) run(ctx context.Context) {
wait.UntilWithContext(ctx, checkFunc, npc.pingInterval)
}
// getResult returns the current ping result in a non-blocking fashion except for the first ping. It waits for the
// GetResult returns the current ping result in a non-blocking fashion except for the first ping. It waits for the
// first ping to be successful before returning. If the context is cancelled while waiting for that value, it will
// return immediately.
func (npc *nodePingController) getResult(ctx context.Context) (*pingResult, error) {

View File

@@ -21,10 +21,11 @@ import (
"testing"
"time"
coordinationv1 "k8s.io/api/coordination/v1"
"gotest.tools/assert"
"gotest.tools/assert/cmp"
is "gotest.tools/assert/cmp"
coord "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -47,7 +48,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
leases := c.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
leases := c.CoordinationV1().Leases(corev1.NamespaceNodeLease)
interval := 1 * time.Millisecond
opts := []NodeControllerOpt{
@@ -55,7 +56,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
WithNodeStatusUpdateInterval(interval),
}
if enableLease {
opts = append(opts, WithNodeEnableLeaseV1Beta1(leases, nil))
opts = append(opts, WithNodeEnableLeaseV1WithRenewInterval(leases, 40, interval))
}
testNode := testNode(t)
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
@@ -84,7 +85,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
lr := lw.ResultChan()
var (
lBefore *coord.Lease
lBefore *coordinationv1.Lease
nodeUpdates int
leaseUpdates int
@@ -94,7 +95,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
timeout := time.After(30 * time.Second)
for i := 0; i < iters; i++ {
var l *coord.Lease
var l *coordinationv1.Lease
select {
case <-timeout:
@@ -108,7 +109,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
nodeUpdates++
continue
case le := <-lr:
l = le.Object.(*coord.Lease)
l = le.Object.(*coordinationv1.Lease)
leaseUpdates++
assert.Assert(t, cmp.Equal(l.Spec.HolderIdentity != nil, true))
@@ -224,23 +225,6 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
}
}
func TestEnsureLease(t *testing.T) {
c := testclient.NewSimpleClientset().CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
n := testNode(t)
ctx := context.Background()
lease := newLease(ctx, nil, n, 1*time.Second)
l1, err := ensureLease(ctx, c, lease.DeepCopy())
assert.NilError(t, err)
assert.Check(t, timeEqual(l1.Spec.RenewTime.Time, lease.Spec.RenewTime.Time))
l1.Spec.RenewTime.Time = time.Now().Add(1 * time.Second)
l2, err := ensureLease(ctx, c, l1.DeepCopy())
assert.NilError(t, err)
assert.Check(t, timeEqual(l2.Spec.RenewTime.Time, l1.Spec.RenewTime.Time))
}
func TestUpdateNodeStatus(t *testing.T) {
n := testNode(t)
n.Status.Conditions = append(n.Status.Conditions, corev1.NodeCondition{
@@ -277,53 +261,6 @@ func TestUpdateNodeStatus(t *testing.T) {
assert.Equal(t, errors.IsNotFound(err), true, err)
}
func TestUpdateNodeLease(t *testing.T) {
ctx := context.Background()
leases := testclient.NewSimpleClientset().CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
n := testNode(t)
lease := newLease(ctx, nil, n, time.Duration(0))
l, err := updateNodeLease(ctx, leases, lease)
assert.NilError(t, err)
assert.Equal(t, l.Name, lease.Name)
assert.Assert(t, cmp.DeepEqual(l.Spec.HolderIdentity, lease.Spec.HolderIdentity))
compare, err := leases.Get(ctx, l.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Equal(t, l.Spec.RenewTime.Time.Unix(), compare.Spec.RenewTime.Time.Unix())
assert.Equal(t, compare.Name, lease.Name)
assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity))
l.Spec.RenewTime.Time = time.Now().Add(10 * time.Second)
compare, err = updateNodeLease(ctx, leases, l.DeepCopy())
assert.NilError(t, err)
assert.Equal(t, compare.Spec.RenewTime.Time.Unix(), l.Spec.RenewTime.Time.Unix())
assert.Equal(t, compare.Name, lease.Name)
assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity))
}
func TestFixNodeLeaseReferences(t *testing.T) {
ctx := context.Background()
n := testNode(t)
lease1 := newLease(ctx, nil, n, time.Second)
// Let's break owner references
lease1.OwnerReferences = nil
time.Sleep(2 * time.Nanosecond)
lease2 := newLease(ctx, lease1, n, time.Second)
// Make sure that newLease actually did its jobs
assert.Assert(t, lease2.Spec.RenewTime.Nanosecond() > lease1.Spec.RenewTime.Nanosecond())
// Let's check if owner references got set
assert.Assert(t, is.Len(lease2.OwnerReferences, 1))
assert.Assert(t, is.Equal(lease2.OwnerReferences[0].UID, n.UID))
assert.Assert(t, is.Equal(lease2.OwnerReferences[0].Name, n.Name))
}
// TestPingAfterStatusUpdate checks that Ping continues to be called with the specified interval
// after a node status update occurs, when leases are disabled.
//
@@ -672,7 +609,7 @@ func TestNodePingSingleInflight(t *testing.T) {
assert.NilError(t, err)
start := time.Now()
go node.nodePingController.run(ctx)
go node.nodePingController.Run(ctx)
firstPing, err := node.nodePingController.getResult(ctx)
assert.NilError(t, err)
timeTakenToCompleteFirstPing := time.Since(start)
@@ -773,15 +710,6 @@ func before(x, y time.Time) cmp.Comparison {
}
}
func timeEqual(x, y time.Time) cmp.Comparison {
return func() cmp.Result {
if x.Equal(y) {
return cmp.ResultSuccess
}
return cmp.ResultFailureTemplate(failTemplate("!="), map[string]interface{}{"x": x, "y": y})
}
}
// waitForEvent waits for the `check` function to return true
// `check` is run when an event is received
// Cancelling the context will cancel the wait, with the context error sent on

View File

@@ -9,7 +9,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
@@ -57,9 +57,9 @@ func PodInformerFilter(node string) kubeinformers.SharedInformerOption {
})
}
// NodeLeaseV1Beta1Client creates a v1beta1 Lease client for use with node leases from the passed in client.
// NodeLeaseV1Client creates a V1 Lease client for use with node leases from the passed in client.
//
// Use this with node.WithNodeEnableLeaseV1Beta1 when creating a node controller.
func NodeLeaseV1Beta1Client(client kubernetes.Interface) v1beta1.LeaseInterface {
return client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
func NodeLeaseV1Client(client kubernetes.Interface) coordclientset.LeaseInterface {
return client.CoordinationV1().Leases(corev1.NamespaceNodeLease)
}