diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index 2252d651e..6d21accf7 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -199,40 +199,31 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } defer cancelHTTP() - go func() { - if err := pc.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled { - log.G(ctx).Fatal(err) - } - }() + cm := nodeutil.NewControllerManager(nodeRunner, pc) + go cm.Run(ctx, c.PodSyncWorkers) // nolint:errcheck - if c.StartupTimeout > 0 { - ctx, cancel := context.WithTimeout(ctx, c.StartupTimeout) - log.G(ctx).Info("Waiting for pod controller / VK to be ready") - select { - case <-ctx.Done(): - cancel() - return ctx.Err() - case <-pc.Ready(): - } + defer func() { + log.G(ctx).Debug("Waiting for controllers to be done") cancel() - if err := pc.Err(); err != nil { - return err - } - } - - go func() { - if err := nodeRunner.Run(ctx); err != nil { - log.G(ctx).Fatal(err) - } + <-cm.Done() }() + log.G(ctx).Info("Waiting for controller to be ready") + if err := cm.WaitReady(ctx, c.StartupTimeout); err != nil { + return err + } + setNodeReady(pNode) if err := np.UpdateStatus(ctx, pNode); err != nil { return errors.Wrap(err, "error marking the node as ready") } log.G(ctx).Info("Initialized") - <-ctx.Done() + select { + case <-ctx.Done(): + case <-cm.Done(): + return cm.Err() + } return nil } diff --git a/node/node.go b/node/node.go index 4b2f3aa79..232601d7b 100644 --- a/node/node.go +++ b/node/node.go @@ -84,6 +84,7 @@ func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface serverNode: node, nodes: nodes, chReady: make(chan struct{}), + chDone: make(chan struct{}), } for _, o := range opts { if err := o(n); err != nil { @@ -223,7 +224,12 @@ type NodeController struct { // nolint:golint nodeStatusUpdateErrorHandler ErrorHandler + // chReady is closed once the controller is ready to start the control loop chReady chan struct{} + // chDone is closed once the control loop has exited + chDone chan struct{} + errMu sync.Mutex + err error nodePingController *nodePingController pingTimeout *time.Duration @@ -249,7 +255,14 @@ const ( // node status update (because some things still expect the node to be updated // periodically), otherwise it will only use node status update with the configured // ping interval. -func (n *NodeController) Run(ctx context.Context) error { +func (n *NodeController) Run(ctx context.Context) (retErr error) { + defer func() { + n.errMu.Lock() + n.err = retErr + n.errMu.Unlock() + close(n.chDone) + }() + n.chStatusUpdate = make(chan *corev1.Node, 1) n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) { n.chStatusUpdate <- node @@ -273,6 +286,22 @@ func (n *NodeController) Run(ctx context.Context) error { return n.controlLoop(ctx, providerNode) } +// Done signals to the caller when the controller is done and the control loop is exited. +// +// Call n.Err() to find out if there was an error. +func (n *NodeController) Done() <-chan struct{} { + return n.chDone +} + +// Err returns any errors that have occurred that trigger the control loop to exit. +// +// Err only returns a non-nil error after `<-n.Done()` returns. +func (n *NodeController) Err() error { + n.errMu.Lock() + defer n.errMu.Unlock() + return n.err +} + func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.Node) (err error) { ctx, span := trace.StartSpan(ctx, "node.ensureNode") defer span.End() @@ -307,14 +336,12 @@ func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.No // Ready returns a channel that gets closed when the node is fully up and // running. Note that if there is an error on startup this channel will never -// be started. +// be closed. func (n *NodeController) Ready() <-chan struct{} { return n.chReady } func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.Node) error { - close(n.chReady) - defer n.group.Wait() var sleepInterval time.Duration @@ -355,6 +382,7 @@ func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.N return false } + close(n.chReady) for { shouldTerminate := loop() if shouldTerminate { diff --git a/node/node_test.go b/node/node_test.go index aa037e465..e4d97375a 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -65,16 +65,13 @@ func testNodeRun(t *testing.T, enableLease bool) { node, err := NewNodeController(testP, testNode, nodes, opts...) assert.NilError(t, err) - chErr := make(chan error) defer func() { cancel() - assert.NilError(t, <-chErr) + <-node.Done() + assert.NilError(t, node.Err()) }() - go func() { - chErr <- node.Run(ctx) - close(chErr) - }() + go node.Run(ctx) // nolint:errcheck nw := makeWatch(ctx, t, nodes, testNodeCopy.Name) defer nw.Stop() @@ -103,8 +100,8 @@ func testNodeRun(t *testing.T, enableLease bool) { case <-time.After(time.Second): t.Errorf("timeout waiting for event") continue - case err := <-chErr: - t.Fatal(err) // if this returns at all it is an error regardless if err is nil + case <-node.Done(): + t.Fatal(node.Err()) // if this returns at all it is an error regardless if err is nil case <-nr: nodeUpdates++ continue @@ -152,8 +149,8 @@ func testNodeRun(t *testing.T, enableLease bool) { defer eCancel() select { - case err := <-chErr: - t.Fatal(err) // if this returns at all it is an error regardless if err is nil + case <-node.Done(): + t.Fatal(node.Err()) // if this returns at all it is an error regardless if err is nil case err := <-waitForEvent(eCtx, nr, func(e watch.Event) bool { node := e.Object.(*corev1.Node) if len(node.Status.Conditions) == 0 { @@ -192,10 +189,7 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) { ) assert.NilError(t, err) - chErr := make(chan error, 1) - go func() { - chErr <- node.Run(ctx) - }() + go node.Run(ctx) // nolint:errcheck timer := time.NewTimer(10 * time.Second) defer timer.Stop() @@ -204,8 +198,8 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) { select { case <-timer.C: t.Fatal("timeout waiting for node to be ready") - case <-chErr: - t.Fatalf("node.Run returned earlier than expected: %v", err) + case <-node.Done(): + t.Fatalf("node.Run returned earlier than expected: %v", node.Err()) case <-node.Ready(): } @@ -218,8 +212,8 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) { defer timer.Stop() select { - case err := <-chErr: - assert.Equal(t, err, nil) + case <-node.Done(): + assert.NilError(t, node.Err()) case <-timer.C: t.Fatal("timeout waiting for node shutdown") } @@ -301,9 +295,11 @@ func TestPingAfterStatusUpdate(t *testing.T) { node, err := NewNodeController(testP, testNode, nodes, opts...) assert.NilError(t, err) - chErr := make(chan error, 1) - go func() { - chErr <- node.Run(ctx) + go node.Run(ctx) // nolint:errcheck + defer func() { + cancel() + <-node.Done() + assert.NilError(t, node.Err()) }() timer := time.NewTimer(10 * time.Second) @@ -313,10 +309,11 @@ func TestPingAfterStatusUpdate(t *testing.T) { select { case <-timer.C: t.Fatal("timeout waiting for node to be ready") - case <-chErr: - t.Fatalf("node.Run returned earlier than expected: %v", err) + case <-node.Done(): + t.Fatalf("node.Run returned earlier than expected: %v", node.Err()) case <-node.Ready(): } + timer.Stop() notifyTimer := time.After(interval * time.Duration(10)) <-notifyTimer @@ -360,16 +357,13 @@ func TestBeforeAnnotationsPreserved(t *testing.T) { node, err := NewNodeController(testP, testNode, nodes, opts...) assert.NilError(t, err) - chErr := make(chan error) defer func() { cancel() - assert.NilError(t, <-chErr) + <-node.Done() + assert.NilError(t, node.Err()) }() - go func() { - chErr <- node.Run(ctx) - close(chErr) - }() + go node.Run(ctx) // nolint:errcheck nw := makeWatch(ctx, t, nodes, testNodeCopy.Name) defer nw.Stop() @@ -427,16 +421,13 @@ func TestManualConditionsPreserved(t *testing.T) { node, err := NewNodeController(testP, testNode, nodes, opts...) assert.NilError(t, err) - chErr := make(chan error) defer func() { cancel() - assert.NilError(t, <-chErr) + <-node.Done() + assert.NilError(t, node.Err()) }() - go func() { - chErr <- node.Run(ctx) - close(chErr) - }() + go node.Run(ctx) // nolint:errcheck nw := makeWatch(ctx, t, nodes, testNodeCopy.Name) defer nw.Stop() diff --git a/node/nodeutil/controller.go b/node/nodeutil/controller.go new file mode 100644 index 000000000..5521f7935 --- /dev/null +++ b/node/nodeutil/controller.go @@ -0,0 +1,136 @@ +package nodeutil + +import ( + "context" + "fmt" + "time" + + "github.com/virtual-kubelet/virtual-kubelet/node" +) + +// ControllerManager helps manage the startup/shutdown procedure for other controllers. +// It is intended as a convenience to reduce boiler plate code for starting up controllers. +// +// Must be created with constructor `NewControllerManager`. +type ControllerManager struct { + nc *node.NodeController + pc *node.PodController + + ready chan struct{} + done chan struct{} + err error +} + +// NewControllerManager creates a new ControllerManager. +func NewControllerManager(nc *node.NodeController, pc *node.PodController) *ControllerManager { + return &ControllerManager{ + nc: nc, + pc: pc, + ready: make(chan struct{}), + done: make(chan struct{}), + } +} + +// NodeController returns the configured node controller. +func (c *ControllerManager) NodeController() *node.NodeController { + return c.nc +} + +// PodController returns the configured pod controller. +func (c *ControllerManager) PodController() *node.PodController { + return c.pc +} + +// Run starts all the underlying controllers +func (c *ControllerManager) Run(ctx context.Context, workers int) (retErr error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + go c.pc.Run(ctx, workers) // nolint:errcheck + + defer func() { + cancel() + + <-c.pc.Done() + + c.err = retErr + close(c.done) + }() + + select { + case <-ctx.Done(): + return c.err + case <-c.pc.Ready(): + case <-c.pc.Done(): + return c.pc.Err() + } + + go c.nc.Run(ctx) // nolint:errcheck + + defer func() { + cancel() + <-c.nc.Done() + }() + + select { + case <-ctx.Done(): + c.err = ctx.Err() + return c.err + case <-c.nc.Ready(): + case <-c.nc.Done(): + return c.nc.Err() + } + + close(c.ready) + + select { + case <-c.nc.Done(): + cancel() + return c.nc.Err() + case <-c.pc.Done(): + cancel() + return c.pc.Err() + } +} + +// WaitReady waits for the specified timeout for the controller to be ready. +// +// The timeout is for convenience so the caller doesn't have to juggle an extra context. +func (c *ControllerManager) WaitReady(ctx context.Context, timeout time.Duration) error { + if timeout > 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + select { + case <-c.ready: + return nil + case <-c.done: + return fmt.Errorf("controller exited before ready: %w", c.err) + case <-ctx.Done(): + return ctx.Err() + } +} + +// Ready returns a channel that will be closed after the controller is ready. +func (c *ControllerManager) Ready() <-chan struct{} { + return c.ready +} + +// Done returns a channel that will be closed when the controller has exited. +func (c *ControllerManager) Done() <-chan struct{} { + return c.done +} + +// Err returns any error that occurred with the controller. +// +// This always return nil before `<-Done()`. +func (c *ControllerManager) Err() error { + select { + case <-c.Done(): + return c.err + default: + return nil + } +}