Allow providers to update pod statuses

We had added an optimization that made it so we dedupe pod status updates
from the provider. This ignored two subfields that could be updated along
with status.

Because the details of subresource updating is a bit API server centric,
I wrote an envtest which checks for this behaviour.

Signed-off-by: Sargun Dhillon <sargun@sargun.me>
This commit is contained in:
Sargun Dhillon
2021-02-15 17:34:01 -08:00
parent 7feb175720
commit c4582ccfbc
4 changed files with 89 additions and 10 deletions

View File

@@ -16,6 +16,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
klogv2 "k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/envtest"
@@ -42,9 +43,11 @@ func TestEnvtest(t *testing.T) {
t.Run("E2ERunWithLeases", wrapE2ETest(ctx, env, func(ctx context.Context, t *testing.T, environment *envtest.Environment) {
testNodeE2ERun(t, env, true)
}))
t.Run("E2EPodStatusUpdate", wrapE2ETest(ctx, env, testPodStatusUpdate))
}
func nodeNameForTest(t *testing.T) string {
func kubernetesNameForTest(t *testing.T) string {
name := t.Name()
name = strings.ToLower(name)
name = strings.ReplaceAll(name, "/", "-")
@@ -67,6 +70,72 @@ func wrapE2ETest(ctx context.Context, env *envtest.Environment, f func(context.C
}
}
func testPodStatusUpdate(ctx context.Context, t *testing.T, env *envtest.Environment) {
provider := newMockProvider()
clientset, err := kubernetes.NewForConfig(env.Config)
assert.NilError(t, err)
pods := clientset.CoreV1().Pods(testNamespace)
assert.NilError(t, wireUpSystemWithClient(ctx, provider, clientset, func(ctx context.Context, s *system) {
p := newPod(forRealAPIServer, nameBasedOnTest(t))
// In real API server, we don't set the resource version
p.ResourceVersion = ""
newPod, err := pods.Create(ctx, p, metav1.CreateOptions{})
assert.NilError(t, err)
key, err := buildKey(newPod)
assert.NilError(t, err)
listOptions := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(),
}
// Setup a watch to check if the pod is in running
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
assert.NilError(t, err)
defer watcher.Stop()
// Start the pod controller
assert.NilError(t, s.start(ctx))
var serverPod *corev1.Pod
for {
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case ev := <-watcher.ResultChan():
serverPod = ev.Object.(*corev1.Pod)
if serverPod.Status.Phase == corev1.PodRunning {
goto running
}
}
}
running:
t.Log("Observed pod in running state")
providerPod, ok := provider.pods.Load(key)
assert.Assert(t, ok)
providerPodCopy := providerPod.(*corev1.Pod).DeepCopy()
providerPodCopy.Status = serverPod.Status
if providerPodCopy.Annotations == nil {
providerPodCopy.Annotations = make(map[string]string, 1)
}
providerPodCopy.Annotations["testannotation"] = "testvalue"
provider.notifier(providerPodCopy)
for {
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case ev := <-watcher.ResultChan():
annotations := ev.Object.(*corev1.Pod).Annotations
if annotations != nil && annotations["testannotation"] == "testvalue" {
return
}
}
}
}))
}
func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -81,7 +150,7 @@ func testNodeE2ERun(t *testing.T, env *envtest.Environment, withLeases bool) {
testNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeNameForTest(t),
Name: kubernetesNameForTest(t),
},
}

View File

@@ -628,6 +628,17 @@ func randomizeName(pod *corev1.Pod) {
pod.Name = name
}
func forRealAPIServer(pod *corev1.Pod) {
pod.ResourceVersion = ""
pod.ObjectMeta.UID = ""
}
func nameBasedOnTest(t *testing.T) podModifier {
return func(pod *corev1.Pod) {
pod.Name = kubernetesNameForTest(t)
}
}
func newPod(podmodifiers ...podModifier) *corev1.Pod {
var terminationGracePeriodSeconds int64 = 30
pod := &corev1.Pod{

View File

@@ -215,14 +215,7 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
}
kPod := obj.(*knownPod)
kPod.Lock()
podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy()
if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) && podFromProvider.DeletionTimestamp == nil {
kPod.lastPodStatusUpdateSkipped = true
kPod.Unlock()
return nil
}
kPod.lastPodStatusUpdateSkipped = false
kPod.Unlock()
// Pod deleted by provider due some reasons. e.g. a K8s provider, pod created by deployment would be evicted when node is not ready.
// If we do not delete pod in K8s, deployment would not create a new one.
@@ -326,9 +319,11 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1
kpod := obj.(*knownPod)
kpod.Lock()
if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) {
kpod.lastPodStatusUpdateSkipped = true
kpod.Unlock()
return
}
kpod.lastPodStatusUpdateSkipped = false
kpod.lastPodStatusReceivedFromProvider = pod
kpod.Unlock()
pc.syncPodStatusFromProvider.Enqueue(ctx, key)

View File

@@ -339,7 +339,11 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
kPod := obj.(*knownPod)
kPod.Lock()
if kPod.lastPodStatusUpdateSkipped && !cmp.Equal(newPod.Status, kPod.lastPodStatusReceivedFromProvider.Status) {
if kPod.lastPodStatusUpdateSkipped &&
(!cmp.Equal(newPod.Status, kPod.lastPodStatusReceivedFromProvider.Status) ||
!cmp.Equal(newPod.Annotations, kPod.lastPodStatusReceivedFromProvider.Annotations) ||
!cmp.Equal(newPod.Labels, kPod.lastPodStatusReceivedFromProvider.Labels) ||
!cmp.Equal(newPod.Finalizers, kPod.lastPodStatusReceivedFromProvider.Finalizers)) {
// The last pod from the provider -> kube api server was skipped, but we see they no longer match.
// This means that the pod in API server was changed by someone else [this can be okay], but we skipped
// a status update on our side because we compared the status received from the provider to the status