diff --git a/virtual-kubelet/commands/root/root.go b/virtual-kubelet/commands/root/root.go index caacdf6..4ba7b9d 100644 --- a/virtual-kubelet/commands/root/root.go +++ b/virtual-kubelet/commands/root/root.go @@ -17,6 +17,7 @@ package root import ( "context" "os" + "path" "time" "github.com/cpuguy83/strongerrors" @@ -32,8 +33,11 @@ import ( "k8s.io/apimachinery/pkg/fields" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/record" ) // NewCommand creates a new top-level command. @@ -88,7 +92,6 @@ func runRootCommand(ctx context.Context, c Opts) error { kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String() })) - // Create a pod informer so we can pass its lister to the resource manager. podInformer := podInformerFactory.Core().V1().Pods() // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). @@ -148,15 +151,20 @@ func runRootCommand(ctx context.Context, c Opts) error { log.G(ctx).Fatal(err) } - vk := vkubelet.New(vkubelet.Config{ - Client: client, - Namespace: c.KubeNamespace, - NodeName: pNode.Name, + eb := record.NewBroadcaster() + eb.StartLogging(log.G(ctx).Infof) + eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)}) + + pc, err := vkubelet.NewPodController(vkubelet.PodControllerConfig{ + PodClient: client.CoreV1(), + PodInformer: podInformer, + EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}), Provider: p, ResourceManager: rm, - PodSyncWorkers: c.PodSyncWorkers, - PodInformer: podInformer, }) + if err != nil { + return errors.Wrap(err, "error setting up pod controller") + } cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig) if err != nil { @@ -165,7 +173,7 @@ func runRootCommand(ctx context.Context, c Opts) error { defer cancelHTTP() go func() { - if err := vk.Run(ctx); err != nil && errors.Cause(err) != context.Canceled { + if err := pc.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled { log.G(ctx).Fatal(err) } }() @@ -174,7 +182,7 @@ func runRootCommand(ctx context.Context, c Opts) error { // If there is a startup timeout, it does two things: // 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period // 2. It prevents node advertisement from happening until we're in an operational state - err = waitForVK(ctx, c.StartupTimeout, vk) + err = waitFor(ctx, c.StartupTimeout, pc.Ready()) if err != nil { return err } @@ -192,7 +200,7 @@ func runRootCommand(ctx context.Context, c Opts) error { return nil } -func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) error { +func waitFor(ctx context.Context, time time.Duration, ready <-chan struct{}) error { ctx, cancel := context.WithTimeout(ctx, time) defer cancel() @@ -200,7 +208,7 @@ func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) err log.G(ctx).Info("Waiting for pod controller / VK to be ready") select { - case <-vk.Ready(): + case <-ready: return nil case <-ctx.Done(): return errors.Wrap(ctx.Err(), "Error while starting up VK")