Remove Server object (#629)

This had some weird shared responsibility with the PodController.
Instead just move the functionality to the PodController.
This commit is contained in:
Brian Goff
2019-06-01 09:36:38 -07:00
committed by GitHub
parent ae69cc749f
commit ddee513763

View File

@@ -17,6 +17,7 @@ package root
import ( import (
"context" "context"
"os" "os"
"path"
"time" "time"
"github.com/cpuguy83/strongerrors" "github.com/cpuguy83/strongerrors"
@@ -32,8 +33,11 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
) )
// NewCommand creates a new top-level command. // NewCommand creates a new top-level command.
@@ -88,7 +92,6 @@ func runRootCommand(ctx context.Context, c Opts) error {
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String() options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String()
})) }))
// Create a pod informer so we can pass its lister to the resource manager.
podInformer := podInformerFactory.Core().V1().Pods() podInformer := podInformerFactory.Core().V1().Pods()
// Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
@@ -148,15 +151,20 @@ func runRootCommand(ctx context.Context, c Opts) error {
log.G(ctx).Fatal(err) log.G(ctx).Fatal(err)
} }
vk := vkubelet.New(vkubelet.Config{ eb := record.NewBroadcaster()
Client: client, eb.StartLogging(log.G(ctx).Infof)
Namespace: c.KubeNamespace, eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)})
NodeName: pNode.Name,
pc, err := vkubelet.NewPodController(vkubelet.PodControllerConfig{
PodClient: client.CoreV1(),
PodInformer: podInformer,
EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}),
Provider: p, Provider: p,
ResourceManager: rm, ResourceManager: rm,
PodSyncWorkers: c.PodSyncWorkers,
PodInformer: podInformer,
}) })
if err != nil {
return errors.Wrap(err, "error setting up pod controller")
}
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig) cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig)
if err != nil { if err != nil {
@@ -165,7 +173,7 @@ func runRootCommand(ctx context.Context, c Opts) error {
defer cancelHTTP() defer cancelHTTP()
go func() { go func() {
if err := vk.Run(ctx); err != nil && errors.Cause(err) != context.Canceled { if err := pc.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled {
log.G(ctx).Fatal(err) log.G(ctx).Fatal(err)
} }
}() }()
@@ -174,7 +182,7 @@ func runRootCommand(ctx context.Context, c Opts) error {
// If there is a startup timeout, it does two things: // If there is a startup timeout, it does two things:
// 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period // 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period
// 2. It prevents node advertisement from happening until we're in an operational state // 2. It prevents node advertisement from happening until we're in an operational state
err = waitForVK(ctx, c.StartupTimeout, vk) err = waitFor(ctx, c.StartupTimeout, pc.Ready())
if err != nil { if err != nil {
return err return err
} }
@@ -192,7 +200,7 @@ func runRootCommand(ctx context.Context, c Opts) error {
return nil return nil
} }
func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) error { func waitFor(ctx context.Context, time time.Duration, ready <-chan struct{}) error {
ctx, cancel := context.WithTimeout(ctx, time) ctx, cancel := context.WithTimeout(ctx, time)
defer cancel() defer cancel()
@@ -200,7 +208,7 @@ func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) err
log.G(ctx).Info("Waiting for pod controller / VK to be ready") log.G(ctx).Info("Waiting for pod controller / VK to be ready")
select { select {
case <-vk.Ready(): case <-ready:
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return errors.Wrap(ctx.Err(), "Error while starting up VK") return errors.Wrap(ctx.Err(), "Error while starting up VK")