Use go-cmp to compare pods to suppress duplicate updates
Rather than copying the pods, this uses go-cmp and filters out the paths which should not be compared.
This commit is contained in:
@@ -17,10 +17,10 @@ package node
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
pkgerrors "github.com/pkg/errors"
|
pkgerrors "github.com/pkg/errors"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
||||||
@@ -241,14 +241,11 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
|||||||
},
|
},
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
// Create a copy of the old and new pod objects so we don't mutate the cache.
|
// Create a copy of the old and new pod objects so we don't mutate the cache.
|
||||||
oldPod := oldObj.(*corev1.Pod).DeepCopy()
|
oldPod := oldObj.(*corev1.Pod)
|
||||||
newPod := newObj.(*corev1.Pod).DeepCopy()
|
newPod := newObj.(*corev1.Pod)
|
||||||
// We want to check if the two objects differ in anything other than their resource versions.
|
|
||||||
// Hence, we make them equal so that this change isn't picked up by reflect.DeepEqual.
|
|
||||||
newPod.ResourceVersion = oldPod.ResourceVersion
|
|
||||||
// Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed.
|
// Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed.
|
||||||
// This guarantees that we don't attempt to sync the pod every time its .status field is updated.
|
// This guarantees that we don't attempt to sync the pod every time its .status field is updated.
|
||||||
if reflect.DeepEqual(oldPod.ObjectMeta, newPod.ObjectMeta) && reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
|
if podsEffectivelyEqual(oldPod, newPod) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
|
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
|
||||||
@@ -503,3 +500,18 @@ func loggablePodName(pod *corev1.Pod) string {
|
|||||||
func loggablePodNameFromCoordinates(namespace, name string) string {
|
func loggablePodNameFromCoordinates(namespace, name string) string {
|
||||||
return fmt.Sprintf("%s/%s", namespace, name)
|
return fmt.Sprintf("%s/%s", namespace, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// podsEffectivelyEqual compares two pods, and ignores the pod status, and the resource version
|
||||||
|
func podsEffectivelyEqual(p1, p2 *corev1.Pod) bool {
|
||||||
|
filterForResourceVersion := func(p cmp.Path) bool {
|
||||||
|
if p.String() == "ObjectMeta.ResourceVersion" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if p.String() == "Status" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return cmp.Equal(p1, p2, cmp.FilterPath(filterForResourceVersion, cmp.Ignore()))
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gotest.tools/assert"
|
"gotest.tools/assert"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPodControllerExitOnContextCancel(t *testing.T) {
|
func TestPodControllerExitOnContextCancel(t *testing.T) {
|
||||||
@@ -42,3 +44,49 @@ func TestPodControllerExitOnContextCancel(t *testing.T) {
|
|||||||
}
|
}
|
||||||
assert.NilError(t, tc.Err())
|
assert.NilError(t, tc.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCompareResourceVersion(t *testing.T) {
|
||||||
|
p1 := &corev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
p2 := &corev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "2",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.Assert(t, podsEffectivelyEqual(p1, p2))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompareStatus(t *testing.T) {
|
||||||
|
p1 := &corev1.Pod{
|
||||||
|
Status: corev1.PodStatus{
|
||||||
|
Phase: corev1.PodRunning,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
p2 := &corev1.Pod{
|
||||||
|
Status: corev1.PodStatus{
|
||||||
|
Phase: corev1.PodFailed,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.Assert(t, podsEffectivelyEqual(p1, p2))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompareLabels(t *testing.T) {
|
||||||
|
p1 := &corev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"foo": "bar1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
p2 := &corev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"foo": "bar2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.Assert(t, !podsEffectivelyEqual(p1, p2))
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user