Move node pinging to its own goroutine

This moves the job of pinging the node provider into its own
goroutine. If it takes a long time, it shouldn't slow down
leases, and vice-versa.

It also adds timeouts for node pings. One of the problems
is that we don't know how long a node ping will take --
there could be a bunch of network calls underneath us.

The point of the lease is to say whether or not the
Kubelet is unreachable, not whether or not the node
pings are "passing".

Signed-off-by: Sargun Dhillon <sargun@sargun.me>
This commit is contained in:
Sargun Dhillon
2020-07-27 20:59:10 -07:00
parent 49c596c5ca
commit d390dfce43
4 changed files with 208 additions and 6 deletions

1
go.mod
View File

@@ -19,6 +19,7 @@ require (
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
go.opencensus.io v0.21.0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7
gotest.tools v2.2.0+incompatible
k8s.io/api v0.18.4

View File

@@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
@@ -70,12 +71,19 @@ type NodeProvider interface { //nolint:golint
// Note: When if there are multiple NodeControllerOpts which apply against the same
// underlying options, the last NodeControllerOpt will win.
func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeControllerOpt) (*NodeController, error) {
n := &NodeController{p: p, n: node, nodes: nodes, chReady: make(chan struct{})}
n := &NodeController{
p: p,
n: node,
nodes: nodes,
chReady: make(chan struct{}),
}
for _, o := range opts {
if err := o(n); err != nil {
return nil, pkgerrors.Wrap(err, "error applying node option")
}
}
n.nodePingController = newNodePingController(n.p, n.pingInterval, n.pingTimeout)
return n, nil
}
@@ -104,7 +112,17 @@ func WithNodeEnableLeaseV1Beta1(client v1beta1.LeaseInterface, baseLease *coord.
}
}
// WithNodePingInterval sets the interval for checking node status
// WithNodePingTimeout limits the amount of time that the virtual kubelet will wait for the node provider to
// respond to the ping callback. If it does not return within this time, it will be considered an error
// condition
func WithNodePingTimeout(timeout time.Duration) NodeControllerOpt {
return func(n *NodeController) error {
n.pingTimeout = &timeout
return nil
}
}
// WithNodePingInterval sets the interval between checking for node statuses via Ping()
// If node leases are not supported (or not enabled), this is the frequency
// with which the node status will be updated in Kubernetes.
func WithNodePingInterval(d time.Duration) NodeControllerOpt {
@@ -164,6 +182,9 @@ type NodeController struct { // nolint: golint
nodeStatusUpdateErrorHandler ErrorHandler
chReady chan struct{}
nodePingController *nodePingController
pingTimeout *time.Duration
}
// The default intervals used for lease and status updates.
@@ -270,6 +291,10 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
close(n.chReady)
group := &wait.Group{}
group.StartWithContext(ctx, n.nodePingController.run)
defer group.Wait()
loop := func() bool {
ctx, span := trace.StartSpan(ctx, "node.controlLoop.loop")
defer span.End()
@@ -319,6 +344,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
}
return false
}
for {
shouldTerminate := loop()
if shouldTerminate {
@@ -334,14 +360,22 @@ func (n *NodeController) handlePing(ctx context.Context) (retErr error) {
span.SetStatus(retErr)
}()
if err := n.p.Ping(ctx); err != nil {
return pkgerrors.Wrap(err, "error while pinging the node provider")
result, err := n.nodePingController.getResult(ctx)
if err != nil {
err = pkgerrors.Wrap(err, "error while fetching result of node ping")
return err
}
if result.error != nil {
err = pkgerrors.Wrap(err, "node ping returned error on ping")
return err
}
if n.disableLease {
return n.updateStatus(ctx, false)
}
// TODO(Sargun): Pass down the result / timestamp so we can accurately track when the ping actually occurred
return n.updateLease(ctx)
}

View File

@@ -0,0 +1,104 @@
package node
import (
"context"
"sync"
"time"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"golang.org/x/sync/singleflight"
"k8s.io/apimachinery/pkg/util/wait"
)
type nodePingController struct {
nodeProvider NodeProvider
pingInterval time.Duration
firstPingCompleted chan struct{}
pingTimeout *time.Duration
// "Results"
sync.Mutex
result pingResult
}
type pingResult struct {
pingTime time.Time
error error
}
func newNodePingController(node NodeProvider, pingInterval time.Duration, timeout *time.Duration) *nodePingController {
return &nodePingController{
nodeProvider: node,
pingInterval: pingInterval,
firstPingCompleted: make(chan struct{}),
pingTimeout: timeout,
}
}
func (npc *nodePingController) run(ctx context.Context) {
const key = "key"
sf := &singleflight.Group{}
// 1. If the node is "stuck" and not responding to pings, we want to set the status
// to that the node provider has timed out responding to pings
// 2. We want it so that the context is cancelled, and whatever the node might have
// been stuck on uses context so it might be unstuck
// 3. We want to retry pinging the node, but we do not ever want more than one
// ping in flight at a time.
mkContextFunc := context.WithCancel
if npc.pingTimeout != nil {
mkContextFunc = func(ctx2 context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx2, *npc.pingTimeout)
}
}
checkFunc := func(ctx context.Context) {
ctx, cancel := mkContextFunc(ctx)
defer cancel()
ctx, span := trace.StartSpan(ctx, "node.pingLoop")
defer span.End()
doChan := sf.DoChan(key, func() (interface{}, error) {
now := time.Now()
ctx, span := trace.StartSpan(ctx, "node.pingNode")
defer span.End()
err := npc.nodeProvider.Ping(ctx)
span.SetStatus(err)
return now, err
})
var pingResult pingResult
select {
case <-ctx.Done():
pingResult.error = ctx.Err()
log.G(ctx).WithError(pingResult.error).Warn("Failed to ping node due to context cancellation")
case result := <-doChan:
pingResult.error = result.Err
pingResult.pingTime = result.Val.(time.Time)
}
npc.Lock()
npc.result = pingResult
defer npc.Unlock()
span.SetStatus(pingResult.error)
}
// Run the first check manually
checkFunc(ctx)
close(npc.firstPingCompleted)
wait.UntilWithContext(ctx, checkFunc, npc.pingInterval)
}
func (npc *nodePingController) getResult(ctx context.Context) (*pingResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-npc.firstPingCompleted:
}
return &npc.result, nil
}

View File

@@ -641,6 +641,64 @@ func TestManualConditionsPreserved(t *testing.T) {
t.Log(newNode.Status.Conditions)
}
func TestNodePingSingleInflight(t *testing.T) {
testCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
const pingTimeout = 100 * time.Millisecond
c := testclient.NewSimpleClientset()
testP := &testNodeProviderPing{}
calls := newWaitableInt()
finished := newWaitableInt()
ctx, cancel := context.WithTimeout(testCtx, time.Second)
defer cancel()
// The ping callback function is meant to block during the entire lifetime of the node ping controller.
// The point is to check whether or it allows callbacks to stack up.
testP.customPingFunction = func(context.Context) error {
calls.increment()
// This timer has to be longer than that of the context of the controller because we want to make sure
// that goroutines are not allowed to stack up. If this exits as soon as that timeout is up, finished
// will be incremented and we might miss goroutines stacking up, so we wait a tiny bit longer than
// the nodePingController control loop (we wait 2 seconds, the control loop only lasts 1 second)
// This is the context tied to the lifetime of the node ping controller, not the context created
// for the specific invocation of this ping function
<-ctx.Done()
finished.increment()
return nil
}
nodes := c.CoreV1().Nodes()
testNode := testNode(t)
node, err := NewNodeController(testP, testNode, nodes, WithNodePingInterval(10*time.Millisecond), WithNodePingTimeout(pingTimeout))
assert.NilError(t, err)
start := time.Now()
go node.nodePingController.run(ctx)
firstPing, err := node.nodePingController.getResult(ctx)
assert.NilError(t, err)
timeTakenToCompleteFirstPing := time.Since(start)
assert.Assert(t, timeTakenToCompleteFirstPing < pingTimeout*5, "Time taken to complete first ping: %v", timeTakenToCompleteFirstPing)
assert.Assert(t, cmp.Error(firstPing.error, context.DeadlineExceeded.Error()))
assert.Assert(t, is.Equal(1, calls.read()))
assert.Assert(t, is.Equal(0, finished.read()))
// Wait until the first sleep finishes (the test context is done)
assert.NilError(t, finished.until(testCtx, func(i int) bool { return i > 0 }))
// Assert we didn't stack up goroutines, and that the one goroutine in flight finishd
assert.Assert(t, is.Equal(1, calls.read()))
assert.Assert(t, is.Equal(1, finished.read()))
}
func testNode(t *testing.T) *corev1.Node {
n := &corev1.Node{}
n.Name = strings.ToLower(t.Name())
@@ -668,11 +726,16 @@ func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
// testNodeProviderPing tracks the maximum time interval between calls to Ping
type testNodeProviderPing struct {
testNodeProvider
lastPingTime time.Time
maxPingInterval time.Duration
customPingFunction func(context.Context) error
lastPingTime time.Time
maxPingInterval time.Duration
}
func (tnp *testNodeProviderPing) Ping(ctx context.Context) error {
if tnp.customPingFunction != nil {
return tnp.customPingFunction(ctx)
}
now := time.Now()
if tnp.lastPingTime.IsZero() {
tnp.lastPingTime = now