diff --git a/cmd/virtual-kubelet/internal/commands/root/root.go b/cmd/virtual-kubelet/internal/commands/root/root.go index d8d5fc7de..d1f59a28c 100644 --- a/cmd/virtual-kubelet/internal/commands/root/root.go +++ b/cmd/virtual-kubelet/internal/commands/root/root.go @@ -26,17 +26,14 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/internal/manager" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" ) @@ -79,7 +76,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } } - client, err := newClient(c.KubeConfigPath) + client, err := nodeutil.ClientsetFromEnv(c.KubeConfigPath) if err != nil { return err } @@ -89,9 +86,8 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { client, c.InformerResyncPeriod, kubeinformers.WithNamespace(c.KubeNamespace), - kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { - options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String() - })) + nodeutil.PodInformerFilter(c.NodeName), + ) podInformer := podInformerFactory.Core().V1().Pods() // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). @@ -120,7 +116,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { NodeName: c.NodeName, OperatingSystem: c.OperatingSystem, ResourceManager: rm, - DaemonPort: int32(c.ListenPort), + DaemonPort: c.ListenPort, InternalIP: os.Getenv("VKUBELET_POD_IP"), KubeClusterDomain: c.KubeClusterDomain, } @@ -144,12 +140,13 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { var leaseClient v1beta1.LeaseInterface if c.EnableNodeLease { - leaseClient = client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) + leaseClient = nodeutil.NodeLeaseV1Beta1Client(client) } pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version) + np := node.NewNaiveNodeProvider() nodeRunner, err := node.NewNodeController( - node.NaiveNodeProvider{}, + np, pNode, client.CoreV1().Nodes(), node.WithNodeEnableLeaseV1Beta1(leaseClient, nil), @@ -228,33 +225,28 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error { } }() + setNodeReady(pNode) + if err := np.UpdateStatus(ctx, pNode); err != nil { + return errors.Wrap(err, "error marking the node as ready") + } log.G(ctx).Info("Initialized") <-ctx.Done() return nil } -func newClient(configPath string) (*kubernetes.Clientset, error) { - var config *rest.Config - - // Check if the kubeConfig file exists. - if _, err := os.Stat(configPath); !os.IsNotExist(err) { - // Get the kubeconfig from the filepath. - config, err = clientcmd.BuildConfigFromFlags("", configPath) - if err != nil { - return nil, errors.Wrap(err, "error building client config") +func setNodeReady(n *corev1.Node) { + for i, c := range n.Status.Conditions { + if c.Type != "Ready" { + continue } - } else { - // Set to in-cluster config. - config, err = rest.InClusterConfig() - if err != nil { - return nil, errors.Wrap(err, "error building in cluster config") - } - } - if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { - config.Host = masterURI + c.Message = "Kubelet is ready" + c.Reason = "KubeletReady" + c.Status = corev1.ConditionTrue + c.LastHeartbeatTime = metav1.Now() + c.LastTransitionTime = metav1.Now() + n.Status.Conditions[i] = c + return } - - return kubernetes.NewForConfig(config) } diff --git a/cmd/virtual-kubelet/internal/provider/mock/mock.go b/cmd/virtual-kubelet/internal/provider/mock/mock.go index 31296e9ae..75e301251 100644 --- a/cmd/virtual-kubelet/internal/provider/mock/mock.go +++ b/cmd/virtual-kubelet/internal/provider/mock/mock.go @@ -362,11 +362,11 @@ func (p *MockProvider) nodeConditions() []v1.NodeCondition { return []v1.NodeCondition{ { Type: "Ready", - Status: v1.ConditionTrue, + Status: v1.ConditionFalse, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), - Reason: "KubeletReady", - Message: "kubelet is ready.", + Reason: "KubeletPending", + Message: "kubelet is pending.", }, { Type: "OutOfDisk", diff --git a/internal/test/suite/suite.go b/internal/test/suite/suite.go index f3dc7db6e..ccae5379b 100644 --- a/internal/test/suite/suite.go +++ b/internal/test/suite/suite.go @@ -21,7 +21,7 @@ type ShouldSkipTestFunc func(string) bool // TestSuite contains methods that defines the lifecycle of a test suite type TestSuite interface { - Setup() + Setup(t *testing.T) Teardown() } @@ -39,7 +39,7 @@ type testCase struct { func Run(t *testing.T, ts TestSuite) { defer failOnPanic(t) - ts.Setup() + ts.Setup(t) defer ts.Teardown() // The implementation below is based on https://github.com/stretchr/testify diff --git a/internal/test/suite/suite_test.go b/internal/test/suite/suite_test.go index e6a767c9e..84a6a46fb 100644 --- a/internal/test/suite/suite_test.go +++ b/internal/test/suite/suite_test.go @@ -20,7 +20,7 @@ type basicTestSuite struct { testsRan []string } -func (bts *basicTestSuite) Setup() { +func (bts *basicTestSuite) Setup(t *testing.T) { bts.setupCount++ } diff --git a/node/node.go b/node/node.go index 503e82a43..f307f656d 100644 --- a/node/node.go +++ b/node/node.go @@ -734,7 +734,52 @@ func (NaiveNodeProvider) Ping(ctx context.Context) error { // // This NaiveNodeProvider does not support updating node status and so this // function is a no-op. -func (NaiveNodeProvider) NotifyNodeStatus(ctx context.Context, f func(*corev1.Node)) { +func (n NaiveNodeProvider) NotifyNodeStatus(_ context.Context, _ func(*corev1.Node)) { +} + +// NaiveNodeProviderV2 is like NaiveNodeProvider except it supports accepting node status updates. +// It must be used with as a pointer and must be created with `NewNaiveNodeProvider` +type NaiveNodeProviderV2 struct { + notify func(*corev1.Node) + updateReady chan struct{} +} + +// Ping just implements the NodeProvider interface. +// It returns the error from the passed in context only. +func (*NaiveNodeProviderV2) Ping(ctx context.Context) error { + return ctx.Err() +} + +// NotifyNodeStatus implements the NodeProvider interface. +// +// NaiveNodeProvider does not support updating node status unless created with `NewNaiveNodeProvider` +// Otherwise this is a no-op +func (n *NaiveNodeProviderV2) NotifyNodeStatus(_ context.Context, f func(*corev1.Node)) { + n.notify = f + // This is a little sloppy and assumes `NotifyNodeStatus` is only called once, which is indeed currently true. + // The reason a channel is preferred here is so we can use a context in `UpdateStatus` to cancel waiting for this. + close(n.updateReady) +} + +// UpdateStatus sends a node status update to the node controller +func (n *NaiveNodeProviderV2) UpdateStatus(ctx context.Context, node *corev1.Node) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-n.updateReady: + } + + n.notify(node) + return nil +} + +// NewNaiveNodeProvider creates a new NaiveNodeProviderV2 +// You must use this to create a NaiveNodeProviderV2 if you want to be able to send node status updates to the node +// controller. +func NewNaiveNodeProvider() *NaiveNodeProviderV2 { + return &NaiveNodeProviderV2{ + updateReady: make(chan struct{}), + } } type taintsStringer []corev1.Taint diff --git a/node/nodeutil/client.go b/node/nodeutil/client.go new file mode 100644 index 000000000..87614e54c --- /dev/null +++ b/node/nodeutil/client.go @@ -0,0 +1,58 @@ +package nodeutil + +import ( + "os" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +// ClientsetFromEnv returns a kuberentes client set from: +// 1. the passed in kubeconfig path +// 2. If the kubeconfig path is empty or non-existent, then the in-cluster config is used. +func ClientsetFromEnv(kubeConfigPath string) (*kubernetes.Clientset, error) { + var ( + config *rest.Config + err error + ) + + if kubeConfigPath != "" { + if _, err := os.Stat(kubeConfigPath); err != nil { + config, err = rest.InClusterConfig() + } else { + config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeConfigPath}, + nil, + ).ClientConfig() + } + } else { + config, err = rest.InClusterConfig() + } + + if err != nil { + return nil, errors.Wrap(err, "error getting rest client config") + } + + return kubernetes.NewForConfig(config) +} + +// PodInformerFilter is a filter that you should use when creating a pod informer for use with the pod controller. +func PodInformerFilter(node string) kubeinformers.SharedInformerOption { + return kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", node).String() + }) +} + +// NodeLeaseV1Beta1Client creates a v1beta1 Lease client for use with node leases from the passed in client. +// +// Use this with node.WithNodeEnableLeaseV1Beta1 when creating a node controller. +func NodeLeaseV1Beta1Client(client kubernetes.Interface) v1beta1.LeaseInterface { + return client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) +} diff --git a/test/e2e/suite.go b/test/e2e/suite.go index e91f7ab9b..3471c732b 100644 --- a/test/e2e/suite.go +++ b/test/e2e/suite.go @@ -4,6 +4,9 @@ import ( "testing" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" + "github.com/virtual-kubelet/virtual-kubelet/internal/test/e2e/framework" "github.com/virtual-kubelet/virtual-kubelet/internal/test/suite" ) @@ -40,14 +43,29 @@ type EndToEndTestSuiteConfig struct { // Setup runs the setup function from the provider and other // procedures before running the test suite -func (ts *EndToEndTestSuite) Setup() { +func (ts *EndToEndTestSuite) Setup(t *testing.T) { if err := ts.setup(); err != nil { - panic(err) + t.Fatal(err) } - // Wait for the virtual kubelet (deployed as a pod) to become fully ready - if _, err := f.WaitUntilPodReady(f.Namespace, f.NodeName); err != nil { - panic(err) + // Wait for the virtual kubelet node resource to become fully ready + if err := f.WaitUntilNodeCondition(func(ev watch.Event) (bool, error) { + n := ev.Object.(*corev1.Node) + if n.Name != f.NodeName { + return false, nil + } + + for _, c := range n.Status.Conditions { + if c.Type != "Ready" { + continue + } + t.Log(c.Status) + return c.Status == corev1.ConditionTrue, nil + } + + return false, nil + }); err != nil { + t.Fatal(err) } }