diff --git a/cmd/virtual-kubelet/commands/root/flag.go b/cmd/virtual-kubelet/commands/root/flag.go index 3a5bc74ee..e599fd9c3 100644 --- a/cmd/virtual-kubelet/commands/root/flag.go +++ b/cmd/virtual-kubelet/commands/root/flag.go @@ -78,7 +78,7 @@ func installFlags(flags *pflag.FlagSet, c *Opts) { flags.StringVar(&c.TraceSampleRate, "trace-sample-rate", c.TraceSampleRate, "set probability of tracing samples") flags.DurationVar(&c.InformerResyncPeriod, "full-resync-period", c.InformerResyncPeriod, "how often to perform a full resync of pods between kubernetes and the provider") - + flags.DurationVar(&c.StartupTimeout, "startup-timeout", c.StartupTimeout, "How long to wait for the virtual-kubelet to start") } func getEnv(key, defaultValue string) string { diff --git a/cmd/virtual-kubelet/commands/root/opts.go b/cmd/virtual-kubelet/commands/root/opts.go index 41038faef..845f4041e 100644 --- a/cmd/virtual-kubelet/commands/root/opts.go +++ b/cmd/virtual-kubelet/commands/root/opts.go @@ -78,6 +78,9 @@ type Opts struct { TraceExporters []string TraceSampleRate string TraceConfig opencensus.TracingExporterOptions + + // Startup Timeout is how long to wait for the kubelet to start + StartupTimeout time.Duration } // SetDefaultOpts sets default options for unset values on the passed in option struct. diff --git a/cmd/virtual-kubelet/commands/root/root.go b/cmd/virtual-kubelet/commands/root/root.go index 099e81fd9..caacdf63f 100644 --- a/cmd/virtual-kubelet/commands/root/root.go +++ b/cmd/virtual-kubelet/commands/root/root.go @@ -17,6 +17,7 @@ package root import ( "context" "os" + "time" "github.com/cpuguy83/strongerrors" "github.com/pkg/errors" @@ -54,6 +55,9 @@ This allows users to schedule kubernetes workloads on nodes that aren't running } func runRootCommand(ctx context.Context, c Opts) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + if ok := providers.ValidOperatingSystems[c.OperatingSystem]; !ok { return strongerrors.InvalidArgument(errors.Errorf("operating system %q is not supported", c.OperatingSystem)) } @@ -166,6 +170,16 @@ func runRootCommand(ctx context.Context, c Opts) error { } }() + if c.StartupTimeout > 0 { + // If there is a startup timeout, it does two things: + // 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period + // 2. It prevents node advertisement from happening until we're in an operational state + err = waitForVK(ctx, c.StartupTimeout, vk) + if err != nil { + return err + } + } + go func() { if err := node.Run(ctx); err != nil { log.G(ctx).Fatal(err) @@ -178,6 +192,21 @@ func runRootCommand(ctx context.Context, c Opts) error { return nil } +func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) error { + ctx, cancel := context.WithTimeout(ctx, time) + defer cancel() + + // Wait for the VK / PC close the the ready channel, or time out and return + log.G(ctx).Info("Waiting for pod controller / VK to be ready") + + select { + case <-vk.Ready(): + return nil + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "Error while starting up VK") + } +} + func newClient(configPath string) (*kubernetes.Clientset, error) { var config *rest.Config diff --git a/vkubelet/podcontroller.go b/vkubelet/podcontroller.go index 56af3d022..e66385ce4 100644 --- a/vkubelet/podcontroller.go +++ b/vkubelet/podcontroller.go @@ -54,6 +54,10 @@ type PodController struct { workqueue workqueue.RateLimitingInterface // recorder is an event recorder for recording Event resources to the Kubernetes API. recorder record.EventRecorder + + // inSync is a channel which will be closed once the pod controller has become in-sync with apiserver + // it will never close if startup fails, or if the run context is cancelled prior to initialization completing + inSyncCh chan struct{} } // NewPodController returns a new instance of PodController. @@ -71,6 +75,7 @@ func NewPodController(server *Server) *PodController { podsLister: server.podInformer.Lister(), workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pods"), recorder: recorder, + inSyncCh: make(chan struct{}), } // Set up event handlers for when Pod resources change. @@ -123,6 +128,9 @@ func (pc *PodController) Run(ctx context.Context, threadiness int) error { if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { return pkgerrors.New("failed to wait for caches to sync") } + log.G(ctx).Info("Pod cache in-sync") + + close(pc.inSyncCh) // Perform a reconciliation step that deletes any dangling pods from the provider. // This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis. diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index acddb8119..87e1757c9 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -27,6 +27,7 @@ type Server struct { resourceManager *manager.ResourceManager podSyncWorkers int podInformer corev1informers.PodInformer + readyCh chan struct{} } // Config is used to configure a new server. @@ -54,6 +55,7 @@ func New(cfg Config) *Server { provider: cfg.Provider, podSyncWorkers: cfg.PodSyncWorkers, podInformer: cfg.PodInformer, + readyCh: make(chan struct{}), } } @@ -64,7 +66,7 @@ func New(cfg Config) *Server { // See `AttachPodRoutes` and `AttachMetricsRoutes` to set these up. func (s *Server) Run(ctx context.Context) error { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podStatusUpdate") - go s.runProviderSyncWorkers(ctx, q) + s.runProviderSyncWorkers(ctx, q) if pn, ok := s.provider.(providers.PodNotifier); ok { pn.NotifyPods(ctx, func(pod *corev1.Pod) { @@ -74,7 +76,27 @@ func (s *Server) Run(ctx context.Context) error { go s.providerSyncLoop(ctx, q) } - return NewPodController(s).Run(ctx, s.podSyncWorkers) + pc := NewPodController(s) + + go func() { + select { + case <-pc.inSyncCh: + case <-ctx.Done(): + } + close(s.readyCh) + }() + + return pc.Run(ctx, s.podSyncWorkers) +} + +// Ready returns a channel which will be closed once the VKubelet is running +func (s *Server) Ready() <-chan struct{} { + // TODO: right now all this waits on is the in-sync channel. Later, we might either want to expose multiple types + // of ready, for example: + // * In Sync + // * Control Loop running + // * Provider state synchronized with API Server state + return s.readyCh } // providerSyncLoop syncronizes pod states from the provider back to kubernetes