Add helpers for common setup code
Create a clientset, setup pod informer filters, and setup node lease client.
This commit is contained in:
@@ -26,17 +26,14 @@ import (
|
|||||||
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/node"
|
"github.com/virtual-kubelet/virtual-kubelet/node"
|
||||||
|
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
|
||||||
kubeinformers "k8s.io/client-go/informers"
|
kubeinformers "k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes"
|
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
|
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
|
||||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
"k8s.io/client-go/rest"
|
|
||||||
"k8s.io/client-go/tools/clientcmd"
|
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -79,7 +76,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
client, err := newClient(c.KubeConfigPath)
|
client, err := nodeutil.ClientsetFromEnv(c.KubeConfigPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -89,9 +86,8 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
|||||||
client,
|
client,
|
||||||
c.InformerResyncPeriod,
|
c.InformerResyncPeriod,
|
||||||
kubeinformers.WithNamespace(c.KubeNamespace),
|
kubeinformers.WithNamespace(c.KubeNamespace),
|
||||||
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
|
nodeutil.PodInformerFilter(c.NodeName),
|
||||||
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String()
|
)
|
||||||
}))
|
|
||||||
podInformer := podInformerFactory.Core().V1().Pods()
|
podInformer := podInformerFactory.Core().V1().Pods()
|
||||||
|
|
||||||
// Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
|
// Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
|
||||||
@@ -120,7 +116,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
|||||||
NodeName: c.NodeName,
|
NodeName: c.NodeName,
|
||||||
OperatingSystem: c.OperatingSystem,
|
OperatingSystem: c.OperatingSystem,
|
||||||
ResourceManager: rm,
|
ResourceManager: rm,
|
||||||
DaemonPort: int32(c.ListenPort),
|
DaemonPort: c.ListenPort,
|
||||||
InternalIP: os.Getenv("VKUBELET_POD_IP"),
|
InternalIP: os.Getenv("VKUBELET_POD_IP"),
|
||||||
KubeClusterDomain: c.KubeClusterDomain,
|
KubeClusterDomain: c.KubeClusterDomain,
|
||||||
}
|
}
|
||||||
@@ -144,7 +140,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
|||||||
|
|
||||||
var leaseClient v1beta1.LeaseInterface
|
var leaseClient v1beta1.LeaseInterface
|
||||||
if c.EnableNodeLease {
|
if c.EnableNodeLease {
|
||||||
leaseClient = client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
|
leaseClient = nodeutil.NodeLeaseV1Beta1Client(client)
|
||||||
}
|
}
|
||||||
|
|
||||||
pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version)
|
pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version)
|
||||||
@@ -233,28 +229,3 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
|||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClient(configPath string) (*kubernetes.Clientset, error) {
|
|
||||||
var config *rest.Config
|
|
||||||
|
|
||||||
// Check if the kubeConfig file exists.
|
|
||||||
if _, err := os.Stat(configPath); !os.IsNotExist(err) {
|
|
||||||
// Get the kubeconfig from the filepath.
|
|
||||||
config, err = clientcmd.BuildConfigFromFlags("", configPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "error building client config")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Set to in-cluster config.
|
|
||||||
config, err = rest.InClusterConfig()
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "error building in cluster config")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if masterURI := os.Getenv("MASTER_URI"); masterURI != "" {
|
|
||||||
config.Host = masterURI
|
|
||||||
}
|
|
||||||
|
|
||||||
return kubernetes.NewForConfig(config)
|
|
||||||
}
|
|
||||||
|
|||||||
58
node/nodeutil/client.go
Normal file
58
node/nodeutil/client.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
package nodeutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
kubeinformers "k8s.io/client-go/informers"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
|
||||||
|
"k8s.io/client-go/rest"
|
||||||
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ClientsetFromEnv returns a kuberentes client set from:
|
||||||
|
// 1. the passed in kubeconfig path
|
||||||
|
// 2. If the kubeconfig path is empty or non-existent, then the in-cluster config is used.
|
||||||
|
func ClientsetFromEnv(kubeConfigPath string) (*kubernetes.Clientset, error) {
|
||||||
|
var (
|
||||||
|
config *rest.Config
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if kubeConfigPath != "" {
|
||||||
|
if _, err := os.Stat(kubeConfigPath); err != nil {
|
||||||
|
config, err = rest.InClusterConfig()
|
||||||
|
} else {
|
||||||
|
config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
|
||||||
|
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeConfigPath},
|
||||||
|
nil,
|
||||||
|
).ClientConfig()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
config, err = rest.InClusterConfig()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "error getting rest client config")
|
||||||
|
}
|
||||||
|
|
||||||
|
return kubernetes.NewForConfig(config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PodInformerFilter is a filter that you should use when creating a pod informer for use with the pod controller.
|
||||||
|
func PodInformerFilter(node string) kubeinformers.SharedInformerOption {
|
||||||
|
return kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
|
||||||
|
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", node).String()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodeLeaseV1Beta1Client creates a v1beta1 Lease client for use with node leases from the passed in client.
|
||||||
|
//
|
||||||
|
// Use this with node.WithNodeEnableLeaseV1Beta1 when creating a node controller.
|
||||||
|
func NodeLeaseV1Beta1Client(client kubernetes.Interface) v1beta1.LeaseInterface {
|
||||||
|
return client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user