From d390dfce4393195facce933e1befc4588ac884e3 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Mon, 27 Jul 2020 20:59:10 -0700 Subject: [PATCH] Move node pinging to its own goroutine This moves the job of pinging the node provider into its own goroutine. If it takes a long time, it shouldn't slow down leases, and vice-versa. It also adds timeouts for node pings. One of the problems is that we don't know how long a node ping will take -- there could be a bunch of network calls underneath us. The point of the lease is to say whether or not the Kubelet is unreachable, not whether or not the node pings are "passing". Signed-off-by: Sargun Dhillon --- go.mod | 1 + node/node.go | 42 ++++++++++++-- node/node_ping_controller.go | 104 +++++++++++++++++++++++++++++++++++ node/node_test.go | 67 +++++++++++++++++++++- 4 files changed, 208 insertions(+), 6 deletions(-) create mode 100644 node/node_ping_controller.go diff --git a/go.mod b/go.mod index ebc3a6f3a..1952aad52 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.5 go.opencensus.io v0.21.0 + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 gotest.tools v2.2.0+incompatible k8s.io/api v0.18.4 diff --git a/node/node.go b/node/node.go index 6f6f363e2..c7ab7448b 100644 --- a/node/node.go +++ b/node/node.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/retry" @@ -70,12 +71,19 @@ type NodeProvider interface { //nolint:golint // Note: When if there are multiple NodeControllerOpts which apply against the same // underlying options, the last NodeControllerOpt will win. func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeControllerOpt) (*NodeController, error) { - n := &NodeController{p: p, n: node, nodes: nodes, chReady: make(chan struct{})} + n := &NodeController{ + p: p, + n: node, + nodes: nodes, + chReady: make(chan struct{}), + } for _, o := range opts { if err := o(n); err != nil { return nil, pkgerrors.Wrap(err, "error applying node option") } } + n.nodePingController = newNodePingController(n.p, n.pingInterval, n.pingTimeout) + return n, nil } @@ -104,7 +112,17 @@ func WithNodeEnableLeaseV1Beta1(client v1beta1.LeaseInterface, baseLease *coord. } } -// WithNodePingInterval sets the interval for checking node status +// WithNodePingTimeout limits the amount of time that the virtual kubelet will wait for the node provider to +// respond to the ping callback. If it does not return within this time, it will be considered an error +// condition +func WithNodePingTimeout(timeout time.Duration) NodeControllerOpt { + return func(n *NodeController) error { + n.pingTimeout = &timeout + return nil + } +} + +// WithNodePingInterval sets the interval between checking for node statuses via Ping() // If node leases are not supported (or not enabled), this is the frequency // with which the node status will be updated in Kubernetes. func WithNodePingInterval(d time.Duration) NodeControllerOpt { @@ -164,6 +182,9 @@ type NodeController struct { // nolint: golint nodeStatusUpdateErrorHandler ErrorHandler chReady chan struct{} + + nodePingController *nodePingController + pingTimeout *time.Duration } // The default intervals used for lease and status updates. @@ -270,6 +291,10 @@ func (n *NodeController) controlLoop(ctx context.Context) error { close(n.chReady) + group := &wait.Group{} + group.StartWithContext(ctx, n.nodePingController.run) + defer group.Wait() + loop := func() bool { ctx, span := trace.StartSpan(ctx, "node.controlLoop.loop") defer span.End() @@ -319,6 +344,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error { } return false } + for { shouldTerminate := loop() if shouldTerminate { @@ -334,14 +360,22 @@ func (n *NodeController) handlePing(ctx context.Context) (retErr error) { span.SetStatus(retErr) }() - if err := n.p.Ping(ctx); err != nil { - return pkgerrors.Wrap(err, "error while pinging the node provider") + result, err := n.nodePingController.getResult(ctx) + if err != nil { + err = pkgerrors.Wrap(err, "error while fetching result of node ping") + return err + } + + if result.error != nil { + err = pkgerrors.Wrap(err, "node ping returned error on ping") + return err } if n.disableLease { return n.updateStatus(ctx, false) } + // TODO(Sargun): Pass down the result / timestamp so we can accurately track when the ping actually occurred return n.updateLease(ctx) } diff --git a/node/node_ping_controller.go b/node/node_ping_controller.go new file mode 100644 index 000000000..f1b435924 --- /dev/null +++ b/node/node_ping_controller.go @@ -0,0 +1,104 @@ +package node + +import ( + "context" + "sync" + "time" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" + "golang.org/x/sync/singleflight" + "k8s.io/apimachinery/pkg/util/wait" +) + +type nodePingController struct { + nodeProvider NodeProvider + pingInterval time.Duration + firstPingCompleted chan struct{} + pingTimeout *time.Duration + + // "Results" + sync.Mutex + result pingResult +} + +type pingResult struct { + pingTime time.Time + error error +} + +func newNodePingController(node NodeProvider, pingInterval time.Duration, timeout *time.Duration) *nodePingController { + return &nodePingController{ + nodeProvider: node, + pingInterval: pingInterval, + firstPingCompleted: make(chan struct{}), + pingTimeout: timeout, + } +} + +func (npc *nodePingController) run(ctx context.Context) { + const key = "key" + sf := &singleflight.Group{} + + // 1. If the node is "stuck" and not responding to pings, we want to set the status + // to that the node provider has timed out responding to pings + // 2. We want it so that the context is cancelled, and whatever the node might have + // been stuck on uses context so it might be unstuck + // 3. We want to retry pinging the node, but we do not ever want more than one + // ping in flight at a time. + + mkContextFunc := context.WithCancel + + if npc.pingTimeout != nil { + mkContextFunc = func(ctx2 context.Context) (context.Context, context.CancelFunc) { + return context.WithTimeout(ctx2, *npc.pingTimeout) + } + } + + checkFunc := func(ctx context.Context) { + ctx, cancel := mkContextFunc(ctx) + defer cancel() + ctx, span := trace.StartSpan(ctx, "node.pingLoop") + defer span.End() + doChan := sf.DoChan(key, func() (interface{}, error) { + now := time.Now() + ctx, span := trace.StartSpan(ctx, "node.pingNode") + defer span.End() + err := npc.nodeProvider.Ping(ctx) + span.SetStatus(err) + return now, err + }) + + var pingResult pingResult + select { + case <-ctx.Done(): + pingResult.error = ctx.Err() + log.G(ctx).WithError(pingResult.error).Warn("Failed to ping node due to context cancellation") + case result := <-doChan: + pingResult.error = result.Err + pingResult.pingTime = result.Val.(time.Time) + } + + npc.Lock() + npc.result = pingResult + defer npc.Unlock() + span.SetStatus(pingResult.error) + } + + // Run the first check manually + checkFunc(ctx) + + close(npc.firstPingCompleted) + + wait.UntilWithContext(ctx, checkFunc, npc.pingInterval) +} + +func (npc *nodePingController) getResult(ctx context.Context) (*pingResult, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-npc.firstPingCompleted: + } + + return &npc.result, nil +} diff --git a/node/node_test.go b/node/node_test.go index c5b5de5f9..a2a2267b5 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -641,6 +641,64 @@ func TestManualConditionsPreserved(t *testing.T) { t.Log(newNode.Status.Conditions) } + +func TestNodePingSingleInflight(t *testing.T) { + testCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + const pingTimeout = 100 * time.Millisecond + c := testclient.NewSimpleClientset() + testP := &testNodeProviderPing{} + + calls := newWaitableInt() + finished := newWaitableInt() + + ctx, cancel := context.WithTimeout(testCtx, time.Second) + defer cancel() + + // The ping callback function is meant to block during the entire lifetime of the node ping controller. + // The point is to check whether or it allows callbacks to stack up. + testP.customPingFunction = func(context.Context) error { + calls.increment() + // This timer has to be longer than that of the context of the controller because we want to make sure + // that goroutines are not allowed to stack up. If this exits as soon as that timeout is up, finished + // will be incremented and we might miss goroutines stacking up, so we wait a tiny bit longer than + // the nodePingController control loop (we wait 2 seconds, the control loop only lasts 1 second) + + // This is the context tied to the lifetime of the node ping controller, not the context created + // for the specific invocation of this ping function + <-ctx.Done() + finished.increment() + return nil + } + + nodes := c.CoreV1().Nodes() + + testNode := testNode(t) + + node, err := NewNodeController(testP, testNode, nodes, WithNodePingInterval(10*time.Millisecond), WithNodePingTimeout(pingTimeout)) + assert.NilError(t, err) + + start := time.Now() + go node.nodePingController.run(ctx) + firstPing, err := node.nodePingController.getResult(ctx) + assert.NilError(t, err) + timeTakenToCompleteFirstPing := time.Since(start) + assert.Assert(t, timeTakenToCompleteFirstPing < pingTimeout*5, "Time taken to complete first ping: %v", timeTakenToCompleteFirstPing) + + assert.Assert(t, cmp.Error(firstPing.error, context.DeadlineExceeded.Error())) + assert.Assert(t, is.Equal(1, calls.read())) + assert.Assert(t, is.Equal(0, finished.read())) + + // Wait until the first sleep finishes (the test context is done) + assert.NilError(t, finished.until(testCtx, func(i int) bool { return i > 0 })) + + // Assert we didn't stack up goroutines, and that the one goroutine in flight finishd + assert.Assert(t, is.Equal(1, calls.read())) + assert.Assert(t, is.Equal(1, finished.read())) + +} + func testNode(t *testing.T) *corev1.Node { n := &corev1.Node{} n.Name = strings.ToLower(t.Name()) @@ -668,11 +726,16 @@ func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) { // testNodeProviderPing tracks the maximum time interval between calls to Ping type testNodeProviderPing struct { testNodeProvider - lastPingTime time.Time - maxPingInterval time.Duration + customPingFunction func(context.Context) error + lastPingTime time.Time + maxPingInterval time.Duration } func (tnp *testNodeProviderPing) Ping(ctx context.Context) error { + if tnp.customPingFunction != nil { + return tnp.customPingFunction(ctx) + } + now := time.Now() if tnp.lastPingTime.IsZero() { tnp.lastPingTime = now