Create stronger separation between provider node and server node
There were some (additional) bugs that were easy-ish to introduce by interleaving the provider provided node, and the server provided updated node. This removes the chance of that confusion.
This commit is contained in:
101
node/node.go
101
node/node.go
@@ -72,10 +72,10 @@ type NodeProvider interface { //nolint:golint
|
||||
// underlying options, the last NodeControllerOpt will win.
|
||||
func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeControllerOpt) (*NodeController, error) {
|
||||
n := &NodeController{
|
||||
p: p,
|
||||
n: node,
|
||||
nodes: nodes,
|
||||
chReady: make(chan struct{}),
|
||||
p: p,
|
||||
serverNode: node,
|
||||
nodes: nodes,
|
||||
chReady: make(chan struct{}),
|
||||
}
|
||||
for _, o := range opts {
|
||||
if err := o(n); err != nil {
|
||||
@@ -176,7 +176,9 @@ type ErrorHandler func(context.Context, error) error
|
||||
// NodeController manages a single node entity.
|
||||
type NodeController struct { // nolint: golint
|
||||
p NodeProvider
|
||||
n *corev1.Node
|
||||
|
||||
// serverNode should only be written to on initialization, or as the result of node creation.
|
||||
serverNode *corev1.Node
|
||||
|
||||
leases v1beta1.LeaseInterface
|
||||
nodes v1.NodeInterface
|
||||
@@ -219,16 +221,18 @@ func (n *NodeController) Run(ctx context.Context) error {
|
||||
n.chStatusUpdate <- node
|
||||
})
|
||||
|
||||
if err := n.ensureNode(ctx); err != nil {
|
||||
providerNode := n.serverNode.DeepCopy()
|
||||
|
||||
if err := n.ensureNode(ctx, providerNode); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if n.leases == nil {
|
||||
n.disableLease = true
|
||||
return n.controlLoop(ctx)
|
||||
return n.controlLoop(ctx, providerNode)
|
||||
}
|
||||
|
||||
n.lease = newLease(ctx, n.lease, n.n, n.pingInterval)
|
||||
n.lease = newLease(ctx, n.lease, n.serverNode, n.pingInterval)
|
||||
|
||||
l, err := ensureLease(ctx, n.leases, n.lease)
|
||||
if err != nil {
|
||||
@@ -241,26 +245,32 @@ func (n *NodeController) Run(ctx context.Context) error {
|
||||
n.lease = l
|
||||
|
||||
log.G(ctx).Debug("Created node lease")
|
||||
return n.controlLoop(ctx)
|
||||
return n.controlLoop(ctx, providerNode)
|
||||
}
|
||||
|
||||
func (n *NodeController) ensureNode(ctx context.Context) (err error) {
|
||||
func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.Node) (err error) {
|
||||
ctx, span := trace.StartSpan(ctx, "node.ensureNode")
|
||||
defer span.End()
|
||||
defer func() {
|
||||
span.SetStatus(err)
|
||||
}()
|
||||
|
||||
err = n.updateStatus(ctx, true)
|
||||
err = n.updateStatus(ctx, providerNode, true)
|
||||
if err == nil || !errors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
node, err := n.nodes.Create(ctx, n.n, metav1.CreateOptions{})
|
||||
node, err := n.nodes.Create(ctx, n.serverNode, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return pkgerrors.Wrap(err, "error registering node with kubernetes")
|
||||
}
|
||||
n.n = node
|
||||
|
||||
n.serverNode = node
|
||||
// Bad things will happen if the node is deleted in k8s and recreated by someone else
|
||||
// we rely on this persisting
|
||||
providerNode.ObjectMeta.Name = node.Name
|
||||
providerNode.ObjectMeta.Namespace = node.Namespace
|
||||
providerNode.ObjectMeta.UID = node.UID
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -272,7 +282,7 @@ func (n *NodeController) Ready() <-chan struct{} {
|
||||
return n.chReady
|
||||
}
|
||||
|
||||
func (n *NodeController) controlLoop(ctx context.Context) error {
|
||||
func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.Node) error {
|
||||
pingTimer := time.NewTimer(n.pingInterval)
|
||||
defer pingTimer.Stop()
|
||||
|
||||
@@ -281,7 +291,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
|
||||
timerResetDuration := n.statusInterval
|
||||
if n.disableLease {
|
||||
// when resetting the timer after processing a status update, reset it to the ping interval
|
||||
// (since it will be the ping timer as n.disableLease == true)
|
||||
// (since it will be the ping timer as serverNode.disableLease == true)
|
||||
timerResetDuration = n.pingInterval
|
||||
|
||||
// hack to make sure this channel always blocks since we won't be using it
|
||||
@@ -318,25 +328,20 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
|
||||
<-t.C
|
||||
}
|
||||
|
||||
n.n.Status = updated.Status
|
||||
n.n.ObjectMeta = metav1.ObjectMeta{
|
||||
Annotations: updated.Annotations,
|
||||
Labels: updated.Labels,
|
||||
Name: n.n.ObjectMeta.Name,
|
||||
Namespace: n.n.ObjectMeta.Namespace,
|
||||
UID: n.n.ObjectMeta.UID,
|
||||
}
|
||||
if err := n.updateStatus(ctx, false); err != nil {
|
||||
providerNode.Status = updated.Status
|
||||
providerNode.ObjectMeta.Annotations = updated.Annotations
|
||||
providerNode.ObjectMeta.Labels = updated.Labels
|
||||
if err := n.updateStatus(ctx, providerNode, false); err != nil {
|
||||
log.G(ctx).WithError(err).Error("Error handling node status update")
|
||||
}
|
||||
t.Reset(timerResetDuration)
|
||||
case <-statusTimer.C:
|
||||
if err := n.updateStatus(ctx, false); err != nil {
|
||||
if err := n.updateStatus(ctx, providerNode, false); err != nil {
|
||||
log.G(ctx).WithError(err).Error("Error handling node status update")
|
||||
}
|
||||
statusTimer.Reset(n.statusInterval)
|
||||
case <-pingTimer.C:
|
||||
if err := n.handlePing(ctx); err != nil {
|
||||
if err := n.handlePing(ctx, providerNode); err != nil {
|
||||
log.G(ctx).WithError(err).Error("Error while handling node ping")
|
||||
} else {
|
||||
log.G(ctx).Debug("Successful node ping")
|
||||
@@ -354,7 +359,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (n *NodeController) handlePing(ctx context.Context) (retErr error) {
|
||||
func (n *NodeController) handlePing(ctx context.Context, providerNode *corev1.Node) (retErr error) {
|
||||
ctx, span := trace.StartSpan(ctx, "node.handlePing")
|
||||
defer span.End()
|
||||
defer func() {
|
||||
@@ -373,7 +378,7 @@ func (n *NodeController) handlePing(ctx context.Context) (retErr error) {
|
||||
}
|
||||
|
||||
if n.disableLease {
|
||||
return n.updateStatus(ctx, false)
|
||||
return n.updateStatus(ctx, providerNode, false)
|
||||
}
|
||||
|
||||
// TODO(Sargun): Pass down the result / timestamp so we can accurately track when the ping actually occurred
|
||||
@@ -381,7 +386,7 @@ func (n *NodeController) handlePing(ctx context.Context) (retErr error) {
|
||||
}
|
||||
|
||||
func (n *NodeController) updateLease(ctx context.Context) error {
|
||||
l, err := updateNodeLease(ctx, n.leases, newLease(ctx, n.lease, n.n, n.pingInterval))
|
||||
l, err := updateNodeLease(ctx, n.leases, newLease(ctx, n.lease, n.serverNode, n.pingInterval))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -390,16 +395,16 @@ func (n *NodeController) updateLease(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) (err error) {
|
||||
func (n *NodeController) updateStatus(ctx context.Context, providerNode *corev1.Node, skipErrorCb bool) (err error) {
|
||||
ctx, span := trace.StartSpan(ctx, "node.updateStatus")
|
||||
defer span.End()
|
||||
defer func() {
|
||||
span.SetStatus(err)
|
||||
}()
|
||||
|
||||
updateNodeStatusHeartbeat(n.n)
|
||||
updateNodeStatusHeartbeat(providerNode)
|
||||
|
||||
node, err := updateNodeStatus(ctx, n.nodes, n.n)
|
||||
node, err := updateNodeStatus(ctx, n.nodes, providerNode)
|
||||
if err != nil {
|
||||
if skipErrorCb || n.nodeStatusUpdateErrorHandler == nil {
|
||||
return err
|
||||
@@ -408,13 +413,14 @@ func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) (er
|
||||
return err
|
||||
}
|
||||
|
||||
node, err = updateNodeStatus(ctx, n.nodes, n.n)
|
||||
// This might have recreated the node, which may cause problems with our leases until a node update succeeds
|
||||
node, err = updateNodeStatus(ctx, n.nodes, providerNode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
n.n = node
|
||||
n.serverNode = node
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -637,7 +643,7 @@ func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvi
|
||||
}
|
||||
|
||||
// This will return a new lease. It will either update base lease (and the set the renewal time appropriately), or create a brand new lease
|
||||
func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRenewalInterval time.Duration) *coord.Lease {
|
||||
func newLease(ctx context.Context, base *coord.Lease, serverNode *corev1.Node, leaseRenewalInterval time.Duration) *coord.Lease {
|
||||
var lease *coord.Lease
|
||||
if base == nil {
|
||||
lease = &coord.Lease{}
|
||||
@@ -654,11 +660,11 @@ func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRe
|
||||
}
|
||||
|
||||
if lease.Name == "" {
|
||||
lease.Name = node.Name
|
||||
lease.Name = serverNode.Name
|
||||
}
|
||||
if lease.Spec.HolderIdentity == nil {
|
||||
// Let's do a copy here
|
||||
name := node.Name
|
||||
name := serverNode.Name
|
||||
lease.Spec.HolderIdentity = &name
|
||||
}
|
||||
|
||||
@@ -678,8 +684,8 @@ func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRe
|
||||
{
|
||||
APIVersion: corev1.SchemeGroupVersion.WithKind("Node").Version,
|
||||
Kind: corev1.SchemeGroupVersion.WithKind("Node").Kind,
|
||||
Name: node.Name,
|
||||
UID: node.UID,
|
||||
Name: serverNode.Name,
|
||||
UID: serverNode.UID,
|
||||
},
|
||||
}
|
||||
} else if l > 0 {
|
||||
@@ -687,21 +693,20 @@ func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRe
|
||||
for _, ref := range lease.OwnerReferences {
|
||||
if ref.APIVersion == corev1.SchemeGroupVersion.WithKind("Node").Version && ref.Kind == corev1.SchemeGroupVersion.WithKind("Node").Kind {
|
||||
foundAnyNode = true
|
||||
if node.UID == ref.UID && node.Name == ref.Name {
|
||||
if serverNode.UID == ref.UID && serverNode.Name == ref.Name {
|
||||
return lease
|
||||
} else {
|
||||
log.G(ctx).WithFields(map[string]interface{}{
|
||||
"node.UID": node.UID,
|
||||
"ref.UID": ref.UID,
|
||||
"node.Name": node.Name,
|
||||
"ref.Name": ref.Name,
|
||||
}).Warn("Found that lease had node in owner references that is not this node")
|
||||
}
|
||||
|
||||
log.G(ctx).WithFields(map[string]interface{}{
|
||||
"node.UID": serverNode.UID,
|
||||
"ref.UID": ref.UID,
|
||||
"node.Name": serverNode.Name,
|
||||
"ref.Name": ref.Name,
|
||||
}).Warn("Found that lease had node in owner references that is not this node")
|
||||
}
|
||||
}
|
||||
if !foundAnyNode {
|
||||
log.G(ctx).Warn("Found that lease had owner references, but no nodes in owner references")
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -208,10 +208,10 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
|
||||
case <-node.Ready():
|
||||
}
|
||||
|
||||
err = nodes.Delete(ctx, node.n.Name, metav1.DeleteOptions{})
|
||||
err = nodes.Delete(ctx, node.serverNode.Name, metav1.DeleteOptions{})
|
||||
assert.NilError(t, err)
|
||||
|
||||
testP.triggerStatusUpdate(node.n.DeepCopy())
|
||||
testP.triggerStatusUpdate(node.serverNode.DeepCopy())
|
||||
|
||||
timer = time.NewTimer(10 * time.Second)
|
||||
defer timer.Stop()
|
||||
|
||||
Reference in New Issue
Block a user