From 8340407f98616b6496888c3c4ad593a6ddfcdb0d Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Sat, 1 Jun 2019 09:46:47 -0700 Subject: [PATCH] Support error handler callback for node status (#648) This moves the logic for re-creating the a missing node up into the CLI. We can make this optional, but for now I've just preserved existing functionality. --- cmd/virtual-kubelet/commands/root/root.go | 16 ++++ test/e2e/node_test.go | 2 +- vkubelet/node.go | 108 +++++++++++++++------- vkubelet/node_test.go | 64 ++++++++++++- 4 files changed, 154 insertions(+), 36 deletions(-) diff --git a/cmd/virtual-kubelet/commands/root/root.go b/cmd/virtual-kubelet/commands/root/root.go index 4ba7b9d2c..aa64890a8 100644 --- a/cmd/virtual-kubelet/commands/root/root.go +++ b/cmd/virtual-kubelet/commands/root/root.go @@ -29,6 +29,7 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/providers/register" "github.com/virtual-kubelet/virtual-kubelet/vkubelet" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kubeinformers "k8s.io/client-go/informers" @@ -146,6 +147,21 @@ func runRootCommand(ctx context.Context, c Opts) error { client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease), client.CoreV1().Nodes(), vkubelet.WithNodeDisableLease(!c.EnableNodeLease), + vkubelet.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error { + if !k8serrors.IsNotFound(err) { + return err + } + + log.G(ctx).Debug("node not found") + newNode := pNode.DeepCopy() + newNode.ResourceVersion = "" + _, err = client.CoreV1().Nodes().Create(newNode) + if err != nil { + return err + } + log.G(ctx).Debug("created new node") + return nil + }), ) if err != nil { log.G(ctx).Fatal(err) diff --git a/test/e2e/node_test.go b/test/e2e/node_test.go index 584f502d4..5c81022b4 100644 --- a/test/e2e/node_test.go +++ b/test/e2e/node_test.go @@ -26,7 +26,7 @@ func TestNodeCreateAfterDelete(t *testing.T) { }) assert.NilError(t, err) - assert.Assert(t, is.Len(podList.Items, 0), "Kubernetes does not allow node deletion with dependent objects (pods) in existence: %v", podList.Items) + assert.Assert(t, is.Len(podList.Items, 0), "Kubernetes does not allow node deletion with dependent objects (pods) in existence: %v") chErr := make(chan error, 1) diff --git a/vkubelet/node.go b/vkubelet/node.go index 145315822..12c1923cf 100644 --- a/vkubelet/node.go +++ b/vkubelet/node.go @@ -47,7 +47,7 @@ type NodeProvider interface { // Use the node's `Run` method to register and run the loops to update the node // in Kubernetes. func NewNode(p NodeProvider, node *corev1.Node, leases v1beta1.LeaseInterface, nodes v1.NodeInterface, opts ...NodeOpt) (*Node, error) { - n := &Node{p: p, n: node, leases: leases, nodes: nodes} + n := &Node{p: p, n: node, leases: leases, nodes: nodes, chReady: make(chan struct{})} for _, o := range opts { if err := o(n); err != nil { return nil, pkgerrors.Wrap(err, "error applying node option") @@ -87,6 +87,25 @@ func WithNodeStatusUpdateInterval(d time.Duration) NodeOpt { } } +// WithNodeStatusUpdateErrorHandler adds an error handler for cases where there is an error +// when updating the node status. +// This allows the caller to have some control on how errors are dealt with when +// updating a node's status. +// +// The error passed to the handler will be the error received from kubernetes +// when updating node status. +func WithNodeStatusUpdateErrorHandler(h ErrorHandler) NodeOpt { + return func(n *Node) error { + n.nodeStatusUpdateErrorHandler = h + return nil + } +} + +// ErrorHandler is a type of function used to allow callbacks for handling errors. +// It is expected that if a nil error is returned that the error is handled and +// progress can continue (or a retry is possible). +type ErrorHandler func(context.Context, error) error + // WithNodeLease sets the base node lease to use. // If a lease time is set, it will be ignored. func WithNodeLease(l *coord.Lease) NodeOpt { @@ -110,6 +129,10 @@ type Node struct { statusInterval time.Duration lease *coord.Lease chStatusUpdate chan *corev1.Node + + nodeStatusUpdateErrorHandler ErrorHandler + + chReady chan struct{} } // The default intervals used for lease and status updates. @@ -138,16 +161,15 @@ func (n *Node) Run(ctx context.Context) error { n.statusInterval = DefaultStatusUpdateInterval } - if err := n.updateStatus(ctx); err != nil { - return pkgerrors.Wrap(err, "error registering node with kubernetes") - } - log.G(ctx).Info("Created node") - n.chStatusUpdate = make(chan *corev1.Node) n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) { n.chStatusUpdate <- node }) + if err := n.ensureNode(ctx); err != nil { + return err + } + if !n.disableLease { n.lease = newLease(n.lease) setLeaseAttrs(n.lease, n.n, n.pingInterval*5) @@ -169,11 +191,32 @@ func (n *Node) Run(ctx context.Context) error { log.G(ctx).Info("Node leases not supported, falling back to only node status updates") } - n.controlLoop(ctx) + return n.controlLoop(ctx) +} + +func (n *Node) ensureNode(ctx context.Context) error { + err := n.updateStatus(ctx, true) + if err == nil || !errors.IsNotFound(err) { + return err + } + + node, err := n.nodes.Create(n.n) + if err != nil { + return pkgerrors.Wrap(err, "error registering node with kubernetes") + } + n.n = node + return nil } -func (n *Node) controlLoop(ctx context.Context) { +// Ready returns a channel that gets closed when the node is fully up and +// running. Note that if there is an error on startup this channel will never +// be started. +func (n *Node) Ready() <-chan struct{} { + return n.chReady +} + +func (n *Node) controlLoop(ctx context.Context) error { pingTimer := time.NewTimer(n.pingInterval) defer pingTimer.Stop() @@ -186,10 +229,12 @@ func (n *Node) controlLoop(ctx context.Context) { } } + close(n.chReady) + for { select { case <-ctx.Done(): - return + return nil case updated := <-n.chStatusUpdate: var t *time.Timer if n.disableLease { @@ -206,12 +251,12 @@ func (n *Node) controlLoop(ctx context.Context) { } n.n.Status = updated.Status - if err := n.updateStatus(ctx); err != nil { + if err := n.updateStatus(ctx, false); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } t.Reset(n.statusInterval) case <-statusTimer.C: - if err := n.updateStatus(ctx); err != nil { + if err := n.updateStatus(ctx, false); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } statusTimer.Reset(n.statusInterval) @@ -238,7 +283,7 @@ func (n *Node) handlePing(ctx context.Context) (retErr error) { } if n.disableLease { - return n.updateStatus(ctx) + return n.updateStatus(ctx, false) } return n.updateLease(ctx) @@ -254,12 +299,22 @@ func (n *Node) updateLease(ctx context.Context) error { return nil } -func (n *Node) updateStatus(ctx context.Context) error { +func (n *Node) updateStatus(ctx context.Context, skipErrorCb bool) error { updateNodeStatusHeartbeat(n.n) node, err := UpdateNodeStatus(ctx, n.nodes, n.n) if err != nil { - return err + if skipErrorCb || n.nodeStatusUpdateErrorHandler == nil { + return err + } + if err := n.nodeStatusUpdateErrorHandler(ctx, err); err != nil { + return err + } + + node, err = UpdateNodeStatus(ctx, n.nodes, n.n) + if err != nil { + return err + } } n.n = node @@ -370,27 +425,18 @@ func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Nod // // If you use this function, it is up to you to syncronize this with other operations. // This reduces the time to second-level precision. -func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (*corev1.Node, error) { +func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (_ *corev1.Node, retErr error) { ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus") - defer span.End() + defer func() { + span.End() + span.SetStatus(ocstatus.FromError(retErr)) + }() + var node *corev1.Node oldNode, err := nodes.Get(n.Name, emptyGetOptions) if err != nil { - if !errors.IsNotFound(err) { - span.SetStatus(ocstatus.FromError(err)) - return nil, err - } - - log.G(ctx).Debug("node not found") - newNode := n.DeepCopy() - newNode.ResourceVersion = "" - node, err = nodes.Create(newNode) - if err != nil { - return nil, err - } - log.G(ctx).Debug("created new node") - return node, nil + return nil, err } log.G(ctx).Debug("got node from api server") @@ -440,7 +486,7 @@ func setLeaseAttrs(l *coord.Lease, n *corev1.Node, dur time.Duration) { func updateNodeStatusHeartbeat(n *corev1.Node) { now := metav1.NewTime(time.Now()) - for i, _ := range n.Status.Conditions { + for i := range n.Status.Conditions { n.Status.Conditions[i].LastHeartbeatTime = now } } diff --git a/vkubelet/node_test.go b/vkubelet/node_test.go index ddd6bf8c5..a6c69ab34 100644 --- a/vkubelet/node_test.go +++ b/vkubelet/node_test.go @@ -149,6 +149,56 @@ func testNodeRun(t *testing.T, enableLease bool) { } } +func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) { + c := testclient.NewSimpleClientset() + testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}} + nodes := c.CoreV1().Nodes() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + node, err := NewNode(testP, testNode(t), nil, nodes, + WithNodeStatusUpdateErrorHandler(func(_ context.Context, err error) error { + cancel() + return nil + }), + WithNodeDisableLease(true), + ) + assert.NilError(t, err) + + chErr := make(chan error, 1) + go func() { + chErr <- node.Run(ctx) + }() + + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + + // wait for the node to be ready + select { + case <-timer.C: + t.Fatal("timeout waiting for node to be ready") + case <-chErr: + t.Fatalf("node.Run returned earlier than expected: %v", err) + case <-node.Ready(): + } + + err = nodes.Delete(node.n.Name, nil) + assert.NilError(t, err) + + testP.triggerStatusUpdate(node.n.DeepCopy()) + + timer = time.NewTimer(10 * time.Second) + defer timer.Stop() + + select { + case err := <-chErr: + assert.Equal(t, err, nil) + case <-timer.C: + t.Fatal("timeout waiting for node shutdown") + } +} + func TestEnsureLease(t *testing.T) { c := testclient.NewSimpleClientset().Coordination().Leases(corev1.NamespaceNodeLease) n := testNode(t) @@ -177,6 +227,14 @@ func TestUpdateNodeStatus(t *testing.T) { ctx := context.Background() updated, err := UpdateNodeStatus(ctx, nodes, n.DeepCopy()) + assert.Equal(t, errors.IsNotFound(err), true, err) + + _, err = nodes.Create(n) + assert.NilError(t, err) + + updated, err = UpdateNodeStatus(ctx, nodes, n.DeepCopy()) + assert.NilError(t, err) + assert.NilError(t, err) assert.Check(t, cmp.DeepEqual(n.Status, updated.Status)) @@ -191,10 +249,8 @@ func TestUpdateNodeStatus(t *testing.T) { _, err = nodes.Get(n.Name, metav1.GetOptions{}) assert.Equal(t, errors.IsNotFound(err), true, err) - updated, err = UpdateNodeStatus(ctx, nodes, updated.DeepCopy()) - assert.NilError(t, err) - _, err = nodes.Get(n.Name, metav1.GetOptions{}) - assert.NilError(t, err) + _, err = UpdateNodeStatus(ctx, nodes, updated.DeepCopy()) + assert.Equal(t, errors.IsNotFound(err), true, err) } func TestUpdateNodeLease(t *testing.T) {