Merge pull request #786 from cpuguy83/add_sync_provider_support
Re-add support for sync providers
This commit is contained in:
@@ -106,106 +106,121 @@ func TestPodLifecycle(t *testing.T) {
|
|||||||
return watchErr
|
return watchErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// isPodDeletedGracefullyFunc is a condition func that waits until the pod is in a terminal state, which is the VK's
|
// isPodDeletedGracefullyFunc is a condition func that waits until the pod is in a terminal state, which is the VK's
|
||||||
// action when the pod is deleted from the provider
|
// action when the pod is deleted from the provider.
|
||||||
isPodDeletedGracefullyFunc := func(ctx context.Context, watcher watch.Interface) error {
|
isPodDeletedGracefullyFunc := func(ctx context.Context, watcher watch.Interface) error {
|
||||||
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
|
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
|
||||||
log.G(ctx).WithField("event", ev).Info("got event")
|
log.G(ctx).WithField("event", ev).Info("got event")
|
||||||
pod := ev.Object.(*corev1.Pod)
|
pod := ev.Object.(*corev1.Pod)
|
||||||
return (pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded) && pod.Status.Reason == mockProviderPodDeletedReason, nil
|
return (pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded) && (pod.Status.Reason == mockProviderPodDeletedReason || pod.Status.Reason == statusTerminatedReason), nil
|
||||||
})
|
})
|
||||||
return watchErr
|
return watchErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting
|
envs := map[string]func(*testing.T) testingProvider{
|
||||||
// it gracefully
|
"Async": func(t *testing.T) testingProvider {
|
||||||
t.Run("createStartDeleteScenario", func(t *testing.T) {
|
return newMockProvider()
|
||||||
t.Run("mockProvider", func(t *testing.T) {
|
},
|
||||||
assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) {
|
"Sync": func(t *testing.T) testingProvider {
|
||||||
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedGracefullyFunc, true)
|
if testing.Short() {
|
||||||
}))
|
t.Skip()
|
||||||
|
}
|
||||||
|
return newSyncMockProvider()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for env, h := range envs {
|
||||||
|
t.Run(env, func(t *testing.T) {
|
||||||
|
|
||||||
|
// createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting
|
||||||
|
// it gracefully.
|
||||||
|
t.Run("createStartDeleteScenario", func(t *testing.T) {
|
||||||
|
assert.NilError(t, wireUpSystem(ctx, h(t), func(ctx context.Context, s *system) {
|
||||||
|
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedGracefullyFunc, true)
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
// createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider
|
||||||
|
// for some reason.
|
||||||
|
t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) {
|
||||||
|
mp := h(t)
|
||||||
|
mp.setErrorOnDelete(errdefs.NotFound("not found"))
|
||||||
|
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||||
|
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc, false)
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
// createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the
|
||||||
|
// provider.
|
||||||
|
t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) {
|
||||||
|
mp := h(t)
|
||||||
|
deletionFunc := func(ctx context.Context, watcher watch.Interface) error {
|
||||||
|
return mp.getAttemptedDeletes().until(ctx, func(v int) bool { return v >= 2 })
|
||||||
|
}
|
||||||
|
mp.setErrorOnDelete(errors.New("random error"))
|
||||||
|
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||||
|
testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false)
|
||||||
|
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
|
||||||
|
assert.NilError(t, err)
|
||||||
|
assert.Assert(t, is.Len(pods.Items, 1))
|
||||||
|
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
// danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting,
|
||||||
|
// and ensures the pod controller deletes the pod prior to continuing.
|
||||||
|
t.Run("danglingPodScenario", func(t *testing.T) {
|
||||||
|
t.Run("mockProvider", func(t *testing.T) {
|
||||||
|
mp := h(t)
|
||||||
|
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||||
|
testDanglingPodScenario(ctx, t, s, mp)
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// testDanglingPodScenarioWithDeletionTimestamp tests if a pod is created in the provider and on api server it had
|
||||||
|
// deletiontimestamp set. It ensures deletion occurs.
|
||||||
|
t.Run("testDanglingPodScenarioWithDeletionTimestamp", func(t *testing.T) {
|
||||||
|
t.Run("mockProvider", func(t *testing.T) {
|
||||||
|
mp := h(t)
|
||||||
|
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||||
|
testDanglingPodScenarioWithDeletionTimestamp(ctx, t, s, mp)
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// failedPodScenario ensures that the VK ignores failed pods that were failed prior to the pod controller starting up.
|
||||||
|
t.Run("failedPodScenario", func(t *testing.T) {
|
||||||
|
t.Run("mockProvider", func(t *testing.T) {
|
||||||
|
mp := h(t)
|
||||||
|
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||||
|
testFailedPodScenario(ctx, t, s)
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the pod controller starting up.
|
||||||
|
t.Run("succeededPodScenario", func(t *testing.T) {
|
||||||
|
t.Run("mockProvider", func(t *testing.T) {
|
||||||
|
mp := h(t)
|
||||||
|
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||||
|
testSucceededPodScenario(ctx, t, s)
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated
|
||||||
|
// to the provider.
|
||||||
|
t.Run("updatePodWhileRunningScenario", func(t *testing.T) {
|
||||||
|
t.Run("mockProvider", func(t *testing.T) {
|
||||||
|
mp := h(t)
|
||||||
|
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||||
|
testUpdatePodWhileRunningScenario(ctx, t, s, mp)
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
}
|
||||||
|
|
||||||
// createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider
|
|
||||||
// for some reason
|
|
||||||
t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) {
|
|
||||||
mp := newMockProvider()
|
|
||||||
mp.errorOnDelete = errdefs.NotFound("not found")
|
|
||||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
|
||||||
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc, false)
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
|
|
||||||
// createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the
|
|
||||||
// provider
|
|
||||||
t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) {
|
|
||||||
mp := newMockProvider()
|
|
||||||
deletionFunc := func(ctx context.Context, watcher watch.Interface) error {
|
|
||||||
return mp.attemptedDeletes.until(ctx, func(v int) bool { return v >= 2 })
|
|
||||||
}
|
|
||||||
mp.errorOnDelete = errors.New("random error")
|
|
||||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
|
||||||
testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false)
|
|
||||||
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
|
|
||||||
assert.NilError(t, err)
|
|
||||||
assert.Assert(t, is.Len(pods.Items, 1))
|
|
||||||
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
|
|
||||||
// danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting,
|
|
||||||
// and ensures the pod controller deletes the pod prior to continuing.
|
|
||||||
t.Run("danglingPodScenario", func(t *testing.T) {
|
|
||||||
t.Run("mockProvider", func(t *testing.T) {
|
|
||||||
mp := newMockProvider()
|
|
||||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
|
||||||
testDanglingPodScenario(ctx, t, s, mp)
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
// testDanglingPodScenarioWithDeletionTimestamp tests if a pod is created in the provider and on api server it had
|
|
||||||
// deletiontimestamp set. It ensures deletion occurs.
|
|
||||||
t.Run("testDanglingPodScenarioWithDeletionTimestamp", func(t *testing.T) {
|
|
||||||
t.Run("mockProvider", func(t *testing.T) {
|
|
||||||
mp := newMockProvider()
|
|
||||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
|
||||||
testDanglingPodScenarioWithDeletionTimestamp(ctx, t, s, mp)
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
// failedPodScenario ensures that the VK ignores failed pods that were failed prior to the PC starting up
|
|
||||||
t.Run("failedPodScenario", func(t *testing.T) {
|
|
||||||
t.Run("mockProvider", func(t *testing.T) {
|
|
||||||
mp := newMockProvider()
|
|
||||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
|
||||||
testFailedPodScenario(ctx, t, s)
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
// succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the PC starting up.
|
|
||||||
t.Run("succeededPodScenario", func(t *testing.T) {
|
|
||||||
t.Run("mockProvider", func(t *testing.T) {
|
|
||||||
mp := newMockProvider()
|
|
||||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
|
||||||
testSucceededPodScenario(ctx, t, s)
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
// updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated
|
|
||||||
// to the provider
|
|
||||||
t.Run("updatePodWhileRunningScenario", func(t *testing.T) {
|
|
||||||
t.Run("mockProvider", func(t *testing.T) {
|
|
||||||
mp := newMockProvider()
|
|
||||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
|
||||||
testUpdatePodWhileRunningScenario(ctx, t, s, mp)
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type testFunction func(ctx context.Context, s *system)
|
type testFunction func(ctx context.Context, s *system)
|
||||||
@@ -331,7 +346,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
|
|||||||
assert.DeepEqual(t, p1, p2)
|
assert.DeepEqual(t, p1, p2)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) {
|
func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m testingProvider) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
pod := newPod()
|
pod := newPod()
|
||||||
@@ -340,11 +355,11 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo
|
|||||||
// Start the pod controller
|
// Start the pod controller
|
||||||
assert.NilError(t, s.start(ctx))
|
assert.NilError(t, s.start(ctx))
|
||||||
|
|
||||||
assert.Assert(t, is.Equal(m.deletes.read(), 1))
|
assert.Assert(t, is.Equal(m.getDeletes().read(), 1))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testing.T, s *system, m *mockProvider) {
|
func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testing.T, s *system, m testingProvider) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
pod := newPod()
|
pod := newPod()
|
||||||
@@ -506,7 +521,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) {
|
func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m testingProvider) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@@ -563,7 +578,7 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
|
|||||||
log.G(ctx).WithField("pod", p).Info("Updating pod")
|
log.G(ctx).WithField("pod", p).Info("Updating pod")
|
||||||
_, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
|
_, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
assert.NilError(t, m.updates.until(ctx, func(v int) bool { return v > 0 }))
|
assert.NilError(t, m.getUpdates().until(ctx, func(v int) bool { return v > 0 }))
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkCreatePods(b *testing.B) {
|
func BenchmarkCreatePods(b *testing.B) {
|
||||||
|
|||||||
@@ -79,7 +79,15 @@ type mockProvider struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// newMockProvider creates a new mockProvider.
|
// newMockProvider creates a new mockProvider.
|
||||||
func newMockProvider() *mockProvider {
|
func newMockProvider() *mockProviderAsync {
|
||||||
|
provider := newSyncMockProvider()
|
||||||
|
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
|
||||||
|
// it will be set, and then we'll call a real underlying implementation.
|
||||||
|
// This makes it easier in the sense we don't need to wrap each method.
|
||||||
|
return &mockProviderAsync{provider}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyncMockProvider() *mockProvider {
|
||||||
provider := mockProvider{
|
provider := mockProvider{
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
creates: newWaitableInt(),
|
creates: newWaitableInt(),
|
||||||
@@ -87,10 +95,6 @@ func newMockProvider() *mockProvider {
|
|||||||
deletes: newWaitableInt(),
|
deletes: newWaitableInt(),
|
||||||
attemptedDeletes: newWaitableInt(),
|
attemptedDeletes: newWaitableInt(),
|
||||||
}
|
}
|
||||||
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
|
|
||||||
// it will be set, and then we'll call a real underlying implementation.
|
|
||||||
// This makes it easier in the sense we don't need to wrap each method.
|
|
||||||
|
|
||||||
return &provider
|
return &provider
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -258,10 +262,24 @@ func (p *mockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
|
|||||||
return pods, nil
|
return pods, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
|
func (p *mockProvider) setErrorOnDelete(err error) {
|
||||||
// within the provider.
|
p.errorOnDelete = err
|
||||||
func (p *mockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
|
}
|
||||||
p.realNotifier = notifier
|
|
||||||
|
func (p *mockProvider) getAttemptedDeletes() *waitableInt {
|
||||||
|
return p.attemptedDeletes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mockProvider) getCreates() *waitableInt {
|
||||||
|
return p.creates
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mockProvider) getDeletes() *waitableInt {
|
||||||
|
return p.deletes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mockProvider) getUpdates() *waitableInt {
|
||||||
|
return p.updates
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildKeyFromNames(namespace string, name string) (string, error) {
|
func buildKeyFromNames(namespace string, name string) (string, error) {
|
||||||
@@ -280,3 +298,22 @@ func buildKey(pod *v1.Pod) (string, error) {
|
|||||||
|
|
||||||
return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
|
return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mockProviderAsync struct {
|
||||||
|
*mockProvider
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
|
||||||
|
// within the provider.
|
||||||
|
func (p *mockProviderAsync) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
|
||||||
|
p.realNotifier = notifier
|
||||||
|
}
|
||||||
|
|
||||||
|
type testingProvider interface {
|
||||||
|
PodLifecycleHandler
|
||||||
|
setErrorOnDelete(error)
|
||||||
|
getAttemptedDeletes() *waitableInt
|
||||||
|
getDeletes() *waitableInt
|
||||||
|
getCreates() *waitableInt
|
||||||
|
getUpdates() *waitableInt
|
||||||
|
}
|
||||||
|
|||||||
10
node/pod.go
10
node/pod.go
@@ -238,7 +238,7 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE
|
|||||||
return pc.updatePodStatus(ctx, pod, key)
|
return pc.updatePodStatus(ctx, pod, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pc *PodController) deletePodHandler(ctx context.Context, key string) error {
|
func (pc *PodController) deletePodHandler(ctx context.Context, key string) (retErr error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
|
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
@@ -255,6 +255,14 @@ func (pc *PodController) deletePodHandler(ctx context.Context, key string) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if retErr == nil {
|
||||||
|
if w, ok := pc.provider.(syncWrapper); ok {
|
||||||
|
w._deletePodKey(ctx, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// If the pod has been deleted from API server, we don't need to do anything.
|
// If the pod has been deleted from API server, we don't need to do anything.
|
||||||
k8sPod, err := pc.podsLister.Pods(namespace).Get(name)
|
k8sPod, err := pc.podsLister.Pods(namespace).Get(name)
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ import (
|
|||||||
|
|
||||||
type TestController struct {
|
type TestController struct {
|
||||||
*PodController
|
*PodController
|
||||||
mock *mockProvider
|
mock *mockProviderAsync
|
||||||
client *fake.Clientset
|
client *fake.Clientset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -72,7 +72,10 @@ type PodLifecycleHandler interface {
|
|||||||
// concurrently outside of the calling goroutine. Therefore it is recommended
|
// concurrently outside of the calling goroutine. Therefore it is recommended
|
||||||
// to return a version after DeepCopy.
|
// to return a version after DeepCopy.
|
||||||
GetPods(context.Context) ([]*corev1.Pod, error)
|
GetPods(context.Context) ([]*corev1.Pod, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PodNotifier is used as an extension to PodLifecycleHandler to support async updates of pod statuses.
|
||||||
|
type PodNotifier interface {
|
||||||
// NotifyPods instructs the notifier to call the passed in function when
|
// NotifyPods instructs the notifier to call the passed in function when
|
||||||
// the pod status changes. It should be called when a pod's status changes.
|
// the pod status changes. It should be called when a pod's status changes.
|
||||||
//
|
//
|
||||||
@@ -157,6 +160,7 @@ type PodControllerConfig struct {
|
|||||||
ServiceInformer corev1informers.ServiceInformer
|
ServiceInformer corev1informers.ServiceInformer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewPodController creates a new pod controller with the provided config.
|
||||||
func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
||||||
if cfg.PodClient == nil {
|
if cfg.PodClient == nil {
|
||||||
return nil, errdefs.InvalidInput("missing core client")
|
return nil, errdefs.InvalidInput("missing core client")
|
||||||
@@ -201,6 +205,11 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
|||||||
return pc, nil
|
return pc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type asyncProvider interface {
|
||||||
|
PodLifecycleHandler
|
||||||
|
PodNotifier
|
||||||
|
}
|
||||||
|
|
||||||
// Run will set up the event handlers for types we are interested in, as well
|
// Run will set up the event handlers for types we are interested in, as well
|
||||||
// as syncing informer caches and starting workers. It will block until the
|
// as syncing informer caches and starting workers. It will block until the
|
||||||
// context is cancelled, at which point it will shutdown the work queue and
|
// context is cancelled, at which point it will shutdown the work queue and
|
||||||
@@ -220,10 +229,24 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
|||||||
pc.mu.Unlock()
|
pc.mu.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
var provider asyncProvider
|
||||||
|
runProvider := func(context.Context) {}
|
||||||
|
|
||||||
|
if p, ok := pc.provider.(asyncProvider); ok {
|
||||||
|
provider = p
|
||||||
|
} else {
|
||||||
|
wrapped := &syncProviderWrapper{PodLifecycleHandler: pc.provider, l: pc.podsLister}
|
||||||
|
runProvider = wrapped.run
|
||||||
|
provider = wrapped
|
||||||
|
log.G(ctx).Debug("Wrapped non-async provider with async")
|
||||||
|
}
|
||||||
|
pc.provider = provider
|
||||||
|
|
||||||
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
|
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
|
||||||
pc.provider.NotifyPods(ctx, func(pod *corev1.Pod) {
|
provider.NotifyPods(ctx, func(pod *corev1.Pod) {
|
||||||
pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
|
pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
|
||||||
})
|
})
|
||||||
|
go runProvider(ctx)
|
||||||
|
|
||||||
defer podStatusQueue.ShutDown()
|
defer podStatusQueue.ShutDown()
|
||||||
|
|
||||||
|
|||||||
209
node/sync.go
Normal file
209
node/sync.go
Normal file
@@ -0,0 +1,209 @@
|
|||||||
|
package node
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||||
|
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||||
|
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
podStatusReasonNotFound = "NotFound"
|
||||||
|
podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider"
|
||||||
|
containerStatusReasonNotFound = "NotFound"
|
||||||
|
containerStatusMessageNotFound = "Container was not found and was likely deleted"
|
||||||
|
containerStatusExitCodeNotFound = -137
|
||||||
|
statusTerminatedReason = "Terminated"
|
||||||
|
containerStatusTerminatedMessage = "Container was terminated. The exit code may not reflect the real exit code"
|
||||||
|
)
|
||||||
|
|
||||||
|
// syncProviderWrapper wraps a PodLifecycleHandler to give it async-like pod status notification behavior.
|
||||||
|
type syncProviderWrapper struct {
|
||||||
|
PodLifecycleHandler
|
||||||
|
notify func(*corev1.Pod)
|
||||||
|
l corev1listers.PodLister
|
||||||
|
|
||||||
|
// deletedPods makes sure we don't set the "NotFound" status
|
||||||
|
// for pods which have been requested to be deleted.
|
||||||
|
// This is needed for our loop which just grabs pod statuses every 5 seconds.
|
||||||
|
deletedPods sync.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is used to clean up keys for deleted pods after they have been fully deleted in the API server.
|
||||||
|
type syncWrapper interface {
|
||||||
|
_deletePodKey(context.Context, string)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *syncProviderWrapper) NotifyPods(ctx context.Context, f func(*corev1.Pod)) {
|
||||||
|
p.notify = f
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *syncProviderWrapper) _deletePodKey(ctx context.Context, key string) {
|
||||||
|
log.G(ctx).WithField("key", key).Debug("Cleaning up pod from deletion cache")
|
||||||
|
p.deletedPods.Delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *syncProviderWrapper) DeletePod(ctx context.Context, pod *corev1.Pod) error {
|
||||||
|
log.G(ctx).Debug("syncProviderWrappper.DeletePod")
|
||||||
|
key, err := cache.MetaNamespaceKeyFunc(pod)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
p.deletedPods.Store(key, pod)
|
||||||
|
if err := p.PodLifecycleHandler.DeletePod(ctx, pod.DeepCopy()); err != nil {
|
||||||
|
log.G(ctx).WithField("key", key).WithError(err).Debug("Removed key from deleted pods cache")
|
||||||
|
// We aren't going to actually delete the pod from the provider since there is an error so delete it from our cache,
|
||||||
|
// otherwise we could end up leaking pods in our deletion cache.
|
||||||
|
// Delete will be retried by the pod controller.
|
||||||
|
p.deletedPods.Delete(key)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if shouldSkipPodStatusUpdate(pod) {
|
||||||
|
log.G(ctx).Debug("skipping pod status update for terminated pod")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
updated := pod.DeepCopy()
|
||||||
|
updated.Status.Phase = corev1.PodSucceeded
|
||||||
|
now := metav1.NewTime(time.Now())
|
||||||
|
for i, cs := range updated.Status.ContainerStatuses {
|
||||||
|
updated.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
|
||||||
|
Reason: statusTerminatedReason,
|
||||||
|
Message: containerStatusTerminatedMessage,
|
||||||
|
FinishedAt: now,
|
||||||
|
}
|
||||||
|
if cs.State.Running != nil {
|
||||||
|
updated.Status.ContainerStatuses[i].State.Terminated.StartedAt = cs.State.Running.StartedAt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
updated.Status.Reason = statusTerminatedReason
|
||||||
|
|
||||||
|
p.notify(updated)
|
||||||
|
log.G(ctx).Debug("Notified pod terminal pod status")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *syncProviderWrapper) run(ctx context.Context) {
|
||||||
|
interval := 5 * time.Second
|
||||||
|
timer := time.NewTimer(interval)
|
||||||
|
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
if !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
log.G(ctx).Debug("Pod status update loop start")
|
||||||
|
timer.Reset(interval)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.G(ctx).WithError(ctx.Err()).Debug("sync wrapper loop exiting")
|
||||||
|
return
|
||||||
|
case <-timer.C:
|
||||||
|
}
|
||||||
|
p.syncPodStatuses(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *syncProviderWrapper) syncPodStatuses(ctx context.Context) {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.syncPodStatuses")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
// Update all the pods with the provider status.
|
||||||
|
pods, err := p.l.List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
err = errors.Wrap(err, "error getting pod list from kubernetes")
|
||||||
|
span.SetStatus(err)
|
||||||
|
log.G(ctx).WithError(err).Error("Error updating pod statuses")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ctx = span.WithField(ctx, "nPods", int64(len(pods)))
|
||||||
|
|
||||||
|
for _, pod := range pods {
|
||||||
|
if shouldSkipPodStatusUpdate(pod) {
|
||||||
|
log.G(ctx).Debug("Skipping pod status update")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.updatePodStatus(ctx, pod); err != nil {
|
||||||
|
log.G(ctx).WithFields(map[string]interface{}{
|
||||||
|
"name": pod.Name,
|
||||||
|
"namespace": pod.Namespace,
|
||||||
|
}).WithError(err).Error("Could not fetch pod status")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubernetes *corev1.Pod) error {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.updatePodStatus")
|
||||||
|
defer span.End()
|
||||||
|
ctx = addPodAttributes(ctx, span, podFromKubernetes)
|
||||||
|
|
||||||
|
podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
|
||||||
|
if err != nil {
|
||||||
|
if !errdefs.IsNotFound(err) {
|
||||||
|
span.SetStatus(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if podStatus != nil {
|
||||||
|
pod := podFromKubernetes.DeepCopy()
|
||||||
|
podStatus.DeepCopyInto(&pod.Status)
|
||||||
|
p.notify(pod)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, exists := p.deletedPods.Load(key); exists {
|
||||||
|
log.G(ctx).Debug("pod is in known deleted state, ignoring")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only change the status when the pod was already up.
|
||||||
|
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
|
||||||
|
if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute {
|
||||||
|
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
|
||||||
|
podStatus = podFromKubernetes.Status.DeepCopy()
|
||||||
|
podStatus.Phase = corev1.PodFailed
|
||||||
|
podStatus.Reason = podStatusReasonNotFound
|
||||||
|
podStatus.Message = podStatusMessageNotFound
|
||||||
|
now := metav1.NewTime(time.Now())
|
||||||
|
for i, c := range podStatus.ContainerStatuses {
|
||||||
|
if c.State.Running == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
|
||||||
|
ExitCode: containerStatusExitCodeNotFound,
|
||||||
|
Reason: containerStatusReasonNotFound,
|
||||||
|
Message: containerStatusMessageNotFound,
|
||||||
|
FinishedAt: now,
|
||||||
|
StartedAt: c.State.Running.StartedAt,
|
||||||
|
ContainerID: c.ContainerID,
|
||||||
|
}
|
||||||
|
podStatus.ContainerStatuses[i].State.Running = nil
|
||||||
|
}
|
||||||
|
log.G(ctx).Debug("Setting pod not found on pod status")
|
||||||
|
}
|
||||||
|
|
||||||
|
pod := podFromKubernetes.DeepCopy()
|
||||||
|
podStatus.DeepCopyInto(&pod.Status)
|
||||||
|
p.notify(pod)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user