Merge pull request #967 from cpuguy83/controller_manager2

Move some boiler plate startup logic to nodeutil
This commit is contained in:
Brian Goff
2021-06-01 12:05:59 -07:00
committed by GitHub
4 changed files with 209 additions and 63 deletions

View File

@@ -199,40 +199,31 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
} }
defer cancelHTTP() defer cancelHTTP()
go func() { cm := nodeutil.NewControllerManager(nodeRunner, pc)
if err := pc.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled { go cm.Run(ctx, c.PodSyncWorkers) // nolint:errcheck
log.G(ctx).Fatal(err)
}
}()
if c.StartupTimeout > 0 { defer func() {
ctx, cancel := context.WithTimeout(ctx, c.StartupTimeout) log.G(ctx).Debug("Waiting for controllers to be done")
log.G(ctx).Info("Waiting for pod controller / VK to be ready")
select {
case <-ctx.Done():
cancel()
return ctx.Err()
case <-pc.Ready():
}
cancel() cancel()
if err := pc.Err(); err != nil { <-cm.Done()
return err
}
}
go func() {
if err := nodeRunner.Run(ctx); err != nil {
log.G(ctx).Fatal(err)
}
}() }()
log.G(ctx).Info("Waiting for controller to be ready")
if err := cm.WaitReady(ctx, c.StartupTimeout); err != nil {
return err
}
setNodeReady(pNode) setNodeReady(pNode)
if err := np.UpdateStatus(ctx, pNode); err != nil { if err := np.UpdateStatus(ctx, pNode); err != nil {
return errors.Wrap(err, "error marking the node as ready") return errors.Wrap(err, "error marking the node as ready")
} }
log.G(ctx).Info("Initialized") log.G(ctx).Info("Initialized")
<-ctx.Done() select {
case <-ctx.Done():
case <-cm.Done():
return cm.Err()
}
return nil return nil
} }

View File

@@ -84,6 +84,7 @@ func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface
serverNode: node, serverNode: node,
nodes: nodes, nodes: nodes,
chReady: make(chan struct{}), chReady: make(chan struct{}),
chDone: make(chan struct{}),
} }
for _, o := range opts { for _, o := range opts {
if err := o(n); err != nil { if err := o(n); err != nil {
@@ -223,7 +224,12 @@ type NodeController struct { // nolint:golint
nodeStatusUpdateErrorHandler ErrorHandler nodeStatusUpdateErrorHandler ErrorHandler
// chReady is closed once the controller is ready to start the control loop
chReady chan struct{} chReady chan struct{}
// chDone is closed once the control loop has exited
chDone chan struct{}
errMu sync.Mutex
err error
nodePingController *nodePingController nodePingController *nodePingController
pingTimeout *time.Duration pingTimeout *time.Duration
@@ -249,7 +255,14 @@ const (
// node status update (because some things still expect the node to be updated // node status update (because some things still expect the node to be updated
// periodically), otherwise it will only use node status update with the configured // periodically), otherwise it will only use node status update with the configured
// ping interval. // ping interval.
func (n *NodeController) Run(ctx context.Context) error { func (n *NodeController) Run(ctx context.Context) (retErr error) {
defer func() {
n.errMu.Lock()
n.err = retErr
n.errMu.Unlock()
close(n.chDone)
}()
n.chStatusUpdate = make(chan *corev1.Node, 1) n.chStatusUpdate = make(chan *corev1.Node, 1)
n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) { n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) {
n.chStatusUpdate <- node n.chStatusUpdate <- node
@@ -273,6 +286,22 @@ func (n *NodeController) Run(ctx context.Context) error {
return n.controlLoop(ctx, providerNode) return n.controlLoop(ctx, providerNode)
} }
// Done signals to the caller when the controller is done and the control loop is exited.
//
// Call n.Err() to find out if there was an error.
func (n *NodeController) Done() <-chan struct{} {
return n.chDone
}
// Err returns any errors that have occurred that trigger the control loop to exit.
//
// Err only returns a non-nil error after `<-n.Done()` returns.
func (n *NodeController) Err() error {
n.errMu.Lock()
defer n.errMu.Unlock()
return n.err
}
func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.Node) (err error) { func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.Node) (err error) {
ctx, span := trace.StartSpan(ctx, "node.ensureNode") ctx, span := trace.StartSpan(ctx, "node.ensureNode")
defer span.End() defer span.End()
@@ -307,14 +336,12 @@ func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.No
// Ready returns a channel that gets closed when the node is fully up and // Ready returns a channel that gets closed when the node is fully up and
// running. Note that if there is an error on startup this channel will never // running. Note that if there is an error on startup this channel will never
// be started. // be closed.
func (n *NodeController) Ready() <-chan struct{} { func (n *NodeController) Ready() <-chan struct{} {
return n.chReady return n.chReady
} }
func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.Node) error { func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.Node) error {
close(n.chReady)
defer n.group.Wait() defer n.group.Wait()
var sleepInterval time.Duration var sleepInterval time.Duration
@@ -355,6 +382,7 @@ func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.N
return false return false
} }
close(n.chReady)
for { for {
shouldTerminate := loop() shouldTerminate := loop()
if shouldTerminate { if shouldTerminate {

View File

@@ -65,16 +65,13 @@ func testNodeRun(t *testing.T, enableLease bool) {
node, err := NewNodeController(testP, testNode, nodes, opts...) node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err) assert.NilError(t, err)
chErr := make(chan error)
defer func() { defer func() {
cancel() cancel()
assert.NilError(t, <-chErr) <-node.Done()
assert.NilError(t, node.Err())
}() }()
go func() { go node.Run(ctx) // nolint:errcheck
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name) nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
defer nw.Stop() defer nw.Stop()
@@ -103,8 +100,8 @@ func testNodeRun(t *testing.T, enableLease bool) {
case <-time.After(time.Second): case <-time.After(time.Second):
t.Errorf("timeout waiting for event") t.Errorf("timeout waiting for event")
continue continue
case err := <-chErr: case <-node.Done():
t.Fatal(err) // if this returns at all it is an error regardless if err is nil t.Fatal(node.Err()) // if this returns at all it is an error regardless if err is nil
case <-nr: case <-nr:
nodeUpdates++ nodeUpdates++
continue continue
@@ -152,8 +149,8 @@ func testNodeRun(t *testing.T, enableLease bool) {
defer eCancel() defer eCancel()
select { select {
case err := <-chErr: case <-node.Done():
t.Fatal(err) // if this returns at all it is an error regardless if err is nil t.Fatal(node.Err()) // if this returns at all it is an error regardless if err is nil
case err := <-waitForEvent(eCtx, nr, func(e watch.Event) bool { case err := <-waitForEvent(eCtx, nr, func(e watch.Event) bool {
node := e.Object.(*corev1.Node) node := e.Object.(*corev1.Node)
if len(node.Status.Conditions) == 0 { if len(node.Status.Conditions) == 0 {
@@ -192,10 +189,7 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
) )
assert.NilError(t, err) assert.NilError(t, err)
chErr := make(chan error, 1) go node.Run(ctx) // nolint:errcheck
go func() {
chErr <- node.Run(ctx)
}()
timer := time.NewTimer(10 * time.Second) timer := time.NewTimer(10 * time.Second)
defer timer.Stop() defer timer.Stop()
@@ -204,8 +198,8 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
select { select {
case <-timer.C: case <-timer.C:
t.Fatal("timeout waiting for node to be ready") t.Fatal("timeout waiting for node to be ready")
case <-chErr: case <-node.Done():
t.Fatalf("node.Run returned earlier than expected: %v", err) t.Fatalf("node.Run returned earlier than expected: %v", node.Err())
case <-node.Ready(): case <-node.Ready():
} }
@@ -218,8 +212,8 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
defer timer.Stop() defer timer.Stop()
select { select {
case err := <-chErr: case <-node.Done():
assert.Equal(t, err, nil) assert.NilError(t, node.Err())
case <-timer.C: case <-timer.C:
t.Fatal("timeout waiting for node shutdown") t.Fatal("timeout waiting for node shutdown")
} }
@@ -301,9 +295,11 @@ func TestPingAfterStatusUpdate(t *testing.T) {
node, err := NewNodeController(testP, testNode, nodes, opts...) node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err) assert.NilError(t, err)
chErr := make(chan error, 1) go node.Run(ctx) // nolint:errcheck
go func() { defer func() {
chErr <- node.Run(ctx) cancel()
<-node.Done()
assert.NilError(t, node.Err())
}() }()
timer := time.NewTimer(10 * time.Second) timer := time.NewTimer(10 * time.Second)
@@ -313,10 +309,11 @@ func TestPingAfterStatusUpdate(t *testing.T) {
select { select {
case <-timer.C: case <-timer.C:
t.Fatal("timeout waiting for node to be ready") t.Fatal("timeout waiting for node to be ready")
case <-chErr: case <-node.Done():
t.Fatalf("node.Run returned earlier than expected: %v", err) t.Fatalf("node.Run returned earlier than expected: %v", node.Err())
case <-node.Ready(): case <-node.Ready():
} }
timer.Stop()
notifyTimer := time.After(interval * time.Duration(10)) notifyTimer := time.After(interval * time.Duration(10))
<-notifyTimer <-notifyTimer
@@ -360,16 +357,13 @@ func TestBeforeAnnotationsPreserved(t *testing.T) {
node, err := NewNodeController(testP, testNode, nodes, opts...) node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err) assert.NilError(t, err)
chErr := make(chan error)
defer func() { defer func() {
cancel() cancel()
assert.NilError(t, <-chErr) <-node.Done()
assert.NilError(t, node.Err())
}() }()
go func() { go node.Run(ctx) // nolint:errcheck
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name) nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
defer nw.Stop() defer nw.Stop()
@@ -427,16 +421,13 @@ func TestManualConditionsPreserved(t *testing.T) {
node, err := NewNodeController(testP, testNode, nodes, opts...) node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err) assert.NilError(t, err)
chErr := make(chan error)
defer func() { defer func() {
cancel() cancel()
assert.NilError(t, <-chErr) <-node.Done()
assert.NilError(t, node.Err())
}() }()
go func() { go node.Run(ctx) // nolint:errcheck
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name) nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
defer nw.Stop() defer nw.Stop()

136
node/nodeutil/controller.go Normal file
View File

@@ -0,0 +1,136 @@
package nodeutil
import (
"context"
"fmt"
"time"
"github.com/virtual-kubelet/virtual-kubelet/node"
)
// ControllerManager helps manage the startup/shutdown procedure for other controllers.
// It is intended as a convenience to reduce boiler plate code for starting up controllers.
//
// Must be created with constructor `NewControllerManager`.
type ControllerManager struct {
nc *node.NodeController
pc *node.PodController
ready chan struct{}
done chan struct{}
err error
}
// NewControllerManager creates a new ControllerManager.
func NewControllerManager(nc *node.NodeController, pc *node.PodController) *ControllerManager {
return &ControllerManager{
nc: nc,
pc: pc,
ready: make(chan struct{}),
done: make(chan struct{}),
}
}
// NodeController returns the configured node controller.
func (c *ControllerManager) NodeController() *node.NodeController {
return c.nc
}
// PodController returns the configured pod controller.
func (c *ControllerManager) PodController() *node.PodController {
return c.pc
}
// Run starts all the underlying controllers
func (c *ControllerManager) Run(ctx context.Context, workers int) (retErr error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go c.pc.Run(ctx, workers) // nolint:errcheck
defer func() {
cancel()
<-c.pc.Done()
c.err = retErr
close(c.done)
}()
select {
case <-ctx.Done():
return c.err
case <-c.pc.Ready():
case <-c.pc.Done():
return c.pc.Err()
}
go c.nc.Run(ctx) // nolint:errcheck
defer func() {
cancel()
<-c.nc.Done()
}()
select {
case <-ctx.Done():
c.err = ctx.Err()
return c.err
case <-c.nc.Ready():
case <-c.nc.Done():
return c.nc.Err()
}
close(c.ready)
select {
case <-c.nc.Done():
cancel()
return c.nc.Err()
case <-c.pc.Done():
cancel()
return c.pc.Err()
}
}
// WaitReady waits for the specified timeout for the controller to be ready.
//
// The timeout is for convenience so the caller doesn't have to juggle an extra context.
func (c *ControllerManager) WaitReady(ctx context.Context, timeout time.Duration) error {
if timeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
select {
case <-c.ready:
return nil
case <-c.done:
return fmt.Errorf("controller exited before ready: %w", c.err)
case <-ctx.Done():
return ctx.Err()
}
}
// Ready returns a channel that will be closed after the controller is ready.
func (c *ControllerManager) Ready() <-chan struct{} {
return c.ready
}
// Done returns a channel that will be closed when the controller has exited.
func (c *ControllerManager) Done() <-chan struct{} {
return c.done
}
// Err returns any error that occurred with the controller.
//
// This always return nil before `<-Done()`.
func (c *ControllerManager) Err() error {
select {
case <-c.Done():
return c.err
default:
return nil
}
}