From f1cb6a7bf6405c8c5b2928a56434a3c79aca983e Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Mon, 6 May 2019 09:25:00 -0700 Subject: [PATCH] Add the concept of startup timeout (#597) This adds two concepts, where one encompasses the other. Startup timeout Startup timeout is how long to wait for the entire kubelet to get into a functional state. Right now, this only waits for the pod informer cache for the pod controllerto become in-sync with API server, but this could be extended to other informers (like secrets informer). Wait For Startup This changes the behaviour of the virtual kubelet to wait for the pod controller to start before registering the node. It is to avoid the race condition where the node is registered, but we cannot actually do any pod operations. --- cmd/virtual-kubelet/commands/root/flag.go | 2 +- cmd/virtual-kubelet/commands/root/opts.go | 3 +++ cmd/virtual-kubelet/commands/root/root.go | 29 +++++++++++++++++++++++ vkubelet/podcontroller.go | 8 +++++++ vkubelet/vkubelet.go | 26 ++++++++++++++++++-- 5 files changed, 65 insertions(+), 3 deletions(-) diff --git a/cmd/virtual-kubelet/commands/root/flag.go b/cmd/virtual-kubelet/commands/root/flag.go index 3a5bc74ee..e599fd9c3 100644 --- a/cmd/virtual-kubelet/commands/root/flag.go +++ b/cmd/virtual-kubelet/commands/root/flag.go @@ -78,7 +78,7 @@ func installFlags(flags *pflag.FlagSet, c *Opts) { flags.StringVar(&c.TraceSampleRate, "trace-sample-rate", c.TraceSampleRate, "set probability of tracing samples") flags.DurationVar(&c.InformerResyncPeriod, "full-resync-period", c.InformerResyncPeriod, "how often to perform a full resync of pods between kubernetes and the provider") - + flags.DurationVar(&c.StartupTimeout, "startup-timeout", c.StartupTimeout, "How long to wait for the virtual-kubelet to start") } func getEnv(key, defaultValue string) string { diff --git a/cmd/virtual-kubelet/commands/root/opts.go b/cmd/virtual-kubelet/commands/root/opts.go index 41038faef..845f4041e 100644 --- a/cmd/virtual-kubelet/commands/root/opts.go +++ b/cmd/virtual-kubelet/commands/root/opts.go @@ -78,6 +78,9 @@ type Opts struct { TraceExporters []string TraceSampleRate string TraceConfig opencensus.TracingExporterOptions + + // Startup Timeout is how long to wait for the kubelet to start + StartupTimeout time.Duration } // SetDefaultOpts sets default options for unset values on the passed in option struct. diff --git a/cmd/virtual-kubelet/commands/root/root.go b/cmd/virtual-kubelet/commands/root/root.go index 099e81fd9..caacdf63f 100644 --- a/cmd/virtual-kubelet/commands/root/root.go +++ b/cmd/virtual-kubelet/commands/root/root.go @@ -17,6 +17,7 @@ package root import ( "context" "os" + "time" "github.com/cpuguy83/strongerrors" "github.com/pkg/errors" @@ -54,6 +55,9 @@ This allows users to schedule kubernetes workloads on nodes that aren't running } func runRootCommand(ctx context.Context, c Opts) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + if ok := providers.ValidOperatingSystems[c.OperatingSystem]; !ok { return strongerrors.InvalidArgument(errors.Errorf("operating system %q is not supported", c.OperatingSystem)) } @@ -166,6 +170,16 @@ func runRootCommand(ctx context.Context, c Opts) error { } }() + if c.StartupTimeout > 0 { + // If there is a startup timeout, it does two things: + // 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period + // 2. It prevents node advertisement from happening until we're in an operational state + err = waitForVK(ctx, c.StartupTimeout, vk) + if err != nil { + return err + } + } + go func() { if err := node.Run(ctx); err != nil { log.G(ctx).Fatal(err) @@ -178,6 +192,21 @@ func runRootCommand(ctx context.Context, c Opts) error { return nil } +func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) error { + ctx, cancel := context.WithTimeout(ctx, time) + defer cancel() + + // Wait for the VK / PC close the the ready channel, or time out and return + log.G(ctx).Info("Waiting for pod controller / VK to be ready") + + select { + case <-vk.Ready(): + return nil + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "Error while starting up VK") + } +} + func newClient(configPath string) (*kubernetes.Clientset, error) { var config *rest.Config diff --git a/vkubelet/podcontroller.go b/vkubelet/podcontroller.go index 56af3d022..e66385ce4 100644 --- a/vkubelet/podcontroller.go +++ b/vkubelet/podcontroller.go @@ -54,6 +54,10 @@ type PodController struct { workqueue workqueue.RateLimitingInterface // recorder is an event recorder for recording Event resources to the Kubernetes API. recorder record.EventRecorder + + // inSync is a channel which will be closed once the pod controller has become in-sync with apiserver + // it will never close if startup fails, or if the run context is cancelled prior to initialization completing + inSyncCh chan struct{} } // NewPodController returns a new instance of PodController. @@ -71,6 +75,7 @@ func NewPodController(server *Server) *PodController { podsLister: server.podInformer.Lister(), workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pods"), recorder: recorder, + inSyncCh: make(chan struct{}), } // Set up event handlers for when Pod resources change. @@ -123,6 +128,9 @@ func (pc *PodController) Run(ctx context.Context, threadiness int) error { if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { return pkgerrors.New("failed to wait for caches to sync") } + log.G(ctx).Info("Pod cache in-sync") + + close(pc.inSyncCh) // Perform a reconciliation step that deletes any dangling pods from the provider. // This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis. diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index acddb8119..87e1757c9 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -27,6 +27,7 @@ type Server struct { resourceManager *manager.ResourceManager podSyncWorkers int podInformer corev1informers.PodInformer + readyCh chan struct{} } // Config is used to configure a new server. @@ -54,6 +55,7 @@ func New(cfg Config) *Server { provider: cfg.Provider, podSyncWorkers: cfg.PodSyncWorkers, podInformer: cfg.PodInformer, + readyCh: make(chan struct{}), } } @@ -64,7 +66,7 @@ func New(cfg Config) *Server { // See `AttachPodRoutes` and `AttachMetricsRoutes` to set these up. func (s *Server) Run(ctx context.Context) error { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podStatusUpdate") - go s.runProviderSyncWorkers(ctx, q) + s.runProviderSyncWorkers(ctx, q) if pn, ok := s.provider.(providers.PodNotifier); ok { pn.NotifyPods(ctx, func(pod *corev1.Pod) { @@ -74,7 +76,27 @@ func (s *Server) Run(ctx context.Context) error { go s.providerSyncLoop(ctx, q) } - return NewPodController(s).Run(ctx, s.podSyncWorkers) + pc := NewPodController(s) + + go func() { + select { + case <-pc.inSyncCh: + case <-ctx.Done(): + } + close(s.readyCh) + }() + + return pc.Run(ctx, s.podSyncWorkers) +} + +// Ready returns a channel which will be closed once the VKubelet is running +func (s *Server) Ready() <-chan struct{} { + // TODO: right now all this waits on is the in-sync channel. Later, we might either want to expose multiple types + // of ready, for example: + // * In Sync + // * Control Loop running + // * Provider state synchronized with API Server state + return s.readyCh } // providerSyncLoop syncronizes pod states from the provider back to kubernetes