From 83f8cd1a5856f2cdcfda8758186936988015c0d3 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 27 Jul 2020 14:31:41 -0700 Subject: [PATCH 1/2] Add helpers for common setup code Create a clientset, setup pod informer filters, and setup node lease client. --- .../internal/commands/root/root.go | 41 ++----------- node/nodeutil/client.go | 58 +++++++++++++++++++ 2 files changed, 64 insertions(+), 35 deletions(-) create mode 100644 node/nodeutil/client.go diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index d8d5fc7de..0d6d769b7 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -26,17 +26,14 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/internal/manager" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" ) @@ -79,7 +76,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } } - client, err := newClient(c.KubeConfigPath) + client, err := nodeutil.ClientsetFromEnv(c.KubeConfigPath) if err != nil { return err } @@ -89,9 +86,8 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { client, c.InformerResyncPeriod, kubeinformers.WithNamespace(c.KubeNamespace), - kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { - options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String() - })) + nodeutil.PodInformerFilter(c.NodeName), + ) podInformer := podInformerFactory.Core().V1().Pods() // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). @@ -120,7 +116,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { NodeName: c.NodeName, OperatingSystem: c.OperatingSystem, ResourceManager: rm, - DaemonPort: int32(c.ListenPort), + DaemonPort: c.ListenPort, InternalIP: os.Getenv("VKUBELET_POD_IP"), KubeClusterDomain: c.KubeClusterDomain, } @@ -144,7 +140,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { var leaseClient v1beta1.LeaseInterface if c.EnableNodeLease { - leaseClient = client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) + leaseClient = nodeutil.NodeLeaseV1Beta1Client(client) } pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version) @@ -233,28 +229,3 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { <-ctx.Done() return nil } - -func newClient(configPath string) (*kubernetes.Clientset, error) { - var config *rest.Config - - // Check if the kubeConfig file exists. - if _, err := os.Stat(configPath); !os.IsNotExist(err) { - // Get the kubeconfig from the filepath. - config, err = clientcmd.BuildConfigFromFlags("", configPath) - if err != nil { - return nil, errors.Wrap(err, "error building client config") - } - } else { - // Set to in-cluster config. - config, err = rest.InClusterConfig() - if err != nil { - return nil, errors.Wrap(err, "error building in cluster config") - } - } - - if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { - config.Host = masterURI - } - - return kubernetes.NewForConfig(config) -} diff --git a/node/nodeutil/client.go b/node/nodeutil/client.go new file mode 100644 index 000000000..87614e54c --- /dev/null +++ b/node/nodeutil/client.go @@ -0,0 +1,58 @@ +package nodeutil + +import ( + "os" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +// ClientsetFromEnv returns a kuberentes client set from: +// 1. the passed in kubeconfig path +// 2. If the kubeconfig path is empty or non-existent, then the in-cluster config is used. +func ClientsetFromEnv(kubeConfigPath string) (*kubernetes.Clientset, error) { + var ( + config *rest.Config + err error + ) + + if kubeConfigPath != "" { + if _, err := os.Stat(kubeConfigPath); err != nil { + config, err = rest.InClusterConfig() + } else { + config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeConfigPath}, + nil, + ).ClientConfig() + } + } else { + config, err = rest.InClusterConfig() + } + + if err != nil { + return nil, errors.Wrap(err, "error getting rest client config") + } + + return kubernetes.NewForConfig(config) +} + +// PodInformerFilter is a filter that you should use when creating a pod informer for use with the pod controller. +func PodInformerFilter(node string) kubeinformers.SharedInformerOption { + return kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", node).String() + }) +} + +// NodeLeaseV1Beta1Client creates a v1beta1 Lease client for use with node leases from the passed in client. +// +// Use this with node.WithNodeEnableLeaseV1Beta1 when creating a node controller. +func NodeLeaseV1Beta1Client(client kubernetes.Interface) v1beta1.LeaseInterface { + return client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) +} From 0c64171e851a1a1e49fb6edce8a09235bc54b22f Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 27 Jul 2020 14:44:40 -0700 Subject: [PATCH 2/2] Add v2 node provider for accepting status updates This allows the use of a built-in provider to do things like mark a node as ready once all the controllers are spun up. The e2e tests now use this instead of waiting on the pod that the vk provider is deployed in to be marked ready (this was waiting on /stats/summary to be serving, which is racey). --- .../internal/commands/root/root.go | 23 ++++++++- .../internal/provider/mock/mock.go | 6 +-- internal/test/suite/suite.go | 4 +- internal/test/suite/suite_test.go | 2 +- node/node.go | 47 ++++++++++++++++++- test/e2e/suite.go | 28 +++++++++-- 6 files changed, 97 insertions(+), 13 deletions(-) diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index 0d6d769b7..d1f59a28c 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -144,8 +144,9 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version) + np := node.NewNaiveNodeProvider() nodeRunner, err := node.NewNodeController( - node.NaiveNodeProvider{}, + np, pNode, client.CoreV1().Nodes(), node.WithNodeEnableLeaseV1Beta1(leaseClient, nil), @@ -224,8 +225,28 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } }() + setNodeReady(pNode) + if err := np.UpdateStatus(ctx, pNode); err != nil { + return errors.Wrap(err, "error marking the node as ready") + } log.G(ctx).Info("Initialized") <-ctx.Done() return nil } + +func setNodeReady(n *corev1.Node) { + for i, c := range n.Status.Conditions { + if c.Type != "Ready" { + continue + } + + c.Message = "Kubelet is ready" + c.Reason = "KubeletReady" + c.Status = corev1.ConditionTrue + c.LastHeartbeatTime = metav1.Now() + c.LastTransitionTime = metav1.Now() + n.Status.Conditions[i] = c + return + } +} diff --git a/cmd/virtual-kubelet/internal/provider/mock/mock.go b/cmd/virtual-kubelet/internal/provider/mock/mock.go index 31296e9ae..75e301251 100644 --- a/cmd/virtual-kubelet/internal/provider/mock/mock.go +++ b/cmd/virtual-kubelet/internal/provider/mock/mock.go @@ -362,11 +362,11 @@ func (p *MockProvider) nodeConditions() []v1.NodeCondition { return []v1.NodeCondition{ { Type: "Ready", - Status: v1.ConditionTrue, + Status: v1.ConditionFalse, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), - Reason: "KubeletReady", - Message: "kubelet is ready.", + Reason: "KubeletPending", + Message: "kubelet is pending.", }, { Type: "OutOfDisk", diff --git a/internal/test/suite/suite.go b/internal/test/suite/suite.go index f3dc7db6e..ccae5379b 100644 --- a/internal/test/suite/suite.go +++ b/internal/test/suite/suite.go @@ -21,7 +21,7 @@ type ShouldSkipTestFunc func(string) bool // TestSuite contains methods that defines the lifecycle of a test suite type TestSuite interface { - Setup() + Setup(t *testing.T) Teardown() } @@ -39,7 +39,7 @@ type testCase struct { func Run(t *testing.T, ts TestSuite) { defer failOnPanic(t) - ts.Setup() + ts.Setup(t) defer ts.Teardown() // The implementation below is based on https://github.com/stretchr/testify diff --git a/internal/test/suite/suite_test.go b/internal/test/suite/suite_test.go index e6a767c9e..84a6a46fb 100644 --- a/internal/test/suite/suite_test.go +++ b/internal/test/suite/suite_test.go @@ -20,7 +20,7 @@ type basicTestSuite struct { testsRan []string } -func (bts *basicTestSuite) Setup() { +func (bts *basicTestSuite) Setup(t *testing.T) { bts.setupCount++ } diff --git a/node/node.go b/node/node.go index 30d8aedb3..95fc54e56 100644 --- a/node/node.go +++ b/node/node.go @@ -648,7 +648,52 @@ func (NaiveNodeProvider) Ping(ctx context.Context) error { // // This NaiveNodeProvider does not support updating node status and so this // function is a no-op. -func (NaiveNodeProvider) NotifyNodeStatus(ctx context.Context, f func(*corev1.Node)) { +func (n NaiveNodeProvider) NotifyNodeStatus(_ context.Context, _ func(*corev1.Node)) { +} + +// NaiveNodeProviderV2 is like NaiveNodeProvider except it supports accepting node status updates. +// It must be used with as a pointer and must be created with `NewNaiveNodeProvider` +type NaiveNodeProviderV2 struct { + notify func(*corev1.Node) + updateReady chan struct{} +} + +// Ping just implements the NodeProvider interface. +// It returns the error from the passed in context only. +func (*NaiveNodeProviderV2) Ping(ctx context.Context) error { + return ctx.Err() +} + +// NotifyNodeStatus implements the NodeProvider interface. +// +// NaiveNodeProvider does not support updating node status unless created with `NewNaiveNodeProvider` +// Otherwise this is a no-op +func (n *NaiveNodeProviderV2) NotifyNodeStatus(_ context.Context, f func(*corev1.Node)) { + n.notify = f + // This is a little sloppy and assumes `NotifyNodeStatus` is only called once, which is indeed currently true. + // The reason a channel is preferred here is so we can use a context in `UpdateStatus` to cancel waiting for this. + close(n.updateReady) +} + +// UpdateStatus sends a node status update to the node controller +func (n *NaiveNodeProviderV2) UpdateStatus(ctx context.Context, node *corev1.Node) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-n.updateReady: + } + + n.notify(node) + return nil +} + +// NewNaiveNodeProvider creates a new NaiveNodeProviderV2 +// You must use this to create a NaiveNodeProviderV2 if you want to be able to send node status updates to the node +// controller. +func NewNaiveNodeProvider() *NaiveNodeProviderV2 { + return &NaiveNodeProviderV2{ + updateReady: make(chan struct{}), + } } type taintsStringer []corev1.Taint diff --git a/test/e2e/suite.go b/test/e2e/suite.go index e91f7ab9b..3471c732b 100644 --- a/test/e2e/suite.go +++ b/test/e2e/suite.go @@ -4,6 +4,9 @@ import ( "testing" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" + "github.com/virtual-kubelet/virtual-kubelet/internal/test/e2e/framework" "github.com/virtual-kubelet/virtual-kubelet/internal/test/suite" ) @@ -40,14 +43,29 @@ type EndToEndTestSuiteConfig struct { // Setup runs the setup function from the provider and other // procedures before running the test suite -func (ts *EndToEndTestSuite) Setup() { +func (ts *EndToEndTestSuite) Setup(t *testing.T) { if err := ts.setup(); err != nil { - panic(err) + t.Fatal(err) } - // Wait for the virtual kubelet (deployed as a pod) to become fully ready - if _, err := f.WaitUntilPodReady(f.Namespace, f.NodeName); err != nil { - panic(err) + // Wait for the virtual kubelet node resource to become fully ready + if err := f.WaitUntilNodeCondition(func(ev watch.Event) (bool, error) { + n := ev.Object.(*corev1.Node) + if n.Name != f.NodeName { + return false, nil + } + + for _, c := range n.Status.Conditions { + if c.Type != "Ready" { + continue + } + t.Log(c.Status) + return c.Status == corev1.ConditionTrue, nil + } + + return false, nil + }); err != nil { + t.Fatal(err) } }