Make ControllerManager more useful

This changes `ControllerManager` to `Node`.

`Node` is created from a client where the VK lib is responsible for
creating all the things except the client (unless client is nil, then we
use the env client).

This should be a good replacement for node-cli.  It offers a simpler
API.  *It only works with leases enabled* since this seems always
desired, however an option could be added to disable if needed.

The intent of this is to provide a simpler way to get a vk node up and
running while also being extensible. We can slowly add options, but
they should be focussed on a use-case rather than trying to support
every possible scenario... in which case the user can just use the
controllers directly.
This commit is contained in:
Brian Goff
2021-06-02 23:58:25 +00:00
parent a9a0ee50cf
commit 597e7dc281
8 changed files with 329 additions and 225 deletions

View File

@@ -59,7 +59,11 @@ func (mv mapVar) Type() string {
func installFlags(flags *pflag.FlagSet, c *Opts) {
flags.StringVar(&c.KubeConfigPath, "kubeconfig", c.KubeConfigPath, "kube config file to use for connecting to the Kubernetes API server")
flags.StringVar(&c.KubeNamespace, "namespace", c.KubeNamespace, "kubernetes namespace (default is 'all')")
flags.MarkDeprecated("namespace", "Nodes must watch for pods in all namespaces. This option is now ignored.") //nolint:errcheck
flags.MarkHidden("namespace") //nolint:errcheck
flags.StringVar(&c.KubeClusterDomain, "cluster-domain", c.KubeClusterDomain, "kubernetes cluster-domain (default is 'cluster.local')")
flags.StringVar(&c.NodeName, "nodename", c.NodeName, "kubernetes node name")
flags.StringVar(&c.OperatingSystem, "os", c.OperatingSystem, "Operating System (Linux/Windows)")
@@ -68,11 +72,15 @@ func installFlags(flags *pflag.FlagSet, c *Opts) {
flags.StringVar(&c.MetricsAddr, "metrics-addr", c.MetricsAddr, "address to listen for metrics/stats requests")
flags.StringVar(&c.TaintKey, "taint", c.TaintKey, "Set node taint key")
flags.BoolVar(&c.DisableTaint, "disable-taint", c.DisableTaint, "disable the virtual-kubelet node taint")
flags.MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable") //nolint:errcheck
flags.IntVar(&c.PodSyncWorkers, "pod-sync-workers", c.PodSyncWorkers, `set the number of pod synchronization workers`)
flags.BoolVar(&c.EnableNodeLease, "enable-node-lease", c.EnableNodeLease, `use node leases (1.13) for node heartbeats`)
flags.MarkDeprecated("enable-node-lease", "leases are always enabled") //nolint:errcheck
flags.MarkHidden("enable-node-lease") //nolint:errcheck
flags.StringSliceVar(&c.TraceExporters, "trace-exporter", c.TraceExporters, fmt.Sprintf("sets the tracing exporter to use, available exporters: %s", AvailableTraceExporters()))
flags.StringVar(&c.TraceConfig.ServiceName, "trace-service-name", c.TraceConfig.ServiceName, "sets the name of the service used to register with the trace exporter")

View File

@@ -15,54 +15,10 @@
package root
import (
"context"
"strings"
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const osLabel = "beta.kubernetes.io/os"
// NodeFromProvider builds a kubernetes node object from a provider
// This is a temporary solution until node stuff actually split off from the provider interface itself.
func NodeFromProvider(ctx context.Context, name string, taint *v1.Taint, p provider.Provider, version string) *v1.Node {
taints := make([]v1.Taint, 0)
if taint != nil {
taints = append(taints, *taint)
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
"type": "virtual-kubelet",
"kubernetes.io/role": "agent",
"kubernetes.io/hostname": name,
},
},
Spec: v1.NodeSpec{
Taints: taints,
},
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
Architecture: "amd64",
KubeletVersion: version,
},
},
}
p.ConfigureNode(ctx, node)
if _, ok := node.ObjectMeta.Labels[osLabel]; !ok {
node.ObjectMeta.Labels[osLabel] = strings.ToLower(node.Status.NodeInfo.OperatingSystem)
}
return node
}
// getTaint creates a taint using the provided key/value.
// Taint effect is read from the environment
// The taint key/value may be overwritten by the environment.

View File

@@ -28,7 +28,7 @@ import (
// Defaults for root command options
const (
DefaultNodeName = "virtual-kubelet"
DefaultOperatingSystem = "Linux"
DefaultOperatingSystem = "linux"
DefaultInformerResyncPeriod = 1 * time.Minute
DefaultMetricsAddr = ":10255"
DefaultListenPort = 10250 // TODO(cpuguy83)(VK1.0): Change this to an addr instead of just a port.. we should not be listening on all interfaces.

View File

@@ -17,7 +17,7 @@ package root
import (
"context"
"os"
"path"
"runtime"
"github.com/pkg/errors"
"github.com/spf13/cobra"
@@ -28,12 +28,6 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/node"
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
)
// NewCommand creates a new top-level command.
@@ -80,28 +74,65 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
return err
}
// Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node.
podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
client,
c.InformerResyncPeriod,
kubeinformers.WithNamespace(c.KubeNamespace),
nodeutil.PodInformerFilter(c.NodeName),
)
podInformer := podInformerFactory.Core().V1().Pods()
cancelHTTP := func() {}
defer func() {
// note: this is purposefully using a closure so that when this is actually set the correct function will be called
if cancelHTTP != nil {
cancelHTTP()
}
}()
newProvider := func(cfg nodeutil.ProviderConfig) (node.PodLifecycleHandler, node.NodeProvider, error) {
var err error
rm, err := manager.NewResourceManager(cfg.Pods, cfg.Secrets, cfg.ConfigMaps, cfg.Services)
if err != nil {
return nil, nil, errors.Wrap(err, "could not create resource manager")
}
initConfig := provider.InitConfig{
ConfigPath: c.ProviderConfigPath,
NodeName: c.NodeName,
OperatingSystem: c.OperatingSystem,
ResourceManager: rm,
DaemonPort: c.ListenPort,
InternalIP: os.Getenv("VKUBELET_POD_IP"),
KubeClusterDomain: c.KubeClusterDomain,
}
pInit := s.Get(c.Provider)
if pInit == nil {
return nil, nil, errors.Errorf("provider %q not found", c.Provider)
}
// Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, c.InformerResyncPeriod)
// Create a secret informer and a config map informer so we can pass their listers to the resource manager.
secretInformer := scmInformerFactory.Core().V1().Secrets()
configMapInformer := scmInformerFactory.Core().V1().ConfigMaps()
serviceInformer := scmInformerFactory.Core().V1().Services()
p, err := pInit(initConfig)
if err != nil {
return nil, nil, errors.Wrapf(err, "error initializing provider %s", c.Provider)
}
p.ConfigureNode(ctx, cfg.Node)
rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister())
if err != nil {
return errors.Wrap(err, "could not create resource manager")
apiConfig, err := getAPIConfig(c)
if err != nil {
return nil, nil, err
}
cancelHTTP, err = setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) {
return rm.GetPods(), nil
})
if err != nil {
return nil, nil, err
}
return p, nil, nil
}
apiConfig, err := getAPIConfig(c)
cm, err := nodeutil.NewNodeFromClient(client, c.NodeName, newProvider, func(cfg *nodeutil.NodeConfig) error {
cfg.InformerResyncPeriod = c.InformerResyncPeriod
if taint != nil {
cfg.NodeSpec.Spec.Taints = append(cfg.NodeSpec.Spec.Taints, *taint)
}
cfg.NodeSpec.Status.NodeInfo.Architecture = runtime.GOARCH
cfg.NodeSpec.Status.NodeInfo.OperatingSystem = c.OperatingSystem
return nil
})
if err != nil {
return err
}
@@ -110,26 +141,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
return err
}
initConfig := provider.InitConfig{
ConfigPath: c.ProviderConfigPath,
NodeName: c.NodeName,
OperatingSystem: c.OperatingSystem,
ResourceManager: rm,
DaemonPort: c.ListenPort,
InternalIP: os.Getenv("VKUBELET_POD_IP"),
KubeClusterDomain: c.KubeClusterDomain,
}
pInit := s.Get(c.Provider)
if pInit == nil {
return errors.Errorf("provider %q not found", c.Provider)
}
p, err := pInit(initConfig)
if err != nil {
return errors.Wrapf(err, "error initializing provider %s", c.Provider)
}
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(log.Fields{
"provider": c.Provider,
"operatingSystem": c.OperatingSystem,
@@ -137,69 +148,8 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
"watchedNamespace": c.KubeNamespace,
}))
pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version)
np := node.NewNaiveNodeProvider()
additionalOptions := []node.NodeControllerOpt{
node.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error {
if !k8serrors.IsNotFound(err) {
return err
}
log.G(ctx).Debug("node not found")
newNode := pNode.DeepCopy()
newNode.ResourceVersion = ""
_, err = client.CoreV1().Nodes().Create(ctx, newNode, metav1.CreateOptions{})
if err != nil {
return err
}
log.G(ctx).Debug("created new node")
return nil
}),
}
if c.EnableNodeLease {
leaseClient := nodeutil.NodeLeaseV1Client(client)
// 40 seconds is the default lease time in upstream kubelet
additionalOptions = append(additionalOptions, node.WithNodeEnableLeaseV1(leaseClient, 40))
}
nodeRunner, err := node.NewNodeController(
np,
pNode,
client.CoreV1().Nodes(),
additionalOptions...,
)
if err != nil {
log.G(ctx).Fatal(err)
}
eb := record.NewBroadcaster()
eb.StartLogging(log.G(ctx).Infof)
eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)})
pc, err := node.NewPodController(node.PodControllerConfig{
PodClient: client.CoreV1(),
PodInformer: podInformer,
EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}),
Provider: p,
SecretInformer: secretInformer,
ConfigMapInformer: configMapInformer,
ServiceInformer: serviceInformer,
})
if err != nil {
return errors.Wrap(err, "error setting up pod controller")
}
go podInformerFactory.Start(ctx.Done())
go scmInformerFactory.Start(ctx.Done())
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) {
return rm.GetPods(), nil
})
if err != nil {
return err
}
defer cancelHTTP()
cm := nodeutil.NewControllerManager(nodeRunner, pc)
go cm.Run(ctx, c.PodSyncWorkers) // nolint:errcheck
defer func() {
@@ -213,12 +163,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
return err
}
setNodeReady(pNode)
if err := np.UpdateStatus(ctx, pNode); err != nil {
return errors.Wrap(err, "error marking the node as ready")
}
log.G(ctx).Info("Initialized")
select {
case <-ctx.Done():
case <-cm.Done():
@@ -226,19 +170,3 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
}
return nil
}
func setNodeReady(n *corev1.Node) {
for i, c := range n.Status.Conditions {
if c.Type != "Ready" {
continue
}
c.Message = "Kubelet is ready"
c.Reason = "KubeletReady"
c.Status = corev1.ConditionTrue
c.LastHeartbeatTime = metav1.Now()
c.LastTransitionTime = metav1.Now()
n.Status.Conditions[i] = c
return
}
}

View File

@@ -339,7 +339,7 @@ func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) { // nolin
n.Status.DaemonEndpoints = p.nodeDaemonEndpoints()
os := p.operatingSystem
if os == "" {
os = "Linux"
os = "linux"
}
n.Status.NodeInfo.OperatingSystem = os
n.Status.NodeInfo.Architecture = "amd64"

View File

@@ -2,9 +2,9 @@ package provider
const (
// OperatingSystemLinux is the configuration value for defining Linux.
OperatingSystemLinux = "Linux"
OperatingSystemLinux = "linux"
// OperatingSystemWindows is the configuration value for defining Windows.
OperatingSystemWindows = "Windows"
OperatingSystemWindows = "windows"
)
type OperatingSystems map[string]bool // nolint:golint