Move some boiler plate startup logic to nodeutil
This makes a controller that handles the startup for the node and pod controller. Later if we add an "api controller" it can also be added here. This is just part of reducing some of the boiler plate code so it is easier to get off of node-cli.
This commit is contained in:
@@ -199,40 +199,31 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
|||||||
}
|
}
|
||||||
defer cancelHTTP()
|
defer cancelHTTP()
|
||||||
|
|
||||||
go func() {
|
cm := nodeutil.NewControllerManager(nodeRunner, pc)
|
||||||
if err := pc.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled {
|
go cm.Run(ctx, c.PodSyncWorkers) // nolint:errcheck
|
||||||
log.G(ctx).Fatal(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if c.StartupTimeout > 0 {
|
defer func() {
|
||||||
ctx, cancel := context.WithTimeout(ctx, c.StartupTimeout)
|
log.G(ctx).Debug("Waiting for controllers to be done")
|
||||||
log.G(ctx).Info("Waiting for pod controller / VK to be ready")
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
cancel()
|
|
||||||
return ctx.Err()
|
|
||||||
case <-pc.Ready():
|
|
||||||
}
|
|
||||||
cancel()
|
cancel()
|
||||||
if err := pc.Err(); err != nil {
|
<-cm.Done()
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
if err := nodeRunner.Run(ctx); err != nil {
|
|
||||||
log.G(ctx).Fatal(err)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
log.G(ctx).Info("Waiting for controller to be ready")
|
||||||
|
if err := cm.WaitReady(ctx, c.StartupTimeout); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
setNodeReady(pNode)
|
setNodeReady(pNode)
|
||||||
if err := np.UpdateStatus(ctx, pNode); err != nil {
|
if err := np.UpdateStatus(ctx, pNode); err != nil {
|
||||||
return errors.Wrap(err, "error marking the node as ready")
|
return errors.Wrap(err, "error marking the node as ready")
|
||||||
}
|
}
|
||||||
log.G(ctx).Info("Initialized")
|
log.G(ctx).Info("Initialized")
|
||||||
|
|
||||||
<-ctx.Done()
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-cm.Done():
|
||||||
|
return cm.Err()
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
36
node/node.go
36
node/node.go
@@ -84,6 +84,7 @@ func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface
|
|||||||
serverNode: node,
|
serverNode: node,
|
||||||
nodes: nodes,
|
nodes: nodes,
|
||||||
chReady: make(chan struct{}),
|
chReady: make(chan struct{}),
|
||||||
|
chDone: make(chan struct{}),
|
||||||
}
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
if err := o(n); err != nil {
|
if err := o(n); err != nil {
|
||||||
@@ -223,7 +224,12 @@ type NodeController struct { // nolint:golint
|
|||||||
|
|
||||||
nodeStatusUpdateErrorHandler ErrorHandler
|
nodeStatusUpdateErrorHandler ErrorHandler
|
||||||
|
|
||||||
|
// chReady is closed once the controller is ready to start the control loop
|
||||||
chReady chan struct{}
|
chReady chan struct{}
|
||||||
|
// chDone is closed once the control loop has exited
|
||||||
|
chDone chan struct{}
|
||||||
|
errMu sync.Mutex
|
||||||
|
err error
|
||||||
|
|
||||||
nodePingController *nodePingController
|
nodePingController *nodePingController
|
||||||
pingTimeout *time.Duration
|
pingTimeout *time.Duration
|
||||||
@@ -249,7 +255,14 @@ const (
|
|||||||
// node status update (because some things still expect the node to be updated
|
// node status update (because some things still expect the node to be updated
|
||||||
// periodically), otherwise it will only use node status update with the configured
|
// periodically), otherwise it will only use node status update with the configured
|
||||||
// ping interval.
|
// ping interval.
|
||||||
func (n *NodeController) Run(ctx context.Context) error {
|
func (n *NodeController) Run(ctx context.Context) (retErr error) {
|
||||||
|
defer func() {
|
||||||
|
n.errMu.Lock()
|
||||||
|
n.err = retErr
|
||||||
|
n.errMu.Unlock()
|
||||||
|
close(n.chDone)
|
||||||
|
}()
|
||||||
|
|
||||||
n.chStatusUpdate = make(chan *corev1.Node, 1)
|
n.chStatusUpdate = make(chan *corev1.Node, 1)
|
||||||
n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) {
|
n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) {
|
||||||
n.chStatusUpdate <- node
|
n.chStatusUpdate <- node
|
||||||
@@ -273,6 +286,22 @@ func (n *NodeController) Run(ctx context.Context) error {
|
|||||||
return n.controlLoop(ctx, providerNode)
|
return n.controlLoop(ctx, providerNode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Done signals to the caller when the controller is done and the control loop is exited.
|
||||||
|
//
|
||||||
|
// Call n.Err() to find out if there was an error.
|
||||||
|
func (n *NodeController) Done() <-chan struct{} {
|
||||||
|
return n.chDone
|
||||||
|
}
|
||||||
|
|
||||||
|
// Err returns any errors that have occurred that trigger the control loop to exit.
|
||||||
|
//
|
||||||
|
// Err only returns a non-nil error after `<-n.Done()` returns.
|
||||||
|
func (n *NodeController) Err() error {
|
||||||
|
n.errMu.Lock()
|
||||||
|
defer n.errMu.Unlock()
|
||||||
|
return n.err
|
||||||
|
}
|
||||||
|
|
||||||
func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.Node) (err error) {
|
func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.Node) (err error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "node.ensureNode")
|
ctx, span := trace.StartSpan(ctx, "node.ensureNode")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
@@ -307,14 +336,12 @@ func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.No
|
|||||||
|
|
||||||
// Ready returns a channel that gets closed when the node is fully up and
|
// Ready returns a channel that gets closed when the node is fully up and
|
||||||
// running. Note that if there is an error on startup this channel will never
|
// running. Note that if there is an error on startup this channel will never
|
||||||
// be started.
|
// be closed.
|
||||||
func (n *NodeController) Ready() <-chan struct{} {
|
func (n *NodeController) Ready() <-chan struct{} {
|
||||||
return n.chReady
|
return n.chReady
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.Node) error {
|
func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.Node) error {
|
||||||
close(n.chReady)
|
|
||||||
|
|
||||||
defer n.group.Wait()
|
defer n.group.Wait()
|
||||||
|
|
||||||
var sleepInterval time.Duration
|
var sleepInterval time.Duration
|
||||||
@@ -355,6 +382,7 @@ func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.N
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
close(n.chReady)
|
||||||
for {
|
for {
|
||||||
shouldTerminate := loop()
|
shouldTerminate := loop()
|
||||||
if shouldTerminate {
|
if shouldTerminate {
|
||||||
|
|||||||
@@ -65,16 +65,13 @@ func testNodeRun(t *testing.T, enableLease bool) {
|
|||||||
node, err := NewNodeController(testP, testNode, nodes, opts...)
|
node, err := NewNodeController(testP, testNode, nodes, opts...)
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
chErr := make(chan error)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
cancel()
|
cancel()
|
||||||
assert.NilError(t, <-chErr)
|
<-node.Done()
|
||||||
|
assert.NilError(t, node.Err())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go node.Run(ctx) // nolint:errcheck
|
||||||
chErr <- node.Run(ctx)
|
|
||||||
close(chErr)
|
|
||||||
}()
|
|
||||||
|
|
||||||
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
|
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
|
||||||
defer nw.Stop()
|
defer nw.Stop()
|
||||||
@@ -103,8 +100,8 @@ func testNodeRun(t *testing.T, enableLease bool) {
|
|||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
t.Errorf("timeout waiting for event")
|
t.Errorf("timeout waiting for event")
|
||||||
continue
|
continue
|
||||||
case err := <-chErr:
|
case <-node.Done():
|
||||||
t.Fatal(err) // if this returns at all it is an error regardless if err is nil
|
t.Fatal(node.Err()) // if this returns at all it is an error regardless if err is nil
|
||||||
case <-nr:
|
case <-nr:
|
||||||
nodeUpdates++
|
nodeUpdates++
|
||||||
continue
|
continue
|
||||||
@@ -152,8 +149,8 @@ func testNodeRun(t *testing.T, enableLease bool) {
|
|||||||
defer eCancel()
|
defer eCancel()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case err := <-chErr:
|
case <-node.Done():
|
||||||
t.Fatal(err) // if this returns at all it is an error regardless if err is nil
|
t.Fatal(node.Err()) // if this returns at all it is an error regardless if err is nil
|
||||||
case err := <-waitForEvent(eCtx, nr, func(e watch.Event) bool {
|
case err := <-waitForEvent(eCtx, nr, func(e watch.Event) bool {
|
||||||
node := e.Object.(*corev1.Node)
|
node := e.Object.(*corev1.Node)
|
||||||
if len(node.Status.Conditions) == 0 {
|
if len(node.Status.Conditions) == 0 {
|
||||||
@@ -192,10 +189,7 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
|
|||||||
)
|
)
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
chErr := make(chan error, 1)
|
go node.Run(ctx) // nolint:errcheck
|
||||||
go func() {
|
|
||||||
chErr <- node.Run(ctx)
|
|
||||||
}()
|
|
||||||
|
|
||||||
timer := time.NewTimer(10 * time.Second)
|
timer := time.NewTimer(10 * time.Second)
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
@@ -204,8 +198,8 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
|
|||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
t.Fatal("timeout waiting for node to be ready")
|
t.Fatal("timeout waiting for node to be ready")
|
||||||
case <-chErr:
|
case <-node.Done():
|
||||||
t.Fatalf("node.Run returned earlier than expected: %v", err)
|
t.Fatalf("node.Run returned earlier than expected: %v", node.Err())
|
||||||
case <-node.Ready():
|
case <-node.Ready():
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -218,8 +212,8 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
|
|||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case err := <-chErr:
|
case <-node.Done():
|
||||||
assert.Equal(t, err, nil)
|
assert.NilError(t, node.Err())
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
t.Fatal("timeout waiting for node shutdown")
|
t.Fatal("timeout waiting for node shutdown")
|
||||||
}
|
}
|
||||||
@@ -301,9 +295,11 @@ func TestPingAfterStatusUpdate(t *testing.T) {
|
|||||||
node, err := NewNodeController(testP, testNode, nodes, opts...)
|
node, err := NewNodeController(testP, testNode, nodes, opts...)
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
chErr := make(chan error, 1)
|
go node.Run(ctx) // nolint:errcheck
|
||||||
go func() {
|
defer func() {
|
||||||
chErr <- node.Run(ctx)
|
cancel()
|
||||||
|
<-node.Done()
|
||||||
|
assert.NilError(t, node.Err())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
timer := time.NewTimer(10 * time.Second)
|
timer := time.NewTimer(10 * time.Second)
|
||||||
@@ -313,10 +309,11 @@ func TestPingAfterStatusUpdate(t *testing.T) {
|
|||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
t.Fatal("timeout waiting for node to be ready")
|
t.Fatal("timeout waiting for node to be ready")
|
||||||
case <-chErr:
|
case <-node.Done():
|
||||||
t.Fatalf("node.Run returned earlier than expected: %v", err)
|
t.Fatalf("node.Run returned earlier than expected: %v", node.Err())
|
||||||
case <-node.Ready():
|
case <-node.Ready():
|
||||||
}
|
}
|
||||||
|
timer.Stop()
|
||||||
|
|
||||||
notifyTimer := time.After(interval * time.Duration(10))
|
notifyTimer := time.After(interval * time.Duration(10))
|
||||||
<-notifyTimer
|
<-notifyTimer
|
||||||
@@ -360,16 +357,13 @@ func TestBeforeAnnotationsPreserved(t *testing.T) {
|
|||||||
node, err := NewNodeController(testP, testNode, nodes, opts...)
|
node, err := NewNodeController(testP, testNode, nodes, opts...)
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
chErr := make(chan error)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
cancel()
|
cancel()
|
||||||
assert.NilError(t, <-chErr)
|
<-node.Done()
|
||||||
|
assert.NilError(t, node.Err())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go node.Run(ctx) // nolint:errcheck
|
||||||
chErr <- node.Run(ctx)
|
|
||||||
close(chErr)
|
|
||||||
}()
|
|
||||||
|
|
||||||
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
|
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
|
||||||
defer nw.Stop()
|
defer nw.Stop()
|
||||||
@@ -427,16 +421,13 @@ func TestManualConditionsPreserved(t *testing.T) {
|
|||||||
node, err := NewNodeController(testP, testNode, nodes, opts...)
|
node, err := NewNodeController(testP, testNode, nodes, opts...)
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
chErr := make(chan error)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
cancel()
|
cancel()
|
||||||
assert.NilError(t, <-chErr)
|
<-node.Done()
|
||||||
|
assert.NilError(t, node.Err())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go node.Run(ctx) // nolint:errcheck
|
||||||
chErr <- node.Run(ctx)
|
|
||||||
close(chErr)
|
|
||||||
}()
|
|
||||||
|
|
||||||
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
|
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
|
||||||
defer nw.Stop()
|
defer nw.Stop()
|
||||||
|
|||||||
136
node/nodeutil/controller.go
Normal file
136
node/nodeutil/controller.go
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
package nodeutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/virtual-kubelet/virtual-kubelet/node"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ControllerManager helps manage the startup/shutdown procedure for other controllers.
|
||||||
|
// It is intended as a convenience to reduce boiler plate code for starting up controllers.
|
||||||
|
//
|
||||||
|
// Must be created with constructor `NewControllerManager`.
|
||||||
|
type ControllerManager struct {
|
||||||
|
nc *node.NodeController
|
||||||
|
pc *node.PodController
|
||||||
|
|
||||||
|
ready chan struct{}
|
||||||
|
done chan struct{}
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewControllerManager creates a new ControllerManager.
|
||||||
|
func NewControllerManager(nc *node.NodeController, pc *node.PodController) *ControllerManager {
|
||||||
|
return &ControllerManager{
|
||||||
|
nc: nc,
|
||||||
|
pc: pc,
|
||||||
|
ready: make(chan struct{}),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodeController returns the configured node controller.
|
||||||
|
func (c *ControllerManager) NodeController() *node.NodeController {
|
||||||
|
return c.nc
|
||||||
|
}
|
||||||
|
|
||||||
|
// PodController returns the configured pod controller.
|
||||||
|
func (c *ControllerManager) PodController() *node.PodController {
|
||||||
|
return c.pc
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts all the underlying controllers
|
||||||
|
func (c *ControllerManager) Run(ctx context.Context, workers int) (retErr error) {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
go c.pc.Run(ctx, workers) // nolint:errcheck
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
<-c.pc.Done()
|
||||||
|
|
||||||
|
c.err = retErr
|
||||||
|
close(c.done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return c.err
|
||||||
|
case <-c.pc.Ready():
|
||||||
|
case <-c.pc.Done():
|
||||||
|
return c.pc.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
go c.nc.Run(ctx) // nolint:errcheck
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
cancel()
|
||||||
|
<-c.nc.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
c.err = ctx.Err()
|
||||||
|
return c.err
|
||||||
|
case <-c.nc.Ready():
|
||||||
|
case <-c.nc.Done():
|
||||||
|
return c.nc.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
close(c.ready)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.nc.Done():
|
||||||
|
cancel()
|
||||||
|
return c.nc.Err()
|
||||||
|
case <-c.pc.Done():
|
||||||
|
cancel()
|
||||||
|
return c.pc.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitReady waits for the specified timeout for the controller to be ready.
|
||||||
|
//
|
||||||
|
// The timeout is for convenience so the caller doesn't have to juggle an extra context.
|
||||||
|
func (c *ControllerManager) WaitReady(ctx context.Context, timeout time.Duration) error {
|
||||||
|
if timeout > 0 {
|
||||||
|
var cancel func()
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, timeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.ready:
|
||||||
|
return nil
|
||||||
|
case <-c.done:
|
||||||
|
return fmt.Errorf("controller exited before ready: %w", c.err)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ready returns a channel that will be closed after the controller is ready.
|
||||||
|
func (c *ControllerManager) Ready() <-chan struct{} {
|
||||||
|
return c.ready
|
||||||
|
}
|
||||||
|
|
||||||
|
// Done returns a channel that will be closed when the controller has exited.
|
||||||
|
func (c *ControllerManager) Done() <-chan struct{} {
|
||||||
|
return c.done
|
||||||
|
}
|
||||||
|
|
||||||
|
// Err returns any error that occurred with the controller.
|
||||||
|
//
|
||||||
|
// This always return nil before `<-Done()`.
|
||||||
|
func (c *ControllerManager) Err() error {
|
||||||
|
select {
|
||||||
|
case <-c.Done():
|
||||||
|
return c.err
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user