Patch the node status instead of updating it. (#557)
* Patch the node status instead of updating it.
Virtual-kubelet updates the node status periodically.
This change proposes we use the `Patch` API instead of `Update`,
to update the node status.
This avoids overwriting any node updates made by other controllers
in the system, for example a attach-detach controller.
Patch API does a strategic merge instead of overwriting
the entire object, which ensures parallel updates don't overwrite
each other.
Note: `PatchNodeStatus` reduces the time precision to the seconds-level
and therefore I corrected the test for this.
consider two controllers:
CONTROLLER 1 (virtual kubelet) | CONTROLLER 2
oldNode := nodes.Get(nodename) |
| node := nodes.Get(nodename)
| // update status with attached volumes info
| updateNode := Nodes.UpdateStatus(node)
// update vkubelet info on node status |
latestNode := Nodes.UpdateStatus(oldNode) |
<-- latestNode does not contain the volume info added by second controller.
with my patch change:
CONTROLLER 1 (virtual kubelet) | CONTROLLER 2
oldNode := Nodes.Get(nodename) |
| node := Nodes.Get(nodename)
| // update status with attached volumes info
| updateNode := Nodes.UpdateStatus(node)
node := oldNode.DeepCopy() |
// update vkubelet info on node status |
latestNode := util.PatchNodeStatus(oldNode, node) |
<-- latestNode contains the volume info added by second controller.
Testing Done: make test
* Introduce PatchNodeStatus into vkubelet.
* Pass only the node interface.
This commit is contained in:
1
Gopkg.lock
generated
1
Gopkg.lock
generated
@@ -1883,6 +1883,7 @@
|
||||
"k8s.io/apimachinery/pkg/types",
|
||||
"k8s.io/apimachinery/pkg/util/intstr",
|
||||
"k8s.io/apimachinery/pkg/util/net",
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch",
|
||||
"k8s.io/apimachinery/pkg/util/uuid",
|
||||
"k8s.io/apimachinery/pkg/util/validation",
|
||||
"k8s.io/apimachinery/pkg/util/wait",
|
||||
|
||||
@@ -2,6 +2,8 @@ package vkubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cpuguy83/strongerrors/status/ocstatus"
|
||||
@@ -13,6 +15,8 @@ import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
|
||||
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
)
|
||||
@@ -302,16 +306,56 @@ func UpdateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *
|
||||
// just so we don't have to allocate this on every get request
|
||||
var emptyGetOptions = metav1.GetOptions{}
|
||||
|
||||
// PatchNodeStatus patches node status.
|
||||
// Copied from github.com/kubernetes/kubernetes/pkg/util/node
|
||||
func PatchNodeStatus(nodes v1.NodeInterface, nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) (*corev1.Node, []byte, error) {
|
||||
patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
updatedNode, err := nodes.Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
|
||||
}
|
||||
return updatedNode, patchBytes, nil
|
||||
}
|
||||
|
||||
func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) ([]byte, error) {
|
||||
oldData, err := json.Marshal(oldNode)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err)
|
||||
}
|
||||
|
||||
// Reset spec to make sure only patch for Status or ObjectMeta is generated.
|
||||
// Note that we don't reset ObjectMeta here, because:
|
||||
// 1. This aligns with Nodes().UpdateStatus().
|
||||
// 2. Some component does use this to update node annotations.
|
||||
newNode.Spec = oldNode.Spec
|
||||
newData, err := json.Marshal(newNode)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err)
|
||||
}
|
||||
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err)
|
||||
}
|
||||
return patchBytes, nil
|
||||
}
|
||||
|
||||
// UpdateNodeStatus triggers an update to the node status in Kubernetes.
|
||||
// It first fetches the current node details and then sets the status according
|
||||
// to the passed in node object.
|
||||
//
|
||||
// If you use this function, it is up to you to syncronize this with other operations.
|
||||
// This reduces the time to second-level precision.
|
||||
func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (*corev1.Node, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus")
|
||||
defer span.End()
|
||||
var node *corev1.Node
|
||||
|
||||
node, err := nodes.Get(n.Name, emptyGetOptions)
|
||||
oldNode, err := nodes.Get(n.Name, emptyGetOptions)
|
||||
if err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
span.SetStatus(ocstatus.FromError(err))
|
||||
@@ -328,18 +372,21 @@ func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Nod
|
||||
}
|
||||
|
||||
log.G(ctx).Debug("got node from api server")
|
||||
|
||||
node = oldNode.DeepCopy()
|
||||
node.ResourceVersion = ""
|
||||
node.Status = n.Status
|
||||
|
||||
ctx = addNodeAttributes(ctx, span, node)
|
||||
|
||||
updated, err := nodes.UpdateStatus(node)
|
||||
// Patch the node status to merge other changes on the node.
|
||||
updated, _, err := PatchNodeStatus(nodes, types.NodeName(n.Name), oldNode, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.G(ctx).WithField("resourceVersion", updated.ResourceVersion).Debug("updated node status in api server")
|
||||
log.G(ctx).WithField("node.resourceVersion", updated.ResourceVersion).
|
||||
WithField("node.Status.Conditions", updated.Status.Conditions).
|
||||
Debug("updated node status in api server")
|
||||
return updated, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -111,7 +111,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
|
||||
n := node.n.DeepCopy()
|
||||
newCondition := corev1.NodeCondition{
|
||||
Type: corev1.NodeConditionType("UPDATED"),
|
||||
LastTransitionTime: metav1.NewTime(time.Now()),
|
||||
LastTransitionTime: metav1.Now().Rfc3339Copy(),
|
||||
}
|
||||
n.Status.Conditions = append(n.Status.Conditions, newCondition)
|
||||
|
||||
@@ -170,7 +170,7 @@ func TestEnsureLease(t *testing.T) {
|
||||
func TestUpdateNodeStatus(t *testing.T) {
|
||||
n := testNode(t)
|
||||
n.Status.Conditions = append(n.Status.Conditions, corev1.NodeCondition{
|
||||
LastHeartbeatTime: metav1.NewTime(time.Now()),
|
||||
LastHeartbeatTime: metav1.Now().Rfc3339Copy(),
|
||||
})
|
||||
n.Status.Phase = corev1.NodePending
|
||||
nodes := testclient.NewSimpleClientset().CoreV1().Nodes()
|
||||
|
||||
Reference in New Issue
Block a user