Introduce three-way patch for proper handling of out-of-band status updates
As described in the patch itself, there is a case that if a node is updated out of band (e.g. node-problem-detector (https://github.com/kubernetes/node-problem-detector)), we will overwrite the patch in our typicaly two-way strategic patch for node status updates. The reason why the standard kubelet can do this is because the flow goes: apiserver->kubelet: Fetch current node kubelet->kubelet: Update apiserver's snapshot with local state changes kubelet->apiserver: patch We don't have this luxury, as we rely on providers making a callback into us in order to get the most recent pod status. They do not have a way to do that merge operation themselves, and a two-way merge doesn't give us enough metadata. In order to work around this, we perform a three-way merge on behalf of the user. We do this by stashing the contents of the last update inside of it. We then fetch that status back, and use that for the future update itself. In the upgrade case, or the case where the VK has been created by "someone else", we do not know which attributes were created by or written by us, so we cannot generate a three way patch. In this case, we will do our best to avoid deleting any attributes, and only overwrite them. We will consider all current api server values written by "someone else", and not edit them. This is done by considering the "old node" to be empty.
This commit is contained in:
218
node/node.go
218
node/node.go
@@ -17,7 +17,6 @@ package node
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
pkgerrors "github.com/pkg/errors"
|
||||
@@ -31,6 +30,14 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
|
||||
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/util/retry"
|
||||
)
|
||||
|
||||
const (
|
||||
// Annotation with the JSON-serialized last applied node conditions. Based on kube ctl apply. Used to calculate
|
||||
// the three-way patch
|
||||
virtualKubeletLastNodeAppliedNodeStatus = "virtual-kubelet.io/last-applied-node-status"
|
||||
virtualKubeletLastNodeAppliedObjectMeta = "virtual-kubelet.io/last-applied-object-meta"
|
||||
)
|
||||
|
||||
// NodeProvider is the interface used for registering a node and updating its
|
||||
@@ -287,6 +294,13 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
|
||||
}
|
||||
|
||||
n.n.Status = updated.Status
|
||||
n.n.ObjectMeta = metav1.ObjectMeta{
|
||||
Annotations: updated.Annotations,
|
||||
Labels: updated.Labels,
|
||||
Name: n.n.ObjectMeta.Name,
|
||||
Namespace: n.n.ObjectMeta.Namespace,
|
||||
UID: n.n.ObjectMeta.UID,
|
||||
}
|
||||
if err := n.updateStatus(ctx, false); err != nil {
|
||||
log.G(ctx).WithError(err).Error("Error handling node status update")
|
||||
}
|
||||
@@ -428,42 +442,117 @@ func updateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *
|
||||
// just so we don't have to allocate this on every get request
|
||||
var emptyGetOptions = metav1.GetOptions{}
|
||||
|
||||
// patchNodeStatus patches node status.
|
||||
// Copied from github.com/kubernetes/kubernetes/pkg/util/node
|
||||
func patchNodeStatus(nodes v1.NodeInterface, nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) (*corev1.Node, []byte, error) {
|
||||
patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
func prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode *corev1.Node) ([]byte, error) {
|
||||
// We use these two values to calculate a patch. We use a three-way patch in order to avoid
|
||||
// causing state regression server side. For example, let's consider the scenario:
|
||||
/*
|
||||
UML Source:
|
||||
@startuml
|
||||
participant VK
|
||||
participant K8s
|
||||
participant ExternalUpdater
|
||||
note right of VK: Updates internal node conditions to [A, B]
|
||||
VK->K8s: Patch Upsert [A, B]
|
||||
note left of K8s: Node conditions are [A, B]
|
||||
ExternalUpdater->K8s: Patch Upsert [C]
|
||||
note left of K8s: Node Conditions are [A, B, C]
|
||||
note right of VK: Updates internal node conditions to [A]
|
||||
VK->K8s: Patch: delete B, upsert A\nThis is where things go wrong,\nbecause the patch is written to replace all node conditions\nit overwrites (drops) [C]
|
||||
note left of K8s: Node Conditions are [A]\nNode condition C from ExternalUpdater is no longer present
|
||||
@enduml
|
||||
,--. ,---. ,---------------.
|
||||
|VK| |K8s| |ExternalUpdater|
|
||||
`+-' `-+-' `-------+-------'
|
||||
| ,------------------------------------------!. | |
|
||||
| |Updates internal node conditions to [A, B]|_\ | |
|
||||
| `--------------------------------------------' | |
|
||||
| Patch Upsert [A, B] | |
|
||||
| -----------------------------------------------------------> |
|
||||
| | |
|
||||
| ,--------------------------!. | |
|
||||
| |Node conditions are [A, B]|_\| |
|
||||
| `----------------------------'| |
|
||||
| | Patch Upsert [C] |
|
||||
| | <-------------------
|
||||
| | |
|
||||
| ,-----------------------------!. | |
|
||||
| |Node Conditions are [A, B, C]|_\| |
|
||||
| `-------------------------------'| |
|
||||
| ,---------------------------------------!. | |
|
||||
| |Updates internal node conditions to [A]|_\ | |
|
||||
| `-----------------------------------------' | |
|
||||
| Patch: delete B, upsert A | |
|
||||
| This is where things go wrong, | |
|
||||
| because the patch is written to replace all node conditions| |
|
||||
| it overwrites (drops) [C] | |
|
||||
| -----------------------------------------------------------> |
|
||||
| | |
|
||||
,----------------------------------------------------------!. | |
|
||||
|Node Conditions are [A] |_\| |
|
||||
|Node condition C from ExternalUpdater is no longer present || |
|
||||
`------------------------------------------------------------'+-. ,-------+-------.
|
||||
|VK| |K8s| |ExternalUpdater|
|
||||
`--' `---' `---------------'
|
||||
*/
|
||||
// In order to calculate that last patch to delete B, and upsert C, we need to know that C was added by
|
||||
// "someone else". So, we keep track of our last applied value, and our current value. We then generate
|
||||
// our patch based on the diff of these and *not* server side state.
|
||||
oldVKStatus, ok1 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus]
|
||||
oldVKObjectMeta, ok2 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta]
|
||||
|
||||
oldNode := corev1.Node{}
|
||||
// Check if there were no labels, which means someone else probably created the node, or this is an upgrade. Either way, we will consider
|
||||
// ourselves as never having written the node object before, so oldNode will be left empty. We will overwrite values if
|
||||
// our new node conditions / status / objectmeta have them
|
||||
if ok1 && ok2 {
|
||||
err := json.Unmarshal([]byte(oldVKObjectMeta), &oldNode.ObjectMeta)
|
||||
if err != nil {
|
||||
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node object metadata (key: %q): %q", virtualKubeletLastNodeAppliedObjectMeta, oldVKObjectMeta)
|
||||
}
|
||||
err = json.Unmarshal([]byte(oldVKStatus), &oldNode.Status)
|
||||
if err != nil {
|
||||
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node status (key: %q): %q", virtualKubeletLastNodeAppliedNodeStatus, oldVKStatus)
|
||||
}
|
||||
}
|
||||
|
||||
updatedNode, err := nodes.Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
|
||||
}
|
||||
return updatedNode, patchBytes, nil
|
||||
}
|
||||
// newNode is the representation of the node the provider "wants"
|
||||
newNode := corev1.Node{}
|
||||
newNode.ObjectMeta = simplestObjectMetadata(&apiServerNode.ObjectMeta, &nodeFromProvider.ObjectMeta)
|
||||
nodeFromProvider.Status.DeepCopyInto(&newNode.Status)
|
||||
|
||||
func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) ([]byte, error) {
|
||||
oldData, err := json.Marshal(oldNode)
|
||||
// virtualKubeletLastNodeAppliedObjectMeta must always be set before virtualKubeletLastNodeAppliedNodeStatus,
|
||||
// otherwise we capture virtualKubeletLastNodeAppliedNodeStatus in virtualKubeletLastNodeAppliedObjectMeta,
|
||||
// which is wrong
|
||||
virtualKubeletLastNodeAppliedObjectMetaBytes, err := json.Marshal(newNode.ObjectMeta)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err)
|
||||
return nil, pkgerrors.Wrap(err, "Cannot marshal object meta from provider")
|
||||
}
|
||||
newNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta] = string(virtualKubeletLastNodeAppliedObjectMetaBytes)
|
||||
|
||||
// Reset spec to make sure only patch for Status or ObjectMeta is generated.
|
||||
// Note that we don't reset ObjectMeta here, because:
|
||||
// 1. This aligns with Nodes().UpdateStatus().
|
||||
// 2. Some component does use this to update node annotations.
|
||||
newNode.Spec = oldNode.Spec
|
||||
newData, err := json.Marshal(newNode)
|
||||
virtualKubeletLastNodeAppliedNodeStatusBytes, err := json.Marshal(newNode.Status)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err)
|
||||
return nil, pkgerrors.Wrap(err, "Cannot marshal node status from provider")
|
||||
}
|
||||
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
|
||||
newNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus] = string(virtualKubeletLastNodeAppliedNodeStatusBytes)
|
||||
// Generate three way patch from oldNode -> newNode, without deleting the changes in api server
|
||||
// Should we use the Kubernetes serialization / deserialization libraries here?
|
||||
oldNodeBytes, err := json.Marshal(oldNode)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err)
|
||||
return nil, pkgerrors.Wrap(err, "Cannot marshal old node bytes")
|
||||
}
|
||||
return patchBytes, nil
|
||||
newNodeBytes, err := json.Marshal(newNode)
|
||||
if err != nil {
|
||||
return nil, pkgerrors.Wrap(err, "Cannot marshal new node bytes")
|
||||
}
|
||||
apiServerNodeBytes, err := json.Marshal(apiServerNode)
|
||||
if err != nil {
|
||||
return nil, pkgerrors.Wrap(err, "Cannot marshal api server node")
|
||||
}
|
||||
schema, err := strategicpatch.NewPatchMetaFromStruct(&corev1.Node{})
|
||||
if err != nil {
|
||||
return nil, pkgerrors.Wrap(err, "Cannot get patch schema from node")
|
||||
}
|
||||
return strategicpatch.CreateThreeWayMergePatch(oldNodeBytes, newNodeBytes, apiServerNodeBytes, schema, true)
|
||||
}
|
||||
|
||||
// updateNodeStatus triggers an update to the node status in Kubernetes.
|
||||
@@ -472,37 +561,44 @@ func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Nod
|
||||
//
|
||||
// If you use this function, it is up to you to synchronize this with other operations.
|
||||
// This reduces the time to second-level precision.
|
||||
func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (_ *corev1.Node, retErr error) {
|
||||
func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvider *corev1.Node) (_ *corev1.Node, retErr error) {
|
||||
ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus")
|
||||
defer func() {
|
||||
span.End()
|
||||
span.SetStatus(retErr)
|
||||
}()
|
||||
|
||||
var node *corev1.Node
|
||||
var updatedNode *corev1.Node
|
||||
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
apiServerNode, err := nodes.Get(nodeFromProvider.Name, emptyGetOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx = addNodeAttributes(ctx, span, apiServerNode)
|
||||
log.G(ctx).Debug("got node from api server")
|
||||
|
||||
patchBytes, err := prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode)
|
||||
if err != nil {
|
||||
return pkgerrors.Wrap(err, "Cannot generate patch")
|
||||
}
|
||||
log.G(ctx).WithError(err).WithField("patch", string(patchBytes)).Debug("Generated three way patch")
|
||||
|
||||
updatedNode, err = nodes.Patch(nodeFromProvider.Name, types.StrategicMergePatchType, patchBytes, "status")
|
||||
if err != nil {
|
||||
// We cannot wrap this error because the kubernetes error module doesn't understand wrapping
|
||||
log.G(ctx).WithField("patch", string(patchBytes)).WithError(err).Warn("Failed to patch node status")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
oldNode, err := nodes.Get(n.Name, emptyGetOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.G(ctx).Debug("got node from api server")
|
||||
node = oldNode.DeepCopy()
|
||||
node.ResourceVersion = ""
|
||||
node.Status = n.Status
|
||||
|
||||
ctx = addNodeAttributes(ctx, span, node)
|
||||
|
||||
// Patch the node status to merge other changes on the node.
|
||||
updated, _, err := patchNodeStatus(nodes, types.NodeName(n.Name), oldNode, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.G(ctx).WithField("node.resourceVersion", updated.ResourceVersion).
|
||||
WithField("node.Status.Conditions", updated.Status.Conditions).
|
||||
log.G(ctx).WithField("node.resourceVersion", updatedNode.ResourceVersion).
|
||||
WithField("node.Status.Conditions", updatedNode.Status.Conditions).
|
||||
Debug("updated node status in api server")
|
||||
return updated, nil
|
||||
return updatedNode, nil
|
||||
}
|
||||
|
||||
func newLease(base *coord.Lease) *coord.Lease {
|
||||
@@ -577,3 +673,31 @@ func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) con
|
||||
"node.taints": taintsStringer(n.Spec.Taints),
|
||||
})
|
||||
}
|
||||
|
||||
// Provides the simplest object metadata to match the previous object. Name, namespace, UID. It copies labels and
|
||||
// annotations from the second object if defined. It exempts the patch metadata
|
||||
func simplestObjectMetadata(baseObjectMeta, objectMetaWithLabelsAndAnnotations *metav1.ObjectMeta) metav1.ObjectMeta {
|
||||
ret := metav1.ObjectMeta{
|
||||
Namespace: baseObjectMeta.Namespace,
|
||||
Name: baseObjectMeta.Name,
|
||||
UID: baseObjectMeta.UID,
|
||||
Annotations: make(map[string]string),
|
||||
}
|
||||
if objectMetaWithLabelsAndAnnotations != nil {
|
||||
if objectMetaWithLabelsAndAnnotations.Labels != nil {
|
||||
ret.Labels = objectMetaWithLabelsAndAnnotations.Labels
|
||||
} else {
|
||||
ret.Labels = make(map[string]string)
|
||||
}
|
||||
if objectMetaWithLabelsAndAnnotations.Annotations != nil {
|
||||
// We want to copy over all annotations except the special embedded ones.
|
||||
for key := range objectMetaWithLabelsAndAnnotations.Annotations {
|
||||
if key == virtualKubeletLastNodeAppliedNodeStatus || key == virtualKubeletLastNodeAppliedObjectMeta {
|
||||
continue
|
||||
}
|
||||
ret.Annotations[key] = objectMetaWithLabelsAndAnnotations.Annotations[key]
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
@@ -22,12 +22,14 @@ import (
|
||||
|
||||
"gotest.tools/assert"
|
||||
"gotest.tools/assert/cmp"
|
||||
is "gotest.tools/assert/cmp"
|
||||
coord "k8s.io/api/coordination/v1beta1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
watch "k8s.io/apimachinery/pkg/watch"
|
||||
testclient "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/util/retry"
|
||||
)
|
||||
|
||||
func TestNodeRun(t *testing.T) {
|
||||
@@ -374,6 +376,252 @@ func TestPingAfterStatusUpdate(t *testing.T) {
|
||||
assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval)
|
||||
}
|
||||
|
||||
// Are annotations that were created before the VK existed preserved?
|
||||
func TestBeforeAnnotationsPreserved(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
c := testclient.NewSimpleClientset()
|
||||
|
||||
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
|
||||
|
||||
nodes := c.CoreV1().Nodes()
|
||||
|
||||
interval := 10 * time.Millisecond
|
||||
opts := []NodeControllerOpt{
|
||||
WithNodePingInterval(interval),
|
||||
}
|
||||
|
||||
testNode := testNode(t)
|
||||
testNodeCreateCopy := testNode.DeepCopy()
|
||||
testNodeCreateCopy.Annotations = map[string]string{
|
||||
"beforeAnnotation": "value",
|
||||
}
|
||||
_, err := nodes.Create(testNodeCreateCopy)
|
||||
assert.NilError(t, err)
|
||||
|
||||
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
|
||||
// so it will trigger the race detector.
|
||||
testNodeCopy := testNode.DeepCopy()
|
||||
node, err := NewNodeController(testP, testNode, nodes, opts...)
|
||||
assert.NilError(t, err)
|
||||
|
||||
chErr := make(chan error)
|
||||
defer func() {
|
||||
cancel()
|
||||
assert.NilError(t, <-chErr)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
chErr <- node.Run(ctx)
|
||||
close(chErr)
|
||||
}()
|
||||
|
||||
nw := makeWatch(t, nodes, testNodeCopy.Name)
|
||||
defer nw.Stop()
|
||||
nr := nw.ResultChan()
|
||||
|
||||
t.Log("Waiting for node to exist")
|
||||
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
|
||||
if e.Object == nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}))
|
||||
|
||||
testP.notifyNodeStatus(&corev1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
"testAnnotation": "value",
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
|
||||
if e.Object == nil {
|
||||
return false
|
||||
}
|
||||
_, ok := e.Object.(*corev1.Node).Annotations["testAnnotation"]
|
||||
|
||||
return ok
|
||||
}))
|
||||
|
||||
newNode, err := nodes.Get(testNodeCopy.Name, emptyGetOptions)
|
||||
assert.NilError(t, err)
|
||||
|
||||
assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation"))
|
||||
assert.Assert(t, is.Contains(newNode.Annotations, "beforeAnnotation"))
|
||||
}
|
||||
|
||||
// Are conditions set by systems outside of VK preserved?
|
||||
func TestManualConditionsPreserved(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
c := testclient.NewSimpleClientset()
|
||||
|
||||
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
|
||||
|
||||
nodes := c.CoreV1().Nodes()
|
||||
|
||||
interval := 10 * time.Millisecond
|
||||
opts := []NodeControllerOpt{
|
||||
WithNodePingInterval(interval),
|
||||
}
|
||||
|
||||
testNode := testNode(t)
|
||||
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
|
||||
// so it will trigger the race detector.
|
||||
testNodeCopy := testNode.DeepCopy()
|
||||
node, err := NewNodeController(testP, testNode, nodes, opts...)
|
||||
assert.NilError(t, err)
|
||||
|
||||
chErr := make(chan error)
|
||||
defer func() {
|
||||
cancel()
|
||||
assert.NilError(t, <-chErr)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
chErr <- node.Run(ctx)
|
||||
close(chErr)
|
||||
}()
|
||||
|
||||
nw := makeWatch(t, nodes, testNodeCopy.Name)
|
||||
defer nw.Stop()
|
||||
nr := nw.ResultChan()
|
||||
|
||||
t.Log("Waiting for node to exist")
|
||||
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
|
||||
if e.Object == nil {
|
||||
return false
|
||||
}
|
||||
receivedNode := e.Object.(*corev1.Node)
|
||||
if len(receivedNode.Status.Conditions) != 0 {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}))
|
||||
|
||||
newNode, err := nodes.Get(testNodeCopy.Name, emptyGetOptions)
|
||||
assert.NilError(t, err)
|
||||
assert.Assert(t, is.Len(newNode.Status.Conditions, 0))
|
||||
|
||||
baseCondition := corev1.NodeCondition{
|
||||
Type: "BaseCondition",
|
||||
Status: "Ok",
|
||||
Reason: "NA",
|
||||
Message: "This is the base condition. It is set by VK, and should always be there.",
|
||||
}
|
||||
|
||||
testP.notifyNodeStatus(&corev1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
"testAnnotation": "value",
|
||||
},
|
||||
},
|
||||
Status: corev1.NodeStatus{
|
||||
Conditions: []corev1.NodeCondition{
|
||||
baseCondition,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// Wait for this (node with condition) to show up
|
||||
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
|
||||
receivedNode := e.Object.(*corev1.Node)
|
||||
for _, condition := range receivedNode.Status.Conditions {
|
||||
if condition.Type == baseCondition.Type {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}))
|
||||
|
||||
newNode, err = nodes.Get(testNodeCopy.Name, emptyGetOptions)
|
||||
assert.NilError(t, err)
|
||||
assert.Assert(t, is.Len(newNode.Status.Conditions, 1))
|
||||
assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation"))
|
||||
|
||||
// Add a new event manually
|
||||
manuallyAddedCondition := corev1.NodeCondition{
|
||||
Type: "ManuallyAddedCondition",
|
||||
Status: "Ok",
|
||||
Reason: "NA",
|
||||
Message: "This is a manually added condition. Outside of VK. It should not be removed.",
|
||||
}
|
||||
assert.NilError(t, retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
newNode, err = nodes.Get(testNodeCopy.Name, emptyGetOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newNode.Annotations["manuallyAddedAnnotation"] = "value"
|
||||
newNode.Status.Conditions = append(newNode.Status.Conditions, manuallyAddedCondition)
|
||||
_, err = nodes.UpdateStatus(newNode)
|
||||
return err
|
||||
}))
|
||||
|
||||
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
|
||||
receivedNode := e.Object.(*corev1.Node)
|
||||
for _, condition := range receivedNode.Status.Conditions {
|
||||
if condition.Type == manuallyAddedCondition.Type {
|
||||
return true
|
||||
}
|
||||
}
|
||||
assert.Assert(t, is.Contains(receivedNode.Annotations, "testAnnotation"))
|
||||
assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation"))
|
||||
|
||||
return false
|
||||
}))
|
||||
|
||||
// Let's have the VK have a new condition.
|
||||
newCondition := corev1.NodeCondition{
|
||||
Type: "NewCondition",
|
||||
Status: "Ok",
|
||||
Reason: "NA",
|
||||
Message: "This is a newly added condition. It should only show up *with* / *after* ManuallyAddedCondition. It is set by the VK.",
|
||||
}
|
||||
|
||||
// Everything but node status is ignored here
|
||||
testP.notifyNodeStatus(&corev1.Node{
|
||||
// Annotations is left empty
|
||||
Status: corev1.NodeStatus{
|
||||
Conditions: []corev1.NodeCondition{
|
||||
baseCondition,
|
||||
newCondition,
|
||||
},
|
||||
},
|
||||
})
|
||||
i := 0
|
||||
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
|
||||
receivedNode := e.Object.(*corev1.Node)
|
||||
for _, condition := range receivedNode.Status.Conditions {
|
||||
if condition.Type == newCondition.Type {
|
||||
// Wait for 2 updates / patches
|
||||
if i > 2 {
|
||||
return true
|
||||
}
|
||||
i++
|
||||
}
|
||||
}
|
||||
return false
|
||||
}))
|
||||
|
||||
// Make sure that all three conditions are there.
|
||||
newNode, err = nodes.Get(testNodeCopy.Name, emptyGetOptions)
|
||||
assert.NilError(t, err)
|
||||
seenConditionTypes := make([]corev1.NodeConditionType, len(newNode.Status.Conditions))
|
||||
for idx := range newNode.Status.Conditions {
|
||||
seenConditionTypes[idx] = newNode.Status.Conditions[idx].Type
|
||||
}
|
||||
assert.Assert(t, is.Contains(seenConditionTypes, baseCondition.Type))
|
||||
assert.Assert(t, is.Contains(seenConditionTypes, newCondition.Type))
|
||||
assert.Assert(t, is.Contains(seenConditionTypes, manuallyAddedCondition.Type))
|
||||
assert.Assert(t, is.Equal(newNode.Annotations["testAnnotation"], ""))
|
||||
assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation"))
|
||||
|
||||
t.Log(newNode.Status.Conditions)
|
||||
}
|
||||
func testNode(t *testing.T) *corev1.Node {
|
||||
n := &corev1.Node{}
|
||||
n.Name = strings.ToLower(t.Name())
|
||||
@@ -383,16 +631,19 @@ func testNode(t *testing.T) *corev1.Node {
|
||||
type testNodeProvider struct {
|
||||
NodeProvider
|
||||
statusHandlers []func(*corev1.Node)
|
||||
// Callback to VK
|
||||
notifyNodeStatus func(*corev1.Node)
|
||||
}
|
||||
|
||||
func (p *testNodeProvider) NotifyNodeStatus(ctx context.Context, h func(*corev1.Node)) {
|
||||
p.statusHandlers = append(p.statusHandlers, h)
|
||||
p.notifyNodeStatus = h
|
||||
}
|
||||
|
||||
func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
|
||||
for _, h := range p.statusHandlers {
|
||||
h(n)
|
||||
}
|
||||
p.notifyNodeStatus(n)
|
||||
}
|
||||
|
||||
// testNodeProviderPing tracks the maximum time interval between calls to Ping
|
||||
|
||||
Reference in New Issue
Block a user