Allow to delete pod in K8s if it is deleting in Provider
For example: Provier is a K8s provider, pod created by deployment would be evicted when node is not ready. If we do not delete pod in K8s, deployment would not create a new one. Add some tests for updateStatus
This commit is contained in:
24
node/pod.go
24
node/pod.go
@@ -216,14 +216,34 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
|
||||
kPod.Lock()
|
||||
podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy()
|
||||
kPod.Unlock()
|
||||
if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) {
|
||||
if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) && podFromProvider.DeletionTimestamp == nil {
|
||||
return nil
|
||||
}
|
||||
// Pod deleted by provider due some reasons. e.g. a K8s provider, pod created by deployment would be evicted when node is not ready.
|
||||
// If we do not delete pod in K8s, deployment would not create a new one.
|
||||
if podFromProvider.DeletionTimestamp != nil && podFromKubernetes.DeletionTimestamp == nil {
|
||||
deleteOptions := metav1.DeleteOptions{
|
||||
GracePeriodSeconds: podFromProvider.DeletionGracePeriodSeconds,
|
||||
}
|
||||
current := metav1.NewTime(time.Now())
|
||||
if podFromProvider.DeletionTimestamp.Before(¤t) {
|
||||
deleteOptions.GracePeriodSeconds = new(int64)
|
||||
}
|
||||
// check status here to avoid pod re-created deleted incorrectly. e.g. delete a pod from K8s and re-create it(statefulSet and so on),
|
||||
// pod in provider may not delete immediately. so deletionTimestamp is not nil. Then the re-created one would be deleted if we do not check pod status.
|
||||
if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) {
|
||||
if err := pc.client.Pods(podFromKubernetes.Namespace).Delete(ctx, podFromKubernetes.Name, deleteOptions); err != nil && !errors.IsNotFound(err) {
|
||||
span.SetStatus(err)
|
||||
return pkgerrors.Wrap(err, "error while delete pod in kubernetes")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating
|
||||
// the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore
|
||||
// we need to copy the pod and set ResourceVersion to 0.
|
||||
podFromProvider.ResourceVersion = "0"
|
||||
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(ctx, podFromProvider, metav1.UpdateOptions{}); err != nil {
|
||||
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(ctx, podFromProvider, metav1.UpdateOptions{}); err != nil && !errors.IsNotFound(err) {
|
||||
span.SetStatus(err)
|
||||
return pkgerrors.Wrap(err, "error while updating pod status in kubernetes")
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -23,6 +25,7 @@ import (
|
||||
"gotest.tools/assert"
|
||||
is "gotest.tools/assert/cmp"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
@@ -52,6 +55,7 @@ func newTestController() *TestController {
|
||||
podStatusQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
||||
done: make(chan struct{}),
|
||||
ready: make(chan struct{}),
|
||||
knownPods: sync.Map{},
|
||||
podsInformer: iFactory.Core().V1().Pods(),
|
||||
podsLister: iFactory.Core().V1().Pods().Lister(),
|
||||
},
|
||||
@@ -242,6 +246,71 @@ func TestPodNoSpecChange(t *testing.T) {
|
||||
assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
|
||||
}
|
||||
|
||||
func TestPodStatusDelete(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
c := newTestController()
|
||||
pod := &corev1.Pod{}
|
||||
pod.ObjectMeta.Namespace = "default"
|
||||
pod.ObjectMeta.Name = "nginx"
|
||||
pod.Spec = newPodSpec()
|
||||
fk8s := fake.NewSimpleClientset(pod)
|
||||
c.client = fk8s
|
||||
c.PodController.client = fk8s.CoreV1()
|
||||
podCopy := pod.DeepCopy()
|
||||
deleteTime := v1.Time{Time: time.Now().Add(30 * time.Second)}
|
||||
podCopy.DeletionTimestamp = &deleteTime
|
||||
key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
|
||||
c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy})
|
||||
|
||||
// test pod in provider delete
|
||||
err := c.updatePodStatus(ctx, pod, key)
|
||||
if err != nil {
|
||||
t.Fatal("pod updated failed")
|
||||
}
|
||||
newPod, err := c.client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, v1.GetOptions{})
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
t.Fatalf("Get pod %v failed", key)
|
||||
}
|
||||
if newPod != nil && newPod.DeletionTimestamp == nil {
|
||||
t.Fatalf("Pod %v delete failed", key)
|
||||
}
|
||||
t.Logf("pod delete success")
|
||||
|
||||
// test pod in provider delete
|
||||
pod.DeletionTimestamp = &deleteTime
|
||||
if _, err = c.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, v1.CreateOptions{}); err != nil {
|
||||
t.Fatal("Parepare pod in k8s failed")
|
||||
}
|
||||
podCopy.Status.ContainerStatuses = []corev1.ContainerStatus{
|
||||
{
|
||||
State: corev1.ContainerState{
|
||||
Waiting: nil,
|
||||
Running: nil,
|
||||
Terminated: &corev1.ContainerStateTerminated{
|
||||
ExitCode: 1,
|
||||
Message: "Exit",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy})
|
||||
err = c.updatePodStatus(ctx, pod, key)
|
||||
if err != nil {
|
||||
t.Fatalf("pod updated failed %v", err)
|
||||
}
|
||||
newPod, err = c.client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, v1.GetOptions{})
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
t.Fatalf("Get pod %v failed", key)
|
||||
}
|
||||
if newPod.DeletionTimestamp == nil {
|
||||
t.Fatalf("Pod %v delete failed", key)
|
||||
}
|
||||
if newPod.Status.ContainerStatuses[0].State.Terminated == nil {
|
||||
t.Fatalf("Pod status %v update failed", key)
|
||||
}
|
||||
t.Logf("pod updated, container status: %+v, pod delete Time: %v", newPod.Status.ContainerStatuses[0].State.Terminated, newPod.DeletionTimestamp)
|
||||
}
|
||||
|
||||
func newPodSpec() corev1.PodSpec {
|
||||
return corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
|
||||
Reference in New Issue
Block a user