Support error handler callback for node status (#648)

This moves the logic for re-creating the a missing node up into the CLI.
We can make this optional, but for now I've just preserved existing
functionality.
This commit is contained in:
Brian Goff
2019-06-01 09:46:47 -07:00
committed by GitHub
parent 1542c4d2f4
commit 8340407f98
4 changed files with 154 additions and 36 deletions

View File

@@ -47,7 +47,7 @@ type NodeProvider interface {
// Use the node's `Run` method to register and run the loops to update the node
// in Kubernetes.
func NewNode(p NodeProvider, node *corev1.Node, leases v1beta1.LeaseInterface, nodes v1.NodeInterface, opts ...NodeOpt) (*Node, error) {
n := &Node{p: p, n: node, leases: leases, nodes: nodes}
n := &Node{p: p, n: node, leases: leases, nodes: nodes, chReady: make(chan struct{})}
for _, o := range opts {
if err := o(n); err != nil {
return nil, pkgerrors.Wrap(err, "error applying node option")
@@ -87,6 +87,25 @@ func WithNodeStatusUpdateInterval(d time.Duration) NodeOpt {
}
}
// WithNodeStatusUpdateErrorHandler adds an error handler for cases where there is an error
// when updating the node status.
// This allows the caller to have some control on how errors are dealt with when
// updating a node's status.
//
// The error passed to the handler will be the error received from kubernetes
// when updating node status.
func WithNodeStatusUpdateErrorHandler(h ErrorHandler) NodeOpt {
return func(n *Node) error {
n.nodeStatusUpdateErrorHandler = h
return nil
}
}
// ErrorHandler is a type of function used to allow callbacks for handling errors.
// It is expected that if a nil error is returned that the error is handled and
// progress can continue (or a retry is possible).
type ErrorHandler func(context.Context, error) error
// WithNodeLease sets the base node lease to use.
// If a lease time is set, it will be ignored.
func WithNodeLease(l *coord.Lease) NodeOpt {
@@ -110,6 +129,10 @@ type Node struct {
statusInterval time.Duration
lease *coord.Lease
chStatusUpdate chan *corev1.Node
nodeStatusUpdateErrorHandler ErrorHandler
chReady chan struct{}
}
// The default intervals used for lease and status updates.
@@ -138,16 +161,15 @@ func (n *Node) Run(ctx context.Context) error {
n.statusInterval = DefaultStatusUpdateInterval
}
if err := n.updateStatus(ctx); err != nil {
return pkgerrors.Wrap(err, "error registering node with kubernetes")
}
log.G(ctx).Info("Created node")
n.chStatusUpdate = make(chan *corev1.Node)
n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) {
n.chStatusUpdate <- node
})
if err := n.ensureNode(ctx); err != nil {
return err
}
if !n.disableLease {
n.lease = newLease(n.lease)
setLeaseAttrs(n.lease, n.n, n.pingInterval*5)
@@ -169,11 +191,32 @@ func (n *Node) Run(ctx context.Context) error {
log.G(ctx).Info("Node leases not supported, falling back to only node status updates")
}
n.controlLoop(ctx)
return n.controlLoop(ctx)
}
func (n *Node) ensureNode(ctx context.Context) error {
err := n.updateStatus(ctx, true)
if err == nil || !errors.IsNotFound(err) {
return err
}
node, err := n.nodes.Create(n.n)
if err != nil {
return pkgerrors.Wrap(err, "error registering node with kubernetes")
}
n.n = node
return nil
}
func (n *Node) controlLoop(ctx context.Context) {
// Ready returns a channel that gets closed when the node is fully up and
// running. Note that if there is an error on startup this channel will never
// be started.
func (n *Node) Ready() <-chan struct{} {
return n.chReady
}
func (n *Node) controlLoop(ctx context.Context) error {
pingTimer := time.NewTimer(n.pingInterval)
defer pingTimer.Stop()
@@ -186,10 +229,12 @@ func (n *Node) controlLoop(ctx context.Context) {
}
}
close(n.chReady)
for {
select {
case <-ctx.Done():
return
return nil
case updated := <-n.chStatusUpdate:
var t *time.Timer
if n.disableLease {
@@ -206,12 +251,12 @@ func (n *Node) controlLoop(ctx context.Context) {
}
n.n.Status = updated.Status
if err := n.updateStatus(ctx); err != nil {
if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
t.Reset(n.statusInterval)
case <-statusTimer.C:
if err := n.updateStatus(ctx); err != nil {
if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
statusTimer.Reset(n.statusInterval)
@@ -238,7 +283,7 @@ func (n *Node) handlePing(ctx context.Context) (retErr error) {
}
if n.disableLease {
return n.updateStatus(ctx)
return n.updateStatus(ctx, false)
}
return n.updateLease(ctx)
@@ -254,12 +299,22 @@ func (n *Node) updateLease(ctx context.Context) error {
return nil
}
func (n *Node) updateStatus(ctx context.Context) error {
func (n *Node) updateStatus(ctx context.Context, skipErrorCb bool) error {
updateNodeStatusHeartbeat(n.n)
node, err := UpdateNodeStatus(ctx, n.nodes, n.n)
if err != nil {
return err
if skipErrorCb || n.nodeStatusUpdateErrorHandler == nil {
return err
}
if err := n.nodeStatusUpdateErrorHandler(ctx, err); err != nil {
return err
}
node, err = UpdateNodeStatus(ctx, n.nodes, n.n)
if err != nil {
return err
}
}
n.n = node
@@ -370,27 +425,18 @@ func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Nod
//
// If you use this function, it is up to you to syncronize this with other operations.
// This reduces the time to second-level precision.
func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (*corev1.Node, error) {
func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (_ *corev1.Node, retErr error) {
ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus")
defer span.End()
defer func() {
span.End()
span.SetStatus(ocstatus.FromError(retErr))
}()
var node *corev1.Node
oldNode, err := nodes.Get(n.Name, emptyGetOptions)
if err != nil {
if !errors.IsNotFound(err) {
span.SetStatus(ocstatus.FromError(err))
return nil, err
}
log.G(ctx).Debug("node not found")
newNode := n.DeepCopy()
newNode.ResourceVersion = ""
node, err = nodes.Create(newNode)
if err != nil {
return nil, err
}
log.G(ctx).Debug("created new node")
return node, nil
return nil, err
}
log.G(ctx).Debug("got node from api server")
@@ -440,7 +486,7 @@ func setLeaseAttrs(l *coord.Lease, n *corev1.Node, dur time.Duration) {
func updateNodeStatusHeartbeat(n *corev1.Node) {
now := metav1.NewTime(time.Now())
for i, _ := range n.Status.Conditions {
for i := range n.Status.Conditions {
n.Status.Conditions[i].LastHeartbeatTime = now
}
}

View File

@@ -149,6 +149,56 @@ func testNodeRun(t *testing.T, enableLease bool) {
}
}
func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
c := testclient.NewSimpleClientset()
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node, err := NewNode(testP, testNode(t), nil, nodes,
WithNodeStatusUpdateErrorHandler(func(_ context.Context, err error) error {
cancel()
return nil
}),
WithNodeDisableLease(true),
)
assert.NilError(t, err)
chErr := make(chan error, 1)
go func() {
chErr <- node.Run(ctx)
}()
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
// wait for the node to be ready
select {
case <-timer.C:
t.Fatal("timeout waiting for node to be ready")
case <-chErr:
t.Fatalf("node.Run returned earlier than expected: %v", err)
case <-node.Ready():
}
err = nodes.Delete(node.n.Name, nil)
assert.NilError(t, err)
testP.triggerStatusUpdate(node.n.DeepCopy())
timer = time.NewTimer(10 * time.Second)
defer timer.Stop()
select {
case err := <-chErr:
assert.Equal(t, err, nil)
case <-timer.C:
t.Fatal("timeout waiting for node shutdown")
}
}
func TestEnsureLease(t *testing.T) {
c := testclient.NewSimpleClientset().Coordination().Leases(corev1.NamespaceNodeLease)
n := testNode(t)
@@ -177,6 +227,14 @@ func TestUpdateNodeStatus(t *testing.T) {
ctx := context.Background()
updated, err := UpdateNodeStatus(ctx, nodes, n.DeepCopy())
assert.Equal(t, errors.IsNotFound(err), true, err)
_, err = nodes.Create(n)
assert.NilError(t, err)
updated, err = UpdateNodeStatus(ctx, nodes, n.DeepCopy())
assert.NilError(t, err)
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(n.Status, updated.Status))
@@ -191,10 +249,8 @@ func TestUpdateNodeStatus(t *testing.T) {
_, err = nodes.Get(n.Name, metav1.GetOptions{})
assert.Equal(t, errors.IsNotFound(err), true, err)
updated, err = UpdateNodeStatus(ctx, nodes, updated.DeepCopy())
assert.NilError(t, err)
_, err = nodes.Get(n.Name, metav1.GetOptions{})
assert.NilError(t, err)
_, err = UpdateNodeStatus(ctx, nodes, updated.DeepCopy())
assert.Equal(t, errors.IsNotFound(err), true, err)
}
func TestUpdateNodeLease(t *testing.T) {