Merge pull request #838 from sargun/threeway-patch

Introduce three-way patch for proper handling of updates
This commit is contained in:
Brian Goff
2020-07-07 12:52:14 -07:00
committed by GitHub
2 changed files with 423 additions and 48 deletions

View File

@@ -17,7 +17,6 @@ package node
import (
"context"
"encoding/json"
"fmt"
"time"
pkgerrors "github.com/pkg/errors"
@@ -31,6 +30,14 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
)
const (
// Annotation with the JSON-serialized last applied node conditions. Based on kube ctl apply. Used to calculate
// the three-way patch
virtualKubeletLastNodeAppliedNodeStatus = "virtual-kubelet.io/last-applied-node-status"
virtualKubeletLastNodeAppliedObjectMeta = "virtual-kubelet.io/last-applied-object-meta"
)
// NodeProvider is the interface used for registering a node and updating its
@@ -287,6 +294,13 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
}
n.n.Status = updated.Status
n.n.ObjectMeta = metav1.ObjectMeta{
Annotations: updated.Annotations,
Labels: updated.Labels,
Name: n.n.ObjectMeta.Name,
Namespace: n.n.ObjectMeta.Namespace,
UID: n.n.ObjectMeta.UID,
}
if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
@@ -428,42 +442,117 @@ func updateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *
// just so we don't have to allocate this on every get request
var emptyGetOptions = metav1.GetOptions{}
// patchNodeStatus patches node status.
// Copied from github.com/kubernetes/kubernetes/pkg/util/node
func patchNodeStatus(nodes v1.NodeInterface, nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) (*corev1.Node, []byte, error) {
patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
if err != nil {
return nil, nil, err
func prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode *corev1.Node) ([]byte, error) {
// We use these two values to calculate a patch. We use a three-way patch in order to avoid
// causing state regression server side. For example, let's consider the scenario:
/*
UML Source:
@startuml
participant VK
participant K8s
participant ExternalUpdater
note right of VK: Updates internal node conditions to [A, B]
VK->K8s: Patch Upsert [A, B]
note left of K8s: Node conditions are [A, B]
ExternalUpdater->K8s: Patch Upsert [C]
note left of K8s: Node Conditions are [A, B, C]
note right of VK: Updates internal node conditions to [A]
VK->K8s: Patch: delete B, upsert A\nThis is where things go wrong,\nbecause the patch is written to replace all node conditions\nit overwrites (drops) [C]
note left of K8s: Node Conditions are [A]\nNode condition C from ExternalUpdater is no longer present
@enduml
,--. ,---. ,---------------.
|VK| |K8s| |ExternalUpdater|
`+-' `-+-' `-------+-------'
| ,------------------------------------------!. | |
| |Updates internal node conditions to [A, B]|_\ | |
| `--------------------------------------------' | |
| Patch Upsert [A, B] | |
| -----------------------------------------------------------> |
| | |
| ,--------------------------!. | |
| |Node conditions are [A, B]|_\| |
| `----------------------------'| |
| | Patch Upsert [C] |
| | <-------------------
| | |
| ,-----------------------------!. | |
| |Node Conditions are [A, B, C]|_\| |
| `-------------------------------'| |
| ,---------------------------------------!. | |
| |Updates internal node conditions to [A]|_\ | |
| `-----------------------------------------' | |
| Patch: delete B, upsert A | |
| This is where things go wrong, | |
| because the patch is written to replace all node conditions| |
| it overwrites (drops) [C] | |
| -----------------------------------------------------------> |
| | |
,----------------------------------------------------------!. | |
|Node Conditions are [A] |_\| |
|Node condition C from ExternalUpdater is no longer present || |
`------------------------------------------------------------'+-. ,-------+-------.
|VK| |K8s| |ExternalUpdater|
`--' `---' `---------------'
*/
// In order to calculate that last patch to delete B, and upsert C, we need to know that C was added by
// "someone else". So, we keep track of our last applied value, and our current value. We then generate
// our patch based on the diff of these and *not* server side state.
oldVKStatus, ok1 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus]
oldVKObjectMeta, ok2 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta]
oldNode := corev1.Node{}
// Check if there were no labels, which means someone else probably created the node, or this is an upgrade. Either way, we will consider
// ourselves as never having written the node object before, so oldNode will be left empty. We will overwrite values if
// our new node conditions / status / objectmeta have them
if ok1 && ok2 {
err := json.Unmarshal([]byte(oldVKObjectMeta), &oldNode.ObjectMeta)
if err != nil {
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node object metadata (key: %q): %q", virtualKubeletLastNodeAppliedObjectMeta, oldVKObjectMeta)
}
err = json.Unmarshal([]byte(oldVKStatus), &oldNode.Status)
if err != nil {
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node status (key: %q): %q", virtualKubeletLastNodeAppliedNodeStatus, oldVKStatus)
}
}
updatedNode, err := nodes.Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
}
return updatedNode, patchBytes, nil
}
// newNode is the representation of the node the provider "wants"
newNode := corev1.Node{}
newNode.ObjectMeta = simplestObjectMetadata(&apiServerNode.ObjectMeta, &nodeFromProvider.ObjectMeta)
nodeFromProvider.Status.DeepCopyInto(&newNode.Status)
func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) ([]byte, error) {
oldData, err := json.Marshal(oldNode)
// virtualKubeletLastNodeAppliedObjectMeta must always be set before virtualKubeletLastNodeAppliedNodeStatus,
// otherwise we capture virtualKubeletLastNodeAppliedNodeStatus in virtualKubeletLastNodeAppliedObjectMeta,
// which is wrong
virtualKubeletLastNodeAppliedObjectMetaBytes, err := json.Marshal(newNode.ObjectMeta)
if err != nil {
return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err)
return nil, pkgerrors.Wrap(err, "Cannot marshal object meta from provider")
}
newNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta] = string(virtualKubeletLastNodeAppliedObjectMetaBytes)
// Reset spec to make sure only patch for Status or ObjectMeta is generated.
// Note that we don't reset ObjectMeta here, because:
// 1. This aligns with Nodes().UpdateStatus().
// 2. Some component does use this to update node annotations.
newNode.Spec = oldNode.Spec
newData, err := json.Marshal(newNode)
virtualKubeletLastNodeAppliedNodeStatusBytes, err := json.Marshal(newNode.Status)
if err != nil {
return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err)
return nil, pkgerrors.Wrap(err, "Cannot marshal node status from provider")
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
newNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus] = string(virtualKubeletLastNodeAppliedNodeStatusBytes)
// Generate three way patch from oldNode -> newNode, without deleting the changes in api server
// Should we use the Kubernetes serialization / deserialization libraries here?
oldNodeBytes, err := json.Marshal(oldNode)
if err != nil {
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err)
return nil, pkgerrors.Wrap(err, "Cannot marshal old node bytes")
}
return patchBytes, nil
newNodeBytes, err := json.Marshal(newNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal new node bytes")
}
apiServerNodeBytes, err := json.Marshal(apiServerNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal api server node")
}
schema, err := strategicpatch.NewPatchMetaFromStruct(&corev1.Node{})
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot get patch schema from node")
}
return strategicpatch.CreateThreeWayMergePatch(oldNodeBytes, newNodeBytes, apiServerNodeBytes, schema, true)
}
// updateNodeStatus triggers an update to the node status in Kubernetes.
@@ -472,37 +561,44 @@ func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Nod
//
// If you use this function, it is up to you to synchronize this with other operations.
// This reduces the time to second-level precision.
func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (_ *corev1.Node, retErr error) {
func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvider *corev1.Node) (_ *corev1.Node, retErr error) {
ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus")
defer func() {
span.End()
span.SetStatus(retErr)
}()
var node *corev1.Node
var updatedNode *corev1.Node
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
apiServerNode, err := nodes.Get(nodeFromProvider.Name, emptyGetOptions)
if err != nil {
return err
}
ctx = addNodeAttributes(ctx, span, apiServerNode)
log.G(ctx).Debug("got node from api server")
patchBytes, err := prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode)
if err != nil {
return pkgerrors.Wrap(err, "Cannot generate patch")
}
log.G(ctx).WithError(err).WithField("patch", string(patchBytes)).Debug("Generated three way patch")
updatedNode, err = nodes.Patch(nodeFromProvider.Name, types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
// We cannot wrap this error because the kubernetes error module doesn't understand wrapping
log.G(ctx).WithField("patch", string(patchBytes)).WithError(err).Warn("Failed to patch node status")
return err
}
return nil
})
oldNode, err := nodes.Get(n.Name, emptyGetOptions)
if err != nil {
return nil, err
}
log.G(ctx).Debug("got node from api server")
node = oldNode.DeepCopy()
node.ResourceVersion = ""
node.Status = n.Status
ctx = addNodeAttributes(ctx, span, node)
// Patch the node status to merge other changes on the node.
updated, _, err := patchNodeStatus(nodes, types.NodeName(n.Name), oldNode, node)
if err != nil {
return nil, err
}
log.G(ctx).WithField("node.resourceVersion", updated.ResourceVersion).
WithField("node.Status.Conditions", updated.Status.Conditions).
log.G(ctx).WithField("node.resourceVersion", updatedNode.ResourceVersion).
WithField("node.Status.Conditions", updatedNode.Status.Conditions).
Debug("updated node status in api server")
return updated, nil
return updatedNode, nil
}
func newLease(base *coord.Lease) *coord.Lease {
@@ -577,3 +673,31 @@ func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) con
"node.taints": taintsStringer(n.Spec.Taints),
})
}
// Provides the simplest object metadata to match the previous object. Name, namespace, UID. It copies labels and
// annotations from the second object if defined. It exempts the patch metadata
func simplestObjectMetadata(baseObjectMeta, objectMetaWithLabelsAndAnnotations *metav1.ObjectMeta) metav1.ObjectMeta {
ret := metav1.ObjectMeta{
Namespace: baseObjectMeta.Namespace,
Name: baseObjectMeta.Name,
UID: baseObjectMeta.UID,
Annotations: make(map[string]string),
}
if objectMetaWithLabelsAndAnnotations != nil {
if objectMetaWithLabelsAndAnnotations.Labels != nil {
ret.Labels = objectMetaWithLabelsAndAnnotations.Labels
} else {
ret.Labels = make(map[string]string)
}
if objectMetaWithLabelsAndAnnotations.Annotations != nil {
// We want to copy over all annotations except the special embedded ones.
for key := range objectMetaWithLabelsAndAnnotations.Annotations {
if key == virtualKubeletLastNodeAppliedNodeStatus || key == virtualKubeletLastNodeAppliedObjectMeta {
continue
}
ret.Annotations[key] = objectMetaWithLabelsAndAnnotations.Annotations[key]
}
}
}
return ret
}

View File

@@ -22,12 +22,14 @@ import (
"gotest.tools/assert"
"gotest.tools/assert/cmp"
is "gotest.tools/assert/cmp"
coord "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
watch "k8s.io/apimachinery/pkg/watch"
testclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/retry"
)
func TestNodeRun(t *testing.T) {
@@ -374,6 +376,252 @@ func TestPingAfterStatusUpdate(t *testing.T) {
assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval)
}
// Are annotations that were created before the VK existed preserved?
func TestBeforeAnnotationsPreserved(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
interval := 10 * time.Millisecond
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
}
testNode := testNode(t)
testNodeCreateCopy := testNode.DeepCopy()
testNodeCreateCopy.Annotations = map[string]string{
"beforeAnnotation": "value",
}
_, err := nodes.Create(testNodeCreateCopy)
assert.NilError(t, err)
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
// so it will trigger the race detector.
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error)
defer func() {
cancel()
assert.NilError(t, <-chErr)
}()
go func() {
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr := nw.ResultChan()
t.Log("Waiting for node to exist")
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
return true
}))
testP.notifyNodeStatus(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"testAnnotation": "value",
},
},
})
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
_, ok := e.Object.(*corev1.Node).Annotations["testAnnotation"]
return ok
}))
newNode, err := nodes.Get(testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation"))
assert.Assert(t, is.Contains(newNode.Annotations, "beforeAnnotation"))
}
// Are conditions set by systems outside of VK preserved?
func TestManualConditionsPreserved(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
interval := 10 * time.Millisecond
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
}
testNode := testNode(t)
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
// so it will trigger the race detector.
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error)
defer func() {
cancel()
assert.NilError(t, <-chErr)
}()
go func() {
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr := nw.ResultChan()
t.Log("Waiting for node to exist")
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
receivedNode := e.Object.(*corev1.Node)
if len(receivedNode.Status.Conditions) != 0 {
return false
}
return true
}))
newNode, err := nodes.Get(testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Len(newNode.Status.Conditions, 0))
baseCondition := corev1.NodeCondition{
Type: "BaseCondition",
Status: "Ok",
Reason: "NA",
Message: "This is the base condition. It is set by VK, and should always be there.",
}
testP.notifyNodeStatus(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"testAnnotation": "value",
},
},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
baseCondition,
},
},
})
// Wait for this (node with condition) to show up
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == baseCondition.Type {
return true
}
}
return false
}))
newNode, err = nodes.Get(testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Len(newNode.Status.Conditions, 1))
assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation"))
// Add a new event manually
manuallyAddedCondition := corev1.NodeCondition{
Type: "ManuallyAddedCondition",
Status: "Ok",
Reason: "NA",
Message: "This is a manually added condition. Outside of VK. It should not be removed.",
}
assert.NilError(t, retry.RetryOnConflict(retry.DefaultRetry, func() error {
newNode, err = nodes.Get(testNodeCopy.Name, emptyGetOptions)
if err != nil {
return err
}
newNode.Annotations["manuallyAddedAnnotation"] = "value"
newNode.Status.Conditions = append(newNode.Status.Conditions, manuallyAddedCondition)
_, err = nodes.UpdateStatus(newNode)
return err
}))
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == manuallyAddedCondition.Type {
return true
}
}
assert.Assert(t, is.Contains(receivedNode.Annotations, "testAnnotation"))
assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation"))
return false
}))
// Let's have the VK have a new condition.
newCondition := corev1.NodeCondition{
Type: "NewCondition",
Status: "Ok",
Reason: "NA",
Message: "This is a newly added condition. It should only show up *with* / *after* ManuallyAddedCondition. It is set by the VK.",
}
// Everything but node status is ignored here
testP.notifyNodeStatus(&corev1.Node{
// Annotations is left empty
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
baseCondition,
newCondition,
},
},
})
i := 0
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == newCondition.Type {
// Wait for 2 updates / patches
if i > 2 {
return true
}
i++
}
}
return false
}))
// Make sure that all three conditions are there.
newNode, err = nodes.Get(testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
seenConditionTypes := make([]corev1.NodeConditionType, len(newNode.Status.Conditions))
for idx := range newNode.Status.Conditions {
seenConditionTypes[idx] = newNode.Status.Conditions[idx].Type
}
assert.Assert(t, is.Contains(seenConditionTypes, baseCondition.Type))
assert.Assert(t, is.Contains(seenConditionTypes, newCondition.Type))
assert.Assert(t, is.Contains(seenConditionTypes, manuallyAddedCondition.Type))
assert.Assert(t, is.Equal(newNode.Annotations["testAnnotation"], ""))
assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation"))
t.Log(newNode.Status.Conditions)
}
func testNode(t *testing.T) *corev1.Node {
n := &corev1.Node{}
n.Name = strings.ToLower(t.Name())
@@ -383,16 +631,19 @@ func testNode(t *testing.T) *corev1.Node {
type testNodeProvider struct {
NodeProvider
statusHandlers []func(*corev1.Node)
// Callback to VK
notifyNodeStatus func(*corev1.Node)
}
func (p *testNodeProvider) NotifyNodeStatus(ctx context.Context, h func(*corev1.Node)) {
p.statusHandlers = append(p.statusHandlers, h)
p.notifyNodeStatus = h
}
func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
for _, h := range p.statusHandlers {
h(n)
}
p.notifyNodeStatus(n)
}
// testNodeProviderPing tracks the maximum time interval between calls to Ping