diff --git a/go.mod b/go.mod index ebc3a6f3a..1952aad52 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.5 go.opencensus.io v0.21.0 + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 gotest.tools v2.2.0+incompatible k8s.io/api v0.18.4 diff --git a/node/mock_test.go b/node/mock_test.go index 297ad487a..5e679331b 100644 --- a/node/mock_test.go +++ b/node/mock_test.go @@ -20,51 +20,6 @@ var ( _ PodLifecycleHandler = (*mockProvider)(nil) ) -type waitableInt struct { - cond *sync.Cond - val int -} - -func newWaitableInt() *waitableInt { - return &waitableInt{ - cond: sync.NewCond(&sync.Mutex{}), - } -} - -func (w *waitableInt) read() int { - defer w.cond.L.Unlock() - w.cond.L.Lock() - return w.val -} - -func (w *waitableInt) until(ctx context.Context, f func(int) bool) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - go func() { - <-ctx.Done() - w.cond.Broadcast() - }() - - w.cond.L.Lock() - defer w.cond.L.Unlock() - - for !f(w.val) { - if err := ctx.Err(); err != nil { - return err - } - w.cond.Wait() - } - return nil -} - -func (w *waitableInt) increment() { - w.cond.L.Lock() - defer w.cond.L.Unlock() - w.val++ - w.cond.Broadcast() -} - type mockProvider struct { creates *waitableInt updates *waitableInt diff --git a/node/node.go b/node/node.go index 6f6f363e2..c7ab7448b 100644 --- a/node/node.go +++ b/node/node.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/retry" @@ -70,12 +71,19 @@ type NodeProvider interface { //nolint:golint // Note: When if there are multiple NodeControllerOpts which apply against the same // underlying options, the last NodeControllerOpt will win. func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeControllerOpt) (*NodeController, error) { - n := &NodeController{p: p, n: node, nodes: nodes, chReady: make(chan struct{})} + n := &NodeController{ + p: p, + n: node, + nodes: nodes, + chReady: make(chan struct{}), + } for _, o := range opts { if err := o(n); err != nil { return nil, pkgerrors.Wrap(err, "error applying node option") } } + n.nodePingController = newNodePingController(n.p, n.pingInterval, n.pingTimeout) + return n, nil } @@ -104,7 +112,17 @@ func WithNodeEnableLeaseV1Beta1(client v1beta1.LeaseInterface, baseLease *coord. } } -// WithNodePingInterval sets the interval for checking node status +// WithNodePingTimeout limits the amount of time that the virtual kubelet will wait for the node provider to +// respond to the ping callback. If it does not return within this time, it will be considered an error +// condition +func WithNodePingTimeout(timeout time.Duration) NodeControllerOpt { + return func(n *NodeController) error { + n.pingTimeout = &timeout + return nil + } +} + +// WithNodePingInterval sets the interval between checking for node statuses via Ping() // If node leases are not supported (or not enabled), this is the frequency // with which the node status will be updated in Kubernetes. func WithNodePingInterval(d time.Duration) NodeControllerOpt { @@ -164,6 +182,9 @@ type NodeController struct { // nolint: golint nodeStatusUpdateErrorHandler ErrorHandler chReady chan struct{} + + nodePingController *nodePingController + pingTimeout *time.Duration } // The default intervals used for lease and status updates. @@ -270,6 +291,10 @@ func (n *NodeController) controlLoop(ctx context.Context) error { close(n.chReady) + group := &wait.Group{} + group.StartWithContext(ctx, n.nodePingController.run) + defer group.Wait() + loop := func() bool { ctx, span := trace.StartSpan(ctx, "node.controlLoop.loop") defer span.End() @@ -319,6 +344,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error { } return false } + for { shouldTerminate := loop() if shouldTerminate { @@ -334,14 +360,22 @@ func (n *NodeController) handlePing(ctx context.Context) (retErr error) { span.SetStatus(retErr) }() - if err := n.p.Ping(ctx); err != nil { - return pkgerrors.Wrap(err, "error while pinging the node provider") + result, err := n.nodePingController.getResult(ctx) + if err != nil { + err = pkgerrors.Wrap(err, "error while fetching result of node ping") + return err + } + + if result.error != nil { + err = pkgerrors.Wrap(err, "node ping returned error on ping") + return err } if n.disableLease { return n.updateStatus(ctx, false) } + // TODO(Sargun): Pass down the result / timestamp so we can accurately track when the ping actually occurred return n.updateLease(ctx) } diff --git a/node/node_ping_controller.go b/node/node_ping_controller.go new file mode 100644 index 000000000..f1b435924 --- /dev/null +++ b/node/node_ping_controller.go @@ -0,0 +1,104 @@ +package node + +import ( + "context" + "sync" + "time" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" + "golang.org/x/sync/singleflight" + "k8s.io/apimachinery/pkg/util/wait" +) + +type nodePingController struct { + nodeProvider NodeProvider + pingInterval time.Duration + firstPingCompleted chan struct{} + pingTimeout *time.Duration + + // "Results" + sync.Mutex + result pingResult +} + +type pingResult struct { + pingTime time.Time + error error +} + +func newNodePingController(node NodeProvider, pingInterval time.Duration, timeout *time.Duration) *nodePingController { + return &nodePingController{ + nodeProvider: node, + pingInterval: pingInterval, + firstPingCompleted: make(chan struct{}), + pingTimeout: timeout, + } +} + +func (npc *nodePingController) run(ctx context.Context) { + const key = "key" + sf := &singleflight.Group{} + + // 1. If the node is "stuck" and not responding to pings, we want to set the status + // to that the node provider has timed out responding to pings + // 2. We want it so that the context is cancelled, and whatever the node might have + // been stuck on uses context so it might be unstuck + // 3. We want to retry pinging the node, but we do not ever want more than one + // ping in flight at a time. + + mkContextFunc := context.WithCancel + + if npc.pingTimeout != nil { + mkContextFunc = func(ctx2 context.Context) (context.Context, context.CancelFunc) { + return context.WithTimeout(ctx2, *npc.pingTimeout) + } + } + + checkFunc := func(ctx context.Context) { + ctx, cancel := mkContextFunc(ctx) + defer cancel() + ctx, span := trace.StartSpan(ctx, "node.pingLoop") + defer span.End() + doChan := sf.DoChan(key, func() (interface{}, error) { + now := time.Now() + ctx, span := trace.StartSpan(ctx, "node.pingNode") + defer span.End() + err := npc.nodeProvider.Ping(ctx) + span.SetStatus(err) + return now, err + }) + + var pingResult pingResult + select { + case <-ctx.Done(): + pingResult.error = ctx.Err() + log.G(ctx).WithError(pingResult.error).Warn("Failed to ping node due to context cancellation") + case result := <-doChan: + pingResult.error = result.Err + pingResult.pingTime = result.Val.(time.Time) + } + + npc.Lock() + npc.result = pingResult + defer npc.Unlock() + span.SetStatus(pingResult.error) + } + + // Run the first check manually + checkFunc(ctx) + + close(npc.firstPingCompleted) + + wait.UntilWithContext(ctx, checkFunc, npc.pingInterval) +} + +func (npc *nodePingController) getResult(ctx context.Context) (*pingResult, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-npc.firstPingCompleted: + } + + return &npc.result, nil +} diff --git a/node/node_test.go b/node/node_test.go index c5b5de5f9..a2a2267b5 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -641,6 +641,64 @@ func TestManualConditionsPreserved(t *testing.T) { t.Log(newNode.Status.Conditions) } + +func TestNodePingSingleInflight(t *testing.T) { + testCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + const pingTimeout = 100 * time.Millisecond + c := testclient.NewSimpleClientset() + testP := &testNodeProviderPing{} + + calls := newWaitableInt() + finished := newWaitableInt() + + ctx, cancel := context.WithTimeout(testCtx, time.Second) + defer cancel() + + // The ping callback function is meant to block during the entire lifetime of the node ping controller. + // The point is to check whether or it allows callbacks to stack up. + testP.customPingFunction = func(context.Context) error { + calls.increment() + // This timer has to be longer than that of the context of the controller because we want to make sure + // that goroutines are not allowed to stack up. If this exits as soon as that timeout is up, finished + // will be incremented and we might miss goroutines stacking up, so we wait a tiny bit longer than + // the nodePingController control loop (we wait 2 seconds, the control loop only lasts 1 second) + + // This is the context tied to the lifetime of the node ping controller, not the context created + // for the specific invocation of this ping function + <-ctx.Done() + finished.increment() + return nil + } + + nodes := c.CoreV1().Nodes() + + testNode := testNode(t) + + node, err := NewNodeController(testP, testNode, nodes, WithNodePingInterval(10*time.Millisecond), WithNodePingTimeout(pingTimeout)) + assert.NilError(t, err) + + start := time.Now() + go node.nodePingController.run(ctx) + firstPing, err := node.nodePingController.getResult(ctx) + assert.NilError(t, err) + timeTakenToCompleteFirstPing := time.Since(start) + assert.Assert(t, timeTakenToCompleteFirstPing < pingTimeout*5, "Time taken to complete first ping: %v", timeTakenToCompleteFirstPing) + + assert.Assert(t, cmp.Error(firstPing.error, context.DeadlineExceeded.Error())) + assert.Assert(t, is.Equal(1, calls.read())) + assert.Assert(t, is.Equal(0, finished.read())) + + // Wait until the first sleep finishes (the test context is done) + assert.NilError(t, finished.until(testCtx, func(i int) bool { return i > 0 })) + + // Assert we didn't stack up goroutines, and that the one goroutine in flight finishd + assert.Assert(t, is.Equal(1, calls.read())) + assert.Assert(t, is.Equal(1, finished.read())) + +} + func testNode(t *testing.T) *corev1.Node { n := &corev1.Node{} n.Name = strings.ToLower(t.Name()) @@ -668,11 +726,16 @@ func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) { // testNodeProviderPing tracks the maximum time interval between calls to Ping type testNodeProviderPing struct { testNodeProvider - lastPingTime time.Time - maxPingInterval time.Duration + customPingFunction func(context.Context) error + lastPingTime time.Time + maxPingInterval time.Duration } func (tnp *testNodeProviderPing) Ping(ctx context.Context) error { + if tnp.customPingFunction != nil { + return tnp.customPingFunction(ctx) + } + now := time.Now() if tnp.lastPingTime.IsZero() { tnp.lastPingTime = now diff --git a/node/waitable_int_test.go b/node/waitable_int_test.go new file mode 100644 index 000000000..6757f14ce --- /dev/null +++ b/node/waitable_int_test.go @@ -0,0 +1,51 @@ +package node + +import ( + "context" + "sync" +) + +type waitableInt struct { + cond *sync.Cond + val int +} + +func newWaitableInt() *waitableInt { + return &waitableInt{ + cond: sync.NewCond(&sync.Mutex{}), + } +} + +func (w *waitableInt) read() int { + defer w.cond.L.Unlock() + w.cond.L.Lock() + return w.val +} + +func (w *waitableInt) until(ctx context.Context, f func(int) bool) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + go func() { + <-ctx.Done() + w.cond.Broadcast() + }() + + w.cond.L.Lock() + defer w.cond.L.Unlock() + + for !f(w.val) { + if err := ctx.Err(); err != nil { + return err + } + w.cond.Wait() + } + return nil +} + +func (w *waitableInt) increment() { + w.cond.L.Lock() + defer w.cond.L.Unlock() + w.val++ + w.cond.Broadcast() +}