diff --git a/cmd/client.go b/cmd/client.go new file mode 100644 index 000000000..d0921572e --- /dev/null +++ b/cmd/client.go @@ -0,0 +1,35 @@ +package cmd + +import ( + "os" + + "github.com/pkg/errors" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +func newClient(configPath string) (*kubernetes.Clientset, error) { + var config *rest.Config + + // Check if the kubeConfig file exists. + if _, err := os.Stat(configPath); !os.IsNotExist(err) { + // Get the kubeconfig from the filepath. + config, err = clientcmd.BuildConfigFromFlags("", configPath) + if err != nil { + return nil, errors.Wrap(err, "error building client config") + } + } else { + // Set to in-cluster config. + config, err = rest.InClusterConfig() + if err != nil { + return nil, errors.Wrap(err, "error building in cluster config") + } + } + + if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { + config.Host = masterURI + } + + return kubernetes.NewForConfig(config) +} diff --git a/cmd/root.go b/cmd/root.go index 6f2c0b31e..8e23c88ad 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -16,18 +16,29 @@ package cmd import ( "context" + "fmt" "os" "path/filepath" + "strconv" "strings" "github.com/Sirupsen/logrus" + "github.com/cpuguy83/strongerrors" homedir "github.com/mitchellh/go-homedir" + "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/register" vkubelet "github.com/virtual-kubelet/virtual-kubelet/vkubelet" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" +) + +const ( + defaultDaemonPort = "10250" ) var kubeletConfig string @@ -41,6 +52,11 @@ var taintKey string var disableTaint bool var logLevel string var metricsAddr string +var taint *corev1.Taint +var k8sClient *kubernetes.Clientset +var p providers.Provider +var rm *manager.ResourceManager +var apiConfig vkubelet.APIConfig // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ @@ -50,11 +66,21 @@ var RootCmd = &cobra.Command{ backend implementation allowing users to create kubernetes nodes without running the kubelet. This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, Run: func(cmd *cobra.Command, args []string) { - f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint, metricsAddr) + ctx := context.Background() + f, err := vkubelet.New(ctx, vkubelet.Config{ + Client: k8sClient, + Namespace: kubeNamespace, + NodeName: nodeName, + Taint: taint, + MetricsAddr: metricsAddr, + Provider: p, + ResourceManager: rm, + APIConfig: apiConfig, + }) if err != nil { log.L.WithError(err).Fatal("Error initializing virtual kubelet") } - if err := f.Run(context.Background()); err != nil { + if err := f.Run(ctx); err != nil { log.L.Fatal(err) } }, @@ -155,4 +181,61 @@ func initConfig() { }) logger.Level = level log.L = logger + + if !disableTaint { + taint, err = getTaint(taintKey, provider) + if err != nil { + logger.WithError(err).Fatal("Error setting up desired kubernetes node taint") + } + } + + k8sClient, err = newClient(kubeConfig) + if err != nil { + logger.WithError(err).Fatal("Error creating kubernetes client") + } + + rm, err = manager.NewResourceManager(k8sClient) + if err != nil { + logger.WithError(err).Fatal("Error initializing resource manager") + } + + daemonPortEnv := getEnv("KUBELET_PORT", defaultDaemonPort) + daemonPort, err := strconv.ParseInt(daemonPortEnv, 10, 32) + if err != nil { + logger.WithError(err).WithField("value", daemonPortEnv).Fatal("Invalid value from KUBELET_PORT in environment") + } + + initConfig := register.InitConfig{ + ConfigPath: providerConfig, + NodeName: nodeName, + OperatingSystem: operatingSystem, + ResourceManager: rm, + DaemonPort: int32(daemonPort), + InternalIP: os.Getenv("VKUBELET_POD_IP"), + } + + p, err = register.GetProvider(provider, initConfig) + if err != nil { + logger.WithError(err).Fatal("Error initializing provider") + } + + apiConfig, err = getAPIConfig() + if err != nil { + logger.WithError(err).Fatal("Error reading API config") + } +} + +func getAPIConfig() (vkubelet.APIConfig, error) { + config := vkubelet.APIConfig{ + CertPath: os.Getenv("APISERVER_CERT_LOCATION"), + KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), + } + + port, err := strconv.Atoi(os.Getenv("KUBELET_PORT")) + if err != nil { + return vkubelet.APIConfig{}, strongerrors.InvalidArgument(errors.Wrap(err, "error parsing KUBELET_PORT variable")) + } + config.Addr = fmt.Sprintf(":%d", port) + + return config, nil } diff --git a/cmd/taint.go b/cmd/taint.go new file mode 100644 index 000000000..279dd4d98 --- /dev/null +++ b/cmd/taint.go @@ -0,0 +1,56 @@ +package cmd + +import ( + "os" + + "github.com/cpuguy83/strongerrors" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" +) + +// Default taint values +const ( + DefaultTaintEffect = corev1.TaintEffectPreferNoSchedule + DefaultTaintKey = "virtual-kubelet.io/provider" +) + +func getEnv(key, defaultValue string) string { + value, found := os.LookupEnv(key) + if found { + return value + } + return defaultValue +} + +// getTaint creates a taint using the provided key/value. +// Taint effect is read from the environment +// The taint key/value may be overwritten by the environment. +func getTaint(key, value string) (*corev1.Taint, error) { + if key == "" { + key = DefaultTaintKey + value = provider + } + + key = getEnv("VKUBELET_TAINT_KEY", key) + value = getEnv("VKUBELET_TAINT_VALUE", value) + effectEnv := getEnv("VKUBELET_TAINT_EFFECT", string(DefaultTaintEffect)) + + var effect corev1.TaintEffect + switch effectEnv { + case "NoSchedule": + effect = corev1.TaintEffectNoSchedule + case "NoExecute": + effect = corev1.TaintEffectNoExecute + case "PreferNoSchedule": + effect = corev1.TaintEffectPreferNoSchedule + default: + return nil, strongerrors.InvalidArgument(errors.Errorf("taint effect %q is not supported", effectEnv)) + } + + return &corev1.Taint{ + Key: key, + Value: value, + Effect: effect, + }, nil +} diff --git a/vkubelet/provider.go b/providers/provider.go similarity index 99% rename from vkubelet/provider.go rename to providers/provider.go index 873ba84a1..0d47e69ed 100644 --- a/vkubelet/provider.go +++ b/providers/provider.go @@ -1,4 +1,4 @@ -package vkubelet +package providers import ( "context" diff --git a/providers/register/provider_alicloud.go b/providers/register/provider_alicloud.go new file mode 100644 index 000000000..73315d9c2 --- /dev/null +++ b/providers/register/provider_alicloud.go @@ -0,0 +1,23 @@ +// +build !no_alicooud_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/alicloud" +) + +func init() { + register("alicloud", aliCloudInit) +} + +func aliCloudInit(cfg InitConfig) (providers.Provider, error) { + return alicloud.NewECIProvider( + cfg.ConfigPath, + cfg.ResourceManager, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_aws.go b/providers/register/provider_aws.go new file mode 100644 index 000000000..dead12c78 --- /dev/null +++ b/providers/register/provider_aws.go @@ -0,0 +1,16 @@ +// +build !no_aws_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/aws" +) + +func init() { + register("aws", initAWS) +} + +func initAWS(cfg InitConfig) (providers.Provider, error) { + return aws.NewFargateProvider(cfg.ConfigPath, cfg.ResourceManager, cfg.NodeName, cfg.OperatingSystem, cfg.InternalIP, cfg.DaemonPort) +} diff --git a/providers/register/provider_azure.go b/providers/register/provider_azure.go new file mode 100644 index 000000000..5023afdf5 --- /dev/null +++ b/providers/register/provider_azure.go @@ -0,0 +1,23 @@ +// +build !no_azure_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/azure" +) + +func init() { + register("azure", initAzure) +} + +func initAzure(cfg InitConfig) (providers.Provider, error) { + return azure.NewACIProvider( + cfg.ConfigPath, + cfg.ResourceManager, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_azurebatch.go b/providers/register/provider_azurebatch.go new file mode 100644 index 000000000..b8fab0435 --- /dev/null +++ b/providers/register/provider_azurebatch.go @@ -0,0 +1,23 @@ +// +build !no_azurebatch_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/azurebatch" +) + +func init() { + register("azurebatch", initAzureBatch) +} + +func initAzureBatch(cfg InitConfig) (providers.Provider, error) { + return azurebatch.NewBatchProvider( + cfg.ConfigPath, + cfg.ResourceManager, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_cri.go b/providers/register/provider_cri.go new file mode 100644 index 000000000..a9d4ee7d7 --- /dev/null +++ b/providers/register/provider_cri.go @@ -0,0 +1,22 @@ +// +build linux,!no_cri_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/cri" +) + +func init() { + register("cri", criInit) +} + +func criInit(cfg InitConfig) (providers.Provider, error) { + return cri.NewCRIProvider( + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.ResourceManager, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_huawei.go b/providers/register/provider_huawei.go new file mode 100644 index 000000000..847a8975d --- /dev/null +++ b/providers/register/provider_huawei.go @@ -0,0 +1,23 @@ +// +build !no_huawei_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/huawei" +) + +func init() { + register("huawei", initHuawei) +} + +func initHuawei(cfg InitConfig) (providers.Provider, error) { + return huawei.NewCCIProvider( + cfg.ConfigPath, + cfg.ResourceManager, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_hyper.go b/providers/register/provider_hyper.go new file mode 100644 index 000000000..ac43af5d5 --- /dev/null +++ b/providers/register/provider_hyper.go @@ -0,0 +1,16 @@ +// +build !no_hyper_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/hypersh" +) + +func init() { + register("hyper", initHyper) +} + +func initHyper(cfg InitConfig) (providers.Provider, error) { + return hypersh.NewHyperProvider(cfg.ConfigPath, cfg.ResourceManager, cfg.NodeName, cfg.OperatingSystem) +} diff --git a/providers/register/provider_mock.go b/providers/register/provider_mock.go new file mode 100644 index 000000000..f9a48df38 --- /dev/null +++ b/providers/register/provider_mock.go @@ -0,0 +1,22 @@ +// +build !no_mock_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/mock" +) + +func init() { + register("mock", initMock) +} + +func initMock(cfg InitConfig) (providers.Provider, error) { + return mock.NewMockProvider( + cfg.ConfigPath, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_sfmesh.go b/providers/register/provider_sfmesh.go new file mode 100644 index 000000000..9e4788b6b --- /dev/null +++ b/providers/register/provider_sfmesh.go @@ -0,0 +1,22 @@ +// +build !no_sfmesh_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh" +) + +func init() { + register("sfmesh", sfmeshInit) +} + +func sfmeshInit(cfg InitConfig) (providers.Provider, error) { + return sfmesh.NewSFMeshProvider( + cfg.ResourceManager, + cfg.NodeName, + cfg.OperatingSystem, + cfg.InternalIP, + cfg.DaemonPort, + ) +} diff --git a/providers/register/provider_vic.go b/providers/register/provider_vic.go new file mode 100644 index 000000000..6e7582793 --- /dev/null +++ b/providers/register/provider_vic.go @@ -0,0 +1,16 @@ +// +build linux,!no_vic_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/vic" +) + +func init() { + register("vic", initVic) +} + +func initVic(cfg InitConfig) (providers.Provider, error) { + return vic.NewVicProvider(cfg.ConfigPath, cfg.ResourceManager, cfg.NodeName, cfg.OperatingSystem) +} diff --git a/providers/register/provider_web.go b/providers/register/provider_web.go new file mode 100644 index 000000000..f10b2034c --- /dev/null +++ b/providers/register/provider_web.go @@ -0,0 +1,16 @@ +// +build !no_web_provider + +package register + +import ( + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/web" +) + +func init() { + register("web", initWeb) +} + +func initWeb(cfg InitConfig) (providers.Provider, error) { + return web.NewBrokerProvider(cfg.NodeName, cfg.OperatingSystem, cfg.DaemonPort) +} diff --git a/providers/register/register.go b/providers/register/register.go new file mode 100644 index 000000000..11b853ebd --- /dev/null +++ b/providers/register/register.go @@ -0,0 +1,35 @@ +package register + +import ( + "github.com/cpuguy83/strongerrors" + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" +) + +var providerInits = make(map[string]initFunc) + +// InitConfig is the config passed to initialize a registered provider. +type InitConfig struct { + ConfigPath string + NodeName string + OperatingSystem string + InternalIP string + DaemonPort int32 + ResourceManager *manager.ResourceManager +} + +type initFunc func(InitConfig) (providers.Provider, error) + +// GetProvider gets the provider specified by the given name +func GetProvider(name string, cfg InitConfig) (providers.Provider, error) { + f, ok := providerInits[name] + if !ok { + return nil, strongerrors.NotFound(errors.Errorf("provider not found: %s", name)) + } + return f(cfg) +} + +func register(name string, f initFunc) { + providerInits[name] = f +} diff --git a/vkubelet/apiserver.go b/vkubelet/apiserver.go index 539f837d2..c1671f96e 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/apiserver.go @@ -2,39 +2,33 @@ package vkubelet import ( "context" - "fmt" + "net" "net/http" - "os" "github.com/Sirupsen/logrus" "github.com/gorilla/mux" "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" ) -// KubeletServerStart starts the virtual kubelet HTTP server. -func KubeletServerStart(p Provider) { - certFilePath := os.Getenv("APISERVER_CERT_LOCATION") - keyFilePath := os.Getenv("APISERVER_KEY_LOCATION") - port := os.Getenv("KUBELET_PORT") - addr := fmt.Sprintf(":%s", port) - +// PodHandler creates an http handler for interacting with pods/containers. +func PodHandler(p providers.Provider) http.Handler { r := mux.NewRouter() r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", api.PodLogsHandlerFunc(p)).Methods("GET") r.HandleFunc("/exec/{namespace}/{pod}/{container}", api.PodExecHandlerFunc(p)).Methods("POST") r.NotFoundHandler = http.HandlerFunc(NotFound) - - if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, InstrumentHandler(r)); err != nil { - log.G(context.TODO()).WithError(err).Error("error setting up http server") - } + return r } -// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API. -// TLS is never enabled on this endpoint. -func MetricsServerStart(p Provider, addr string) { +// MetricsSummaryHandler creates an http handler for serving pod metrics. +// +// If the passed in provider does not implement providers.PodMetricsProvider, +// it will create handlers that just serves http.StatusNotImplemented +func MetricsSummaryHandler(p providers.Provider) http.Handler { r := mux.NewRouter() - mp, ok := p.(PodMetricsProvider) + mp, ok := p.(providers.PodMetricsProvider) if !ok { r.HandleFunc("/stats/summary", NotImplemented).Methods("GET") r.HandleFunc("/stats/summary/", NotImplemented).Methods("GET") @@ -43,7 +37,19 @@ func MetricsServerStart(p Provider, addr string) { r.HandleFunc("/stats/summary/", api.PodMetricsHandlerFunc(mp)).Methods("GET") } r.NotFoundHandler = http.HandlerFunc(NotFound) - if err := http.ListenAndServe(addr, InstrumentHandler(r)); err != nil { + return r +} + +// KubeletServerStart starts the virtual kubelet HTTP server. +func KubeletServerStart(p providers.Provider, l net.Listener, cert, key string) { + if err := http.ServeTLS(l, InstrumentHandler(PodHandler(p)), cert, key); err != nil { + log.G(context.TODO()).WithError(err).Error("error setting up http server") + } +} + +// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API. +func MetricsServerStart(p providers.Provider, l net.Listener) { + if err := http.Serve(l, InstrumentHandler(MetricsSummaryHandler(p))); err != nil { log.G(context.TODO()).WithError(err).Error("Error starting http server") } } diff --git a/vkubelet/lookup.go b/vkubelet/lookup.go deleted file mode 100644 index b93bdee89..000000000 --- a/vkubelet/lookup.go +++ /dev/null @@ -1,64 +0,0 @@ -// +build !windows,!darwin - -package vkubelet - -import ( - "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers/alicloud" - "github.com/virtual-kubelet/virtual-kubelet/providers/aws" - "github.com/virtual-kubelet/virtual-kubelet/providers/azure" - "github.com/virtual-kubelet/virtual-kubelet/providers/azurebatch" - "github.com/virtual-kubelet/virtual-kubelet/providers/cri" - "github.com/virtual-kubelet/virtual-kubelet/providers/huawei" - "github.com/virtual-kubelet/virtual-kubelet/providers/hypersh" - "github.com/virtual-kubelet/virtual-kubelet/providers/mock" - "github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh" - "github.com/virtual-kubelet/virtual-kubelet/providers/vic" - "github.com/virtual-kubelet/virtual-kubelet/providers/web" -) - -// Compile time proof that our implementations meet the Provider interface. -var _ Provider = (*alicloud.ECIProvider)(nil) -var _ Provider = (*aws.FargateProvider)(nil) -var _ Provider = (*azure.ACIProvider)(nil) -var _ Provider = (*hypersh.HyperProvider)(nil) -var _ Provider = (*web.BrokerProvider)(nil) -var _ Provider = (*mock.MockProvider)(nil) -var _ Provider = (*huawei.CCIProvider)(nil) -var _ Provider = (*azurebatch.Provider)(nil) -var _ Provider = (*sfmesh.SFMeshProvider)(nil) - -// start of providers not supported on windows -var _ Provider = (*cri.CRIProvider)(nil) -var _ Provider = (*vic.VicProvider)(nil) - -func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager, nodeName, operatingSystem, internalIP string, daemonEndpointPort int32) (Provider, error) { - switch provider { - case "alicloud": - return alicloud.NewECIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "aws": - return aws.NewFargateProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azure": - return azure.NewACIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azurebatch": - return azurebatch.NewBatchProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "hyper": - return hypersh.NewHyperProvider(providerConfig, rm, nodeName, operatingSystem) - case "web": - return web.NewBrokerProvider(nodeName, operatingSystem, daemonEndpointPort) - case "mock": - return mock.NewMockProvider(providerConfig, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "huawei": - return huawei.NewCCIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "sfmesh": - return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - // start of providers not supported on windows - case "cri": - return cri.NewCRIProvider(nodeName, operatingSystem, internalIP, rm, daemonEndpointPort) - case "vic": - return vic.NewVicProvider(providerConfig, rm, nodeName, operatingSystem) - default: - return nil, errors.New("provider not supported") - } -} diff --git a/vkubelet/lookup_darwin.go b/vkubelet/lookup_darwin.go deleted file mode 100644 index 7c20311a0..000000000 --- a/vkubelet/lookup_darwin.go +++ /dev/null @@ -1,50 +0,0 @@ -package vkubelet - -import ( - "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers/aws" - "github.com/virtual-kubelet/virtual-kubelet/providers/azure" - "github.com/virtual-kubelet/virtual-kubelet/providers/azurebatch" - "github.com/virtual-kubelet/virtual-kubelet/providers/huawei" - "github.com/virtual-kubelet/virtual-kubelet/providers/hypersh" - "github.com/virtual-kubelet/virtual-kubelet/providers/mock" - "github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh" - "github.com/virtual-kubelet/virtual-kubelet/providers/web" -) - -// Compile time proof that our implementations meet the Provider interface. -var _ Provider = (*alicloud.ECIProvider)(nil) -var _ Provider = (*aws.FargateProvider)(nil) -var _ Provider = (*azure.ACIProvider)(nil) -var _ Provider = (*hypersh.HyperProvider)(nil) -var _ Provider = (*web.BrokerProvider)(nil) -var _ Provider = (*mock.MockProvider)(nil) -var _ Provider = (*huawei.CCIProvider)(nil) -var _ Provider = (*azurebatch.Provider)(nil) -var _ Provider = (*sfmesh.SFMeshProvider)(nil) - -func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager, nodeName, operatingSystem, internalIP string, daemonEndpointPort int32) (Provider, error) { - switch provider { - case "alicloud": - return alicloud.NewECIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "aws": - return aws.NewFargateProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azure": - return azure.NewACIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azurebatch": - return azurebatch.NewBatchProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "hyper": - return hypersh.NewHyperProvider(providerConfig, rm, nodeName, operatingSystem) - case "web": - return web.NewBrokerProvider(nodeName, operatingSystem, daemonEndpointPort) - case "mock": - return mock.NewMockProvider(providerConfig, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "huawei": - return huawei.NewCCIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "sfmesh": - return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - default: - return nil, errors.New("provider is not supported") - } -} diff --git a/vkubelet/lookup_windows.go b/vkubelet/lookup_windows.go deleted file mode 100644 index 1a09aec54..000000000 --- a/vkubelet/lookup_windows.go +++ /dev/null @@ -1,47 +0,0 @@ -package vkubelet - -import ( - "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers/aws" - "github.com/virtual-kubelet/virtual-kubelet/providers/azure" - "github.com/virtual-kubelet/virtual-kubelet/providers/azurebatch" - "github.com/virtual-kubelet/virtual-kubelet/providers/huawei" - "github.com/virtual-kubelet/virtual-kubelet/providers/hypersh" - "github.com/virtual-kubelet/virtual-kubelet/providers/mock" - "github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh" - "github.com/virtual-kubelet/virtual-kubelet/providers/web" -) - -// Compile time proof that our implementations meet the Provider interface. -var _ Provider = (*aws.FargateProvider)(nil) -var _ Provider = (*azure.ACIProvider)(nil) -var _ Provider = (*hypersh.HyperProvider)(nil) -var _ Provider = (*web.BrokerProvider)(nil) -var _ Provider = (*mock.MockProvider)(nil) -var _ Provider = (*huawei.CCIProvider)(nil) -var _ Provider = (*azurebatch.Provider)(nil) -var _ Provider = (*sfmesh.SFMeshProvider)(nil) - -func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager, nodeName, operatingSystem, internalIP string, daemonEndpointPort int32) (Provider, error) { - switch provider { - case "aws": - return aws.NewFargateProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azure": - return azure.NewACIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "azurebatch": - return azurebatch.NewBatchProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "hyper": - return hypersh.NewHyperProvider(providerConfig, rm, nodeName, operatingSystem) - case "web": - return web.NewBrokerProvider(nodeName, operatingSystem, daemonEndpointPort) - case "mock": - return mock.NewMockProvider(providerConfig, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "huawei": - return huawei.NewCCIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - case "sfmesh": - return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - default: - return nil, errors.New("provider not supported") - } -} diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index 8bed4fd4b..f400e817f 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -3,9 +3,9 @@ package vkubelet import ( "context" "fmt" + "net" "os" "os/signal" - "strconv" "strings" "syscall" "time" @@ -13,14 +13,13 @@ import ( pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" ) const ( @@ -32,123 +31,73 @@ type Server struct { nodeName string namespace string k8sClient *kubernetes.Clientset - taint corev1.Taint - disableTaint bool - provider Provider + taint *corev1.Taint + provider providers.Provider podWatcher watch.Interface resourceManager *manager.ResourceManager } -func getEnv(key, defaultValue string) string { - value, found := os.LookupEnv(key) - if found { - return value - } - return defaultValue +// Config is used to configure a new server. +type Config struct { + APIConfig APIConfig + Client *kubernetes.Clientset + MetricsAddr string + Namespace string + NodeName string + Provider providers.Provider + ResourceManager *manager.ResourceManager + Taint *corev1.Taint +} + +type APIConfig struct { + CertPath string + KeyPath string + Addr string } // New creates a new virtual-kubelet server. -func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool, metricsAddr string) (*Server, error) { - var config *rest.Config - - // Check if the kubeConfig file exists. - if _, err := os.Stat(kubeConfig); !os.IsNotExist(err) { - // Get the kubeconfig from the filepath. - config, err = clientcmd.BuildConfigFromFlags("", kubeConfig) - if err != nil { - return nil, err - } - } else { - // Set to in-cluster config. - config, err = rest.InClusterConfig() - if err != nil { - return nil, err - } +func New(ctx context.Context, cfg Config) (s *Server, retErr error) { + s = &Server{ + namespace: cfg.Namespace, + nodeName: cfg.NodeName, + taint: cfg.Taint, + k8sClient: cfg.Client, + resourceManager: cfg.ResourceManager, + provider: cfg.Provider, } - if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { - config.Host = masterURI - } - - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - - rm, err := manager.NewResourceManager(clientset) - if err != nil { - return nil, pkgerrors.Wrap(err, "error creating resource manager") - } - - daemonEndpointPortEnv := os.Getenv("KUBELET_PORT") - if daemonEndpointPortEnv == "" { - daemonEndpointPortEnv = "10250" - } - i64value, err := strconv.ParseInt(daemonEndpointPortEnv, 10, 32) - daemonEndpointPort := int32(i64value) - - internalIP := os.Getenv("VKUBELET_POD_IP") - - var defaultTaintKey string - var defaultTaintValue string - if taintKey != "" { - defaultTaintKey = taintKey - defaultTaintValue = "" - } else { - defaultTaintKey = "virtual-kubelet.io/provider" - defaultTaintValue = provider - } - vkTaintKey := getEnv("VKUBELET_TAINT_KEY", defaultTaintKey) - vkTaintValue := getEnv("VKUBELET_TAINT_VALUE", defaultTaintValue) - vkTaintEffectEnv := getEnv("VKUBELET_TAINT_EFFECT", "NoSchedule") - var vkTaintEffect corev1.TaintEffect - switch vkTaintEffectEnv { - case "NoSchedule": - vkTaintEffect = corev1.TaintEffectNoSchedule - case "NoExecute": - vkTaintEffect = corev1.TaintEffectNoExecute - case "PreferNoSchedule": - vkTaintEffect = corev1.TaintEffectPreferNoSchedule - default: - return nil, pkgerrors.Errorf("taint effect %q is not supported", vkTaintEffectEnv) - } - - taint := corev1.Taint{ - Key: vkTaintKey, - Value: vkTaintValue, - Effect: vkTaintEffect, - } - - p, err := lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) - if err != nil { - return nil, err - } - - s := &Server{ - namespace: namespace, - nodeName: nodeName, - taint: taint, - disableTaint: disableTaint, - k8sClient: clientset, - resourceManager: rm, - provider: p, - } - - ctx := context.TODO() ctx = log.WithLogger(ctx, log.G(ctx)) - if err = s.registerNode(ctx); err != nil { - return s, err + apiL, err := net.Listen("tcp", cfg.APIConfig.Addr) + if err != nil { + return nil, pkgerrors.Wrap(err, "error setting up API listener") } + defer func() { + if retErr != nil { + apiL.Close() + } + }() + go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath) - go KubeletServerStart(p) - - if metricsAddr != "" { - go MetricsServerStart(p, metricsAddr) + if cfg.MetricsAddr != "" { + metricsL, err := net.Listen("tcp", cfg.MetricsAddr) + if err != nil { + return nil, pkgerrors.Wrap(err, "error setting up metrics listener") + } + defer func() { + if retErr != nil { + metricsL.Close() + } + }() + go MetricsServerStart(cfg.Provider, metricsL) } else { log.G(ctx).Info("Skipping metrics server startup since no address was provided") } + if err := s.registerNode(ctx); err != nil { + return s, err + } + tick := time.Tick(5 * time.Second) go func() { @@ -165,8 +114,8 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon func (s *Server) registerNode(ctx context.Context) error { taints := make([]corev1.Taint, 0) - if !s.disableTaint { - taints = append(taints, s.taint) + if s.taint != nil { + taints = append(taints, *s.taint) } node := &corev1.Node{ @@ -350,10 +299,10 @@ func (s *Server) reconcile(ctx context.Context) { } if providerPod == nil && - pod.DeletionTimestamp == nil && - pod.Status.Phase != corev1.PodSucceeded && - pod.Status.Phase != corev1.PodFailed && - pod.Status.Reason != PodStatusReason_ProviderFailed { + pod.DeletionTimestamp == nil && + pod.Status.Phase != corev1.PodSucceeded && + pod.Status.Phase != corev1.PodFailed && + pod.Status.Reason != PodStatusReason_ProviderFailed { logger.Debug("Creating pod") if err := s.createPod(ctx, pod); err != nil { logger.WithError(err).Error("Error creating pod") @@ -436,8 +385,8 @@ func (s *Server) updatePodStatuses(ctx context.Context) { pods := s.resourceManager.GetPods() for _, pod := range pods { if pod.Status.Phase == corev1.PodSucceeded || - pod.Status.Phase == corev1.PodFailed || - pod.Status.Reason == PodStatusReason_ProviderFailed { + pod.Status.Phase == corev1.PodFailed || + pod.Status.Reason == PodStatusReason_ProviderFailed { continue }