diff --git a/node/node.go b/node/node.go index 4f392f88a..44e0cb1d7 100644 --- a/node/node.go +++ b/node/node.go @@ -17,7 +17,6 @@ package node import ( "context" "encoding/json" - "fmt" "time" pkgerrors "github.com/pkg/errors" @@ -31,6 +30,14 @@ import ( "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" v1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/util/retry" +) + +const ( + // Annotation with the JSON-serialized last applied node conditions. Based on kube ctl apply. Used to calculate + // the three-way patch + virtualKubeletLastNodeAppliedNodeStatus = "virtual-kubelet.io/last-applied-node-status" + virtualKubeletLastNodeAppliedObjectMeta = "virtual-kubelet.io/last-applied-object-meta" ) // NodeProvider is the interface used for registering a node and updating its @@ -287,6 +294,13 @@ func (n *NodeController) controlLoop(ctx context.Context) error { } n.n.Status = updated.Status + n.n.ObjectMeta = metav1.ObjectMeta{ + Annotations: updated.Annotations, + Labels: updated.Labels, + Name: n.n.ObjectMeta.Name, + Namespace: n.n.ObjectMeta.Namespace, + UID: n.n.ObjectMeta.UID, + } if err := n.updateStatus(ctx, false); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } @@ -428,42 +442,117 @@ func updateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease * // just so we don't have to allocate this on every get request var emptyGetOptions = metav1.GetOptions{} -// patchNodeStatus patches node status. -// Copied from github.com/kubernetes/kubernetes/pkg/util/node -func patchNodeStatus(nodes v1.NodeInterface, nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) (*corev1.Node, []byte, error) { - patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode) - if err != nil { - return nil, nil, err +func prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode *corev1.Node) ([]byte, error) { + // We use these two values to calculate a patch. We use a three-way patch in order to avoid + // causing state regression server side. For example, let's consider the scenario: + /* + UML Source: + @startuml + participant VK + participant K8s + participant ExternalUpdater + note right of VK: Updates internal node conditions to [A, B] + VK->K8s: Patch Upsert [A, B] + note left of K8s: Node conditions are [A, B] + ExternalUpdater->K8s: Patch Upsert [C] + note left of K8s: Node Conditions are [A, B, C] + note right of VK: Updates internal node conditions to [A] + VK->K8s: Patch: delete B, upsert A\nThis is where things go wrong,\nbecause the patch is written to replace all node conditions\nit overwrites (drops) [C] + note left of K8s: Node Conditions are [A]\nNode condition C from ExternalUpdater is no longer present + @enduml + ,--. ,---. ,---------------. + |VK| |K8s| |ExternalUpdater| + `+-' `-+-' `-------+-------' + | ,------------------------------------------!. | | + | |Updates internal node conditions to [A, B]|_\ | | + | `--------------------------------------------' | | + | Patch Upsert [A, B] | | + | -----------------------------------------------------------> | + | | | + | ,--------------------------!. | | + | |Node conditions are [A, B]|_\| | + | `----------------------------'| | + | | Patch Upsert [C] | + | | <------------------- + | | | + | ,-----------------------------!. | | + | |Node Conditions are [A, B, C]|_\| | + | `-------------------------------'| | + | ,---------------------------------------!. | | + | |Updates internal node conditions to [A]|_\ | | + | `-----------------------------------------' | | + | Patch: delete B, upsert A | | + | This is where things go wrong, | | + | because the patch is written to replace all node conditions| | + | it overwrites (drops) [C] | | + | -----------------------------------------------------------> | + | | | + ,----------------------------------------------------------!. | | + |Node Conditions are [A] |_\| | + |Node condition C from ExternalUpdater is no longer present || | + `------------------------------------------------------------'+-. ,-------+-------. + |VK| |K8s| |ExternalUpdater| + `--' `---' `---------------' + */ + // In order to calculate that last patch to delete B, and upsert C, we need to know that C was added by + // "someone else". So, we keep track of our last applied value, and our current value. We then generate + // our patch based on the diff of these and *not* server side state. + oldVKStatus, ok1 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus] + oldVKObjectMeta, ok2 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta] + + oldNode := corev1.Node{} + // Check if there were no labels, which means someone else probably created the node, or this is an upgrade. Either way, we will consider + // ourselves as never having written the node object before, so oldNode will be left empty. We will overwrite values if + // our new node conditions / status / objectmeta have them + if ok1 && ok2 { + err := json.Unmarshal([]byte(oldVKObjectMeta), &oldNode.ObjectMeta) + if err != nil { + return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node object metadata (key: %q): %q", virtualKubeletLastNodeAppliedObjectMeta, oldVKObjectMeta) + } + err = json.Unmarshal([]byte(oldVKStatus), &oldNode.Status) + if err != nil { + return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node status (key: %q): %q", virtualKubeletLastNodeAppliedNodeStatus, oldVKStatus) + } } - updatedNode, err := nodes.Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status") - if err != nil { - return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err) - } - return updatedNode, patchBytes, nil -} + // newNode is the representation of the node the provider "wants" + newNode := corev1.Node{} + newNode.ObjectMeta = simplestObjectMetadata(&apiServerNode.ObjectMeta, &nodeFromProvider.ObjectMeta) + nodeFromProvider.Status.DeepCopyInto(&newNode.Status) -func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) ([]byte, error) { - oldData, err := json.Marshal(oldNode) + // virtualKubeletLastNodeAppliedObjectMeta must always be set before virtualKubeletLastNodeAppliedNodeStatus, + // otherwise we capture virtualKubeletLastNodeAppliedNodeStatus in virtualKubeletLastNodeAppliedObjectMeta, + // which is wrong + virtualKubeletLastNodeAppliedObjectMetaBytes, err := json.Marshal(newNode.ObjectMeta) if err != nil { - return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err) + return nil, pkgerrors.Wrap(err, "Cannot marshal object meta from provider") } + newNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta] = string(virtualKubeletLastNodeAppliedObjectMetaBytes) - // Reset spec to make sure only patch for Status or ObjectMeta is generated. - // Note that we don't reset ObjectMeta here, because: - // 1. This aligns with Nodes().UpdateStatus(). - // 2. Some component does use this to update node annotations. - newNode.Spec = oldNode.Spec - newData, err := json.Marshal(newNode) + virtualKubeletLastNodeAppliedNodeStatusBytes, err := json.Marshal(newNode.Status) if err != nil { - return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err) + return nil, pkgerrors.Wrap(err, "Cannot marshal node status from provider") } - - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{}) + newNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus] = string(virtualKubeletLastNodeAppliedNodeStatusBytes) + // Generate three way patch from oldNode -> newNode, without deleting the changes in api server + // Should we use the Kubernetes serialization / deserialization libraries here? + oldNodeBytes, err := json.Marshal(oldNode) if err != nil { - return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err) + return nil, pkgerrors.Wrap(err, "Cannot marshal old node bytes") } - return patchBytes, nil + newNodeBytes, err := json.Marshal(newNode) + if err != nil { + return nil, pkgerrors.Wrap(err, "Cannot marshal new node bytes") + } + apiServerNodeBytes, err := json.Marshal(apiServerNode) + if err != nil { + return nil, pkgerrors.Wrap(err, "Cannot marshal api server node") + } + schema, err := strategicpatch.NewPatchMetaFromStruct(&corev1.Node{}) + if err != nil { + return nil, pkgerrors.Wrap(err, "Cannot get patch schema from node") + } + return strategicpatch.CreateThreeWayMergePatch(oldNodeBytes, newNodeBytes, apiServerNodeBytes, schema, true) } // updateNodeStatus triggers an update to the node status in Kubernetes. @@ -472,37 +561,44 @@ func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Nod // // If you use this function, it is up to you to synchronize this with other operations. // This reduces the time to second-level precision. -func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (_ *corev1.Node, retErr error) { +func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvider *corev1.Node) (_ *corev1.Node, retErr error) { ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus") defer func() { span.End() span.SetStatus(retErr) }() - var node *corev1.Node + var updatedNode *corev1.Node + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + apiServerNode, err := nodes.Get(nodeFromProvider.Name, emptyGetOptions) + if err != nil { + return err + } + ctx = addNodeAttributes(ctx, span, apiServerNode) + log.G(ctx).Debug("got node from api server") + + patchBytes, err := prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode) + if err != nil { + return pkgerrors.Wrap(err, "Cannot generate patch") + } + log.G(ctx).WithError(err).WithField("patch", string(patchBytes)).Debug("Generated three way patch") + + updatedNode, err = nodes.Patch(nodeFromProvider.Name, types.StrategicMergePatchType, patchBytes, "status") + if err != nil { + // We cannot wrap this error because the kubernetes error module doesn't understand wrapping + log.G(ctx).WithField("patch", string(patchBytes)).WithError(err).Warn("Failed to patch node status") + return err + } + return nil + }) - oldNode, err := nodes.Get(n.Name, emptyGetOptions) if err != nil { return nil, err } - - log.G(ctx).Debug("got node from api server") - node = oldNode.DeepCopy() - node.ResourceVersion = "" - node.Status = n.Status - - ctx = addNodeAttributes(ctx, span, node) - - // Patch the node status to merge other changes on the node. - updated, _, err := patchNodeStatus(nodes, types.NodeName(n.Name), oldNode, node) - if err != nil { - return nil, err - } - - log.G(ctx).WithField("node.resourceVersion", updated.ResourceVersion). - WithField("node.Status.Conditions", updated.Status.Conditions). + log.G(ctx).WithField("node.resourceVersion", updatedNode.ResourceVersion). + WithField("node.Status.Conditions", updatedNode.Status.Conditions). Debug("updated node status in api server") - return updated, nil + return updatedNode, nil } func newLease(base *coord.Lease) *coord.Lease { @@ -577,3 +673,31 @@ func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) con "node.taints": taintsStringer(n.Spec.Taints), }) } + +// Provides the simplest object metadata to match the previous object. Name, namespace, UID. It copies labels and +// annotations from the second object if defined. It exempts the patch metadata +func simplestObjectMetadata(baseObjectMeta, objectMetaWithLabelsAndAnnotations *metav1.ObjectMeta) metav1.ObjectMeta { + ret := metav1.ObjectMeta{ + Namespace: baseObjectMeta.Namespace, + Name: baseObjectMeta.Name, + UID: baseObjectMeta.UID, + Annotations: make(map[string]string), + } + if objectMetaWithLabelsAndAnnotations != nil { + if objectMetaWithLabelsAndAnnotations.Labels != nil { + ret.Labels = objectMetaWithLabelsAndAnnotations.Labels + } else { + ret.Labels = make(map[string]string) + } + if objectMetaWithLabelsAndAnnotations.Annotations != nil { + // We want to copy over all annotations except the special embedded ones. + for key := range objectMetaWithLabelsAndAnnotations.Annotations { + if key == virtualKubeletLastNodeAppliedNodeStatus || key == virtualKubeletLastNodeAppliedObjectMeta { + continue + } + ret.Annotations[key] = objectMetaWithLabelsAndAnnotations.Annotations[key] + } + } + } + return ret +} diff --git a/node/node_test.go b/node/node_test.go index f1195b90d..ab8cc6a9e 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -22,12 +22,14 @@ import ( "gotest.tools/assert" "gotest.tools/assert/cmp" + is "gotest.tools/assert/cmp" coord "k8s.io/api/coordination/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" watch "k8s.io/apimachinery/pkg/watch" testclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/util/retry" ) func TestNodeRun(t *testing.T) { @@ -374,6 +376,252 @@ func TestPingAfterStatusUpdate(t *testing.T) { assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval) } +// Are annotations that were created before the VK existed preserved? +func TestBeforeAnnotationsPreserved(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := testclient.NewSimpleClientset() + + testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}} + + nodes := c.CoreV1().Nodes() + + interval := 10 * time.Millisecond + opts := []NodeControllerOpt{ + WithNodePingInterval(interval), + } + + testNode := testNode(t) + testNodeCreateCopy := testNode.DeepCopy() + testNodeCreateCopy.Annotations = map[string]string{ + "beforeAnnotation": "value", + } + _, err := nodes.Create(testNodeCreateCopy) + assert.NilError(t, err) + + // We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller + // so it will trigger the race detector. + testNodeCopy := testNode.DeepCopy() + node, err := NewNodeController(testP, testNode, nodes, opts...) + assert.NilError(t, err) + + chErr := make(chan error) + defer func() { + cancel() + assert.NilError(t, <-chErr) + }() + + go func() { + chErr <- node.Run(ctx) + close(chErr) + }() + + nw := makeWatch(t, nodes, testNodeCopy.Name) + defer nw.Stop() + nr := nw.ResultChan() + + t.Log("Waiting for node to exist") + assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool { + if e.Object == nil { + return false + } + return true + })) + + testP.notifyNodeStatus(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "testAnnotation": "value", + }, + }, + }) + + assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool { + if e.Object == nil { + return false + } + _, ok := e.Object.(*corev1.Node).Annotations["testAnnotation"] + + return ok + })) + + newNode, err := nodes.Get(testNodeCopy.Name, emptyGetOptions) + assert.NilError(t, err) + + assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation")) + assert.Assert(t, is.Contains(newNode.Annotations, "beforeAnnotation")) +} + +// Are conditions set by systems outside of VK preserved? +func TestManualConditionsPreserved(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := testclient.NewSimpleClientset() + + testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}} + + nodes := c.CoreV1().Nodes() + + interval := 10 * time.Millisecond + opts := []NodeControllerOpt{ + WithNodePingInterval(interval), + } + + testNode := testNode(t) + // We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller + // so it will trigger the race detector. + testNodeCopy := testNode.DeepCopy() + node, err := NewNodeController(testP, testNode, nodes, opts...) + assert.NilError(t, err) + + chErr := make(chan error) + defer func() { + cancel() + assert.NilError(t, <-chErr) + }() + + go func() { + chErr <- node.Run(ctx) + close(chErr) + }() + + nw := makeWatch(t, nodes, testNodeCopy.Name) + defer nw.Stop() + nr := nw.ResultChan() + + t.Log("Waiting for node to exist") + assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool { + if e.Object == nil { + return false + } + receivedNode := e.Object.(*corev1.Node) + if len(receivedNode.Status.Conditions) != 0 { + return false + } + return true + })) + + newNode, err := nodes.Get(testNodeCopy.Name, emptyGetOptions) + assert.NilError(t, err) + assert.Assert(t, is.Len(newNode.Status.Conditions, 0)) + + baseCondition := corev1.NodeCondition{ + Type: "BaseCondition", + Status: "Ok", + Reason: "NA", + Message: "This is the base condition. It is set by VK, and should always be there.", + } + + testP.notifyNodeStatus(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "testAnnotation": "value", + }, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + baseCondition, + }, + }, + }) + + // Wait for this (node with condition) to show up + assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool { + receivedNode := e.Object.(*corev1.Node) + for _, condition := range receivedNode.Status.Conditions { + if condition.Type == baseCondition.Type { + return true + } + } + return false + })) + + newNode, err = nodes.Get(testNodeCopy.Name, emptyGetOptions) + assert.NilError(t, err) + assert.Assert(t, is.Len(newNode.Status.Conditions, 1)) + assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation")) + + // Add a new event manually + manuallyAddedCondition := corev1.NodeCondition{ + Type: "ManuallyAddedCondition", + Status: "Ok", + Reason: "NA", + Message: "This is a manually added condition. Outside of VK. It should not be removed.", + } + assert.NilError(t, retry.RetryOnConflict(retry.DefaultRetry, func() error { + newNode, err = nodes.Get(testNodeCopy.Name, emptyGetOptions) + if err != nil { + return err + } + newNode.Annotations["manuallyAddedAnnotation"] = "value" + newNode.Status.Conditions = append(newNode.Status.Conditions, manuallyAddedCondition) + _, err = nodes.UpdateStatus(newNode) + return err + })) + + assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool { + receivedNode := e.Object.(*corev1.Node) + for _, condition := range receivedNode.Status.Conditions { + if condition.Type == manuallyAddedCondition.Type { + return true + } + } + assert.Assert(t, is.Contains(receivedNode.Annotations, "testAnnotation")) + assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation")) + + return false + })) + + // Let's have the VK have a new condition. + newCondition := corev1.NodeCondition{ + Type: "NewCondition", + Status: "Ok", + Reason: "NA", + Message: "This is a newly added condition. It should only show up *with* / *after* ManuallyAddedCondition. It is set by the VK.", + } + + // Everything but node status is ignored here + testP.notifyNodeStatus(&corev1.Node{ + // Annotations is left empty + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + baseCondition, + newCondition, + }, + }, + }) + i := 0 + assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool { + receivedNode := e.Object.(*corev1.Node) + for _, condition := range receivedNode.Status.Conditions { + if condition.Type == newCondition.Type { + // Wait for 2 updates / patches + if i > 2 { + return true + } + i++ + } + } + return false + })) + + // Make sure that all three conditions are there. + newNode, err = nodes.Get(testNodeCopy.Name, emptyGetOptions) + assert.NilError(t, err) + seenConditionTypes := make([]corev1.NodeConditionType, len(newNode.Status.Conditions)) + for idx := range newNode.Status.Conditions { + seenConditionTypes[idx] = newNode.Status.Conditions[idx].Type + } + assert.Assert(t, is.Contains(seenConditionTypes, baseCondition.Type)) + assert.Assert(t, is.Contains(seenConditionTypes, newCondition.Type)) + assert.Assert(t, is.Contains(seenConditionTypes, manuallyAddedCondition.Type)) + assert.Assert(t, is.Equal(newNode.Annotations["testAnnotation"], "")) + assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation")) + + t.Log(newNode.Status.Conditions) +} func testNode(t *testing.T) *corev1.Node { n := &corev1.Node{} n.Name = strings.ToLower(t.Name()) @@ -383,16 +631,19 @@ func testNode(t *testing.T) *corev1.Node { type testNodeProvider struct { NodeProvider statusHandlers []func(*corev1.Node) + // Callback to VK + notifyNodeStatus func(*corev1.Node) } func (p *testNodeProvider) NotifyNodeStatus(ctx context.Context, h func(*corev1.Node)) { - p.statusHandlers = append(p.statusHandlers, h) + p.notifyNodeStatus = h } func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) { for _, h := range p.statusHandlers { h(n) } + p.notifyNodeStatus(n) } // testNodeProviderPing tracks the maximum time interval between calls to Ping