diff --git a/node/node.go b/node/node.go index ea643a0ce..503e82a43 100644 --- a/node/node.go +++ b/node/node.go @@ -72,10 +72,10 @@ type NodeProvider interface { //nolint:golint // underlying options, the last NodeControllerOpt will win. func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeControllerOpt) (*NodeController, error) { n := &NodeController{ - p: p, - n: node, - nodes: nodes, - chReady: make(chan struct{}), + p: p, + serverNode: node, + nodes: nodes, + chReady: make(chan struct{}), } for _, o := range opts { if err := o(n); err != nil { @@ -176,7 +176,9 @@ type ErrorHandler func(context.Context, error) error // NodeController manages a single node entity. type NodeController struct { // nolint: golint p NodeProvider - n *corev1.Node + + // serverNode should only be written to on initialization, or as the result of node creation. + serverNode *corev1.Node leases v1beta1.LeaseInterface nodes v1.NodeInterface @@ -219,16 +221,18 @@ func (n *NodeController) Run(ctx context.Context) error { n.chStatusUpdate <- node }) - if err := n.ensureNode(ctx); err != nil { + providerNode := n.serverNode.DeepCopy() + + if err := n.ensureNode(ctx, providerNode); err != nil { return err } if n.leases == nil { n.disableLease = true - return n.controlLoop(ctx) + return n.controlLoop(ctx, providerNode) } - n.lease = newLease(ctx, n.lease, n.n, n.pingInterval) + n.lease = newLease(ctx, n.lease, n.serverNode, n.pingInterval) l, err := ensureLease(ctx, n.leases, n.lease) if err != nil { @@ -241,26 +245,32 @@ func (n *NodeController) Run(ctx context.Context) error { n.lease = l log.G(ctx).Debug("Created node lease") - return n.controlLoop(ctx) + return n.controlLoop(ctx, providerNode) } -func (n *NodeController) ensureNode(ctx context.Context) (err error) { +func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.Node) (err error) { ctx, span := trace.StartSpan(ctx, "node.ensureNode") defer span.End() defer func() { span.SetStatus(err) }() - err = n.updateStatus(ctx, true) + err = n.updateStatus(ctx, providerNode, true) if err == nil || !errors.IsNotFound(err) { return err } - node, err := n.nodes.Create(ctx, n.n, metav1.CreateOptions{}) + node, err := n.nodes.Create(ctx, n.serverNode, metav1.CreateOptions{}) if err != nil { return pkgerrors.Wrap(err, "error registering node with kubernetes") } - n.n = node + + n.serverNode = node + // Bad things will happen if the node is deleted in k8s and recreated by someone else + // we rely on this persisting + providerNode.ObjectMeta.Name = node.Name + providerNode.ObjectMeta.Namespace = node.Namespace + providerNode.ObjectMeta.UID = node.UID return nil } @@ -272,7 +282,7 @@ func (n *NodeController) Ready() <-chan struct{} { return n.chReady } -func (n *NodeController) controlLoop(ctx context.Context) error { +func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.Node) error { pingTimer := time.NewTimer(n.pingInterval) defer pingTimer.Stop() @@ -281,7 +291,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error { timerResetDuration := n.statusInterval if n.disableLease { // when resetting the timer after processing a status update, reset it to the ping interval - // (since it will be the ping timer as n.disableLease == true) + // (since it will be the ping timer as serverNode.disableLease == true) timerResetDuration = n.pingInterval // hack to make sure this channel always blocks since we won't be using it @@ -318,25 +328,20 @@ func (n *NodeController) controlLoop(ctx context.Context) error { <-t.C } - n.n.Status = updated.Status - n.n.ObjectMeta = metav1.ObjectMeta{ - Annotations: updated.Annotations, - Labels: updated.Labels, - Name: n.n.ObjectMeta.Name, - Namespace: n.n.ObjectMeta.Namespace, - UID: n.n.ObjectMeta.UID, - } - if err := n.updateStatus(ctx, false); err != nil { + providerNode.Status = updated.Status + providerNode.ObjectMeta.Annotations = updated.Annotations + providerNode.ObjectMeta.Labels = updated.Labels + if err := n.updateStatus(ctx, providerNode, false); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } t.Reset(timerResetDuration) case <-statusTimer.C: - if err := n.updateStatus(ctx, false); err != nil { + if err := n.updateStatus(ctx, providerNode, false); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } statusTimer.Reset(n.statusInterval) case <-pingTimer.C: - if err := n.handlePing(ctx); err != nil { + if err := n.handlePing(ctx, providerNode); err != nil { log.G(ctx).WithError(err).Error("Error while handling node ping") } else { log.G(ctx).Debug("Successful node ping") @@ -354,7 +359,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error { } } -func (n *NodeController) handlePing(ctx context.Context) (retErr error) { +func (n *NodeController) handlePing(ctx context.Context, providerNode *corev1.Node) (retErr error) { ctx, span := trace.StartSpan(ctx, "node.handlePing") defer span.End() defer func() { @@ -373,7 +378,7 @@ func (n *NodeController) handlePing(ctx context.Context) (retErr error) { } if n.disableLease { - return n.updateStatus(ctx, false) + return n.updateStatus(ctx, providerNode, false) } // TODO(Sargun): Pass down the result / timestamp so we can accurately track when the ping actually occurred @@ -381,7 +386,7 @@ func (n *NodeController) handlePing(ctx context.Context) (retErr error) { } func (n *NodeController) updateLease(ctx context.Context) error { - l, err := updateNodeLease(ctx, n.leases, newLease(ctx, n.lease, n.n, n.pingInterval)) + l, err := updateNodeLease(ctx, n.leases, newLease(ctx, n.lease, n.serverNode, n.pingInterval)) if err != nil { return err } @@ -390,16 +395,16 @@ func (n *NodeController) updateLease(ctx context.Context) error { return nil } -func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) (err error) { +func (n *NodeController) updateStatus(ctx context.Context, providerNode *corev1.Node, skipErrorCb bool) (err error) { ctx, span := trace.StartSpan(ctx, "node.updateStatus") defer span.End() defer func() { span.SetStatus(err) }() - updateNodeStatusHeartbeat(n.n) + updateNodeStatusHeartbeat(providerNode) - node, err := updateNodeStatus(ctx, n.nodes, n.n) + node, err := updateNodeStatus(ctx, n.nodes, providerNode) if err != nil { if skipErrorCb || n.nodeStatusUpdateErrorHandler == nil { return err @@ -408,13 +413,14 @@ func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) (er return err } - node, err = updateNodeStatus(ctx, n.nodes, n.n) + // This might have recreated the node, which may cause problems with our leases until a node update succeeds + node, err = updateNodeStatus(ctx, n.nodes, providerNode) if err != nil { return err } } - n.n = node + n.serverNode = node return nil } @@ -637,7 +643,7 @@ func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvi } // This will return a new lease. It will either update base lease (and the set the renewal time appropriately), or create a brand new lease -func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRenewalInterval time.Duration) *coord.Lease { +func newLease(ctx context.Context, base *coord.Lease, serverNode *corev1.Node, leaseRenewalInterval time.Duration) *coord.Lease { var lease *coord.Lease if base == nil { lease = &coord.Lease{} @@ -654,11 +660,11 @@ func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRe } if lease.Name == "" { - lease.Name = node.Name + lease.Name = serverNode.Name } if lease.Spec.HolderIdentity == nil { // Let's do a copy here - name := node.Name + name := serverNode.Name lease.Spec.HolderIdentity = &name } @@ -678,8 +684,8 @@ func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRe { APIVersion: corev1.SchemeGroupVersion.WithKind("Node").Version, Kind: corev1.SchemeGroupVersion.WithKind("Node").Kind, - Name: node.Name, - UID: node.UID, + Name: serverNode.Name, + UID: serverNode.UID, }, } } else if l > 0 { @@ -687,21 +693,20 @@ func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRe for _, ref := range lease.OwnerReferences { if ref.APIVersion == corev1.SchemeGroupVersion.WithKind("Node").Version && ref.Kind == corev1.SchemeGroupVersion.WithKind("Node").Kind { foundAnyNode = true - if node.UID == ref.UID && node.Name == ref.Name { + if serverNode.UID == ref.UID && serverNode.Name == ref.Name { return lease - } else { - log.G(ctx).WithFields(map[string]interface{}{ - "node.UID": node.UID, - "ref.UID": ref.UID, - "node.Name": node.Name, - "ref.Name": ref.Name, - }).Warn("Found that lease had node in owner references that is not this node") } + + log.G(ctx).WithFields(map[string]interface{}{ + "node.UID": serverNode.UID, + "ref.UID": ref.UID, + "node.Name": serverNode.Name, + "ref.Name": ref.Name, + }).Warn("Found that lease had node in owner references that is not this node") } } if !foundAnyNode { log.G(ctx).Warn("Found that lease had owner references, but no nodes in owner references") - } } diff --git a/node/node_test.go b/node/node_test.go index 02df6577a..8014777a0 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -208,10 +208,10 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) { case <-node.Ready(): } - err = nodes.Delete(ctx, node.n.Name, metav1.DeleteOptions{}) + err = nodes.Delete(ctx, node.serverNode.Name, metav1.DeleteOptions{}) assert.NilError(t, err) - testP.triggerStatusUpdate(node.n.DeepCopy()) + testP.triggerStatusUpdate(node.serverNode.DeepCopy()) timer = time.NewTimer(10 * time.Second) defer timer.Stop()