Merge pull request #835 from cwdsuzhou/June/avoid_enqueue
Avoid enqueue when status of k8s pods change
This commit is contained in:
24
node/pod.go
24
node/pod.go
@@ -119,6 +119,30 @@ func podsEqual(pod1, pod2 *corev1.Pod) bool {
|
||||
|
||||
}
|
||||
|
||||
func deleteGraceTimeEqual(old, new *int64) bool {
|
||||
if old == nil && new == nil {
|
||||
return true
|
||||
}
|
||||
if old != nil && new != nil {
|
||||
return *old == *new
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// podShouldEnqueue checks if two pods equal according according to podsEqual func and DeleteTimeStamp
|
||||
func podShouldEnqueue(oldPod, newPod *corev1.Pod) bool {
|
||||
if !podsEqual(oldPod, newPod) {
|
||||
return true
|
||||
}
|
||||
if !deleteGraceTimeEqual(oldPod.DeletionGracePeriodSeconds, newPod.DeletionGracePeriodSeconds) {
|
||||
return true
|
||||
}
|
||||
if !oldPod.DeletionTimestamp.Equal(newPod.DeletionTimestamp) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) {
|
||||
podPhase := corev1.PodPending
|
||||
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"gotest.tools/assert"
|
||||
is "gotest.tools/assert/cmp"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
@@ -108,6 +109,76 @@ func TestPodsDifferentIgnoreValue(t *testing.T) {
|
||||
assert.Assert(t, podsEqual(p1, p2))
|
||||
}
|
||||
|
||||
func TestPodShouldEnqueueDifferentDeleteTimeStamp(t *testing.T) {
|
||||
p1 := &corev1.Pod{
|
||||
Spec: newPodSpec(),
|
||||
}
|
||||
|
||||
p2 := p1.DeepCopy()
|
||||
now := v1.NewTime(time.Now())
|
||||
p2.DeletionTimestamp = &now
|
||||
assert.Assert(t, podShouldEnqueue(p1, p2))
|
||||
}
|
||||
|
||||
func TestPodShouldEnqueueDifferentLabel(t *testing.T) {
|
||||
p1 := &corev1.Pod{
|
||||
Spec: newPodSpec(),
|
||||
}
|
||||
|
||||
p2 := p1.DeepCopy()
|
||||
p2.Labels = map[string]string{"test": "test"}
|
||||
assert.Assert(t, podShouldEnqueue(p1, p2))
|
||||
}
|
||||
|
||||
func TestPodShouldEnqueueDifferentAnnotation(t *testing.T) {
|
||||
p1 := &corev1.Pod{
|
||||
Spec: newPodSpec(),
|
||||
}
|
||||
|
||||
p2 := p1.DeepCopy()
|
||||
p2.Annotations = map[string]string{"test": "test"}
|
||||
assert.Assert(t, podShouldEnqueue(p1, p2))
|
||||
}
|
||||
|
||||
func TestPodShouldNotEnqueueDifferentStatus(t *testing.T) {
|
||||
p1 := &corev1.Pod{
|
||||
Spec: newPodSpec(),
|
||||
}
|
||||
|
||||
p2 := p1.DeepCopy()
|
||||
p2.Status.Phase = corev1.PodSucceeded
|
||||
assert.Assert(t, !podShouldEnqueue(p1, p2))
|
||||
}
|
||||
|
||||
func TestPodShouldEnqueueDifferentDeleteGraceTime(t *testing.T) {
|
||||
p1 := &corev1.Pod{
|
||||
Spec: newPodSpec(),
|
||||
}
|
||||
|
||||
p2 := p1.DeepCopy()
|
||||
oldTime := v1.NewTime(time.Now().Add(5))
|
||||
newTime := v1.NewTime(time.Now().Add(10))
|
||||
oldGraceTime := int64(5)
|
||||
newGraceTime := int64(10)
|
||||
p1.DeletionGracePeriodSeconds = &oldGraceTime
|
||||
p2.DeletionTimestamp = &oldTime
|
||||
|
||||
p2.DeletionGracePeriodSeconds = &newGraceTime
|
||||
p2.DeletionTimestamp = &newTime
|
||||
assert.Assert(t, podShouldEnqueue(p1, p2))
|
||||
}
|
||||
|
||||
func TestPodShouldEnqueueGraceTimeChanged(t *testing.T) {
|
||||
p1 := &corev1.Pod{
|
||||
Spec: newPodSpec(),
|
||||
}
|
||||
|
||||
p2 := p1.DeepCopy()
|
||||
graceTime := int64(30)
|
||||
p2.DeletionGracePeriodSeconds = &graceTime
|
||||
assert.Assert(t, podShouldEnqueue(p1, p2))
|
||||
}
|
||||
|
||||
func TestPodCreateNewPod(t *testing.T) {
|
||||
svr := newTestController()
|
||||
|
||||
|
||||
@@ -270,13 +270,16 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
// Create a copy of the old and new pod objects so we don't mutate the cache.
|
||||
oldPod := oldObj.(*corev1.Pod)
|
||||
newPod := newObj.(*corev1.Pod)
|
||||
|
||||
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
|
||||
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
|
||||
log.G(ctx).Error(err)
|
||||
} else {
|
||||
pc.k8sQ.AddRateLimited(key)
|
||||
if podShouldEnqueue(oldPod, newPod) {
|
||||
pc.k8sQ.AddRateLimited(key)
|
||||
}
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(pod interface{}) {
|
||||
|
||||
Reference in New Issue
Block a user