Files
virtual-kubelet/vkubelet/vkubelet.go
Sargun Dhillon f1cb6a7bf6 Add the concept of startup timeout (#597)
This adds two concepts, where one encompasses the other.

Startup timeout
Startup timeout is how long to wait for the entire kubelet
to get into a functional state. Right now, this only waits
for the pod informer cache for the pod controllerto become
in-sync with API server, but this could be extended to other
informers (like secrets informer).

Wait For Startup
This changes the behaviour of the virtual kubelet to wait
for the pod controller to start before registering the node.

It is to avoid the race condition where the node is registered,
but we cannot actually do any pod operations.
2019-05-06 09:25:00 -07:00

152 lines
4.3 KiB
Go

package vkubelet
import (
"context"
"strconv"
"time"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
)
const (
podStatusReasonProviderFailed = "ProviderFailed"
)
// Server masquarades itself as a kubelet and allows for the virtual node to be backed by non-vm/node providers.
type Server struct {
namespace string
nodeName string
k8sClient *kubernetes.Clientset
provider providers.Provider
resourceManager *manager.ResourceManager
podSyncWorkers int
podInformer corev1informers.PodInformer
readyCh chan struct{}
}
// Config is used to configure a new server.
type Config struct {
Client *kubernetes.Clientset
Namespace string
NodeName string
Provider providers.Provider
ResourceManager *manager.ResourceManager
PodSyncWorkers int
PodInformer corev1informers.PodInformer
}
// New creates a new virtual-kubelet server.
// This is the entrypoint to this package.
//
// This creates but does not start the server.
// You must call `Run` on the returned object to start the server.
func New(cfg Config) *Server {
return &Server{
nodeName: cfg.NodeName,
namespace: cfg.Namespace,
k8sClient: cfg.Client,
resourceManager: cfg.ResourceManager,
provider: cfg.Provider,
podSyncWorkers: cfg.PodSyncWorkers,
podInformer: cfg.PodInformer,
readyCh: make(chan struct{}),
}
}
// Run creates and starts an instance of the pod controller, blocking until it stops.
//
// Note that this does not setup the HTTP routes that are used to expose pod
// info to the Kubernetes API Server, such as logs, metrics, exec, etc.
// See `AttachPodRoutes` and `AttachMetricsRoutes` to set these up.
func (s *Server) Run(ctx context.Context) error {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podStatusUpdate")
s.runProviderSyncWorkers(ctx, q)
if pn, ok := s.provider.(providers.PodNotifier); ok {
pn.NotifyPods(ctx, func(pod *corev1.Pod) {
s.enqueuePodStatusUpdate(ctx, q, pod)
})
} else {
go s.providerSyncLoop(ctx, q)
}
pc := NewPodController(s)
go func() {
select {
case <-pc.inSyncCh:
case <-ctx.Done():
}
close(s.readyCh)
}()
return pc.Run(ctx, s.podSyncWorkers)
}
// Ready returns a channel which will be closed once the VKubelet is running
func (s *Server) Ready() <-chan struct{} {
// TODO: right now all this waits on is the in-sync channel. Later, we might either want to expose multiple types
// of ready, for example:
// * In Sync
// * Control Loop running
// * Provider state synchronized with API Server state
return s.readyCh
}
// providerSyncLoop syncronizes pod states from the provider back to kubernetes
// Deprecated: This is only used when the provider does not support async updates
// Providers should implement async update support, even if it just means copying
// something like this in.
func (s *Server) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) {
const sleepTime = 5 * time.Second
t := time.NewTimer(sleepTime)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
t.Stop()
ctx, span := trace.StartSpan(ctx, "syncActualState")
s.updatePodStatuses(ctx, q)
span.End()
// restart the timer
t.Reset(sleepTime)
}
}
}
func (s *Server) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface) {
for i := 0; i < s.podSyncWorkers; i++ {
go func(index int) {
workerID := strconv.Itoa(index)
s.runProviderSyncWorker(ctx, workerID, q)
}(i)
}
}
func (s *Server) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for s.processPodStatusUpdate(ctx, workerID, q) {
}
}
func (s *Server) processPodStatusUpdate(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
ctx, span := trace.StartSpan(ctx, "processPodStatusUpdate")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerID", workerID)
return handleQueueItem(ctx, q, s.podStatusHandler)
}