From 85292ef4ef18fb80a0cd97838f9c363ade17e31c Mon Sep 17 00:00:00 2001 From: Yash Desai Date: Wed, 3 Apr 2019 10:40:57 -0700 Subject: [PATCH] Patch the node status instead of updating it. (#557) * Patch the node status instead of updating it. Virtual-kubelet updates the node status periodically. This change proposes we use the `Patch` API instead of `Update`, to update the node status. This avoids overwriting any node updates made by other controllers in the system, for example a attach-detach controller. Patch API does a strategic merge instead of overwriting the entire object, which ensures parallel updates don't overwrite each other. Note: `PatchNodeStatus` reduces the time precision to the seconds-level and therefore I corrected the test for this. consider two controllers: CONTROLLER 1 (virtual kubelet) | CONTROLLER 2 oldNode := nodes.Get(nodename) | | node := nodes.Get(nodename) | // update status with attached volumes info | updateNode := Nodes.UpdateStatus(node) // update vkubelet info on node status | latestNode := Nodes.UpdateStatus(oldNode) | <-- latestNode does not contain the volume info added by second controller. with my patch change: CONTROLLER 1 (virtual kubelet) | CONTROLLER 2 oldNode := Nodes.Get(nodename) | | node := Nodes.Get(nodename) | // update status with attached volumes info | updateNode := Nodes.UpdateStatus(node) node := oldNode.DeepCopy() | // update vkubelet info on node status | latestNode := util.PatchNodeStatus(oldNode, node) | <-- latestNode contains the volume info added by second controller. Testing Done: make test * Introduce PatchNodeStatus into vkubelet. * Pass only the node interface. --- Gopkg.lock | 1 + vkubelet/node.go | 55 +++++++++++++++++++++++++++++++++++++++---- vkubelet/node_test.go | 4 ++-- 3 files changed, 54 insertions(+), 6 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index af2ec2f59..8d0b75f7d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1883,6 +1883,7 @@ "k8s.io/apimachinery/pkg/types", "k8s.io/apimachinery/pkg/util/intstr", "k8s.io/apimachinery/pkg/util/net", + "k8s.io/apimachinery/pkg/util/strategicpatch", "k8s.io/apimachinery/pkg/util/uuid", "k8s.io/apimachinery/pkg/util/validation", "k8s.io/apimachinery/pkg/util/wait", diff --git a/vkubelet/node.go b/vkubelet/node.go index bf4e39f38..106d41248 100644 --- a/vkubelet/node.go +++ b/vkubelet/node.go @@ -2,6 +2,8 @@ package vkubelet import ( "context" + "encoding/json" + "fmt" "time" "github.com/cpuguy83/strongerrors/status/ocstatus" @@ -13,6 +15,8 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" v1 "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -302,16 +306,56 @@ func UpdateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease * // just so we don't have to allocate this on every get request var emptyGetOptions = metav1.GetOptions{} +// PatchNodeStatus patches node status. +// Copied from github.com/kubernetes/kubernetes/pkg/util/node +func PatchNodeStatus(nodes v1.NodeInterface, nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) (*corev1.Node, []byte, error) { + patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode) + if err != nil { + return nil, nil, err + } + + updatedNode, err := nodes.Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status") + if err != nil { + return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err) + } + return updatedNode, patchBytes, nil +} + +func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) ([]byte, error) { + oldData, err := json.Marshal(oldNode) + if err != nil { + return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err) + } + + // Reset spec to make sure only patch for Status or ObjectMeta is generated. + // Note that we don't reset ObjectMeta here, because: + // 1. This aligns with Nodes().UpdateStatus(). + // 2. Some component does use this to update node annotations. + newNode.Spec = oldNode.Spec + newData, err := json.Marshal(newNode) + if err != nil { + return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{}) + if err != nil { + return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err) + } + return patchBytes, nil +} + // UpdateNodeStatus triggers an update to the node status in Kubernetes. // It first fetches the current node details and then sets the status according // to the passed in node object. // // If you use this function, it is up to you to syncronize this with other operations. +// This reduces the time to second-level precision. func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (*corev1.Node, error) { ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus") defer span.End() + var node *corev1.Node - node, err := nodes.Get(n.Name, emptyGetOptions) + oldNode, err := nodes.Get(n.Name, emptyGetOptions) if err != nil { if !errors.IsNotFound(err) { span.SetStatus(ocstatus.FromError(err)) @@ -328,18 +372,21 @@ func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Nod } log.G(ctx).Debug("got node from api server") - + node = oldNode.DeepCopy() node.ResourceVersion = "" node.Status = n.Status ctx = addNodeAttributes(ctx, span, node) - updated, err := nodes.UpdateStatus(node) + // Patch the node status to merge other changes on the node. + updated, _, err := PatchNodeStatus(nodes, types.NodeName(n.Name), oldNode, node) if err != nil { return nil, err } - log.G(ctx).WithField("resourceVersion", updated.ResourceVersion).Debug("updated node status in api server") + log.G(ctx).WithField("node.resourceVersion", updated.ResourceVersion). + WithField("node.Status.Conditions", updated.Status.Conditions). + Debug("updated node status in api server") return updated, nil } diff --git a/vkubelet/node_test.go b/vkubelet/node_test.go index 5d65a281e..ded48e1c8 100644 --- a/vkubelet/node_test.go +++ b/vkubelet/node_test.go @@ -111,7 +111,7 @@ func testNodeRun(t *testing.T, enableLease bool) { n := node.n.DeepCopy() newCondition := corev1.NodeCondition{ Type: corev1.NodeConditionType("UPDATED"), - LastTransitionTime: metav1.NewTime(time.Now()), + LastTransitionTime: metav1.Now().Rfc3339Copy(), } n.Status.Conditions = append(n.Status.Conditions, newCondition) @@ -170,7 +170,7 @@ func TestEnsureLease(t *testing.T) { func TestUpdateNodeStatus(t *testing.T) { n := testNode(t) n.Status.Conditions = append(n.Status.Conditions, corev1.NodeCondition{ - LastHeartbeatTime: metav1.NewTime(time.Now()), + LastHeartbeatTime: metav1.Now().Rfc3339Copy(), }) n.Status.Phase = corev1.NodePending nodes := testclient.NewSimpleClientset().CoreV1().Nodes()