Merge pull request #862 from cpuguy83/node_helpers

This commit is contained in:
Brian Goff
2020-10-26 15:00:45 -07:00
committed by GitHub
7 changed files with 156 additions and 43 deletions

View File

@@ -26,17 +26,14 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/internal/manager" "github.com/virtual-kubelet/virtual-kubelet/internal/manager"
"github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/node" "github.com/virtual-kubelet/virtual-kubelet/node"
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1" "k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
) )
@@ -79,7 +76,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
} }
} }
client, err := newClient(c.KubeConfigPath) client, err := nodeutil.ClientsetFromEnv(c.KubeConfigPath)
if err != nil { if err != nil {
return err return err
} }
@@ -89,9 +86,8 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
client, client,
c.InformerResyncPeriod, c.InformerResyncPeriod,
kubeinformers.WithNamespace(c.KubeNamespace), kubeinformers.WithNamespace(c.KubeNamespace),
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { nodeutil.PodInformerFilter(c.NodeName),
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String() )
}))
podInformer := podInformerFactory.Core().V1().Pods() podInformer := podInformerFactory.Core().V1().Pods()
// Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
@@ -120,7 +116,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
NodeName: c.NodeName, NodeName: c.NodeName,
OperatingSystem: c.OperatingSystem, OperatingSystem: c.OperatingSystem,
ResourceManager: rm, ResourceManager: rm,
DaemonPort: int32(c.ListenPort), DaemonPort: c.ListenPort,
InternalIP: os.Getenv("VKUBELET_POD_IP"), InternalIP: os.Getenv("VKUBELET_POD_IP"),
KubeClusterDomain: c.KubeClusterDomain, KubeClusterDomain: c.KubeClusterDomain,
} }
@@ -144,12 +140,13 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
var leaseClient v1beta1.LeaseInterface var leaseClient v1beta1.LeaseInterface
if c.EnableNodeLease { if c.EnableNodeLease {
leaseClient = client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) leaseClient = nodeutil.NodeLeaseV1Beta1Client(client)
} }
pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version) pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version)
np := node.NewNaiveNodeProvider()
nodeRunner, err := node.NewNodeController( nodeRunner, err := node.NewNodeController(
node.NaiveNodeProvider{}, np,
pNode, pNode,
client.CoreV1().Nodes(), client.CoreV1().Nodes(),
node.WithNodeEnableLeaseV1Beta1(leaseClient, nil), node.WithNodeEnableLeaseV1Beta1(leaseClient, nil),
@@ -228,33 +225,28 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
} }
}() }()
setNodeReady(pNode)
if err := np.UpdateStatus(ctx, pNode); err != nil {
return errors.Wrap(err, "error marking the node as ready")
}
log.G(ctx).Info("Initialized") log.G(ctx).Info("Initialized")
<-ctx.Done() <-ctx.Done()
return nil return nil
} }
func newClient(configPath string) (*kubernetes.Clientset, error) { func setNodeReady(n *corev1.Node) {
var config *rest.Config for i, c := range n.Status.Conditions {
if c.Type != "Ready" {
// Check if the kubeConfig file exists. continue
if _, err := os.Stat(configPath); !os.IsNotExist(err) {
// Get the kubeconfig from the filepath.
config, err = clientcmd.BuildConfigFromFlags("", configPath)
if err != nil {
return nil, errors.Wrap(err, "error building client config")
} }
} else {
// Set to in-cluster config.
config, err = rest.InClusterConfig()
if err != nil {
return nil, errors.Wrap(err, "error building in cluster config")
}
}
if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { c.Message = "Kubelet is ready"
config.Host = masterURI c.Reason = "KubeletReady"
c.Status = corev1.ConditionTrue
c.LastHeartbeatTime = metav1.Now()
c.LastTransitionTime = metav1.Now()
n.Status.Conditions[i] = c
return
} }
return kubernetes.NewForConfig(config)
} }

View File

@@ -362,11 +362,11 @@ func (p *MockProvider) nodeConditions() []v1.NodeCondition {
return []v1.NodeCondition{ return []v1.NodeCondition{
{ {
Type: "Ready", Type: "Ready",
Status: v1.ConditionTrue, Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Now(), LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(), LastTransitionTime: metav1.Now(),
Reason: "KubeletReady", Reason: "KubeletPending",
Message: "kubelet is ready.", Message: "kubelet is pending.",
}, },
{ {
Type: "OutOfDisk", Type: "OutOfDisk",

View File

@@ -21,7 +21,7 @@ type ShouldSkipTestFunc func(string) bool
// TestSuite contains methods that defines the lifecycle of a test suite // TestSuite contains methods that defines the lifecycle of a test suite
type TestSuite interface { type TestSuite interface {
Setup() Setup(t *testing.T)
Teardown() Teardown()
} }
@@ -39,7 +39,7 @@ type testCase struct {
func Run(t *testing.T, ts TestSuite) { func Run(t *testing.T, ts TestSuite) {
defer failOnPanic(t) defer failOnPanic(t)
ts.Setup() ts.Setup(t)
defer ts.Teardown() defer ts.Teardown()
// The implementation below is based on https://github.com/stretchr/testify // The implementation below is based on https://github.com/stretchr/testify

View File

@@ -20,7 +20,7 @@ type basicTestSuite struct {
testsRan []string testsRan []string
} }
func (bts *basicTestSuite) Setup() { func (bts *basicTestSuite) Setup(t *testing.T) {
bts.setupCount++ bts.setupCount++
} }

View File

@@ -734,7 +734,52 @@ func (NaiveNodeProvider) Ping(ctx context.Context) error {
// //
// This NaiveNodeProvider does not support updating node status and so this // This NaiveNodeProvider does not support updating node status and so this
// function is a no-op. // function is a no-op.
func (NaiveNodeProvider) NotifyNodeStatus(ctx context.Context, f func(*corev1.Node)) { func (n NaiveNodeProvider) NotifyNodeStatus(_ context.Context, _ func(*corev1.Node)) {
}
// NaiveNodeProviderV2 is like NaiveNodeProvider except it supports accepting node status updates.
// It must be used with as a pointer and must be created with `NewNaiveNodeProvider`
type NaiveNodeProviderV2 struct {
notify func(*corev1.Node)
updateReady chan struct{}
}
// Ping just implements the NodeProvider interface.
// It returns the error from the passed in context only.
func (*NaiveNodeProviderV2) Ping(ctx context.Context) error {
return ctx.Err()
}
// NotifyNodeStatus implements the NodeProvider interface.
//
// NaiveNodeProvider does not support updating node status unless created with `NewNaiveNodeProvider`
// Otherwise this is a no-op
func (n *NaiveNodeProviderV2) NotifyNodeStatus(_ context.Context, f func(*corev1.Node)) {
n.notify = f
// This is a little sloppy and assumes `NotifyNodeStatus` is only called once, which is indeed currently true.
// The reason a channel is preferred here is so we can use a context in `UpdateStatus` to cancel waiting for this.
close(n.updateReady)
}
// UpdateStatus sends a node status update to the node controller
func (n *NaiveNodeProviderV2) UpdateStatus(ctx context.Context, node *corev1.Node) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-n.updateReady:
}
n.notify(node)
return nil
}
// NewNaiveNodeProvider creates a new NaiveNodeProviderV2
// You must use this to create a NaiveNodeProviderV2 if you want to be able to send node status updates to the node
// controller.
func NewNaiveNodeProvider() *NaiveNodeProviderV2 {
return &NaiveNodeProviderV2{
updateReady: make(chan struct{}),
}
} }
type taintsStringer []corev1.Taint type taintsStringer []corev1.Taint

58
node/nodeutil/client.go Normal file
View File

@@ -0,0 +1,58 @@
package nodeutil
import (
"os"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// ClientsetFromEnv returns a kuberentes client set from:
// 1. the passed in kubeconfig path
// 2. If the kubeconfig path is empty or non-existent, then the in-cluster config is used.
func ClientsetFromEnv(kubeConfigPath string) (*kubernetes.Clientset, error) {
var (
config *rest.Config
err error
)
if kubeConfigPath != "" {
if _, err := os.Stat(kubeConfigPath); err != nil {
config, err = rest.InClusterConfig()
} else {
config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeConfigPath},
nil,
).ClientConfig()
}
} else {
config, err = rest.InClusterConfig()
}
if err != nil {
return nil, errors.Wrap(err, "error getting rest client config")
}
return kubernetes.NewForConfig(config)
}
// PodInformerFilter is a filter that you should use when creating a pod informer for use with the pod controller.
func PodInformerFilter(node string) kubeinformers.SharedInformerOption {
return kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", node).String()
})
}
// NodeLeaseV1Beta1Client creates a v1beta1 Lease client for use with node leases from the passed in client.
//
// Use this with node.WithNodeEnableLeaseV1Beta1 when creating a node controller.
func NodeLeaseV1Beta1Client(client kubernetes.Interface) v1beta1.LeaseInterface {
return client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
}

View File

@@ -4,6 +4,9 @@ import (
"testing" "testing"
"time" "time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
"github.com/virtual-kubelet/virtual-kubelet/internal/test/e2e/framework" "github.com/virtual-kubelet/virtual-kubelet/internal/test/e2e/framework"
"github.com/virtual-kubelet/virtual-kubelet/internal/test/suite" "github.com/virtual-kubelet/virtual-kubelet/internal/test/suite"
) )
@@ -40,14 +43,29 @@ type EndToEndTestSuiteConfig struct {
// Setup runs the setup function from the provider and other // Setup runs the setup function from the provider and other
// procedures before running the test suite // procedures before running the test suite
func (ts *EndToEndTestSuite) Setup() { func (ts *EndToEndTestSuite) Setup(t *testing.T) {
if err := ts.setup(); err != nil { if err := ts.setup(); err != nil {
panic(err) t.Fatal(err)
} }
// Wait for the virtual kubelet (deployed as a pod) to become fully ready // Wait for the virtual kubelet node resource to become fully ready
if _, err := f.WaitUntilPodReady(f.Namespace, f.NodeName); err != nil { if err := f.WaitUntilNodeCondition(func(ev watch.Event) (bool, error) {
panic(err) n := ev.Object.(*corev1.Node)
if n.Name != f.NodeName {
return false, nil
}
for _, c := range n.Status.Conditions {
if c.Type != "Ready" {
continue
}
t.Log(c.Status)
return c.Status == corev1.ConditionTrue, nil
}
return false, nil
}); err != nil {
t.Fatal(err)
} }
} }