From 71546a908fb92eb85dee612ed6b4766809b04090 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Sat, 1 Jun 2019 09:36:38 -0700 Subject: [PATCH] Remove `Server` object (#629) This had some weird shared responsibility with the PodController. Instead just move the functionality to the PodController. --- cmd/virtual-kubelet/commands/root/root.go | 30 +++-- vkubelet/pod.go | 67 +++++----- vkubelet/pod_test.go | 32 ++--- vkubelet/podcontroller.go | 146 ++++++++++++--------- vkubelet/queue.go | 68 ++++++++++ vkubelet/vkubelet.go | 150 ---------------------- 6 files changed, 226 insertions(+), 267 deletions(-) delete mode 100644 vkubelet/vkubelet.go diff --git a/cmd/virtual-kubelet/commands/root/root.go b/cmd/virtual-kubelet/commands/root/root.go index caacdf63f..4ba7b9d2c 100644 --- a/cmd/virtual-kubelet/commands/root/root.go +++ b/cmd/virtual-kubelet/commands/root/root.go @@ -17,6 +17,7 @@ package root import ( "context" "os" + "path" "time" "github.com/cpuguy83/strongerrors" @@ -32,8 +33,11 @@ import ( "k8s.io/apimachinery/pkg/fields" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/record" ) // NewCommand creates a new top-level command. @@ -88,7 +92,6 @@ func runRootCommand(ctx context.Context, c Opts) error { kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String() })) - // Create a pod informer so we can pass its lister to the resource manager. podInformer := podInformerFactory.Core().V1().Pods() // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). @@ -148,15 +151,20 @@ func runRootCommand(ctx context.Context, c Opts) error { log.G(ctx).Fatal(err) } - vk := vkubelet.New(vkubelet.Config{ - Client: client, - Namespace: c.KubeNamespace, - NodeName: pNode.Name, + eb := record.NewBroadcaster() + eb.StartLogging(log.G(ctx).Infof) + eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)}) + + pc, err := vkubelet.NewPodController(vkubelet.PodControllerConfig{ + PodClient: client.CoreV1(), + PodInformer: podInformer, + EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}), Provider: p, ResourceManager: rm, - PodSyncWorkers: c.PodSyncWorkers, - PodInformer: podInformer, }) + if err != nil { + return errors.Wrap(err, "error setting up pod controller") + } cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig) if err != nil { @@ -165,7 +173,7 @@ func runRootCommand(ctx context.Context, c Opts) error { defer cancelHTTP() go func() { - if err := vk.Run(ctx); err != nil && errors.Cause(err) != context.Canceled { + if err := pc.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled { log.G(ctx).Fatal(err) } }() @@ -174,7 +182,7 @@ func runRootCommand(ctx context.Context, c Opts) error { // If there is a startup timeout, it does two things: // 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period // 2. It prevents node advertisement from happening until we're in an operational state - err = waitForVK(ctx, c.StartupTimeout, vk) + err = waitFor(ctx, c.StartupTimeout, pc.Ready()) if err != nil { return err } @@ -192,7 +200,7 @@ func runRootCommand(ctx context.Context, c Opts) error { return nil } -func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) error { +func waitFor(ctx context.Context, time time.Duration, ready <-chan struct{}) error { ctx, cancel := context.WithTimeout(ctx, time) defer cancel() @@ -200,7 +208,7 @@ func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) err log.G(ctx).Info("Waiting for pod controller / VK to be ready") select { - case <-vk.Ready(): + case <-ready: return nil case <-ctx.Done(): return errors.Wrap(ctx.Err(), "Error while starting up VK") diff --git a/vkubelet/pod.go b/vkubelet/pod.go index 3338a52fc..b0a31c010 100644 --- a/vkubelet/pod.go +++ b/vkubelet/pod.go @@ -15,10 +15,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ) +const ( + podStatusReasonProviderFailed = "ProviderFailed" +) + func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { return span.WithFields(ctx, log.Fields{ "uid": string(pod.GetUID()), @@ -29,7 +32,7 @@ func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) con }) } -func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorder record.EventRecorder) error { +func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "createOrUpdatePod") defer span.End() @@ -40,7 +43,7 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde "namespace": pod.GetNamespace(), }) - if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil { + if err := populateEnvironmentVariables(ctx, pod, pc.resourceManager, pc.recorder); err != nil { span.SetStatus(ocstatus.FromError(err)) return err } @@ -48,7 +51,7 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde // Check if the pod is already known by the provider. // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). - if pp, _ := s.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil { + if pp, _ := pc.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil { // Pod Update Only Permits update of: // - `spec.containers[*].image` // - `spec.initContainers[*].image` @@ -58,15 +61,15 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde expected := hashPodSpec(pp.Spec) if actual := hashPodSpec(pod.Spec); actual != expected { log.G(ctx).Debugf("Pod %s exists, updating pod in provider", pp.Name) - if origErr := s.provider.UpdatePod(ctx, pod); origErr != nil { - s.handleProviderError(ctx, span, origErr, pod) + if origErr := pc.provider.UpdatePod(ctx, pod); origErr != nil { + pc.handleProviderError(ctx, span, origErr, pod) return origErr } log.G(ctx).Info("Updated pod in provider") } } else { - if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { - s.handleProviderError(ctx, span, origErr, pod) + if origErr := pc.provider.CreatePod(ctx, pod); origErr != nil { + pc.handleProviderError(ctx, span, origErr, pod) return origErr } log.G(ctx).Info("Created pod in provider") @@ -88,7 +91,7 @@ func hashPodSpec(spec corev1.PodSpec) uint64 { return uint64(hash.Sum32()) } -func (s *Server) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) { +func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) { podPhase := corev1.PodPending if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { podPhase = corev1.PodFailed @@ -104,7 +107,7 @@ func (s *Server) handleProviderError(ctx context.Context, span trace.Span, origE "reason": pod.Status.Reason, }) - _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) + _, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod) if err != nil { logger.WithError(err).Warn("Failed to update pod status") } else { @@ -113,14 +116,14 @@ func (s *Server) handleProviderError(ctx context.Context, span trace.Span, origE span.SetStatus(ocstatus.FromError(origErr)) } -func (s *Server) deletePod(ctx context.Context, namespace, name string) error { +func (pc *PodController) deletePod(ctx context.Context, namespace, name string) error { // Grab the pod as known by the provider. // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). - pod, _ := s.provider.GetPod(ctx, namespace, name) + pod, _ := pc.provider.GetPod(ctx, namespace, name) if pod == nil { // The provider is not aware of the pod, but we must still delete the Kubernetes API resource. - return s.forceDeletePodResource(ctx, namespace, name) + return pc.forceDeletePodResource(ctx, namespace, name) } ctx, span := trace.StartSpan(ctx, "deletePod") @@ -128,7 +131,7 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error { ctx = addPodAttributes(ctx, span, pod) var delErr error - if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) { + if delErr = pc.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) { span.SetStatus(ocstatus.FromError(delErr)) return delErr } @@ -136,7 +139,7 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error { log.G(ctx).Debug("Deleted pod from provider") if !errors.IsNotFound(delErr) { - if err := s.forceDeletePodResource(ctx, namespace, name); err != nil { + if err := pc.forceDeletePodResource(ctx, namespace, name); err != nil { span.SetStatus(ocstatus.FromError(err)) return err } @@ -146,7 +149,7 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error { return nil } -func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name string) error { +func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, name string) error { ctx, span := trace.StartSpan(ctx, "forceDeletePodResource") defer span.End() ctx = span.WithFields(ctx, log.Fields{ @@ -155,7 +158,7 @@ func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name str }) var grace int64 - if err := s.k8sClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil { + if err := pc.client.Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil { if errors.IsNotFound(err) { log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete") return nil @@ -167,12 +170,12 @@ func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name str } // updatePodStatuses syncs the providers pod status with the kubernetes pod status. -func (s *Server) updatePodStatuses(ctx context.Context, q workqueue.RateLimitingInterface) { +func (pc *PodController) updatePodStatuses(ctx context.Context, q workqueue.RateLimitingInterface) { ctx, span := trace.StartSpan(ctx, "updatePodStatuses") defer span.End() // Update all the pods with the provider status. - pods, err := s.podInformer.Lister().List(labels.Everything()) + pods, err := pc.podsLister.List(labels.Everything()) if err != nil { err = pkgerrors.Wrap(err, "error getting pod list") span.SetStatus(ocstatus.FromError(err)) @@ -183,7 +186,7 @@ func (s *Server) updatePodStatuses(ctx context.Context, q workqueue.RateLimiting for _, pod := range pods { if !shouldSkipPodStatusUpdate(pod) { - s.enqueuePodStatusUpdate(ctx, q, pod) + enqueuePodStatusUpdate(ctx, q, pod) } } } @@ -194,7 +197,7 @@ func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool { pod.Status.Reason == podStatusReasonProviderFailed } -func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { +func (pc *PodController) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { if shouldSkipPodStatusUpdate(pod) { return nil } @@ -203,7 +206,7 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { defer span.End() ctx = addPodAttributes(ctx, span, pod) - status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name) + status, err := pc.provider.GetPodStatus(ctx, pod.Namespace, pod.Name) if err != nil { span.SetStatus(ocstatus.FromError(err)) return pkgerrors.Wrap(err, "error retreiving pod status") @@ -234,7 +237,7 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { } } - if _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod); err != nil { + if _, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod); err != nil { span.SetStatus(ocstatus.FromError(err)) return pkgerrors.Wrap(err, "error while updating pod status in kubernetes") } @@ -247,7 +250,7 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { return nil } -func (s *Server) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { +func enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key") } else { @@ -255,24 +258,28 @@ func (s *Server) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLim } } -func (s *Server) podStatusHandler(ctx context.Context, key string) (retErr error) { +func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) { ctx, span := trace.StartSpan(ctx, "podStatusHandler") defer span.End() - defer func() { - span.SetStatus(ocstatus.FromError(retErr)) - }() ctx = span.WithField(ctx, "key", key) + log.G(ctx).Debug("processing pod status update") + defer func() { + span.SetStatus(ocstatus.FromError(retErr)) + if retErr != nil { + log.G(ctx).WithError(retErr).Error("Error processing pod status update") + } + }() namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return pkgerrors.Wrap(err, "error spliting cache key") } - pod, err := s.podInformer.Lister().Pods(namespace).Get(name) + pod, err := pc.podsLister.Pods(namespace).Get(name) if err != nil { return pkgerrors.Wrap(err, "error looking up pod") } - return s.updatePodStatus(ctx, pod) + return pc.updatePodStatus(ctx, pod) } diff --git a/vkubelet/pod_test.go b/vkubelet/pod_test.go index aa1773bb8..2396164ca 100644 --- a/vkubelet/pod_test.go +++ b/vkubelet/pod_test.go @@ -64,8 +64,8 @@ func (m *mockProvider) GetPods(_ context.Context) ([]*corev1.Pod, error) { return ls, nil } -type TestServer struct { - *Server +type TestController struct { + *PodController mock *mockProvider client *fake.Clientset } @@ -74,24 +74,22 @@ func newMockProvider() *mockProvider { return &mockProvider{pods: make(map[string]*corev1.Pod)} } -func newTestServer() *TestServer { +func newTestController() *TestController { fk8s := fake.NewSimpleClientset() rm := testutil.FakeResourceManager() p := newMockProvider() - tsvr := &TestServer{ - Server: &Server{ - namespace: "default", - nodeName: "vk123", + return &TestController{ + PodController: &PodController{ + client: fk8s.CoreV1(), provider: p, resourceManager: rm, - k8sClient: fk8s, + recorder: testutil.FakeEventRecorder(5), }, mock: p, client: fk8s, } - return tsvr } func TestPodHashingEqual(t *testing.T) { @@ -169,7 +167,7 @@ func TestPodHashingDifferent(t *testing.T) { } func TestPodCreateNewPod(t *testing.T) { - svr := newTestServer() + svr := newTestController() pod := &corev1.Pod{} pod.ObjectMeta.Namespace = "default" @@ -189,8 +187,8 @@ func TestPodCreateNewPod(t *testing.T) { }, } - er := testutil.FakeEventRecorder(5) - err := svr.createOrUpdatePod(context.Background(), pod, er) + err := svr.createOrUpdatePod(context.Background(), pod) + assert.Check(t, is.Nil(err)) // createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist assert.Check(t, is.Equal(svr.mock.creates, 1)) @@ -198,7 +196,7 @@ func TestPodCreateNewPod(t *testing.T) { } func TestPodUpdateExisting(t *testing.T) { - svr := newTestServer() + svr := newTestController() pod := &corev1.Pod{} pod.ObjectMeta.Namespace = "default" @@ -241,8 +239,7 @@ func TestPodUpdateExisting(t *testing.T) { }, } - er := testutil.FakeEventRecorder(5) - err = svr.createOrUpdatePod(context.Background(), pod2, er) + err = svr.createOrUpdatePod(context.Background(), pod2) assert.Check(t, is.Nil(err)) // createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed @@ -251,7 +248,7 @@ func TestPodUpdateExisting(t *testing.T) { } func TestPodNoSpecChange(t *testing.T) { - svr := newTestServer() + svr := newTestController() pod := &corev1.Pod{} pod.ObjectMeta.Namespace = "default" @@ -276,8 +273,7 @@ func TestPodNoSpecChange(t *testing.T) { assert.Check(t, is.Equal(svr.mock.creates, 1)) assert.Check(t, is.Equal(svr.mock.updates, 0)) - er := testutil.FakeEventRecorder(5) - err = svr.createOrUpdatePod(context.Background(), pod, er) + err = svr.createOrUpdatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) // createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change diff --git a/vkubelet/podcontroller.go b/vkubelet/podcontroller.go index dbebcbeaa..262f9382f 100644 --- a/vkubelet/podcontroller.go +++ b/vkubelet/podcontroller.go @@ -22,21 +22,21 @@ import ( "sync" "time" + "github.com/cpuguy83/strongerrors" "github.com/cpuguy83/strongerrors/status/ocstatus" pkgerrors "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" - v1 "k8s.io/client-go/informers/core/v1" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + corev1informers "k8s.io/client-go/informers/core/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - - "github.com/virtual-kubelet/virtual-kubelet/log" ) // PodLifecycleHandler defines the interface used by the PodController to react @@ -74,42 +74,75 @@ type PodNotifier interface { // PodController is the controller implementation for Pod resources. type PodController struct { - // server is the instance to which this controller belongs. - server *Server + provider PodLifecycleHandler + // podsInformer is an informer for Pod resources. - podsInformer v1.PodInformer + podsInformer corev1informers.PodInformer // podsLister is able to list/get Pod resources from a shared informer's store. podsLister corev1listers.PodLister - // workqueue is a rate limited work queue. - // This is used to queue work to be processed instead of performing it as soon as a change happens. - // This means we can ensure we only process a fixed amount of resources at a time, and makes it easy to ensure we are never processing the same item simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface // recorder is an event recorder for recording Event resources to the Kubernetes API. recorder record.EventRecorder - // inSync is a channel which will be closed once the pod controller has become in-sync with apiserver - // it will never close if startup fails, or if the run context is cancelled prior to initialization completing - inSyncCh chan struct{} + // ready is a channel which will be closed once the pod controller is fully up and running. + // this channel will never be closed if there is an error on startup. + ready chan struct{} + + client corev1client.PodsGetter + + resourceManager *manager.ResourceManager // TODO: can we eliminate this? } -// NewPodController returns a new instance of PodController. -func NewPodController(server *Server) *PodController { - // Create an event broadcaster. - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(log.L.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: server.k8sClient.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: fmt.Sprintf("%s/pod-controller", server.nodeName)}) +// PodControllerConfig is used to configure a new PodController. +type PodControllerConfig struct { + // PodClient is used to perform actions on the k8s API, such as updating pod status + // This field is required + PodClient corev1client.PodsGetter - // Create an instance of PodController having a work queue that uses the rate limiter created above. - pc := &PodController{ - server: server, - podsInformer: server.podInformer, - podsLister: server.podInformer.Lister(), - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pods"), - recorder: recorder, - inSyncCh: make(chan struct{}), + // PodInformer is used as a local cache for pods + // This should be configured to only look at pods scheduled to the node which the controller will be managing + PodInformer corev1informers.PodInformer + + EventRecorder record.EventRecorder + + Provider PodLifecycleHandler + + // TODO: get rid of this + ResourceManager *manager.ResourceManager +} + +func NewPodController(cfg PodControllerConfig) (*PodController, error) { + if cfg.PodClient == nil { + return nil, strongerrors.InvalidArgument(pkgerrors.New("must set core client")) } + if cfg.EventRecorder == nil { + return nil, strongerrors.InvalidArgument(pkgerrors.New("must set event recorder")) + } + if cfg.PodInformer == nil { + return nil, strongerrors.InvalidArgument(pkgerrors.New("must set informer")) + } + + return &PodController{ + client: cfg.PodClient, + podsInformer: cfg.PodInformer, + podsLister: cfg.PodInformer.Lister(), + provider: cfg.Provider, + resourceManager: cfg.ResourceManager, + ready: make(chan struct{}), + recorder: cfg.EventRecorder, + }, nil +} + +// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. +// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. +func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { + k8sQ := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes") + defer k8sQ.ShutDown() + + podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") + pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers) + pc.runSyncFromProvider(ctx, podStatusQueue) + defer podStatusQueue.ShutDown() // Set up event handlers for when Pod resources change. pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -117,7 +150,7 @@ func NewPodController(server *Server) *PodController { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { log.L.Error(err) } else { - pc.workqueue.AddRateLimited(key) + k8sQ.AddRateLimited(key) } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -136,49 +169,39 @@ func NewPodController(server *Server) *PodController { if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { log.L.Error(err) } else { - pc.workqueue.AddRateLimited(key) + k8sQ.AddRateLimited(key) } }, DeleteFunc: func(pod interface{}) { if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { log.L.Error(err) } else { - pc.workqueue.AddRateLimited(key) + k8sQ.AddRateLimited(key) } }, }) - // Return the instance of PodController back to the caller. - return pc -} - -// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. -// It will block until stopCh is closed, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. -func (pc *PodController) Run(ctx context.Context, threadiness int) error { - defer pc.workqueue.ShutDown() - - // Wait for the caches to be synced before starting workers. + // Wait for the caches to be synced *before* starting workers. if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { return pkgerrors.New("failed to wait for caches to sync") } log.G(ctx).Info("Pod cache in-sync") - close(pc.inSyncCh) - // Perform a reconciliation step that deletes any dangling pods from the provider. // This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis. // If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried. - pc.deleteDanglingPods(ctx, threadiness) + pc.deleteDanglingPods(ctx, podSyncWorkers) - // Launch "threadiness" workers to process Pod resources. log.G(ctx).Info("starting workers") - for id := 0; id < threadiness; id++ { + for id := 0; id < podSyncWorkers; id++ { go wait.Until(func() { // Use the worker's "index" as its ID so we can use it for tracing. - pc.runWorker(ctx, strconv.Itoa(id)) + pc.runWorker(ctx, strconv.Itoa(id), k8sQ) }, time.Second, ctx.Done()) } + close(pc.ready) + log.G(ctx).Info("started workers") <-ctx.Done() log.G(ctx).Info("shutting down workers") @@ -186,14 +209,21 @@ func (pc *PodController) Run(ctx context.Context, threadiness int) error { return nil } +// Ready returns a channel which gets closed once the PodController is ready to handle scheduled pods. +// This channel will never close if there is an error on startup. +// The status of this channel after sthudown is indeterminate. +func (pc *PodController) Ready() <-chan struct{} { + return pc.ready +} + // runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue. -func (pc *PodController) runWorker(ctx context.Context, workerId string) { - for pc.processNextWorkItem(ctx, workerId) { +func (pc *PodController) runWorker(ctx context.Context, workerId string, q workqueue.RateLimitingInterface) { + for pc.processNextWorkItem(ctx, workerId, q) { } } // processNextWorkItem will read a single work item off the work queue and attempt to process it,by calling the syncHandler. -func (pc *PodController) processNextWorkItem(ctx context.Context, workerId string) bool { +func (pc *PodController) processNextWorkItem(ctx context.Context, workerId string, q workqueue.RateLimitingInterface) bool { // We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item. ctx, span := trace.StartSpan(ctx, "processNextWorkItem") @@ -201,7 +231,7 @@ func (pc *PodController) processNextWorkItem(ctx context.Context, workerId strin // Add the ID of the current worker as an attribute to the current span. ctx = span.WithField(ctx, "workerId", workerId) - return handleQueueItem(ctx, pc.workqueue, pc.syncHandler) + return handleQueueItem(ctx, q, pc.syncHandler) } // syncHandler compares the actual state with the desired, and attempts to converge the two. @@ -232,7 +262,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error { } // At this point we know the Pod resource doesn't exist, which most probably means it was deleted. // Hence, we must delete it from the provider if it still exists there. - if err := pc.server.deletePod(ctx, namespace, name); err != nil { + if err := pc.deletePod(ctx, namespace, name); err != nil { err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name)) span.SetStatus(ocstatus.FromError(err)) return err @@ -254,7 +284,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) // Check whether the pod has been marked for deletion. // If it does, guarantee it is deleted in the provider and Kubernetes. if pod.DeletionTimestamp != nil { - if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil { + if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod)) span.SetStatus(ocstatus.FromError(err)) return err @@ -269,7 +299,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) } // Create or update the pod in the provider. - if err := pc.server.createOrUpdatePod(ctx, pod, pc.recorder); err != nil { + if err := pc.createOrUpdatePod(ctx, pod); err != nil { err := pkgerrors.Wrapf(err, "failed to sync pod %q in the provider", loggablePodName(pod)) span.SetStatus(ocstatus.FromError(err)) return err @@ -283,7 +313,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int defer span.End() // Grab the list of pods known to the provider. - pps, err := pc.server.provider.GetPods(ctx) + pps, err := pc.provider.GetPods(ctx) if err != nil { err := pkgerrors.Wrap(err, "failed to fetch the list of pods from the provider") span.SetStatus(ocstatus.FromError(err)) @@ -332,7 +362,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int // Add the pod's attributes to the current span. ctx = addPodAttributes(ctx, span, pod) // Actually delete the pod. - if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil { + if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { span.SetStatus(ocstatus.FromError(err)) log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod)) } else { diff --git a/vkubelet/queue.go b/vkubelet/queue.go index 880cf1bc2..790f07a50 100644 --- a/vkubelet/queue.go +++ b/vkubelet/queue.go @@ -2,11 +2,14 @@ package vkubelet import ( "context" + "strconv" + "time" "github.com/cpuguy83/strongerrors/status/ocstatus" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" ) @@ -26,7 +29,10 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han return false } + log.G(ctx).Debug("Got queue object") + err := func(obj interface{}) error { + defer log.G(ctx).Debug("Processed queue item") // We call Done here so the work queue knows we have finished processing this item. // We also must remember to call Forget if we do not want this work item being re-queued. // For example, we do not call Forget if a transient error occurs. @@ -43,6 +49,7 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han log.G(ctx).Warnf("expected string in work queue item but got %#v", obj) return nil } + // Add the current key as an attribute to the current span. ctx = span.WithField(ctx, "key", key) // Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced. @@ -71,3 +78,64 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han return true } + +func (pc *PodController) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface, numWorkers int) { + for i := 0; i < numWorkers; i++ { + go func(index int) { + workerID := strconv.Itoa(index) + pc.runProviderSyncWorker(ctx, workerID, q) + }(i) + } +} + +func (pc *PodController) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { + for pc.processPodStatusUpdate(ctx, workerID, q) { + } +} + +func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool { + ctx, span := trace.StartSpan(ctx, "processPodStatusUpdate") + defer span.End() + + // Add the ID of the current worker as an attribute to the current span. + ctx = span.WithField(ctx, "workerID", workerID) + + return handleQueueItem(ctx, q, pc.podStatusHandler) +} + +// providerSyncLoop syncronizes pod states from the provider back to kubernetes +// Deprecated: This is only used when the provider does not support async updates +// Providers should implement async update support, even if it just means copying +// something like this in. +func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) { + const sleepTime = 5 * time.Second + + t := time.NewTimer(sleepTime) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + t.Stop() + + ctx, span := trace.StartSpan(ctx, "syncActualState") + pc.updatePodStatuses(ctx, q) + span.End() + + // restart the timer + t.Reset(sleepTime) + } + } +} + +func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) { + if pn, ok := pc.provider.(PodNotifier); ok { + pn.NotifyPods(ctx, func(pod *corev1.Pod) { + enqueuePodStatusUpdate(ctx, q, pod) + }) + } else { + go pc.providerSyncLoop(ctx, q) + } +} diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go deleted file mode 100644 index 2805cb7ad..000000000 --- a/vkubelet/vkubelet.go +++ /dev/null @@ -1,150 +0,0 @@ -package vkubelet - -import ( - "context" - "strconv" - "time" - - "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/trace" - corev1 "k8s.io/api/core/v1" - corev1informers "k8s.io/client-go/informers/core/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/util/workqueue" -) - -const ( - podStatusReasonProviderFailed = "ProviderFailed" -) - -// Server masquarades itself as a kubelet and allows for the virtual node to be backed by non-vm/node providers. -type Server struct { - namespace string - nodeName string - k8sClient kubernetes.Interface - provider PodLifecycleHandler - resourceManager *manager.ResourceManager - podSyncWorkers int - podInformer corev1informers.PodInformer - readyCh chan struct{} -} - -// Config is used to configure a new server. -type Config struct { - Client *kubernetes.Clientset - Namespace string - NodeName string - Provider PodLifecycleHandler - ResourceManager *manager.ResourceManager - PodSyncWorkers int - PodInformer corev1informers.PodInformer -} - -// New creates a new virtual-kubelet server. -// This is the entrypoint to this package. -// -// This creates but does not start the server. -// You must call `Run` on the returned object to start the server. -func New(cfg Config) *Server { - return &Server{ - nodeName: cfg.NodeName, - namespace: cfg.Namespace, - k8sClient: cfg.Client, - resourceManager: cfg.ResourceManager, - provider: cfg.Provider, - podSyncWorkers: cfg.PodSyncWorkers, - podInformer: cfg.PodInformer, - readyCh: make(chan struct{}), - } -} - -// Run creates and starts an instance of the pod controller, blocking until it stops. -// -// Note that this does not setup the HTTP routes that are used to expose pod -// info to the Kubernetes API Server, such as logs, metrics, exec, etc. -// See `AttachPodRoutes` and `AttachMetricsRoutes` to set these up. -func (s *Server) Run(ctx context.Context) error { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podStatusUpdate") - s.runProviderSyncWorkers(ctx, q) - - if pn, ok := s.provider.(PodNotifier); ok { - pn.NotifyPods(ctx, func(pod *corev1.Pod) { - s.enqueuePodStatusUpdate(ctx, q, pod) - }) - } else { - go s.providerSyncLoop(ctx, q) - } - - pc := NewPodController(s) - - go func() { - select { - case <-pc.inSyncCh: - case <-ctx.Done(): - } - close(s.readyCh) - }() - - return pc.Run(ctx, s.podSyncWorkers) -} - -// Ready returns a channel which will be closed once the VKubelet is running -func (s *Server) Ready() <-chan struct{} { - // TODO: right now all this waits on is the in-sync channel. Later, we might either want to expose multiple types - // of ready, for example: - // * In Sync - // * Control Loop running - // * Provider state synchronized with API Server state - return s.readyCh -} - -// providerSyncLoop syncronizes pod states from the provider back to kubernetes -// Deprecated: This is only used when the provider does not support async updates -// Providers should implement async update support, even if it just means copying -// something like this in. -func (s *Server) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) { - const sleepTime = 5 * time.Second - - t := time.NewTimer(sleepTime) - defer t.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-t.C: - t.Stop() - - ctx, span := trace.StartSpan(ctx, "syncActualState") - s.updatePodStatuses(ctx, q) - span.End() - - // restart the timer - t.Reset(sleepTime) - } - } -} - -func (s *Server) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface) { - for i := 0; i < s.podSyncWorkers; i++ { - go func(index int) { - workerID := strconv.Itoa(index) - s.runProviderSyncWorker(ctx, workerID, q) - }(i) - } -} - -func (s *Server) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { - for s.processPodStatusUpdate(ctx, workerID, q) { - } -} - -func (s *Server) processPodStatusUpdate(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool { - ctx, span := trace.StartSpan(ctx, "processPodStatusUpdate") - defer span.End() - - // Add the ID of the current worker as an attribute to the current span. - ctx = span.WithField(ctx, "workerID", workerID) - - return handleQueueItem(ctx, q, s.podStatusHandler) -}