Merge pull request #863 from sargun/lease-v3

This commit is contained in:
Brian Goff
2020-08-10 16:36:01 -07:00
committed by GitHub
6 changed files with 259 additions and 51 deletions

1
go.mod
View File

@@ -19,6 +19,7 @@ require (
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
go.opencensus.io v0.21.0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7
gotest.tools v2.2.0+incompatible
k8s.io/api v0.18.4

View File

@@ -20,51 +20,6 @@ var (
_ PodLifecycleHandler = (*mockProvider)(nil)
)
type waitableInt struct {
cond *sync.Cond
val int
}
func newWaitableInt() *waitableInt {
return &waitableInt{
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (w *waitableInt) read() int {
defer w.cond.L.Unlock()
w.cond.L.Lock()
return w.val
}
func (w *waitableInt) until(ctx context.Context, f func(int) bool) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-ctx.Done()
w.cond.Broadcast()
}()
w.cond.L.Lock()
defer w.cond.L.Unlock()
for !f(w.val) {
if err := ctx.Err(); err != nil {
return err
}
w.cond.Wait()
}
return nil
}
func (w *waitableInt) increment() {
w.cond.L.Lock()
defer w.cond.L.Unlock()
w.val++
w.cond.Broadcast()
}
type mockProvider struct {
creates *waitableInt
updates *waitableInt

View File

@@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
@@ -70,12 +71,19 @@ type NodeProvider interface { //nolint:golint
// Note: When if there are multiple NodeControllerOpts which apply against the same
// underlying options, the last NodeControllerOpt will win.
func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeControllerOpt) (*NodeController, error) {
n := &NodeController{p: p, n: node, nodes: nodes, chReady: make(chan struct{})}
n := &NodeController{
p: p,
n: node,
nodes: nodes,
chReady: make(chan struct{}),
}
for _, o := range opts {
if err := o(n); err != nil {
return nil, pkgerrors.Wrap(err, "error applying node option")
}
}
n.nodePingController = newNodePingController(n.p, n.pingInterval, n.pingTimeout)
return n, nil
}
@@ -104,7 +112,17 @@ func WithNodeEnableLeaseV1Beta1(client v1beta1.LeaseInterface, baseLease *coord.
}
}
// WithNodePingInterval sets the interval for checking node status
// WithNodePingTimeout limits the amount of time that the virtual kubelet will wait for the node provider to
// respond to the ping callback. If it does not return within this time, it will be considered an error
// condition
func WithNodePingTimeout(timeout time.Duration) NodeControllerOpt {
return func(n *NodeController) error {
n.pingTimeout = &timeout
return nil
}
}
// WithNodePingInterval sets the interval between checking for node statuses via Ping()
// If node leases are not supported (or not enabled), this is the frequency
// with which the node status will be updated in Kubernetes.
func WithNodePingInterval(d time.Duration) NodeControllerOpt {
@@ -164,6 +182,9 @@ type NodeController struct { // nolint: golint
nodeStatusUpdateErrorHandler ErrorHandler
chReady chan struct{}
nodePingController *nodePingController
pingTimeout *time.Duration
}
// The default intervals used for lease and status updates.
@@ -270,6 +291,10 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
close(n.chReady)
group := &wait.Group{}
group.StartWithContext(ctx, n.nodePingController.run)
defer group.Wait()
loop := func() bool {
ctx, span := trace.StartSpan(ctx, "node.controlLoop.loop")
defer span.End()
@@ -319,6 +344,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
}
return false
}
for {
shouldTerminate := loop()
if shouldTerminate {
@@ -334,14 +360,22 @@ func (n *NodeController) handlePing(ctx context.Context) (retErr error) {
span.SetStatus(retErr)
}()
if err := n.p.Ping(ctx); err != nil {
return pkgerrors.Wrap(err, "error while pinging the node provider")
result, err := n.nodePingController.getResult(ctx)
if err != nil {
err = pkgerrors.Wrap(err, "error while fetching result of node ping")
return err
}
if result.error != nil {
err = pkgerrors.Wrap(err, "node ping returned error on ping")
return err
}
if n.disableLease {
return n.updateStatus(ctx, false)
}
// TODO(Sargun): Pass down the result / timestamp so we can accurately track when the ping actually occurred
return n.updateLease(ctx)
}

View File

@@ -0,0 +1,104 @@
package node
import (
"context"
"sync"
"time"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"golang.org/x/sync/singleflight"
"k8s.io/apimachinery/pkg/util/wait"
)
type nodePingController struct {
nodeProvider NodeProvider
pingInterval time.Duration
firstPingCompleted chan struct{}
pingTimeout *time.Duration
// "Results"
sync.Mutex
result pingResult
}
type pingResult struct {
pingTime time.Time
error error
}
func newNodePingController(node NodeProvider, pingInterval time.Duration, timeout *time.Duration) *nodePingController {
return &nodePingController{
nodeProvider: node,
pingInterval: pingInterval,
firstPingCompleted: make(chan struct{}),
pingTimeout: timeout,
}
}
func (npc *nodePingController) run(ctx context.Context) {
const key = "key"
sf := &singleflight.Group{}
// 1. If the node is "stuck" and not responding to pings, we want to set the status
// to that the node provider has timed out responding to pings
// 2. We want it so that the context is cancelled, and whatever the node might have
// been stuck on uses context so it might be unstuck
// 3. We want to retry pinging the node, but we do not ever want more than one
// ping in flight at a time.
mkContextFunc := context.WithCancel
if npc.pingTimeout != nil {
mkContextFunc = func(ctx2 context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx2, *npc.pingTimeout)
}
}
checkFunc := func(ctx context.Context) {
ctx, cancel := mkContextFunc(ctx)
defer cancel()
ctx, span := trace.StartSpan(ctx, "node.pingLoop")
defer span.End()
doChan := sf.DoChan(key, func() (interface{}, error) {
now := time.Now()
ctx, span := trace.StartSpan(ctx, "node.pingNode")
defer span.End()
err := npc.nodeProvider.Ping(ctx)
span.SetStatus(err)
return now, err
})
var pingResult pingResult
select {
case <-ctx.Done():
pingResult.error = ctx.Err()
log.G(ctx).WithError(pingResult.error).Warn("Failed to ping node due to context cancellation")
case result := <-doChan:
pingResult.error = result.Err
pingResult.pingTime = result.Val.(time.Time)
}
npc.Lock()
npc.result = pingResult
defer npc.Unlock()
span.SetStatus(pingResult.error)
}
// Run the first check manually
checkFunc(ctx)
close(npc.firstPingCompleted)
wait.UntilWithContext(ctx, checkFunc, npc.pingInterval)
}
func (npc *nodePingController) getResult(ctx context.Context) (*pingResult, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-npc.firstPingCompleted:
}
return &npc.result, nil
}

View File

@@ -641,6 +641,64 @@ func TestManualConditionsPreserved(t *testing.T) {
t.Log(newNode.Status.Conditions)
}
func TestNodePingSingleInflight(t *testing.T) {
testCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
const pingTimeout = 100 * time.Millisecond
c := testclient.NewSimpleClientset()
testP := &testNodeProviderPing{}
calls := newWaitableInt()
finished := newWaitableInt()
ctx, cancel := context.WithTimeout(testCtx, time.Second)
defer cancel()
// The ping callback function is meant to block during the entire lifetime of the node ping controller.
// The point is to check whether or it allows callbacks to stack up.
testP.customPingFunction = func(context.Context) error {
calls.increment()
// This timer has to be longer than that of the context of the controller because we want to make sure
// that goroutines are not allowed to stack up. If this exits as soon as that timeout is up, finished
// will be incremented and we might miss goroutines stacking up, so we wait a tiny bit longer than
// the nodePingController control loop (we wait 2 seconds, the control loop only lasts 1 second)
// This is the context tied to the lifetime of the node ping controller, not the context created
// for the specific invocation of this ping function
<-ctx.Done()
finished.increment()
return nil
}
nodes := c.CoreV1().Nodes()
testNode := testNode(t)
node, err := NewNodeController(testP, testNode, nodes, WithNodePingInterval(10*time.Millisecond), WithNodePingTimeout(pingTimeout))
assert.NilError(t, err)
start := time.Now()
go node.nodePingController.run(ctx)
firstPing, err := node.nodePingController.getResult(ctx)
assert.NilError(t, err)
timeTakenToCompleteFirstPing := time.Since(start)
assert.Assert(t, timeTakenToCompleteFirstPing < pingTimeout*5, "Time taken to complete first ping: %v", timeTakenToCompleteFirstPing)
assert.Assert(t, cmp.Error(firstPing.error, context.DeadlineExceeded.Error()))
assert.Assert(t, is.Equal(1, calls.read()))
assert.Assert(t, is.Equal(0, finished.read()))
// Wait until the first sleep finishes (the test context is done)
assert.NilError(t, finished.until(testCtx, func(i int) bool { return i > 0 }))
// Assert we didn't stack up goroutines, and that the one goroutine in flight finishd
assert.Assert(t, is.Equal(1, calls.read()))
assert.Assert(t, is.Equal(1, finished.read()))
}
func testNode(t *testing.T) *corev1.Node {
n := &corev1.Node{}
n.Name = strings.ToLower(t.Name())
@@ -668,11 +726,16 @@ func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
// testNodeProviderPing tracks the maximum time interval between calls to Ping
type testNodeProviderPing struct {
testNodeProvider
lastPingTime time.Time
maxPingInterval time.Duration
customPingFunction func(context.Context) error
lastPingTime time.Time
maxPingInterval time.Duration
}
func (tnp *testNodeProviderPing) Ping(ctx context.Context) error {
if tnp.customPingFunction != nil {
return tnp.customPingFunction(ctx)
}
now := time.Now()
if tnp.lastPingTime.IsZero() {
tnp.lastPingTime = now

51
node/waitable_int_test.go Normal file
View File

@@ -0,0 +1,51 @@
package node
import (
"context"
"sync"
)
type waitableInt struct {
cond *sync.Cond
val int
}
func newWaitableInt() *waitableInt {
return &waitableInt{
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (w *waitableInt) read() int {
defer w.cond.L.Unlock()
w.cond.L.Lock()
return w.val
}
func (w *waitableInt) until(ctx context.Context, f func(int) bool) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-ctx.Done()
w.cond.Broadcast()
}()
w.cond.L.Lock()
defer w.cond.L.Unlock()
for !f(w.val) {
if err := ctx.Err(); err != nil {
return err
}
w.cond.Wait()
}
return nil
}
func (w *waitableInt) increment() {
w.cond.L.Lock()
defer w.cond.L.Unlock()
w.val++
w.cond.Broadcast()
}