Add v2 node provider for accepting status updates

This allows the use of a built-in provider to do things  like mark a node
as ready once all the controllers are spun up.

The e2e tests now use this instead of waiting on the pod that the vk
provider is deployed in to be marked ready (this was waiting on
/stats/summary to be serving, which is racey).
This commit is contained in:
Brian Goff
2020-07-27 14:44:40 -07:00
parent 83f8cd1a58
commit 0c64171e85
6 changed files with 97 additions and 13 deletions

View File

@@ -144,8 +144,9 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
} }
pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version) pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version)
np := node.NewNaiveNodeProvider()
nodeRunner, err := node.NewNodeController( nodeRunner, err := node.NewNodeController(
node.NaiveNodeProvider{}, np,
pNode, pNode,
client.CoreV1().Nodes(), client.CoreV1().Nodes(),
node.WithNodeEnableLeaseV1Beta1(leaseClient, nil), node.WithNodeEnableLeaseV1Beta1(leaseClient, nil),
@@ -224,8 +225,28 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
} }
}() }()
setNodeReady(pNode)
if err := np.UpdateStatus(ctx, pNode); err != nil {
return errors.Wrap(err, "error marking the node as ready")
}
log.G(ctx).Info("Initialized") log.G(ctx).Info("Initialized")
<-ctx.Done() <-ctx.Done()
return nil return nil
} }
func setNodeReady(n *corev1.Node) {
for i, c := range n.Status.Conditions {
if c.Type != "Ready" {
continue
}
c.Message = "Kubelet is ready"
c.Reason = "KubeletReady"
c.Status = corev1.ConditionTrue
c.LastHeartbeatTime = metav1.Now()
c.LastTransitionTime = metav1.Now()
n.Status.Conditions[i] = c
return
}
}

View File

@@ -362,11 +362,11 @@ func (p *MockProvider) nodeConditions() []v1.NodeCondition {
return []v1.NodeCondition{ return []v1.NodeCondition{
{ {
Type: "Ready", Type: "Ready",
Status: v1.ConditionTrue, Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Now(), LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(), LastTransitionTime: metav1.Now(),
Reason: "KubeletReady", Reason: "KubeletPending",
Message: "kubelet is ready.", Message: "kubelet is pending.",
}, },
{ {
Type: "OutOfDisk", Type: "OutOfDisk",

View File

@@ -21,7 +21,7 @@ type ShouldSkipTestFunc func(string) bool
// TestSuite contains methods that defines the lifecycle of a test suite // TestSuite contains methods that defines the lifecycle of a test suite
type TestSuite interface { type TestSuite interface {
Setup() Setup(t *testing.T)
Teardown() Teardown()
} }
@@ -39,7 +39,7 @@ type testCase struct {
func Run(t *testing.T, ts TestSuite) { func Run(t *testing.T, ts TestSuite) {
defer failOnPanic(t) defer failOnPanic(t)
ts.Setup() ts.Setup(t)
defer ts.Teardown() defer ts.Teardown()
// The implementation below is based on https://github.com/stretchr/testify // The implementation below is based on https://github.com/stretchr/testify

View File

@@ -20,7 +20,7 @@ type basicTestSuite struct {
testsRan []string testsRan []string
} }
func (bts *basicTestSuite) Setup() { func (bts *basicTestSuite) Setup(t *testing.T) {
bts.setupCount++ bts.setupCount++
} }

View File

@@ -648,7 +648,52 @@ func (NaiveNodeProvider) Ping(ctx context.Context) error {
// //
// This NaiveNodeProvider does not support updating node status and so this // This NaiveNodeProvider does not support updating node status and so this
// function is a no-op. // function is a no-op.
func (NaiveNodeProvider) NotifyNodeStatus(ctx context.Context, f func(*corev1.Node)) { func (n NaiveNodeProvider) NotifyNodeStatus(_ context.Context, _ func(*corev1.Node)) {
}
// NaiveNodeProviderV2 is like NaiveNodeProvider except it supports accepting node status updates.
// It must be used with as a pointer and must be created with `NewNaiveNodeProvider`
type NaiveNodeProviderV2 struct {
notify func(*corev1.Node)
updateReady chan struct{}
}
// Ping just implements the NodeProvider interface.
// It returns the error from the passed in context only.
func (*NaiveNodeProviderV2) Ping(ctx context.Context) error {
return ctx.Err()
}
// NotifyNodeStatus implements the NodeProvider interface.
//
// NaiveNodeProvider does not support updating node status unless created with `NewNaiveNodeProvider`
// Otherwise this is a no-op
func (n *NaiveNodeProviderV2) NotifyNodeStatus(_ context.Context, f func(*corev1.Node)) {
n.notify = f
// This is a little sloppy and assumes `NotifyNodeStatus` is only called once, which is indeed currently true.
// The reason a channel is preferred here is so we can use a context in `UpdateStatus` to cancel waiting for this.
close(n.updateReady)
}
// UpdateStatus sends a node status update to the node controller
func (n *NaiveNodeProviderV2) UpdateStatus(ctx context.Context, node *corev1.Node) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-n.updateReady:
}
n.notify(node)
return nil
}
// NewNaiveNodeProvider creates a new NaiveNodeProviderV2
// You must use this to create a NaiveNodeProviderV2 if you want to be able to send node status updates to the node
// controller.
func NewNaiveNodeProvider() *NaiveNodeProviderV2 {
return &NaiveNodeProviderV2{
updateReady: make(chan struct{}),
}
} }
type taintsStringer []corev1.Taint type taintsStringer []corev1.Taint

View File

@@ -4,6 +4,9 @@ import (
"testing" "testing"
"time" "time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
"github.com/virtual-kubelet/virtual-kubelet/internal/test/e2e/framework" "github.com/virtual-kubelet/virtual-kubelet/internal/test/e2e/framework"
"github.com/virtual-kubelet/virtual-kubelet/internal/test/suite" "github.com/virtual-kubelet/virtual-kubelet/internal/test/suite"
) )
@@ -40,14 +43,29 @@ type EndToEndTestSuiteConfig struct {
// Setup runs the setup function from the provider and other // Setup runs the setup function from the provider and other
// procedures before running the test suite // procedures before running the test suite
func (ts *EndToEndTestSuite) Setup() { func (ts *EndToEndTestSuite) Setup(t *testing.T) {
if err := ts.setup(); err != nil { if err := ts.setup(); err != nil {
panic(err) t.Fatal(err)
} }
// Wait for the virtual kubelet (deployed as a pod) to become fully ready // Wait for the virtual kubelet node resource to become fully ready
if _, err := f.WaitUntilPodReady(f.Namespace, f.NodeName); err != nil { if err := f.WaitUntilNodeCondition(func(ev watch.Event) (bool, error) {
panic(err) n := ev.Object.(*corev1.Node)
if n.Name != f.NodeName {
return false, nil
}
for _, c := range n.Status.Conditions {
if c.Type != "Ready" {
continue
}
t.Log(c.Status)
return c.Status == corev1.ConditionTrue, nil
}
return false, nil
}); err != nil {
t.Fatal(err)
} }
} }