Merge pull request #884 from sargun/fix-provider-vs-server-node

This commit is contained in:
Brian Goff
2020-10-05 10:35:24 -07:00
committed by GitHub
3 changed files with 63 additions and 51 deletions

View File

@@ -72,10 +72,10 @@ type NodeProvider interface { //nolint:golint
// underlying options, the last NodeControllerOpt will win.
func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeControllerOpt) (*NodeController, error) {
n := &NodeController{
p: p,
n: node,
nodes: nodes,
chReady: make(chan struct{}),
p: p,
serverNode: node,
nodes: nodes,
chReady: make(chan struct{}),
}
for _, o := range opts {
if err := o(n); err != nil {
@@ -176,7 +176,9 @@ type ErrorHandler func(context.Context, error) error
// NodeController manages a single node entity.
type NodeController struct { // nolint: golint
p NodeProvider
n *corev1.Node
// serverNode should only be written to on initialization, or as the result of node creation.
serverNode *corev1.Node
leases v1beta1.LeaseInterface
nodes v1.NodeInterface
@@ -219,16 +221,18 @@ func (n *NodeController) Run(ctx context.Context) error {
n.chStatusUpdate <- node
})
if err := n.ensureNode(ctx); err != nil {
providerNode := n.serverNode.DeepCopy()
if err := n.ensureNode(ctx, providerNode); err != nil {
return err
}
if n.leases == nil {
n.disableLease = true
return n.controlLoop(ctx)
return n.controlLoop(ctx, providerNode)
}
n.lease = newLease(ctx, n.lease, n.n, n.pingInterval)
n.lease = newLease(ctx, n.lease, n.serverNode, n.pingInterval)
l, err := ensureLease(ctx, n.leases, n.lease)
if err != nil {
@@ -241,26 +245,32 @@ func (n *NodeController) Run(ctx context.Context) error {
n.lease = l
log.G(ctx).Debug("Created node lease")
return n.controlLoop(ctx)
return n.controlLoop(ctx, providerNode)
}
func (n *NodeController) ensureNode(ctx context.Context) (err error) {
func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.Node) (err error) {
ctx, span := trace.StartSpan(ctx, "node.ensureNode")
defer span.End()
defer func() {
span.SetStatus(err)
}()
err = n.updateStatus(ctx, true)
err = n.updateStatus(ctx, providerNode, true)
if err == nil || !errors.IsNotFound(err) {
return err
}
node, err := n.nodes.Create(ctx, n.n, metav1.CreateOptions{})
node, err := n.nodes.Create(ctx, n.serverNode, metav1.CreateOptions{})
if err != nil {
return pkgerrors.Wrap(err, "error registering node with kubernetes")
}
n.n = node
n.serverNode = node
// Bad things will happen if the node is deleted in k8s and recreated by someone else
// we rely on this persisting
providerNode.ObjectMeta.Name = node.Name
providerNode.ObjectMeta.Namespace = node.Namespace
providerNode.ObjectMeta.UID = node.UID
return nil
}
@@ -272,7 +282,7 @@ func (n *NodeController) Ready() <-chan struct{} {
return n.chReady
}
func (n *NodeController) controlLoop(ctx context.Context) error {
func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.Node) error {
pingTimer := time.NewTimer(n.pingInterval)
defer pingTimer.Stop()
@@ -281,7 +291,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
timerResetDuration := n.statusInterval
if n.disableLease {
// when resetting the timer after processing a status update, reset it to the ping interval
// (since it will be the ping timer as n.disableLease == true)
// (since it will be the ping timer as serverNode.disableLease == true)
timerResetDuration = n.pingInterval
// hack to make sure this channel always blocks since we won't be using it
@@ -318,25 +328,20 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
<-t.C
}
n.n.Status = updated.Status
n.n.ObjectMeta = metav1.ObjectMeta{
Annotations: updated.Annotations,
Labels: updated.Labels,
Name: n.n.ObjectMeta.Name,
Namespace: n.n.ObjectMeta.Namespace,
UID: n.n.ObjectMeta.UID,
}
if err := n.updateStatus(ctx, false); err != nil {
providerNode.Status = updated.Status
providerNode.ObjectMeta.Annotations = updated.Annotations
providerNode.ObjectMeta.Labels = updated.Labels
if err := n.updateStatus(ctx, providerNode, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
t.Reset(timerResetDuration)
case <-statusTimer.C:
if err := n.updateStatus(ctx, false); err != nil {
if err := n.updateStatus(ctx, providerNode, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
statusTimer.Reset(n.statusInterval)
case <-pingTimer.C:
if err := n.handlePing(ctx); err != nil {
if err := n.handlePing(ctx, providerNode); err != nil {
log.G(ctx).WithError(err).Error("Error while handling node ping")
} else {
log.G(ctx).Debug("Successful node ping")
@@ -354,7 +359,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
}
}
func (n *NodeController) handlePing(ctx context.Context) (retErr error) {
func (n *NodeController) handlePing(ctx context.Context, providerNode *corev1.Node) (retErr error) {
ctx, span := trace.StartSpan(ctx, "node.handlePing")
defer span.End()
defer func() {
@@ -373,7 +378,7 @@ func (n *NodeController) handlePing(ctx context.Context) (retErr error) {
}
if n.disableLease {
return n.updateStatus(ctx, false)
return n.updateStatus(ctx, providerNode, false)
}
// TODO(Sargun): Pass down the result / timestamp so we can accurately track when the ping actually occurred
@@ -381,7 +386,7 @@ func (n *NodeController) handlePing(ctx context.Context) (retErr error) {
}
func (n *NodeController) updateLease(ctx context.Context) error {
l, err := updateNodeLease(ctx, n.leases, newLease(ctx, n.lease, n.n, n.pingInterval))
l, err := updateNodeLease(ctx, n.leases, newLease(ctx, n.lease, n.serverNode, n.pingInterval))
if err != nil {
return err
}
@@ -390,16 +395,16 @@ func (n *NodeController) updateLease(ctx context.Context) error {
return nil
}
func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) (err error) {
func (n *NodeController) updateStatus(ctx context.Context, providerNode *corev1.Node, skipErrorCb bool) (err error) {
ctx, span := trace.StartSpan(ctx, "node.updateStatus")
defer span.End()
defer func() {
span.SetStatus(err)
}()
updateNodeStatusHeartbeat(n.n)
updateNodeStatusHeartbeat(providerNode)
node, err := updateNodeStatus(ctx, n.nodes, n.n)
node, err := updateNodeStatus(ctx, n.nodes, providerNode)
if err != nil {
if skipErrorCb || n.nodeStatusUpdateErrorHandler == nil {
return err
@@ -408,13 +413,14 @@ func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) (er
return err
}
node, err = updateNodeStatus(ctx, n.nodes, n.n)
// This might have recreated the node, which may cause problems with our leases until a node update succeeds
node, err = updateNodeStatus(ctx, n.nodes, providerNode)
if err != nil {
return err
}
}
n.n = node
n.serverNode = node
return nil
}
@@ -637,7 +643,7 @@ func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvi
}
// This will return a new lease. It will either update base lease (and the set the renewal time appropriately), or create a brand new lease
func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRenewalInterval time.Duration) *coord.Lease {
func newLease(ctx context.Context, base *coord.Lease, serverNode *corev1.Node, leaseRenewalInterval time.Duration) *coord.Lease {
var lease *coord.Lease
if base == nil {
lease = &coord.Lease{}
@@ -654,11 +660,11 @@ func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRe
}
if lease.Name == "" {
lease.Name = node.Name
lease.Name = serverNode.Name
}
if lease.Spec.HolderIdentity == nil {
// Let's do a copy here
name := node.Name
name := serverNode.Name
lease.Spec.HolderIdentity = &name
}
@@ -678,8 +684,8 @@ func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRe
{
APIVersion: corev1.SchemeGroupVersion.WithKind("Node").Version,
Kind: corev1.SchemeGroupVersion.WithKind("Node").Kind,
Name: node.Name,
UID: node.UID,
Name: serverNode.Name,
UID: serverNode.UID,
},
}
} else if l > 0 {
@@ -687,21 +693,20 @@ func newLease(ctx context.Context, base *coord.Lease, node *corev1.Node, leaseRe
for _, ref := range lease.OwnerReferences {
if ref.APIVersion == corev1.SchemeGroupVersion.WithKind("Node").Version && ref.Kind == corev1.SchemeGroupVersion.WithKind("Node").Kind {
foundAnyNode = true
if node.UID == ref.UID && node.Name == ref.Name {
if serverNode.UID == ref.UID && serverNode.Name == ref.Name {
return lease
} else {
log.G(ctx).WithFields(map[string]interface{}{
"node.UID": node.UID,
"ref.UID": ref.UID,
"node.Name": node.Name,
"ref.Name": ref.Name,
}).Warn("Found that lease had node in owner references that is not this node")
}
log.G(ctx).WithFields(map[string]interface{}{
"node.UID": serverNode.UID,
"ref.UID": ref.UID,
"node.Name": serverNode.Name,
"ref.Name": ref.Name,
}).Warn("Found that lease had node in owner references that is not this node")
}
}
if !foundAnyNode {
log.G(ctx).Warn("Found that lease had owner references, but no nodes in owner references")
}
}

View File

@@ -208,10 +208,10 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
case <-node.Ready():
}
err = nodes.Delete(ctx, node.n.Name, metav1.DeleteOptions{})
err = nodes.Delete(ctx, node.serverNode.Name, metav1.DeleteOptions{})
assert.NilError(t, err)
testP.triggerStatusUpdate(node.n.DeepCopy())
testP.triggerStatusUpdate(node.serverNode.DeepCopy())
timer = time.NewTimer(10 * time.Second)
defer timer.Stop()

View File

@@ -104,7 +104,10 @@ func TestPodEventFilter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go tc.Run(ctx, 1)
errCh := make(chan error)
go func() {
errCh <- tc.Run(ctx, 1)
}()
ctxT, cancelT := context.WithTimeout(ctx, 30*time.Second)
defer cancelT()
@@ -115,6 +118,8 @@ func TestPodEventFilter(t *testing.T) {
case <-tc.Done():
t.Fatal(tc.Err())
case <-tc.Ready():
case err := <-errCh:
t.Fatal(err.Error())
}
pod := &corev1.Pod{}
@@ -142,6 +147,8 @@ func TestPodEventFilter(t *testing.T) {
case <-ctxT.Done():
t.Fatal(ctxT.Err())
case <-wait:
case err := <-errCh:
t.Fatal(err.Error())
}
}
}