Adds Done() and Err() to pod controller (#735)

Allows callers to wait for pod controller exit in addition to readiness.
This means the caller does not have to deal handling errors from the pod
controller running in a gorutine since it can wait for exit via `Done()`
and check the error with `Err()`
This commit is contained in:
Brian Goff
2019-09-10 09:44:19 -07:00
committed by Pires
parent db146a0e01
commit bb9ff1adf3
6 changed files with 136 additions and 82 deletions

View File

@@ -18,7 +18,6 @@ import (
"context" "context"
"os" "os"
"path" "path"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@@ -207,11 +206,16 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
}() }()
if c.StartupTimeout > 0 { if c.StartupTimeout > 0 {
// If there is a startup timeout, it does two things: ctx, cancel := context.WithTimeout(ctx, c.StartupTimeout)
// 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period log.G(ctx).Info("Waiting for pod controller / VK to be ready")
// 2. It prevents node advertisement from happening until we're in an operational state select {
err = waitFor(ctx, c.StartupTimeout, pc.Ready()) case <-ctx.Done():
if err != nil { cancel()
return ctx.Err()
case <-pc.Ready():
}
cancel()
if err := pc.Err(); err != nil {
return err return err
} }
} }
@@ -228,21 +232,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
return nil return nil
} }
func waitFor(ctx context.Context, time time.Duration, ready <-chan struct{}) error {
ctx, cancel := context.WithTimeout(ctx, time)
defer cancel()
// Wait for the VK / PC close the the ready channel, or time out and return
log.G(ctx).Info("Waiting for pod controller / VK to be ready")
select {
case <-ready:
return nil
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "Error while starting up VK")
}
}
func newClient(configPath string) (*kubernetes.Clientset, error) { func newClient(configPath string) (*kubernetes.Clientset, error) {
var config *rest.Config var config *rest.Config

View File

@@ -27,9 +27,10 @@ There are two primary controllers, the node runner and the pod runner.
select { select {
case <-podRunner.Ready(): case <-podRunner.Ready():
go nodeRunner.Run(ctx) case <-podRunner.Done():
case <-ctx.Done() }
return ctx.Err() if podRunner.Err() != nil {
// handle error
} }
After calling start, cancelling the passed in context will shutdown the After calling start, cancelling the passed in context will shutdown the

View File

@@ -237,33 +237,20 @@ func TestPodLifecycle(t *testing.T) {
type testFunction func(ctx context.Context, s *system) type testFunction func(ctx context.Context, s *system)
type system struct { type system struct {
retChan chan error
pc *PodController pc *PodController
client *fake.Clientset client *fake.Clientset
podControllerConfig PodControllerConfig podControllerConfig PodControllerConfig
} }
func (s *system) start(ctx context.Context) chan error { func (s *system) start(ctx context.Context) error {
podControllerErrChan := make(chan error) go s.pc.Run(ctx, podSyncWorkers) // nolint:errcheck
go func() {
podControllerErrChan <- s.pc.Run(ctx, podSyncWorkers)
}()
// We need to wait for the pod controller to start. If there is an error before the pod controller starts, or
// the context is cancelled. If the context is cancelled, the startup will be aborted, and the pod controller
// will return an error, so we don't need to wait on ctx.Done()
select { select {
case <-s.pc.Ready(): case <-s.pc.Ready():
// This listens for errors, or exits in the future. case <-s.pc.Done():
go func() { case <-ctx.Done():
podControllerErr := <-podControllerErrChan return ctx.Err()
s.retChan <- podControllerErr
}()
// If there is an error before things are ready, we need to forward it immediately
case podControllerErr := <-podControllerErrChan:
s.retChan <- podControllerErr
} }
return s.retChan return s.pc.Err()
} }
func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunction) error { func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunction) error {
@@ -305,8 +292,7 @@ func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunct
configMapInformer := sharedInformerFactory.Core().V1().ConfigMaps() configMapInformer := sharedInformerFactory.Core().V1().ConfigMaps()
serviceInformer := sharedInformerFactory.Core().V1().Services() serviceInformer := sharedInformerFactory.Core().V1().Services()
sys := &system{ sys := &system{
client: client, client: client,
retChan: make(chan error, 1),
podControllerConfig: PodControllerConfig{ podControllerConfig: PodControllerConfig{
PodClient: client.CoreV1(), PodClient: client.CoreV1(),
PodInformer: podInformer, PodInformer: podInformer,
@@ -338,7 +324,7 @@ func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunct
// Shutdown the pod controller, and wait for it to exit // Shutdown the pod controller, and wait for it to exit
cancel() cancel()
return <-sys.retChan return nil
} }
func testFailedPodScenario(ctx context.Context, t *testing.T, s *system) { func testFailedPodScenario(ctx context.Context, t *testing.T, s *system) {
@@ -359,7 +345,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
assert.NilError(t, e) assert.NilError(t, e)
// Start the pod controller // Start the pod controller
s.start(ctx) assert.NilError(t, s.start(ctx))
for s.pc.k8sQ.Len() > 0 { for s.pc.k8sQ.Len() > 0 {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@@ -379,7 +365,7 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo
assert.NilError(t, m.CreatePod(ctx, pod)) assert.NilError(t, m.CreatePod(ctx, pod))
// Start the pod controller // Start the pod controller
s.start(ctx) assert.NilError(t, s.start(ctx))
assert.Assert(t, is.Equal(m.deletes.read(), 1)) assert.Assert(t, is.Equal(m.deletes.read(), 1))
@@ -444,8 +430,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
watchErrCh <- watchErr watchErrCh <- watchErr
}() }()
// Start the pod controller assert.NilError(t, s.start(ctx))
podControllerErrCh := s.start(ctx)
// Wait for the pod to go into running // Wait for the pod to go into running
select { select {
@@ -453,9 +438,6 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
t.Fatalf("Context ended early: %s", ctx.Err().Error()) t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-watchErrCh: case err = <-watchErrCh:
assert.NilError(t, err) assert.NilError(t, err)
case err = <-podControllerErrCh:
assert.NilError(t, err)
t.Fatal("Pod controller terminated early")
} }
// Setup a watch prior to pod deletion // Setup a watch prior to pod deletion
@@ -483,12 +465,8 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error()) t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-podControllerErrCh:
assert.NilError(t, err)
t.Fatal("Pod controller exited prematurely without error")
case err = <-watchErrCh: case err = <-watchErrCh:
assert.NilError(t, err) assert.NilError(t, err)
} }
} }
@@ -526,18 +504,14 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
}() }()
// Start the pod controller // Start the pod controller
podControllerErrCh := s.start(ctx) assert.NilError(t, s.start(ctx))
// Wait for pod to be in running // Wait for pod to be in running
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error()) t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-podControllerErrCh:
assert.NilError(t, err)
t.Fatal("Pod controller exited prematurely without error")
case err = <-watchErrCh: case err = <-watchErrCh:
assert.NilError(t, err) assert.NilError(t, err)
} }
// Update the pod // Update the pod
@@ -592,14 +566,14 @@ func testPodStatusMissingWhileRunningScenario(ctx context.Context, t *testing.T,
}() }()
// Start the pod controller // Start the pod controller
podControllerErrCh := s.start(ctx) assert.NilError(t, s.start(ctx))
// Wait for pod to be in running // Wait for pod to be in running
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error()) t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-podControllerErrCh: case <-s.pc.Done():
assert.NilError(t, err) assert.NilError(t, s.pc.Err())
t.Fatal("Pod controller exited prematurely without error") t.Fatal("Pod controller exited prematurely without error")
case err = <-watchErrCh: case err = <-watchErrCh:
assert.NilError(t, err) assert.NilError(t, err)
@@ -627,8 +601,8 @@ func testPodStatusMissingWhileRunningScenario(ctx context.Context, t *testing.T,
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error()) t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-podControllerErrCh: case <-s.pc.Done():
assert.NilError(t, err) assert.NilError(t, s.pc.Err())
t.Fatal("Pod controller exited prematurely without error") t.Fatal("Pod controller exited prematurely without error")
case err = <-watchErrCh: case err = <-watchErrCh:
assert.NilError(t, err) assert.NilError(t, err)
@@ -654,18 +628,14 @@ func benchmarkCreatePods(ctx context.Context, b *testing.B, s *system) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
errCh := s.start(ctx) assert.NilError(b, s.start(ctx))
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
pod := newPod(randomizeUID, randomizeName) pod := newPod(randomizeUID, randomizeName)
_, err := s.client.CoreV1().Pods(pod.Namespace).Create(pod) _, err := s.client.CoreV1().Pods(pod.Namespace).Create(pod)
assert.NilError(b, err) assert.NilError(b, err)
select { assert.NilError(b, ctx.Err())
case err = <-errCh:
b.Fatalf("Benchmark terminated with error: %+v", err)
default:
}
} }
} }

View File

@@ -17,6 +17,7 @@ package node
import ( import (
"context" "context"
"testing" "testing"
"time"
pkgerrors "github.com/pkg/errors" pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/errdefs"
@@ -26,7 +27,9 @@ import (
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
) )
type TestController struct { type TestController struct {
@@ -40,19 +43,29 @@ func newTestController() *TestController {
rm := testutil.FakeResourceManager() rm := testutil.FakeResourceManager()
p := newMockProvider() p := newMockProvider()
iFactory := kubeinformers.NewSharedInformerFactoryWithOptions(fk8s, 10*time.Minute)
return &TestController{ return &TestController{
PodController: &PodController{ PodController: &PodController{
client: fk8s.CoreV1(), client: fk8s.CoreV1(),
provider: p, provider: p,
resourceManager: rm, resourceManager: rm,
recorder: testutil.FakeEventRecorder(5), recorder: testutil.FakeEventRecorder(5),
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
done: make(chan struct{}),
ready: make(chan struct{}),
podsInformer: iFactory.Core().V1().Pods(),
}, },
mock: p, mock: p,
client: fk8s, client: fk8s,
} }
} }
// Run starts the informer and runs the pod controller
func (tc *TestController) Run(ctx context.Context, n int) error {
go tc.podsInformer.Informer().Run(ctx.Done())
return tc.PodController.Run(ctx, n)
}
func TestPodsEqual(t *testing.T) { func TestPodsEqual(t *testing.T) {
p1 := &corev1.Pod{ p1 := &corev1.Pod{
Spec: corev1.PodSpec{ Spec: corev1.PodSpec{

View File

@@ -100,10 +100,6 @@ type PodController struct {
// recorder is an event recorder for recording Event resources to the Kubernetes API. // recorder is an event recorder for recording Event resources to the Kubernetes API.
recorder record.EventRecorder recorder record.EventRecorder
// ready is a channel which will be closed once the pod controller is fully up and running.
// this channel will never be closed if there is an error on startup.
ready chan struct{}
client corev1client.PodsGetter client corev1client.PodsGetter
resourceManager *manager.ResourceManager resourceManager *manager.ResourceManager
@@ -113,6 +109,22 @@ type PodController struct {
// From the time of creation, to termination the knownPods map will contain the pods key // From the time of creation, to termination the knownPods map will contain the pods key
// (derived from Kubernetes' cache library) -> a *knownPod struct. // (derived from Kubernetes' cache library) -> a *knownPod struct.
knownPods sync.Map knownPods sync.Map
// ready is a channel which will be closed once the pod controller is fully up and running.
// this channel will never be closed if there is an error on startup.
ready chan struct{}
// done is closed when Run returns
// Once done is closed `err` may be set to a non-nil value
done chan struct{}
mu sync.Mutex
// err is set if there is an error while while running the pod controller.
// Typically this would be errors that occur during startup.
// Once err is set, `Run` should return.
//
// This is used since `pc.Run()` is typically called in a goroutine and managing
// this can be non-trivial for callers.
err error
} }
type knownPod struct { type knownPod struct {
@@ -180,6 +192,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
provider: cfg.Provider, provider: cfg.Provider,
resourceManager: rm, resourceManager: rm,
ready: make(chan struct{}), ready: make(chan struct{}),
done: make(chan struct{}),
recorder: cfg.EventRecorder, recorder: cfg.EventRecorder,
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"), k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
} }
@@ -187,10 +200,21 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
return pc, nil return pc, nil
} }
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. // Run will set up the event handlers for types we are interested in, as well
// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. // as syncing informer caches and starting workers. It will block until the
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { // context is cancelled, at which point it will shutdown the work queue and
defer pc.k8sQ.ShutDown() // wait for workers to finish processing their current work items.
//
// Once this returns, you should not re-use the controller.
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) {
defer func() {
pc.k8sQ.ShutDown()
pc.mu.Lock()
pc.err = retErr
close(pc.done)
pc.mu.Unlock()
}()
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
pc.runSyncFromProvider(ctx, podStatusQueue) pc.runSyncFromProvider(ctx, podStatusQueue)
@@ -274,6 +298,19 @@ func (pc *PodController) Ready() <-chan struct{} {
return pc.ready return pc.ready
} }
// Done returns a channel receiver which is closed when the pod controller has exited.
// Once the pod controller has exited you can call `pc.Err()` to see if any error occurred.
func (pc *PodController) Done() <-chan struct{} {
return pc.done
}
// Err returns any error that has occurred and caused the pod controller to exit.
func (pc *PodController) Err() error {
pc.mu.Lock()
defer pc.mu.Unlock()
return pc.err
}
// runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue. // runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue.
func (pc *PodController) runWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { func (pc *PodController) runWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processNextWorkItem(ctx, workerID, q) { for pc.processNextWorkItem(ctx, workerID, q) {

View File

@@ -0,0 +1,44 @@
package node
import (
"context"
"testing"
"time"
"gotest.tools/assert"
)
func TestPodControllerExitOnContextCancel(t *testing.T) {
tc := newTestController()
ctx := context.Background()
ctxRun, cancel := context.WithCancel(ctx)
done := make(chan error)
go func() {
done <- tc.Run(ctxRun, 1)
}()
ctxT, cancelT := context.WithTimeout(ctx, 30*time.Second)
select {
case <-ctx.Done():
assert.NilError(t, ctxT.Err())
case <-tc.Ready():
case <-tc.Done():
}
assert.NilError(t, tc.Err())
cancelT()
cancel()
ctxT, cancelT = context.WithTimeout(ctx, 30*time.Second)
defer cancelT()
select {
case <-ctxT.Done():
assert.NilError(t, ctxT.Err(), "timeout waiting for Run() to exit")
case err := <-done:
assert.NilError(t, err)
}
assert.NilError(t, tc.Err())
}