From bb9ff1adf35e3737ce43549a796fa13dc40f2022 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Tue, 10 Sep 2019 09:44:19 -0700 Subject: [PATCH] Adds Done() and Err() to pod controller (#735) Allows callers to wait for pod controller exit in addition to readiness. This means the caller does not have to deal handling errors from the pod controller running in a gorutine since it can wait for exit via `Done()` and check the error with `Err()` --- .../internal/commands/root/root.go | 31 +++------ node/doc.go | 7 +- node/lifecycle_test.go | 68 ++++++------------- node/pod_test.go | 15 +++- node/podcontroller.go | 53 ++++++++++++--- node/podcontroller_test.go | 44 ++++++++++++ 6 files changed, 136 insertions(+), 82 deletions(-) create mode 100644 node/podcontroller_test.go diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index baaa99eca..5f56a8c21 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -18,7 +18,6 @@ import ( "context" "os" "path" - "time" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -207,11 +206,16 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { }() if c.StartupTimeout > 0 { - // If there is a startup timeout, it does two things: - // 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period - // 2. It prevents node advertisement from happening until we're in an operational state - err = waitFor(ctx, c.StartupTimeout, pc.Ready()) - if err != nil { + ctx, cancel := context.WithTimeout(ctx, c.StartupTimeout) + log.G(ctx).Info("Waiting for pod controller / VK to be ready") + select { + case <-ctx.Done(): + cancel() + return ctx.Err() + case <-pc.Ready(): + } + cancel() + if err := pc.Err(); err != nil { return err } } @@ -228,21 +232,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { return nil } -func waitFor(ctx context.Context, time time.Duration, ready <-chan struct{}) error { - ctx, cancel := context.WithTimeout(ctx, time) - defer cancel() - - // Wait for the VK / PC close the the ready channel, or time out and return - log.G(ctx).Info("Waiting for pod controller / VK to be ready") - - select { - case <-ready: - return nil - case <-ctx.Done(): - return errors.Wrap(ctx.Err(), "Error while starting up VK") - } -} - func newClient(configPath string) (*kubernetes.Clientset, error) { var config *rest.Config diff --git a/node/doc.go b/node/doc.go index 34ffcac18..6bb76b309 100644 --- a/node/doc.go +++ b/node/doc.go @@ -27,9 +27,10 @@ There are two primary controllers, the node runner and the pod runner. select { case <-podRunner.Ready(): - go nodeRunner.Run(ctx) - case <-ctx.Done() - return ctx.Err() + case <-podRunner.Done(): + } + if podRunner.Err() != nil { + // handle error } After calling start, cancelling the passed in context will shutdown the diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index bf16e854f..6a4ea0325 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -237,33 +237,20 @@ func TestPodLifecycle(t *testing.T) { type testFunction func(ctx context.Context, s *system) type system struct { - retChan chan error pc *PodController client *fake.Clientset podControllerConfig PodControllerConfig } -func (s *system) start(ctx context.Context) chan error { - podControllerErrChan := make(chan error) - go func() { - podControllerErrChan <- s.pc.Run(ctx, podSyncWorkers) - }() - - // We need to wait for the pod controller to start. If there is an error before the pod controller starts, or - // the context is cancelled. If the context is cancelled, the startup will be aborted, and the pod controller - // will return an error, so we don't need to wait on ctx.Done() +func (s *system) start(ctx context.Context) error { + go s.pc.Run(ctx, podSyncWorkers) // nolint:errcheck select { case <-s.pc.Ready(): - // This listens for errors, or exits in the future. - go func() { - podControllerErr := <-podControllerErrChan - s.retChan <- podControllerErr - }() - // If there is an error before things are ready, we need to forward it immediately - case podControllerErr := <-podControllerErrChan: - s.retChan <- podControllerErr + case <-s.pc.Done(): + case <-ctx.Done(): + return ctx.Err() } - return s.retChan + return s.pc.Err() } func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunction) error { @@ -305,8 +292,7 @@ func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunct configMapInformer := sharedInformerFactory.Core().V1().ConfigMaps() serviceInformer := sharedInformerFactory.Core().V1().Services() sys := &system{ - client: client, - retChan: make(chan error, 1), + client: client, podControllerConfig: PodControllerConfig{ PodClient: client.CoreV1(), PodInformer: podInformer, @@ -338,7 +324,7 @@ func wireUpSystem(ctx context.Context, provider PodLifecycleHandler, f testFunct // Shutdown the pod controller, and wait for it to exit cancel() - return <-sys.retChan + return nil } func testFailedPodScenario(ctx context.Context, t *testing.T, s *system) { @@ -359,7 +345,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system, assert.NilError(t, e) // Start the pod controller - s.start(ctx) + assert.NilError(t, s.start(ctx)) for s.pc.k8sQ.Len() > 0 { time.Sleep(10 * time.Millisecond) @@ -379,7 +365,7 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo assert.NilError(t, m.CreatePod(ctx, pod)) // Start the pod controller - s.start(ctx) + assert.NilError(t, s.start(ctx)) assert.Assert(t, is.Equal(m.deletes.read(), 1)) @@ -444,8 +430,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, watchErrCh <- watchErr }() - // Start the pod controller - podControllerErrCh := s.start(ctx) + assert.NilError(t, s.start(ctx)) // Wait for the pod to go into running select { @@ -453,9 +438,6 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, t.Fatalf("Context ended early: %s", ctx.Err().Error()) case err = <-watchErrCh: assert.NilError(t, err) - case err = <-podControllerErrCh: - assert.NilError(t, err) - t.Fatal("Pod controller terminated early") } // Setup a watch prior to pod deletion @@ -483,12 +465,8 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, select { case <-ctx.Done(): t.Fatalf("Context ended early: %s", ctx.Err().Error()) - case err = <-podControllerErrCh: - assert.NilError(t, err) - t.Fatal("Pod controller exited prematurely without error") case err = <-watchErrCh: assert.NilError(t, err) - } } @@ -526,18 +504,14 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys }() // Start the pod controller - podControllerErrCh := s.start(ctx) + assert.NilError(t, s.start(ctx)) // Wait for pod to be in running select { case <-ctx.Done(): t.Fatalf("Context ended early: %s", ctx.Err().Error()) - case err = <-podControllerErrCh: - assert.NilError(t, err) - t.Fatal("Pod controller exited prematurely without error") case err = <-watchErrCh: assert.NilError(t, err) - } // Update the pod @@ -592,14 +566,14 @@ func testPodStatusMissingWhileRunningScenario(ctx context.Context, t *testing.T, }() // Start the pod controller - podControllerErrCh := s.start(ctx) + assert.NilError(t, s.start(ctx)) // Wait for pod to be in running select { case <-ctx.Done(): t.Fatalf("Context ended early: %s", ctx.Err().Error()) - case err = <-podControllerErrCh: - assert.NilError(t, err) + case <-s.pc.Done(): + assert.NilError(t, s.pc.Err()) t.Fatal("Pod controller exited prematurely without error") case err = <-watchErrCh: assert.NilError(t, err) @@ -627,8 +601,8 @@ func testPodStatusMissingWhileRunningScenario(ctx context.Context, t *testing.T, select { case <-ctx.Done(): t.Fatalf("Context ended early: %s", ctx.Err().Error()) - case err = <-podControllerErrCh: - assert.NilError(t, err) + case <-s.pc.Done(): + assert.NilError(t, s.pc.Err()) t.Fatal("Pod controller exited prematurely without error") case err = <-watchErrCh: assert.NilError(t, err) @@ -654,18 +628,14 @@ func benchmarkCreatePods(ctx context.Context, b *testing.B, s *system) { ctx, cancel := context.WithCancel(ctx) defer cancel() - errCh := s.start(ctx) + assert.NilError(b, s.start(ctx)) b.ResetTimer() for i := 0; i < b.N; i++ { pod := newPod(randomizeUID, randomizeName) _, err := s.client.CoreV1().Pods(pod.Namespace).Create(pod) assert.NilError(b, err) - select { - case err = <-errCh: - b.Fatalf("Benchmark terminated with error: %+v", err) - default: - } + assert.NilError(b, ctx.Err()) } } diff --git a/node/pod_test.go b/node/pod_test.go index 7fecc0826..675b4c0cb 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -17,6 +17,7 @@ package node import ( "context" "testing" + "time" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/errdefs" @@ -26,7 +27,9 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/util/workqueue" ) type TestController struct { @@ -40,19 +43,29 @@ func newTestController() *TestController { rm := testutil.FakeResourceManager() p := newMockProvider() - + iFactory := kubeinformers.NewSharedInformerFactoryWithOptions(fk8s, 10*time.Minute) return &TestController{ PodController: &PodController{ client: fk8s.CoreV1(), provider: p, resourceManager: rm, recorder: testutil.FakeEventRecorder(5), + k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + done: make(chan struct{}), + ready: make(chan struct{}), + podsInformer: iFactory.Core().V1().Pods(), }, mock: p, client: fk8s, } } +// Run starts the informer and runs the pod controller +func (tc *TestController) Run(ctx context.Context, n int) error { + go tc.podsInformer.Informer().Run(ctx.Done()) + return tc.PodController.Run(ctx, n) +} + func TestPodsEqual(t *testing.T) { p1 := &corev1.Pod{ Spec: corev1.PodSpec{ diff --git a/node/podcontroller.go b/node/podcontroller.go index 0aa089c00..99a641d13 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -100,10 +100,6 @@ type PodController struct { // recorder is an event recorder for recording Event resources to the Kubernetes API. recorder record.EventRecorder - // ready is a channel which will be closed once the pod controller is fully up and running. - // this channel will never be closed if there is an error on startup. - ready chan struct{} - client corev1client.PodsGetter resourceManager *manager.ResourceManager @@ -113,6 +109,22 @@ type PodController struct { // From the time of creation, to termination the knownPods map will contain the pods key // (derived from Kubernetes' cache library) -> a *knownPod struct. knownPods sync.Map + + // ready is a channel which will be closed once the pod controller is fully up and running. + // this channel will never be closed if there is an error on startup. + ready chan struct{} + // done is closed when Run returns + // Once done is closed `err` may be set to a non-nil value + done chan struct{} + + mu sync.Mutex + // err is set if there is an error while while running the pod controller. + // Typically this would be errors that occur during startup. + // Once err is set, `Run` should return. + // + // This is used since `pc.Run()` is typically called in a goroutine and managing + // this can be non-trivial for callers. + err error } type knownPod struct { @@ -180,6 +192,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { provider: cfg.Provider, resourceManager: rm, ready: make(chan struct{}), + done: make(chan struct{}), recorder: cfg.EventRecorder, k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"), } @@ -187,10 +200,21 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { return pc, nil } -// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. -// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. -func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { - defer pc.k8sQ.ShutDown() +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until the +// context is cancelled, at which point it will shutdown the work queue and +// wait for workers to finish processing their current work items. +// +// Once this returns, you should not re-use the controller. +func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) { + defer func() { + pc.k8sQ.ShutDown() + + pc.mu.Lock() + pc.err = retErr + close(pc.done) + pc.mu.Unlock() + }() podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") pc.runSyncFromProvider(ctx, podStatusQueue) @@ -274,6 +298,19 @@ func (pc *PodController) Ready() <-chan struct{} { return pc.ready } +// Done returns a channel receiver which is closed when the pod controller has exited. +// Once the pod controller has exited you can call `pc.Err()` to see if any error occurred. +func (pc *PodController) Done() <-chan struct{} { + return pc.done +} + +// Err returns any error that has occurred and caused the pod controller to exit. +func (pc *PodController) Err() error { + pc.mu.Lock() + defer pc.mu.Unlock() + return pc.err +} + // runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue. func (pc *PodController) runWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { for pc.processNextWorkItem(ctx, workerID, q) { diff --git a/node/podcontroller_test.go b/node/podcontroller_test.go new file mode 100644 index 000000000..e9ed7260d --- /dev/null +++ b/node/podcontroller_test.go @@ -0,0 +1,44 @@ +package node + +import ( + "context" + "testing" + "time" + + "gotest.tools/assert" +) + +func TestPodControllerExitOnContextCancel(t *testing.T) { + tc := newTestController() + ctx := context.Background() + ctxRun, cancel := context.WithCancel(ctx) + + done := make(chan error) + go func() { + done <- tc.Run(ctxRun, 1) + }() + + ctxT, cancelT := context.WithTimeout(ctx, 30*time.Second) + select { + case <-ctx.Done(): + assert.NilError(t, ctxT.Err()) + case <-tc.Ready(): + case <-tc.Done(): + } + assert.NilError(t, tc.Err()) + + cancelT() + + cancel() + + ctxT, cancelT = context.WithTimeout(ctx, 30*time.Second) + defer cancelT() + + select { + case <-ctxT.Done(): + assert.NilError(t, ctxT.Err(), "timeout waiting for Run() to exit") + case err := <-done: + assert.NilError(t, err) + } + assert.NilError(t, tc.Err()) +}