Merge pull request #760 from sargun/notify-pods-v7
Do not delete pods in a non-graceful manner
This commit is contained in:
@@ -94,8 +94,7 @@ func TestPodLifecycle(t *testing.T) {
|
|||||||
|
|
||||||
ctx = log.WithLogger(ctx, log.L)
|
ctx = log.WithLogger(ctx, log.L)
|
||||||
|
|
||||||
// isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_, which is the VK's
|
// isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_
|
||||||
// action when the pod is deleted from the provider
|
|
||||||
isPodDeletedPermanentlyFunc := func(ctx context.Context, watcher watch.Interface) error {
|
isPodDeletedPermanentlyFunc := func(ctx context.Context, watcher watch.Interface) error {
|
||||||
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
|
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
|
||||||
log.G(ctx).WithField("event", ev).Info("got event")
|
log.G(ctx).WithField("event", ev).Info("got event")
|
||||||
@@ -107,13 +106,23 @@ func TestPodLifecycle(t *testing.T) {
|
|||||||
return watchErr
|
return watchErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isPodDeletedGracefullyFunc is a condition func that waits until the pod is in a terminal state, which is the VK's
|
||||||
|
// action when the pod is deleted from the provider
|
||||||
|
isPodDeletedGracefullyFunc := func(ctx context.Context, watcher watch.Interface) error {
|
||||||
|
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
|
||||||
|
log.G(ctx).WithField("event", ev).Info("got event")
|
||||||
|
pod := ev.Object.(*corev1.Pod)
|
||||||
|
return (pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded) && pod.Status.Reason == mockProviderPodDeletedReason, nil
|
||||||
|
})
|
||||||
|
return watchErr
|
||||||
|
}
|
||||||
|
|
||||||
// createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting
|
// createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting
|
||||||
// it gracefully
|
// it gracefully
|
||||||
t.Run("createStartDeleteScenario", func(t *testing.T) {
|
t.Run("createStartDeleteScenario", func(t *testing.T) {
|
||||||
|
|
||||||
t.Run("mockProvider", func(t *testing.T) {
|
t.Run("mockProvider", func(t *testing.T) {
|
||||||
assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) {
|
assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) {
|
||||||
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
|
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedGracefullyFunc, true)
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@@ -124,7 +133,7 @@ func TestPodLifecycle(t *testing.T) {
|
|||||||
mp := newMockProvider()
|
mp := newMockProvider()
|
||||||
mp.errorOnDelete = errdefs.NotFound("not found")
|
mp.errorOnDelete = errdefs.NotFound("not found")
|
||||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||||
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
|
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc, false)
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -137,7 +146,7 @@ func TestPodLifecycle(t *testing.T) {
|
|||||||
}
|
}
|
||||||
mp.errorOnDelete = errors.New("random error")
|
mp.errorOnDelete = errors.New("random error")
|
||||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||||
testCreateStartDeleteScenario(ctx, t, s, deletionFunc)
|
testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false)
|
||||||
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
|
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
assert.Assert(t, is.Len(pods.Items, 1))
|
assert.Assert(t, is.Len(pods.Items, 1))
|
||||||
@@ -328,7 +337,7 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error) {
|
func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error, waitForPermanentDeletion bool) {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@@ -396,6 +405,18 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
|
|||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Setup a watch to look for the pod eventually going away completely
|
||||||
|
watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
|
||||||
|
assert.NilError(t, err)
|
||||||
|
defer watcher2.Stop()
|
||||||
|
waitDeleteCh := make(chan error)
|
||||||
|
go func() {
|
||||||
|
_, watchDeleteErr := watchutils.UntilWithoutRetry(ctx, watcher2, func(ev watch.Event) (bool, error) {
|
||||||
|
return ev.Type == watch.Deleted, nil
|
||||||
|
})
|
||||||
|
waitDeleteCh <- watchDeleteErr
|
||||||
|
}()
|
||||||
|
|
||||||
// Setup a watch prior to pod deletion
|
// Setup a watch prior to pod deletion
|
||||||
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
|
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
@@ -410,7 +431,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
|
|||||||
currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{})
|
currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
// 2. Set the pod's deletion timestamp, version, and so on
|
// 2. Set the pod's deletion timestamp, version, and so on
|
||||||
var deletionGracePeriod int64 = 30
|
var deletionGracePeriod int64 = 10
|
||||||
currentPod.DeletionGracePeriodSeconds = &deletionGracePeriod
|
currentPod.DeletionGracePeriodSeconds = &deletionGracePeriod
|
||||||
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
|
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
|
||||||
currentPod.DeletionTimestamp = &deletionTimestamp
|
currentPod.DeletionTimestamp = &deletionTimestamp
|
||||||
@@ -424,6 +445,15 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
|
|||||||
case err = <-watchErrCh:
|
case err = <-watchErrCh:
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if waitForPermanentDeletion {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatalf("Context ended early: %s", ctx.Err().Error())
|
||||||
|
case err = <-waitDeleteCh:
|
||||||
|
assert.NilError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) {
|
func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) {
|
||||||
@@ -526,6 +556,7 @@ func randomizeName(pod *corev1.Pod) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newPod(podmodifiers ...podModifier) *corev1.Pod {
|
func newPod(podmodifiers ...podModifier) *corev1.Pod {
|
||||||
|
var terminationGracePeriodSeconds int64 = 30
|
||||||
pod := &corev1.Pod{
|
pod := &corev1.Pod{
|
||||||
TypeMeta: metav1.TypeMeta{
|
TypeMeta: metav1.TypeMeta{
|
||||||
Kind: "Pod",
|
Kind: "Pod",
|
||||||
@@ -538,7 +569,14 @@ func newPod(podmodifiers ...podModifier) *corev1.Pod {
|
|||||||
ResourceVersion: "100",
|
ResourceVersion: "100",
|
||||||
},
|
},
|
||||||
Spec: corev1.PodSpec{
|
Spec: corev1.PodSpec{
|
||||||
NodeName: testNodeName,
|
NodeName: testNodeName,
|
||||||
|
TerminationGracePeriodSeconds: &terminationGracePeriodSeconds,
|
||||||
|
Containers: []corev1.Container{
|
||||||
|
{
|
||||||
|
Name: "my-container",
|
||||||
|
Image: "nginx",
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Status: corev1.PodStatus{
|
Status: corev1.PodStatus{
|
||||||
Phase: corev1.PodPending,
|
Phase: corev1.PodPending,
|
||||||
|
|||||||
@@ -12,6 +12,10 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
mockProviderPodDeletedReason = "MockProviderPodDeleted"
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
_ PodLifecycleHandler = (*mockProvider)(nil)
|
_ PodLifecycleHandler = (*mockProvider)(nil)
|
||||||
)
|
)
|
||||||
@@ -171,15 +175,19 @@ func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
|
|||||||
log.G(ctx).Infof("receive DeletePod %q", pod.Name)
|
log.G(ctx).Infof("receive DeletePod %q", pod.Name)
|
||||||
|
|
||||||
p.attemptedDeletes.increment()
|
p.attemptedDeletes.increment()
|
||||||
|
key, err := buildKey(pod)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if errdefs.IsNotFound(p.errorOnDelete) {
|
||||||
|
p.pods.Delete(key)
|
||||||
|
}
|
||||||
if p.errorOnDelete != nil {
|
if p.errorOnDelete != nil {
|
||||||
return p.errorOnDelete
|
return p.errorOnDelete
|
||||||
}
|
}
|
||||||
|
|
||||||
p.deletes.increment()
|
p.deletes.increment()
|
||||||
key, err := buildKey(pod)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, exists := p.pods.Load(key); !exists {
|
if _, exists := p.pods.Load(key); !exists {
|
||||||
return errdefs.NotFound("pod not found")
|
return errdefs.NotFound("pod not found")
|
||||||
@@ -188,7 +196,7 @@ func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
|
|||||||
now := metav1.Now()
|
now := metav1.Now()
|
||||||
|
|
||||||
pod.Status.Phase = v1.PodSucceeded
|
pod.Status.Phase = v1.PodSucceeded
|
||||||
pod.Status.Reason = "MockProviderPodDeleted"
|
pod.Status.Reason = mockProviderPodDeletedReason
|
||||||
|
|
||||||
for idx := range pod.Status.ContainerStatuses {
|
for idx := range pod.Status.ContainerStatuses {
|
||||||
pod.Status.ContainerStatuses[idx].Ready = false
|
pod.Status.ContainerStatuses[idx].Ready = false
|
||||||
|
|||||||
103
node/pod.go
103
node/pod.go
@@ -19,7 +19,6 @@ import (
|
|||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
pkgerrors "github.com/pkg/errors"
|
pkgerrors "github.com/pkg/errors"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
@@ -133,61 +132,19 @@ func (pc *PodController) handleProviderError(ctx context.Context, span trace.Spa
|
|||||||
span.SetStatus(origErr)
|
span.SetStatus(origErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pc *PodController) deletePod(ctx context.Context, namespace, name string) error {
|
func (pc *PodController) deletePod(ctx context.Context, pod *corev1.Pod) error {
|
||||||
ctx, span := trace.StartSpan(ctx, "deletePod")
|
ctx, span := trace.StartSpan(ctx, "deletePod")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
pod, err := pc.provider.GetPod(ctx, namespace, name)
|
|
||||||
if err != nil {
|
|
||||||
if errdefs.IsNotFound(err) {
|
|
||||||
// The provider is not aware of the pod, but we must still delete the Kubernetes API resource.
|
|
||||||
return pc.forceDeletePodResource(ctx, namespace, name)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
|
|
||||||
if pod == nil {
|
|
||||||
// The provider is not aware of the pod, but we must still delete the Kubernetes API resource.
|
|
||||||
return pc.forceDeletePodResource(ctx, namespace, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx = addPodAttributes(ctx, span, pod)
|
ctx = addPodAttributes(ctx, span, pod)
|
||||||
|
|
||||||
var delErr error
|
err := pc.provider.DeletePod(ctx, pod.DeepCopy())
|
||||||
if delErr = pc.provider.DeletePod(ctx, pod.DeepCopy()); delErr != nil && !errdefs.IsNotFound(delErr) {
|
if err != nil {
|
||||||
span.SetStatus(delErr)
|
span.SetStatus(err)
|
||||||
return delErr
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.G(ctx).Debug("Deleted pod from provider")
|
log.G(ctx).Debug("Deleted pod from provider")
|
||||||
|
|
||||||
if err := pc.forceDeletePodResource(ctx, namespace, name); err != nil {
|
|
||||||
span.SetStatus(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.G(ctx).Info("Deleted pod from Kubernetes")
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, name string) error {
|
|
||||||
ctx, span := trace.StartSpan(ctx, "forceDeletePodResource")
|
|
||||||
defer span.End()
|
|
||||||
ctx = span.WithFields(ctx, log.Fields{
|
|
||||||
"namespace": namespace,
|
|
||||||
"name": name,
|
|
||||||
})
|
|
||||||
|
|
||||||
var grace int64
|
|
||||||
if err := pc.client.Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil {
|
|
||||||
if errors.IsNotFound(err) {
|
|
||||||
log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
span.SetStatus(err)
|
|
||||||
return pkgerrors.Wrap(err, "Failed to delete Kubernetes pod")
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -213,8 +170,12 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
|
|||||||
}
|
}
|
||||||
kPod := obj.(*knownPod)
|
kPod := obj.(*knownPod)
|
||||||
kPod.Lock()
|
kPod.Lock()
|
||||||
podFromProvider := kPod.lastPodStatusReceivedFromProvider
|
podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy()
|
||||||
kPod.Unlock()
|
kPod.Unlock()
|
||||||
|
// We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating
|
||||||
|
// the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore
|
||||||
|
// we need to copy the pod and set ResourceVersion to 0.
|
||||||
|
podFromProvider.ResourceVersion = "0"
|
||||||
|
|
||||||
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil {
|
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil {
|
||||||
span.SetStatus(err)
|
span.SetStatus(err)
|
||||||
@@ -276,3 +237,47 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE
|
|||||||
|
|
||||||
return pc.updatePodStatus(ctx, pod, key)
|
return pc.updatePodStatus(ctx, pod, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pc *PodController) deletePodHandler(ctx context.Context, key string) error {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
|
ctx = span.WithFields(ctx, log.Fields{
|
||||||
|
"namespace": namespace,
|
||||||
|
"name": name,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// Log the error as a warning, but do not requeue the key as it is invalid.
|
||||||
|
log.G(ctx).Warn(pkgerrors.Wrapf(err, "invalid resource key: %q", key))
|
||||||
|
span.SetStatus(err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the pod has been deleted from API server, we don't need to do anything.
|
||||||
|
k8sPod, err := pc.podsLister.Pods(namespace).Get(name)
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
span.SetStatus(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if running(&k8sPod.Status) {
|
||||||
|
log.G(ctx).Error("Force deleting pod in running state")
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update
|
||||||
|
// was in progress,
|
||||||
|
err = pc.client.Pods(namespace).Delete(name, metav1.NewDeleteOptions(0))
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
span.SetStatus(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,14 +19,10 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
pkgerrors "github.com/pkg/errors"
|
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
|
||||||
testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util"
|
testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util"
|
||||||
"gotest.tools/assert"
|
"gotest.tools/assert"
|
||||||
is "gotest.tools/assert/cmp"
|
is "gotest.tools/assert/cmp"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
kubeinformers "k8s.io/client-go/informers"
|
kubeinformers "k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
@@ -51,6 +47,7 @@ func newTestController() *TestController {
|
|||||||
resourceManager: rm,
|
resourceManager: rm,
|
||||||
recorder: testutil.FakeEventRecorder(5),
|
recorder: testutil.FakeEventRecorder(5),
|
||||||
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
||||||
|
deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
ready: make(chan struct{}),
|
ready: make(chan struct{}),
|
||||||
podsInformer: iFactory.Core().V1().Pods(),
|
podsInformer: iFactory.Core().V1().Pods(),
|
||||||
@@ -172,59 +169,6 @@ func TestPodNoSpecChange(t *testing.T) {
|
|||||||
assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
|
assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodDelete(t *testing.T) {
|
|
||||||
type testCase struct {
|
|
||||||
desc string
|
|
||||||
delErr error
|
|
||||||
}
|
|
||||||
|
|
||||||
cases := []testCase{
|
|
||||||
{desc: "no error on delete", delErr: nil},
|
|
||||||
{desc: "not found error on delete", delErr: errdefs.NotFound("not found")},
|
|
||||||
{desc: "unknown error on delete", delErr: pkgerrors.New("random error")},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range cases {
|
|
||||||
t.Run(tc.desc, func(t *testing.T) {
|
|
||||||
c := newTestController()
|
|
||||||
c.mock.errorOnDelete = tc.delErr
|
|
||||||
|
|
||||||
pod := &corev1.Pod{}
|
|
||||||
pod.ObjectMeta.Namespace = "default"
|
|
||||||
pod.ObjectMeta.Name = "nginx"
|
|
||||||
pod.Spec = newPodSpec()
|
|
||||||
|
|
||||||
pc := c.client.CoreV1().Pods("default")
|
|
||||||
|
|
||||||
p, err := pc.Create(pod)
|
|
||||||
assert.NilError(t, err)
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
err = c.createOrUpdatePod(ctx, p.DeepCopy()) // make sure it's actually created
|
|
||||||
assert.NilError(t, err)
|
|
||||||
assert.Check(t, is.Equal(c.mock.creates.read(), 1))
|
|
||||||
|
|
||||||
err = c.deletePod(ctx, pod.Namespace, pod.Name)
|
|
||||||
assert.Equal(t, pkgerrors.Cause(err), err)
|
|
||||||
|
|
||||||
var expectDeletes int
|
|
||||||
if tc.delErr == nil {
|
|
||||||
expectDeletes = 1
|
|
||||||
}
|
|
||||||
assert.Check(t, is.Equal(c.mock.deletes.read(), expectDeletes))
|
|
||||||
|
|
||||||
expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr)
|
|
||||||
|
|
||||||
_, err = pc.Get(pod.Name, metav1.GetOptions{})
|
|
||||||
if expectDeleted {
|
|
||||||
assert.Assert(t, errors.IsNotFound(err))
|
|
||||||
} else {
|
|
||||||
assert.NilError(t, err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPodSpec() corev1.PodSpec {
|
func newPodSpec() corev1.PodSpec {
|
||||||
return corev1.PodSpec{
|
return corev1.PodSpec{
|
||||||
Containers: []corev1.Container{
|
Containers: []corev1.Container{
|
||||||
|
|||||||
@@ -17,10 +17,11 @@ package node
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
pkgerrors "github.com/pkg/errors"
|
pkgerrors "github.com/pkg/errors"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
||||||
@@ -51,7 +52,7 @@ type PodLifecycleHandler interface {
|
|||||||
|
|
||||||
// DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is
|
// DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is
|
||||||
// expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal
|
// expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal
|
||||||
// state, as well as the pod.
|
// state, as well as the pod. DeletePod may be called multiple times for the same pod.
|
||||||
DeletePod(ctx context.Context, pod *corev1.Pod) error
|
DeletePod(ctx context.Context, pod *corev1.Pod) error
|
||||||
|
|
||||||
// GetPod retrieves a pod by name from the provider (can be cached).
|
// GetPod retrieves a pod by name from the provider (can be cached).
|
||||||
@@ -101,6 +102,9 @@ type PodController struct {
|
|||||||
|
|
||||||
k8sQ workqueue.RateLimitingInterface
|
k8sQ workqueue.RateLimitingInterface
|
||||||
|
|
||||||
|
// deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period
|
||||||
|
deletionQ workqueue.RateLimitingInterface
|
||||||
|
|
||||||
// From the time of creation, to termination the knownPods map will contain the pods key
|
// From the time of creation, to termination the knownPods map will contain the pods key
|
||||||
// (derived from Kubernetes' cache library) -> a *knownPod struct.
|
// (derived from Kubernetes' cache library) -> a *knownPod struct.
|
||||||
knownPods sync.Map
|
knownPods sync.Map
|
||||||
@@ -127,6 +131,7 @@ type knownPod struct {
|
|||||||
// should be immutable to avoid having to hold the lock the entire time you're working with them
|
// should be immutable to avoid having to hold the lock the entire time you're working with them
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
lastPodStatusReceivedFromProvider *corev1.Pod
|
lastPodStatusReceivedFromProvider *corev1.Pod
|
||||||
|
lastPodUsed *corev1.Pod
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodControllerConfig is used to configure a new PodController.
|
// PodControllerConfig is used to configure a new PodController.
|
||||||
@@ -190,6 +195,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
|||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
recorder: cfg.EventRecorder,
|
recorder: cfg.EventRecorder,
|
||||||
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
|
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
|
||||||
|
deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"),
|
||||||
}
|
}
|
||||||
|
|
||||||
return pc, nil
|
return pc, nil
|
||||||
@@ -207,7 +213,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
|||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
pc.k8sQ.ShutDown()
|
pc.k8sQ.ShutDown()
|
||||||
|
pc.deletionQ.ShutDown()
|
||||||
pc.mu.Lock()
|
pc.mu.Lock()
|
||||||
pc.err = retErr
|
pc.err = retErr
|
||||||
close(pc.done)
|
close(pc.done)
|
||||||
@@ -241,16 +247,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
|||||||
},
|
},
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
// Create a copy of the old and new pod objects so we don't mutate the cache.
|
// Create a copy of the old and new pod objects so we don't mutate the cache.
|
||||||
oldPod := oldObj.(*corev1.Pod).DeepCopy()
|
newPod := newObj.(*corev1.Pod)
|
||||||
newPod := newObj.(*corev1.Pod).DeepCopy()
|
|
||||||
// We want to check if the two objects differ in anything other than their resource versions.
|
|
||||||
// Hence, we make them equal so that this change isn't picked up by reflect.DeepEqual.
|
|
||||||
newPod.ResourceVersion = oldPod.ResourceVersion
|
|
||||||
// Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed.
|
|
||||||
// This guarantees that we don't attempt to sync the pod every time its .status field is updated.
|
|
||||||
if reflect.DeepEqual(oldPod.ObjectMeta, newPod.ObjectMeta) && reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
|
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
|
||||||
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
|
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
|
||||||
log.G(ctx).Error(err)
|
log.G(ctx).Error(err)
|
||||||
@@ -264,6 +262,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
|||||||
} else {
|
} else {
|
||||||
pc.knownPods.Delete(key)
|
pc.knownPods.Delete(key)
|
||||||
pc.k8sQ.AddRateLimited(key)
|
pc.k8sQ.AddRateLimited(key)
|
||||||
|
// If this pod was in the deletion queue, forget about it
|
||||||
|
pc.deletionQ.Forget(key)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -295,6 +295,15 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for id := 0; id < podSyncWorkers; id++ {
|
||||||
|
wg.Add(1)
|
||||||
|
workerID := strconv.Itoa(id)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
pc.runDeletionReconcilationWorker(ctx, workerID, pc.deletionQ)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
close(pc.ready)
|
close(pc.ready)
|
||||||
|
|
||||||
log.G(ctx).Info("started workers")
|
log.G(ctx).Info("started workers")
|
||||||
@@ -302,6 +311,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
|||||||
log.G(ctx).Info("shutting down workers")
|
log.G(ctx).Info("shutting down workers")
|
||||||
pc.k8sQ.ShutDown()
|
pc.k8sQ.ShutDown()
|
||||||
podStatusQueue.ShutDown()
|
podStatusQueue.ShutDown()
|
||||||
|
pc.deletionQ.ShutDown()
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return nil
|
return nil
|
||||||
@@ -353,6 +363,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
|
|||||||
|
|
||||||
// Add the current key as an attribute to the current span.
|
// Add the current key as an attribute to the current span.
|
||||||
ctx = span.WithField(ctx, "key", key)
|
ctx = span.WithField(ctx, "key", key)
|
||||||
|
log.G(ctx).WithField("key", key).Debug("sync handled")
|
||||||
|
|
||||||
// Convert the namespace/name string into a distinct namespace and name.
|
// Convert the namespace/name string into a distinct namespace and name.
|
||||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
@@ -372,35 +383,81 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
|
|||||||
span.SetStatus(err)
|
span.SetStatus(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// At this point we know the Pod resource doesn't exist, which most probably means it was deleted.
|
|
||||||
// Hence, we must delete it from the provider if it still exists there.
|
pod, err = pc.provider.GetPod(ctx, namespace, name)
|
||||||
if err := pc.deletePod(ctx, namespace, name); err != nil {
|
if err != nil && !errdefs.IsNotFound(err) {
|
||||||
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
|
err = pkgerrors.Wrapf(err, "failed to fetch pod with key %q from provider", key)
|
||||||
span.SetStatus(err)
|
span.SetStatus(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
if errdefs.IsNotFound(err) || pod == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = pc.provider.DeletePod(ctx, pod)
|
||||||
|
if errdefs.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
err = pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
|
||||||
|
span.SetStatus(err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
|
||||||
}
|
}
|
||||||
// At this point we know the Pod resource has either been created or updated (which includes being marked for deletion).
|
// At this point we know the Pod resource has either been created or updated (which includes being marked for deletion).
|
||||||
return pc.syncPodInProvider(ctx, pod)
|
return pc.syncPodInProvider(ctx, pod, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation.
|
// syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation.
|
||||||
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) error {
|
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, key string) (retErr error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "syncPodInProvider")
|
ctx, span := trace.StartSpan(ctx, "syncPodInProvider")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
// Add the pod's attributes to the current span.
|
// Add the pod's attributes to the current span.
|
||||||
ctx = addPodAttributes(ctx, span, pod)
|
ctx = addPodAttributes(ctx, span, pod)
|
||||||
|
|
||||||
|
// If the pod('s containers) is no longer in a running state then we force-delete the pod from API server
|
||||||
|
// more context is here: https://github.com/virtual-kubelet/virtual-kubelet/pull/760
|
||||||
|
if pod.DeletionTimestamp != nil && !running(&pod.Status) {
|
||||||
|
log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running")
|
||||||
|
pc.deletionQ.Add(key)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
obj, ok := pc.knownPods.Load(key)
|
||||||
|
if !ok {
|
||||||
|
// That means the pod was deleted while we were working
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
kPod := obj.(*knownPod)
|
||||||
|
kPod.Lock()
|
||||||
|
if kPod.lastPodUsed != nil && podsEffectivelyEqual(kPod.lastPodUsed, pod) {
|
||||||
|
kPod.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
kPod.Unlock()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if retErr == nil {
|
||||||
|
kPod.Lock()
|
||||||
|
defer kPod.Unlock()
|
||||||
|
kPod.lastPodUsed = pod
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Check whether the pod has been marked for deletion.
|
// Check whether the pod has been marked for deletion.
|
||||||
// If it does, guarantee it is deleted in the provider and Kubernetes.
|
// If it does, guarantee it is deleted in the provider and Kubernetes.
|
||||||
if pod.DeletionTimestamp != nil {
|
if pod.DeletionTimestamp != nil {
|
||||||
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
|
log.G(ctx).Debug("Deleting pod in provider")
|
||||||
|
if err := pc.deletePod(ctx, pod); errdefs.IsNotFound(err) {
|
||||||
|
log.G(ctx).Debug("Pod not found in provider")
|
||||||
|
} else if err != nil {
|
||||||
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod))
|
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod))
|
||||||
span.SetStatus(err)
|
span.SetStatus(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pc.deletionQ.AddAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -419,6 +476,25 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// runDeletionReconcilationWorker is a long-running function that will continually call the processDeletionReconcilationWorkItem
|
||||||
|
// function in order to read and process an item on the work queue that is generated by the pod informer.
|
||||||
|
func (pc *PodController) runDeletionReconcilationWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
|
||||||
|
for pc.processDeletionReconcilationWorkItem(ctx, workerID, q) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processDeletionReconcilationWorkItem will read a single work item off the work queue and attempt to process it,by calling the deletionReconcilation.
|
||||||
|
func (pc *PodController) processDeletionReconcilationWorkItem(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
|
||||||
|
|
||||||
|
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
|
||||||
|
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
// Add the ID of the current worker as an attribute to the current span.
|
||||||
|
ctx = span.WithField(ctx, "workerId", workerID)
|
||||||
|
return handleQueueItem(ctx, q, pc.deletePodHandler)
|
||||||
|
}
|
||||||
|
|
||||||
// deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them.
|
// deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them.
|
||||||
func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) {
|
func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) {
|
||||||
ctx, span := trace.StartSpan(ctx, "deleteDanglingPods")
|
ctx, span := trace.StartSpan(ctx, "deleteDanglingPods")
|
||||||
@@ -474,7 +550,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int
|
|||||||
// Add the pod's attributes to the current span.
|
// Add the pod's attributes to the current span.
|
||||||
ctx = addPodAttributes(ctx, span, pod)
|
ctx = addPodAttributes(ctx, span, pod)
|
||||||
// Actually delete the pod.
|
// Actually delete the pod.
|
||||||
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
|
if err := pc.provider.DeletePod(ctx, pod.DeepCopy()); err != nil && !errdefs.IsNotFound(err) {
|
||||||
span.SetStatus(err)
|
span.SetStatus(err)
|
||||||
log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod))
|
log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod))
|
||||||
} else {
|
} else {
|
||||||
@@ -503,3 +579,31 @@ func loggablePodName(pod *corev1.Pod) string {
|
|||||||
func loggablePodNameFromCoordinates(namespace, name string) string {
|
func loggablePodNameFromCoordinates(namespace, name string) string {
|
||||||
return fmt.Sprintf("%s/%s", namespace, name)
|
return fmt.Sprintf("%s/%s", namespace, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// podsEffectivelyEqual compares two pods, and ignores the pod status, and the resource version
|
||||||
|
func podsEffectivelyEqual(p1, p2 *corev1.Pod) bool {
|
||||||
|
filterForResourceVersion := func(p cmp.Path) bool {
|
||||||
|
if p.String() == "ObjectMeta.ResourceVersion" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if p.String() == "Status" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return cmp.Equal(p1, p2, cmp.FilterPath(filterForResourceVersion, cmp.Ignore()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// borrowed from https://github.com/kubernetes/kubernetes/blob/f64c631cd7aea58d2552ae2038c1225067d30dde/pkg/kubelet/kubelet_pods.go#L944-L953
|
||||||
|
// running returns true, unless if every status is terminated or waiting, or the status list
|
||||||
|
// is empty.
|
||||||
|
func running(podStatus *corev1.PodStatus) bool {
|
||||||
|
statuses := podStatus.ContainerStatuses
|
||||||
|
for _, status := range statuses {
|
||||||
|
if status.State.Terminated == nil && status.State.Waiting == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gotest.tools/assert"
|
"gotest.tools/assert"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPodControllerExitOnContextCancel(t *testing.T) {
|
func TestPodControllerExitOnContextCancel(t *testing.T) {
|
||||||
@@ -42,3 +44,49 @@ func TestPodControllerExitOnContextCancel(t *testing.T) {
|
|||||||
}
|
}
|
||||||
assert.NilError(t, tc.Err())
|
assert.NilError(t, tc.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCompareResourceVersion(t *testing.T) {
|
||||||
|
p1 := &corev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
p2 := &corev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "2",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.Assert(t, podsEffectivelyEqual(p1, p2))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompareStatus(t *testing.T) {
|
||||||
|
p1 := &corev1.Pod{
|
||||||
|
Status: corev1.PodStatus{
|
||||||
|
Phase: corev1.PodRunning,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
p2 := &corev1.Pod{
|
||||||
|
Status: corev1.PodStatus{
|
||||||
|
Phase: corev1.PodFailed,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.Assert(t, podsEffectivelyEqual(p1, p2))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompareLabels(t *testing.T) {
|
||||||
|
p1 := &corev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"foo": "bar1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
p2 := &corev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"foo": "bar2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.Assert(t, !podsEffectivelyEqual(p1, p2))
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user