Refactor CLI initialization (#562)

This cleans up the CLI code significantly.
Also makes some of this re-usable for providers who want to do so.

This also removes the main.go from the top of the tree of the repro,
instead moving it into cmd/virtual-kubelet.
This allows us to better utilize the package namespace (and e.g. mv the
`vkubelet` package to the top of the tree).
This commit is contained in:
Brian Goff
2019-04-19 17:02:39 -07:00
committed by GitHub
parent 40160ad36f
commit 57e8ee4b51
18 changed files with 771 additions and 796 deletions

View File

@@ -0,0 +1,204 @@
// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package root
import (
"context"
"os"
"github.com/cpuguy83/strongerrors"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/providers/register"
"github.com/virtual-kubelet/virtual-kubelet/vkubelet"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// NewCommand creates a new top-level command.
// This command is used to start the virtual-kubelet daemon
func NewCommand(ctx context.Context, name string, c Opts) *cobra.Command {
cmd := &cobra.Command{
Use: name,
Short: name + " provides a virtual kubelet interface for your kubernetes cluster.",
Long: name + ` implements the Kubelet interface with a pluggable
backend implementation allowing users to create kubernetes nodes without running the kubelet.
This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`,
RunE: func(cmd *cobra.Command, args []string) error {
return runRootCommand(ctx, c)
},
}
installFlags(cmd.Flags(), &c)
return cmd
}
func runRootCommand(ctx context.Context, c Opts) error {
if ok := providers.ValidOperatingSystems[c.OperatingSystem]; !ok {
return strongerrors.InvalidArgument(errors.Errorf("operating system %q is not supported", c.OperatingSystem))
}
if c.PodSyncWorkers == 0 {
return strongerrors.InvalidArgument(errors.New("pod sync workers must be greater than 0"))
}
var taint *corev1.Taint
if !c.DisableTaint {
var err error
taint, err = getTaint(c)
if err != nil {
return err
}
}
client, err := newClient(c.KubeConfigPath)
if err != nil {
return err
}
// Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node.
podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
client,
c.InformerResyncPeriod,
kubeinformers.WithNamespace(c.KubeNamespace),
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String()
}))
// Create a pod informer so we can pass its lister to the resource manager.
podInformer := podInformerFactory.Core().V1().Pods()
// Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, c.InformerResyncPeriod)
// Create a secret informer and a config map informer so we can pass their listers to the resource manager.
secretInformer := scmInformerFactory.Core().V1().Secrets()
configMapInformer := scmInformerFactory.Core().V1().ConfigMaps()
serviceInformer := scmInformerFactory.Core().V1().Services()
go podInformerFactory.Start(ctx.Done())
go scmInformerFactory.Start(ctx.Done())
rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister())
if err != nil {
return errors.Wrap(err, "could not create resource manager")
}
apiConfig, err := getAPIConfig(c)
if err != nil {
return err
}
if err := setupTracing(ctx, c); err != nil {
return err
}
initConfig := register.InitConfig{
ConfigPath: c.ProviderConfigPath,
NodeName: c.NodeName,
OperatingSystem: c.OperatingSystem,
ResourceManager: rm,
DaemonPort: int32(c.ListenPort),
InternalIP: os.Getenv("VKUBELET_POD_IP"),
}
p, err := register.GetProvider(c.Provider, initConfig)
if err != nil {
return err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(log.Fields{
"provider": c.Provider,
"operatingSystem": c.OperatingSystem,
"node": c.NodeName,
"watchedNamespace": c.KubeNamespace,
}))
pNode := NodeFromProvider(ctx, c.NodeName, taint, p)
node, err := vkubelet.NewNode(
vkubelet.NaiveNodeProvider{},
pNode,
client.Coordination().Leases(corev1.NamespaceNodeLease),
client.CoreV1().Nodes(),
vkubelet.WithNodeDisableLease(!c.EnableNodeLease),
)
if err != nil {
log.G(ctx).Fatal(err)
}
vk := vkubelet.New(vkubelet.Config{
Client: client,
Namespace: c.KubeNamespace,
NodeName: pNode.Name,
Provider: p,
ResourceManager: rm,
PodSyncWorkers: c.PodSyncWorkers,
PodInformer: podInformer,
})
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig)
if err != nil {
return err
}
defer cancelHTTP()
go func() {
if err := vk.Run(ctx); err != nil && errors.Cause(err) != context.Canceled {
log.G(ctx).Fatal(err)
}
}()
go func() {
if err := node.Run(ctx); err != nil {
log.G(ctx).Fatal(err)
}
}()
log.G(ctx).Info("Initialized")
<-ctx.Done()
return nil
}
func newClient(configPath string) (*kubernetes.Clientset, error) {
var config *rest.Config
// Check if the kubeConfig file exists.
if _, err := os.Stat(configPath); !os.IsNotExist(err) {
// Get the kubeconfig from the filepath.
config, err = clientcmd.BuildConfigFromFlags("", configPath)
if err != nil {
return nil, errors.Wrap(err, "error building client config")
}
} else {
// Set to in-cluster config.
config, err = rest.InClusterConfig()
if err != nil {
return nil, errors.Wrap(err, "error building in cluster config")
}
}
if masterURI := os.Getenv("MASTER_URI"); masterURI != "" {
config.Host = masterURI
}
return kubernetes.NewForConfig(config)
}