Files
alibabacloud-eci/virtual-kubelet/commands/root/root.go
Sargun Dhillon 764cb5a76e Add the concept of startup timeout (#597)
This adds two concepts, where one encompasses the other.

Startup timeout
Startup timeout is how long to wait for the entire kubelet
to get into a functional state. Right now, this only waits
for the pod informer cache for the pod controllerto become
in-sync with API server, but this could be extended to other
informers (like secrets informer).

Wait For Startup
This changes the behaviour of the virtual kubelet to wait
for the pod controller to start before registering the node.

It is to avoid the race condition where the node is registered,
but we cannot actually do any pod operations.
2019-05-06 09:25:00 -07:00

234 lines
7.0 KiB
Go

// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package root
import (
"context"
"os"
"time"
"github.com/cpuguy83/strongerrors"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/providers/register"
"github.com/virtual-kubelet/virtual-kubelet/vkubelet"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// NewCommand creates a new top-level command.
// This command is used to start the virtual-kubelet daemon
func NewCommand(ctx context.Context, name string, c Opts) *cobra.Command {
cmd := &cobra.Command{
Use: name,
Short: name + " provides a virtual kubelet interface for your kubernetes cluster.",
Long: name + ` implements the Kubelet interface with a pluggable
backend implementation allowing users to create kubernetes nodes without running the kubelet.
This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`,
RunE: func(cmd *cobra.Command, args []string) error {
return runRootCommand(ctx, c)
},
}
installFlags(cmd.Flags(), &c)
return cmd
}
func runRootCommand(ctx context.Context, c Opts) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if ok := providers.ValidOperatingSystems[c.OperatingSystem]; !ok {
return strongerrors.InvalidArgument(errors.Errorf("operating system %q is not supported", c.OperatingSystem))
}
if c.PodSyncWorkers == 0 {
return strongerrors.InvalidArgument(errors.New("pod sync workers must be greater than 0"))
}
var taint *corev1.Taint
if !c.DisableTaint {
var err error
taint, err = getTaint(c)
if err != nil {
return err
}
}
client, err := newClient(c.KubeConfigPath)
if err != nil {
return err
}
// Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node.
podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
client,
c.InformerResyncPeriod,
kubeinformers.WithNamespace(c.KubeNamespace),
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String()
}))
// Create a pod informer so we can pass its lister to the resource manager.
podInformer := podInformerFactory.Core().V1().Pods()
// Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, c.InformerResyncPeriod)
// Create a secret informer and a config map informer so we can pass their listers to the resource manager.
secretInformer := scmInformerFactory.Core().V1().Secrets()
configMapInformer := scmInformerFactory.Core().V1().ConfigMaps()
serviceInformer := scmInformerFactory.Core().V1().Services()
go podInformerFactory.Start(ctx.Done())
go scmInformerFactory.Start(ctx.Done())
rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister())
if err != nil {
return errors.Wrap(err, "could not create resource manager")
}
apiConfig, err := getAPIConfig(c)
if err != nil {
return err
}
if err := setupTracing(ctx, c); err != nil {
return err
}
initConfig := register.InitConfig{
ConfigPath: c.ProviderConfigPath,
NodeName: c.NodeName,
OperatingSystem: c.OperatingSystem,
ResourceManager: rm,
DaemonPort: int32(c.ListenPort),
InternalIP: os.Getenv("VKUBELET_POD_IP"),
}
p, err := register.GetProvider(c.Provider, initConfig)
if err != nil {
return err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(log.Fields{
"provider": c.Provider,
"operatingSystem": c.OperatingSystem,
"node": c.NodeName,
"watchedNamespace": c.KubeNamespace,
}))
pNode := NodeFromProvider(ctx, c.NodeName, taint, p)
node, err := vkubelet.NewNode(
vkubelet.NaiveNodeProvider{},
pNode,
client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease),
client.CoreV1().Nodes(),
vkubelet.WithNodeDisableLease(!c.EnableNodeLease),
)
if err != nil {
log.G(ctx).Fatal(err)
}
vk := vkubelet.New(vkubelet.Config{
Client: client,
Namespace: c.KubeNamespace,
NodeName: pNode.Name,
Provider: p,
ResourceManager: rm,
PodSyncWorkers: c.PodSyncWorkers,
PodInformer: podInformer,
})
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig)
if err != nil {
return err
}
defer cancelHTTP()
go func() {
if err := vk.Run(ctx); err != nil && errors.Cause(err) != context.Canceled {
log.G(ctx).Fatal(err)
}
}()
if c.StartupTimeout > 0 {
// If there is a startup timeout, it does two things:
// 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period
// 2. It prevents node advertisement from happening until we're in an operational state
err = waitForVK(ctx, c.StartupTimeout, vk)
if err != nil {
return err
}
}
go func() {
if err := node.Run(ctx); err != nil {
log.G(ctx).Fatal(err)
}
}()
log.G(ctx).Info("Initialized")
<-ctx.Done()
return nil
}
func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) error {
ctx, cancel := context.WithTimeout(ctx, time)
defer cancel()
// Wait for the VK / PC close the the ready channel, or time out and return
log.G(ctx).Info("Waiting for pod controller / VK to be ready")
select {
case <-vk.Ready():
return nil
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "Error while starting up VK")
}
}
func newClient(configPath string) (*kubernetes.Clientset, error) {
var config *rest.Config
// Check if the kubeConfig file exists.
if _, err := os.Stat(configPath); !os.IsNotExist(err) {
// Get the kubeconfig from the filepath.
config, err = clientcmd.BuildConfigFromFlags("", configPath)
if err != nil {
return nil, errors.Wrap(err, "error building client config")
}
} else {
// Set to in-cluster config.
config, err = rest.InClusterConfig()
if err != nil {
return nil, errors.Wrap(err, "error building in cluster config")
}
}
if masterURI := os.Getenv("MASTER_URI"); masterURI != "" {
config.Host = masterURI
}
return kubernetes.NewForConfig(config)
}