From 083f6dee05515d718bbb7103aea970966bb66afd Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Wed, 26 Sep 2018 13:18:02 -0700 Subject: [PATCH] Refactor provider init (#360) * Refactor provider init This moves provider init out of vkubelet setup, instead preferring to initialize vkubelet with a provider. * Split API server configuration from setup. This makes sure that configuration (which is done primarily through env vars) is separate from actually standing up the servers. This also makes sure to abort daemon initialization if the API servers are not able to start. --- cmd/client.go | 35 +++++ cmd/root.go | 87 ++++++++++- cmd/taint.go | 56 +++++++ {vkubelet => providers}/provider.go | 2 +- providers/register/provider_alicloud.go | 23 +++ providers/register/provider_aws.go | 16 ++ providers/register/provider_azure.go | 23 +++ providers/register/provider_azurebatch.go | 23 +++ providers/register/provider_cri.go | 22 +++ providers/register/provider_huawei.go | 23 +++ providers/register/provider_hyper.go | 16 ++ providers/register/provider_mock.go | 22 +++ providers/register/provider_sfmesh.go | 22 +++ providers/register/provider_vic.go | 16 ++ providers/register/provider_web.go | 16 ++ providers/register/register.go | 35 +++++ vkubelet/apiserver.go | 42 +++--- vkubelet/lookup.go | 64 -------- vkubelet/lookup_darwin.go | 50 ------- vkubelet/lookup_windows.go | 47 ------ vkubelet/vkubelet.go | 171 ++++++++-------------- 21 files changed, 518 insertions(+), 293 deletions(-) create mode 100644 cmd/client.go create mode 100644 cmd/taint.go rename {vkubelet => providers}/provider.go (99%) create mode 100644 providers/register/provider_alicloud.go create mode 100644 providers/register/provider_aws.go create mode 100644 providers/register/provider_azure.go create mode 100644 providers/register/provider_azurebatch.go create mode 100644 providers/register/provider_cri.go create mode 100644 providers/register/provider_huawei.go create mode 100644 providers/register/provider_hyper.go create mode 100644 providers/register/provider_mock.go create mode 100644 providers/register/provider_sfmesh.go create mode 100644 providers/register/provider_vic.go create mode 100644 providers/register/provider_web.go create mode 100644 providers/register/register.go delete mode 100644 vkubelet/lookup.go delete mode 100644 vkubelet/lookup_darwin.go delete mode 100644 vkubelet/lookup_windows.go diff --git a/cmd/client.go b/cmd/client.go new file mode 100644 index 000000000..d0921572e --- /dev/null +++ b/cmd/client.go @@ -0,0 +1,35 @@ +package cmd + +import ( + "os" + + "github.com/pkg/errors" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +func newClient(configPath string) (*kubernetes.Clientset, error) { + var config *rest.Config + + // Check if the kubeConfig file exists. + if _, err := os.Stat(configPath); !os.IsNotExist(err) { + // Get the kubeconfig from the filepath. + config, err = clientcmd.BuildConfigFromFlags("", configPath) + if err != nil { + return nil, errors.Wrap(err, "error building client config") + } + } else { + // Set to in-cluster config. + config, err = rest.InClusterConfig() + if err != nil { + return nil, errors.Wrap(err, "error building in cluster config") + } + } + + if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { + config.Host = masterURI + } + + return kubernetes.NewForConfig(config) +} diff --git a/cmd/root.go b/cmd/root.go index 6f2c0b31e..8e23c88ad 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -16,18 +16,29 @@ package cmd import ( "context" + "fmt" "os" "path/filepath" + "strconv" "strings" "github.com/Sirupsen/logrus" + "github.com/cpuguy83/strongerrors" homedir "github.com/mitchellh/go-homedir" + "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/register" vkubelet "github.com/virtual-kubelet/virtual-kubelet/vkubelet" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" +) + +const ( + defaultDaemonPort = "10250" ) var kubeletConfig string @@ -41,6 +52,11 @@ var taintKey string var disableTaint bool var logLevel string var metricsAddr string +var taint *corev1.Taint +var k8sClient *kubernetes.Clientset +var p providers.Provider +var rm *manager.ResourceManager +var apiConfig vkubelet.APIConfig // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ @@ -50,11 +66,21 @@ var RootCmd = &cobra.Command{ backend implementation allowing users to create kubernetes nodes without running the kubelet. This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, Run: func(cmd *cobra.Command, args []string) { - f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint, metricsAddr) + ctx := context.Background() + f, err := vkubelet.New(ctx, vkubelet.Config{ + Client: k8sClient, + Namespace: kubeNamespace, + NodeName: nodeName, + Taint: taint, + MetricsAddr: metricsAddr, + Provider: p, + ResourceManager: rm, + APIConfig: apiConfig, + }) if err != nil { log.L.WithError(err).Fatal("Error initializing virtual kubelet") } - if err := f.Run(context.Background()); err != nil { + if err := f.Run(ctx); err != nil { log.L.Fatal(err) } }, @@ -155,4 +181,61 @@ func initConfig() { }) logger.Level = level log.L = logger + + if !disableTaint { + taint, err = getTaint(taintKey, provider) + if err != nil { + logger.WithError(err).Fatal("Error setting up desired kubernetes node taint") + } + } + + k8sClient, err = newClient(kubeConfig) + if err != nil { + logger.WithError(err).Fatal("Error creating kubernetes client") + } + + rm, err = manager.NewResourceManager(k8sClient) + if err != nil { + logger.WithError(err).Fatal("Error initializing resource manager") + } + + daemonPortEnv := getEnv("KUBELET_PORT", defaultDaemonPort) + daemonPort, err := strconv.ParseInt(daemonPortEnv, 10, 32) + if err != nil { + logger.WithError(err).WithField("value", daemonPortEnv).Fatal("Invalid value from KUBELET_PORT in environment") + } + + initConfig := register.InitConfig{ + ConfigPath: providerConfig, + NodeName: nodeName, + OperatingSystem: operatingSystem, + ResourceManager: rm, + DaemonPort: int32(daemonPort), + InternalIP: os.Getenv("VKUBELET_POD_IP"), + } + + p, err = register.GetProvider(provider, initConfig) + if err != nil { + logger.WithError(err).Fatal("Error initializing provider") + } + + apiConfig, err = getAPIConfig() + if err != nil { + logger.WithError(err).Fatal("Error reading API config") + } +} + +func getAPIConfig() (vkubelet.APIConfig, error) { + config := vkubelet.APIConfig{ + CertPath: os.Getenv("APISERVER_CERT_LOCATION"), + KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), + } + + port, err := strconv.Atoi(os.Getenv("KUBELET_PORT")) + if err != nil { + return vkubelet.APIConfig{}, strongerrors.InvalidArgument(errors.Wrap(err, "error parsing KUBELET_PORT variable")) + } + config.Addr = fmt.Sprintf(":%d", port) + + return config, nil } diff --git a/cmd/taint.go b/cmd/taint.go new file mode 100644 index 000000000..279dd4d98 --- /dev/null +++ b/cmd/taint.go @@ -0,0 +1,56 @@ +package cmd + +import ( + "os" + + "github.com/cpuguy83/strongerrors" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" +) + +// Default taint values +const ( + DefaultTaintEffect = corev1.TaintEffectPreferNoSchedule + DefaultTaintKey = "virtual-kubelet.io/provider" +) + +func getEnv(key, defaultValue string) string { + value, found := os.LookupEnv(key) + if found { + return value + } + return defaultValue +} + +// getTaint creates a taint using the provided key/value. +// Taint effect is read from the environment +// The taint key/value may be overwritten by the environment. +func getTaint(key, value string) (*corev1.Taint, error) { + if key == "" { + key = DefaultTaintKey + value = provider + } + + key = getEnv("VKUBELET_TAINT_KEY", key) + value = getEnv("VKUBELET_TAINT_VALUE", value) + effectEnv := getEnv("VKUBELET_TAINT_EFFECT", string(DefaultTaintEffect)) + + var effect corev1.TaintEffect + switch effectEnv { + case "NoSchedule": + effect = corev1.TaintEffectNoSchedule + case "NoExecute": + effect = corev1.TaintEffectNoExecute + case "PreferNoSchedule": + effect = corev1.TaintEffectPreferNoSchedule + default: + return nil, strongerrors.InvalidArgument(errors.Errorf("taint effect %q is not supported", effectEnv)) + } + + return &corev1.Taint{ + Key: key, + Value: value, + Effect: effect, + }, nil +} diff --git a/vkubelet/provider.go b/providers/provider.go similarity index 99% rename from vkubelet/provider.go rename to providers/provider.go index 873ba84a1..0d47e69ed 100644 --- a/vkubelet/provider.go +++ b/providers/provider.go @@ -1,4 +1,4 @@ -package vkubelet +package providers import ( "context" diff --git a/providers/register/provider_alicloud.go b/providers/register/provider_alicloud.go new file mode 100644 index 000000000..73315d9c2 --- /dev/null +++ b/providers/register/provider_alicloud.go @@ -0,0 +1,23 @@ +// +build !no_alicooud_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/alicloud" +) + +func init() { + register("alicloud", aliCloudInit) +} + +func aliCloudInit(cfg InitConfig) (providers.Provider, error) { + return alicloud.NewECIProvider( + cfg.ConfigPath, + cfg.ResourceManager, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_aws.go b/providers/register/provider_aws.go new file mode 100644 index 000000000..dead12c78 --- /dev/null +++ b/providers/register/provider_aws.go @@ -0,0 +1,16 @@ +// +build !no_aws_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/aws" +) + +func init() { + register("aws", initAWS) +} + +func initAWS(cfg InitConfig) (providers.Provider, error) { + return aws.NewFargateProvider(cfg.ConfigPath, cfg.ResourceManager, cfg.NodeName, cfg.OperatingSystem, cfg.InternalIP, cfg.DaemonPort) +} diff --git a/providers/register/provider_azure.go b/providers/register/provider_azure.go new file mode 100644 index 000000000..5023afdf5 --- /dev/null +++ b/providers/register/provider_azure.go @@ -0,0 +1,23 @@ +// +build !no_azure_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/azure" +) + +func init() { + register("azure", initAzure) +} + +func initAzure(cfg InitConfig) (providers.Provider, error) { + return azure.NewACIProvider( + cfg.ConfigPath, + cfg.ResourceManager, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_azurebatch.go b/providers/register/provider_azurebatch.go new file mode 100644 index 000000000..b8fab0435 --- /dev/null +++ b/providers/register/provider_azurebatch.go @@ -0,0 +1,23 @@ +// +build !no_azurebatch_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/azurebatch" +) + +func init() { + register("azurebatch", initAzureBatch) +} + +func initAzureBatch(cfg InitConfig) (providers.Provider, error) { + return azurebatch.NewBatchProvider( + cfg.ConfigPath, + cfg.ResourceManager, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_cri.go b/providers/register/provider_cri.go new file mode 100644 index 000000000..a9d4ee7d7 --- /dev/null +++ b/providers/register/provider_cri.go @@ -0,0 +1,22 @@ +// +build linux,!no_cri_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/cri" +) + +func init() { + register("cri", criInit) +} + +func criInit(cfg InitConfig) (providers.Provider, error) { + return cri.NewCRIProvider( + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.ResourceManager, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_huawei.go b/providers/register/provider_huawei.go new file mode 100644 index 000000000..847a8975d --- /dev/null +++ b/providers/register/provider_huawei.go @@ -0,0 +1,23 @@ +// +build !no_huawei_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/huawei" +) + +func init() { + register("huawei", initHuawei) +} + +func initHuawei(cfg InitConfig) (providers.Provider, error) { + return huawei.NewCCIProvider( + cfg.ConfigPath, + cfg.ResourceManager, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_hyper.go b/providers/register/provider_hyper.go new file mode 100644 index 000000000..ac43af5d5 --- /dev/null +++ b/providers/register/provider_hyper.go @@ -0,0 +1,16 @@ +// +build !no_hyper_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/hypersh" +) + +func init() { + register("hyper", initHyper) +} + +func initHyper(cfg InitConfig) (providers.Provider, error) { + return hypersh.NewHyperProvider(cfg.ConfigPath, cfg.ResourceManager, cfg.NodeName, cfg.OperatingSystem) +} diff --git a/providers/register/provider_mock.go b/providers/register/provider_mock.go new file mode 100644 index 000000000..f9a48df38 --- /dev/null +++ b/providers/register/provider_mock.go @@ -0,0 +1,22 @@ +// +build !no_mock_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/mock" +) + +func init() { + register("mock", initMock) +} + +func initMock(cfg InitConfig) (providers.Provider, error) { + return mock.NewMockProvider( + cfg.ConfigPath, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_sfmesh.go b/providers/register/provider_sfmesh.go new file mode 100644 index 000000000..9e4788b6b --- /dev/null +++ b/providers/register/provider_sfmesh.go @@ -0,0 +1,22 @@ +// +build !no_sfmesh_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh" +) + +func init() { + register("sfmesh", sfmeshInit) +} + +func sfmeshInit(cfg InitConfig) (providers.Provider, error) { + return sfmesh.NewSFMeshProvider( + cfg.ResourceManager, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_vic.go b/providers/register/provider_vic.go new file mode 100644 index 000000000..6e7582793 --- /dev/null +++ b/providers/register/provider_vic.go @@ -0,0 +1,16 @@ +// +build linux,!no_vic_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/vic" +) + +func init() { + register("vic", initVic) +} + +func initVic(cfg InitConfig) (providers.Provider, error) { + return vic.NewVicProvider(cfg.ConfigPath, cfg.ResourceManager, cfg.NodeName, cfg.OperatingSystem) +} diff --git a/providers/register/provider_web.go b/providers/register/provider_web.go new file mode 100644 index 000000000..f10b2034c --- /dev/null +++ b/providers/register/provider_web.go @@ -0,0 +1,16 @@ +// +build !no_web_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/web" +) + +func init() { + register("web", initWeb) +} + +func initWeb(cfg InitConfig) (providers.Provider, error) { + return web.NewBrokerProvider(cfg.NodeName, cfg.OperatingSystem, cfg.DaemonPort) +} diff --git a/providers/register/register.go b/providers/register/register.go new file mode 100644 index 000000000..11b853ebd --- /dev/null +++ b/providers/register/register.go @@ -0,0 +1,35 @@ +package register + +import ( + "github.com/cpuguy83/strongerrors" + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" +) + +var providerInits = make(map[string]initFunc) + +// InitConfig is the config passed to initialize a registered provider. +type InitConfig struct { + ConfigPath string + NodeName string + OperatingSystem string + InternalIP string + DaemonPort int32 + ResourceManager *manager.ResourceManager +} + +type initFunc func(InitConfig) (providers.Provider, error) + +// GetProvider gets the provider specified by the given name +func GetProvider(name string, cfg InitConfig) (providers.Provider, error) { + f, ok := providerInits[name] + if !ok { + return nil, strongerrors.NotFound(errors.Errorf("provider not found: %s", name)) + } + return f(cfg) +} + +func register(name string, f initFunc) { + providerInits[name] = f +} diff --git a/vkubelet/apiserver.go b/vkubelet/apiserver.go index 539f837d2..c1671f96e 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/apiserver.go @@ -2,39 +2,33 @@ package vkubelet import ( "context" - "fmt" + "net" "net/http" - "os" "github.com/Sirupsen/logrus" "github.com/gorilla/mux" "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" ) -// KubeletServerStart starts the virtual kubelet HTTP server. -func KubeletServerStart(p Provider) { - certFilePath := os.Getenv("APISERVER_CERT_LOCATION") - keyFilePath := os.Getenv("APISERVER_KEY_LOCATION") - port := os.Getenv("KUBELET_PORT") - addr := fmt.Sprintf(":%s", port) - +// PodHandler creates an http handler for interacting with pods/containers. +func PodHandler(p providers.Provider) http.Handler { r := mux.NewRouter() r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", api.PodLogsHandlerFunc(p)).Methods("GET") r.HandleFunc("/exec/{namespace}/{pod}/{container}", api.PodExecHandlerFunc(p)).Methods("POST") r.NotFoundHandler = http.HandlerFunc(NotFound) - - if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, InstrumentHandler(r)); err != nil { - log.G(context.TODO()).WithError(err).Error("error setting up http server") - } + return r } -// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API. -// TLS is never enabled on this endpoint. -func MetricsServerStart(p Provider, addr string) { +// MetricsSummaryHandler creates an http handler for serving pod metrics. +// +// If the passed in provider does not implement providers.PodMetricsProvider, +// it will create handlers that just serves http.StatusNotImplemented +func MetricsSummaryHandler(p providers.Provider) http.Handler { r := mux.NewRouter() - mp, ok := p.(PodMetricsProvider) + mp, ok := p.(providers.PodMetricsProvider) if !ok { r.HandleFunc("/stats/summary", NotImplemented).Methods("GET") r.HandleFunc("/stats/summary/", NotImplemented).Methods("GET") @@ -43,7 +37,19 @@ func MetricsServerStart(p Provider, addr string) { r.HandleFunc("/stats/summary/", api.PodMetricsHandlerFunc(mp)).Methods("GET") } r.NotFoundHandler = http.HandlerFunc(NotFound) - if err := http.ListenAndServe(addr, InstrumentHandler(r)); err != nil { + return r +} + +// KubeletServerStart starts the virtual kubelet HTTP server. +func KubeletServerStart(p providers.Provider, l net.Listener, cert, key string) { + if err := http.ServeTLS(l, InstrumentHandler(PodHandler(p)), cert, key); err != nil { + log.G(context.TODO()).WithError(err).Error("error setting up http server") + } +} + +// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API. +func MetricsServerStart(p providers.Provider, l net.Listener) { + if err := http.Serve(l, InstrumentHandler(MetricsSummaryHandler(p))); err != nil { log.G(context.TODO()).WithError(err).Error("Error starting http server") } } diff --git a/vkubelet/lookup.go b/vkubelet/lookup.go deleted file mode 100644 index b93bdee89..000000000 --- a/vkubelet/lookup.go +++ /dev/null @@ -1,64 +0,0 @@ -// +build !windows,!darwin - -package vkubelet - -import ( - "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers/alicloud" - "github.com/virtual-kubelet/virtual-kubelet/providers/aws" - "github.com/virtual-kubelet/virtual-kubelet/providers/azure" - "github.com/virtual-kubelet/virtual-kubelet/providers/azurebatch" - "github.com/virtual-kubelet/virtual-kubelet/providers/cri" - "github.com/virtual-kubelet/virtual-kubelet/providers/huawei" - "github.com/virtual-kubelet/virtual-kubelet/providers/hypersh" - "github.com/virtual-kubelet/virtual-kubelet/providers/mock" - "github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh" - "github.com/virtual-kubelet/virtual-kubelet/providers/vic" - "github.com/virtual-kubelet/virtual-kubelet/providers/web" -) - -// Compile time proof that our implementations meet the Provider interface. -var _ Provider = (*alicloud.ECIProvider)(nil) -var _ Provider = (*aws.FargateProvider)(nil) -var _ Provider = (*azure.ACIProvider)(nil) -var _ Provider = (*hypersh.HyperProvider)(nil) -var _ Provider = (*web.BrokerProvider)(nil) -var _ Provider = (*mock.MockProvider)(nil) -var _ Provider = (*huawei.CCIProvider)(nil) -var _ Provider = (*azurebatch.Provider)(nil) -var _ Provider = (*sfmesh.SFMeshProvider)(nil) - -// start of providers not supported on windows -var _ Provider = (*cri.CRIProvider)(nil) -var _ Provider = (*vic.VicProvider)(nil) - -func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager, nodeName, operatingSystem, internalIP string, daemonEndpointPort int32) (Provider, error) { - switch provider { - case "alicloud": - return alicloud.NewECIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "aws": - return aws.NewFargateProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azure": - return azure.NewACIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azurebatch": - return azurebatch.NewBatchProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "hyper": - return hypersh.NewHyperProvider(providerConfig, rm, nodeName, operatingSystem) - case "web": - return web.NewBrokerProvider(nodeName, operatingSystem, daemonEndpointPort) - case "mock": - return mock.NewMockProvider(providerConfig, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "huawei": - return huawei.NewCCIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "sfmesh": - return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - // start of providers not supported on windows - case "cri": - return cri.NewCRIProvider(nodeName, operatingSystem, internalIP, rm, daemonEndpointPort) - case "vic": - return vic.NewVicProvider(providerConfig, rm, nodeName, operatingSystem) - default: - return nil, errors.New("provider not supported") - } -} diff --git a/vkubelet/lookup_darwin.go b/vkubelet/lookup_darwin.go deleted file mode 100644 index 7c20311a0..000000000 --- a/vkubelet/lookup_darwin.go +++ /dev/null @@ -1,50 +0,0 @@ -package vkubelet - -import ( - "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers/aws" - "github.com/virtual-kubelet/virtual-kubelet/providers/azure" - "github.com/virtual-kubelet/virtual-kubelet/providers/azurebatch" - "github.com/virtual-kubelet/virtual-kubelet/providers/huawei" - "github.com/virtual-kubelet/virtual-kubelet/providers/hypersh" - "github.com/virtual-kubelet/virtual-kubelet/providers/mock" - "github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh" - "github.com/virtual-kubelet/virtual-kubelet/providers/web" -) - -// Compile time proof that our implementations meet the Provider interface. -var _ Provider = (*alicloud.ECIProvider)(nil) -var _ Provider = (*aws.FargateProvider)(nil) -var _ Provider = (*azure.ACIProvider)(nil) -var _ Provider = (*hypersh.HyperProvider)(nil) -var _ Provider = (*web.BrokerProvider)(nil) -var _ Provider = (*mock.MockProvider)(nil) -var _ Provider = (*huawei.CCIProvider)(nil) -var _ Provider = (*azurebatch.Provider)(nil) -var _ Provider = (*sfmesh.SFMeshProvider)(nil) - -func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager, nodeName, operatingSystem, internalIP string, daemonEndpointPort int32) (Provider, error) { - switch provider { - case "alicloud": - return alicloud.NewECIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "aws": - return aws.NewFargateProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azure": - return azure.NewACIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azurebatch": - return azurebatch.NewBatchProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "hyper": - return hypersh.NewHyperProvider(providerConfig, rm, nodeName, operatingSystem) - case "web": - return web.NewBrokerProvider(nodeName, operatingSystem, daemonEndpointPort) - case "mock": - return mock.NewMockProvider(providerConfig, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "huawei": - return huawei.NewCCIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "sfmesh": - return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - default: - return nil, errors.New("provider is not supported") - } -} diff --git a/vkubelet/lookup_windows.go b/vkubelet/lookup_windows.go deleted file mode 100644 index 1a09aec54..000000000 --- a/vkubelet/lookup_windows.go +++ /dev/null @@ -1,47 +0,0 @@ -package vkubelet - -import ( - "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers/aws" - "github.com/virtual-kubelet/virtual-kubelet/providers/azure" - "github.com/virtual-kubelet/virtual-kubelet/providers/azurebatch" - "github.com/virtual-kubelet/virtual-kubelet/providers/huawei" - "github.com/virtual-kubelet/virtual-kubelet/providers/hypersh" - "github.com/virtual-kubelet/virtual-kubelet/providers/mock" - "github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh" - "github.com/virtual-kubelet/virtual-kubelet/providers/web" -) - -// Compile time proof that our implementations meet the Provider interface. -var _ Provider = (*aws.FargateProvider)(nil) -var _ Provider = (*azure.ACIProvider)(nil) -var _ Provider = (*hypersh.HyperProvider)(nil) -var _ Provider = (*web.BrokerProvider)(nil) -var _ Provider = (*mock.MockProvider)(nil) -var _ Provider = (*huawei.CCIProvider)(nil) -var _ Provider = (*azurebatch.Provider)(nil) -var _ Provider = (*sfmesh.SFMeshProvider)(nil) - -func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager, nodeName, operatingSystem, internalIP string, daemonEndpointPort int32) (Provider, error) { - switch provider { - case "aws": - return aws.NewFargateProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azure": - return azure.NewACIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azurebatch": - return azurebatch.NewBatchProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "hyper": - return hypersh.NewHyperProvider(providerConfig, rm, nodeName, operatingSystem) - case "web": - return web.NewBrokerProvider(nodeName, operatingSystem, daemonEndpointPort) - case "mock": - return mock.NewMockProvider(providerConfig, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "huawei": - return huawei.NewCCIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "sfmesh": - return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - default: - return nil, errors.New("provider not supported") - } -} diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index 8bed4fd4b..f400e817f 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -3,9 +3,9 @@ package vkubelet import ( "context" "fmt" + "net" "os" "os/signal" - "strconv" "strings" "syscall" "time" @@ -13,14 +13,13 @@ import ( pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" ) const ( @@ -32,123 +31,73 @@ type Server struct { nodeName string namespace string k8sClient *kubernetes.Clientset - taint corev1.Taint - disableTaint bool - provider Provider + taint *corev1.Taint + provider providers.Provider podWatcher watch.Interface resourceManager *manager.ResourceManager } -func getEnv(key, defaultValue string) string { - value, found := os.LookupEnv(key) - if found { - return value - } - return defaultValue +// Config is used to configure a new server. +type Config struct { + APIConfig APIConfig + Client *kubernetes.Clientset + MetricsAddr string + Namespace string + NodeName string + Provider providers.Provider + ResourceManager *manager.ResourceManager + Taint *corev1.Taint +} + +type APIConfig struct { + CertPath string + KeyPath string + Addr string } // New creates a new virtual-kubelet server. -func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool, metricsAddr string) (*Server, error) { - var config *rest.Config - - // Check if the kubeConfig file exists. - if _, err := os.Stat(kubeConfig); !os.IsNotExist(err) { - // Get the kubeconfig from the filepath. - config, err = clientcmd.BuildConfigFromFlags("", kubeConfig) - if err != nil { - return nil, err - } - } else { - // Set to in-cluster config. - config, err = rest.InClusterConfig() - if err != nil { - return nil, err - } +func New(ctx context.Context, cfg Config) (s *Server, retErr error) { + s = &Server{ + namespace: cfg.Namespace, + nodeName: cfg.NodeName, + taint: cfg.Taint, + k8sClient: cfg.Client, + resourceManager: cfg.ResourceManager, + provider: cfg.Provider, } - if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { - config.Host = masterURI - } - - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - - rm, err := manager.NewResourceManager(clientset) - if err != nil { - return nil, pkgerrors.Wrap(err, "error creating resource manager") - } - - daemonEndpointPortEnv := os.Getenv("KUBELET_PORT") - if daemonEndpointPortEnv == "" { - daemonEndpointPortEnv = "10250" - } - i64value, err := strconv.ParseInt(daemonEndpointPortEnv, 10, 32) - daemonEndpointPort := int32(i64value) - - internalIP := os.Getenv("VKUBELET_POD_IP") - - var defaultTaintKey string - var defaultTaintValue string - if taintKey != "" { - defaultTaintKey = taintKey - defaultTaintValue = "" - } else { - defaultTaintKey = "virtual-kubelet.io/provider" - defaultTaintValue = provider - } - vkTaintKey := getEnv("VKUBELET_TAINT_KEY", defaultTaintKey) - vkTaintValue := getEnv("VKUBELET_TAINT_VALUE", defaultTaintValue) - vkTaintEffectEnv := getEnv("VKUBELET_TAINT_EFFECT", "NoSchedule") - var vkTaintEffect corev1.TaintEffect - switch vkTaintEffectEnv { - case "NoSchedule": - vkTaintEffect = corev1.TaintEffectNoSchedule - case "NoExecute": - vkTaintEffect = corev1.TaintEffectNoExecute - case "PreferNoSchedule": - vkTaintEffect = corev1.TaintEffectPreferNoSchedule - default: - return nil, pkgerrors.Errorf("taint effect %q is not supported", vkTaintEffectEnv) - } - - taint := corev1.Taint{ - Key: vkTaintKey, - Value: vkTaintValue, - Effect: vkTaintEffect, - } - - p, err := lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - if err != nil { - return nil, err - } - - s := &Server{ - namespace: namespace, - nodeName: nodeName, - taint: taint, - disableTaint: disableTaint, - k8sClient: clientset, - resourceManager: rm, - provider: p, - } - - ctx := context.TODO() ctx = log.WithLogger(ctx, log.G(ctx)) - if err = s.registerNode(ctx); err != nil { - return s, err + apiL, err := net.Listen("tcp", cfg.APIConfig.Addr) + if err != nil { + return nil, pkgerrors.Wrap(err, "error setting up API listener") } + defer func() { + if retErr != nil { + apiL.Close() + } + }() + go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath) - go KubeletServerStart(p) - - if metricsAddr != "" { - go MetricsServerStart(p, metricsAddr) + if cfg.MetricsAddr != "" { + metricsL, err := net.Listen("tcp", cfg.MetricsAddr) + if err != nil { + return nil, pkgerrors.Wrap(err, "error setting up metrics listener") + } + defer func() { + if retErr != nil { + metricsL.Close() + } + }() + go MetricsServerStart(cfg.Provider, metricsL) } else { log.G(ctx).Info("Skipping metrics server startup since no address was provided") } + if err := s.registerNode(ctx); err != nil { + return s, err + } + tick := time.Tick(5 * time.Second) go func() { @@ -165,8 +114,8 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon func (s *Server) registerNode(ctx context.Context) error { taints := make([]corev1.Taint, 0) - if !s.disableTaint { - taints = append(taints, s.taint) + if s.taint != nil { + taints = append(taints, *s.taint) } node := &corev1.Node{ @@ -350,10 +299,10 @@ func (s *Server) reconcile(ctx context.Context) { } if providerPod == nil && - pod.DeletionTimestamp == nil && - pod.Status.Phase != corev1.PodSucceeded && - pod.Status.Phase != corev1.PodFailed && - pod.Status.Reason != PodStatusReason_ProviderFailed { + pod.DeletionTimestamp == nil && + pod.Status.Phase != corev1.PodSucceeded && + pod.Status.Phase != corev1.PodFailed && + pod.Status.Reason != PodStatusReason_ProviderFailed { logger.Debug("Creating pod") if err := s.createPod(ctx, pod); err != nil { logger.WithError(err).Error("Error creating pod") @@ -436,8 +385,8 @@ func (s *Server) updatePodStatuses(ctx context.Context) { pods := s.resourceManager.GetPods() for _, pod := range pods { if pod.Status.Phase == corev1.PodSucceeded || - pod.Status.Phase == corev1.PodFailed || - pod.Status.Reason == PodStatusReason_ProviderFailed { + pod.Status.Phase == corev1.PodFailed || + pod.Status.Reason == PodStatusReason_ProviderFailed { continue }