use shared informers and workqueue (#425)

* vendor: add vendored code

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* controller: use shared informers and a work queue

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* errors: use cpuguy83/strongerrors

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* aci: fix test that uses resource manager

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* readme: clarify skaffold run before e2e

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* cmd: use root context everywhere

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: refactor pod lifecycle management

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* e2e: fix race in test when observing deletions

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* e2e: test pod forced deletion

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* cmd: fix root context potential leak

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: rename metaKey

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: remove calls to HandleError

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* Revert "errors: use cpuguy83/strongerrors"

This reverts commit f031fc6d.

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* manager: remove redundant lister constraint

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: rename the pod event recorder

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: amend misleading comment

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* mock: add tracing

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: add tracing

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* test: observe timeouts

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* trace: remove unnecessary comments

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: limit concurrency in deleteDanglingPods

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: never store context, always pass in calls

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: remove HandleCrash and just panic

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: don't sync succeeded pods

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: ensure pod deletion from kubernetes

Signed-off-by: Paulo Pires <pjpires@gmail.com>
This commit is contained in:
Paulo Pires
2018-11-30 23:53:58 +00:00
committed by Robbie Zhang
parent 4c0c44b9e4
commit b4c42a01c1

57
root.go
View File

@@ -23,25 +23,35 @@ import (
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
"time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/cpuguy83/strongerrors" "github.com/cpuguy83/strongerrors"
homedir "github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/providers/register" "github.com/virtual-kubelet/virtual-kubelet/providers/register"
vkubelet "github.com/virtual-kubelet/virtual-kubelet/vkubelet" "github.com/virtual-kubelet/virtual-kubelet/vkubelet"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
) )
const ( const (
defaultDaemonPort = "10250" defaultDaemonPort = "10250"
// kubeSharedInformerFactoryDefaultResync is the default resync period used by the shared informer factories for Kubernetes resources.
// It is set to the same value used by the Kubelet, and can be overridden via the "--full-resync-period" flag.
// https://github.com/kubernetes/kubernetes/blob/v1.12.2/pkg/kubelet/apis/config/v1beta1/defaults.go#L51
kubeSharedInformerFactoryDefaultResync = 1 * time.Minute
) )
var kubeletConfig string var kubeletConfig string
@@ -60,12 +70,17 @@ var k8sClient *kubernetes.Clientset
var p providers.Provider var p providers.Provider
var rm *manager.ResourceManager var rm *manager.ResourceManager
var apiConfig vkubelet.APIConfig var apiConfig vkubelet.APIConfig
var podInformer corev1informers.PodInformer
var kubeSharedInformerFactoryResync time.Duration
var podSyncWorkers int var podSyncWorkers int
var userTraceExporters []string var userTraceExporters []string
var userTraceConfig = TracingExporterOptions{Tags: make(map[string]string)} var userTraceConfig = TracingExporterOptions{Tags: make(map[string]string)}
var traceSampler string var traceSampler string
// Create a root context to be used by the pod controller and by the shared informer factories.
var rootContext, rootContextCancel = context.WithCancel(context.Background())
// RootCmd represents the base command when called without any subcommands // RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{ var RootCmd = &cobra.Command{
Use: "virtual-kubelet", Use: "virtual-kubelet",
@@ -74,9 +89,9 @@ var RootCmd = &cobra.Command{
backend implementation allowing users to create kubernetes nodes without running the kubelet. backend implementation allowing users to create kubernetes nodes without running the kubelet.
This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
ctx, cancel := context.WithCancel(context.Background()) defer rootContextCancel()
f, err := vkubelet.New(ctx, vkubelet.Config{ f, err := vkubelet.New(rootContext, vkubelet.Config{
Client: k8sClient, Client: k8sClient,
Namespace: kubeNamespace, Namespace: kubeNamespace,
NodeName: nodeName, NodeName: nodeName,
@@ -86,6 +101,7 @@ This allows users to schedule kubernetes workloads on nodes that aren't running
ResourceManager: rm, ResourceManager: rm,
APIConfig: apiConfig, APIConfig: apiConfig,
PodSyncWorkers: podSyncWorkers, PodSyncWorkers: podSyncWorkers,
PodInformer: podInformer,
}) })
if err != nil { if err != nil {
log.L.WithError(err).Fatal("Error initializing virtual kubelet") log.L.WithError(err).Fatal("Error initializing virtual kubelet")
@@ -95,10 +111,10 @@ This allows users to schedule kubernetes workloads on nodes that aren't running
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
go func() { go func() {
<-sig <-sig
cancel() rootContextCancel()
}() }()
if err := f.Run(ctx); err != nil && errors.Cause(err) != context.Canceled { if err := f.Run(rootContext); err != nil && errors.Cause(err) != context.Canceled {
log.L.Fatal(err) log.L.Fatal(err)
} }
}, },
@@ -176,6 +192,8 @@ func init() {
RootCmd.PersistentFlags().Var(mapVar(userTraceConfig.Tags), "trace-tag", "add tags to include with traces in key=value form") RootCmd.PersistentFlags().Var(mapVar(userTraceConfig.Tags), "trace-tag", "add tags to include with traces in key=value form")
RootCmd.PersistentFlags().StringVar(&traceSampler, "trace-sample-rate", "", "set probability of tracing samples") RootCmd.PersistentFlags().StringVar(&traceSampler, "trace-sample-rate", "", "set probability of tracing samples")
RootCmd.PersistentFlags().DurationVar(&kubeSharedInformerFactoryResync, "full-resync-period", kubeSharedInformerFactoryDefaultResync, "how often to perform a full resync of pods between kubernetes and the provider")
// Cobra also supports local flags, which will only run // Cobra also supports local flags, which will only run
// when this action is called directly. // when this action is called directly.
// RootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") // RootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
@@ -251,11 +269,30 @@ func initConfig() {
logger.WithError(err).Fatal("Error creating kubernetes client") logger.WithError(err).Fatal("Error creating kubernetes client")
} }
rm, err = manager.NewResourceManager(k8sClient) // Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node.
podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k8sClient, kubeSharedInformerFactoryResync, kubeinformers.WithNamespace(kubeNamespace), kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String()
}))
// Create a pod informer so we can pass its lister to the resource manager.
podInformer = podInformerFactory.Core().V1().Pods()
// Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k8sClient, kubeSharedInformerFactoryResync)
// Create a secret informer and a config map informer so we can pass their listers to the resource manager.
secretInformer := scmInformerFactory.Core().V1().Secrets()
configMapInformer := scmInformerFactory.Core().V1().ConfigMaps()
// Create a new instance of the resource manager that uses the listers above for pods, secrets and config maps.
rm, err = manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister())
if err != nil { if err != nil {
logger.WithError(err).Fatal("Error initializing resource manager") logger.WithError(err).Fatal("Error initializing resource manager")
} }
// Start the shared informer factory for pods.
go podInformerFactory.Start(rootContext.Done())
// Start the shared informer factory for secrets and configmaps.
go scmInformerFactory.Start(rootContext.Done())
daemonPortEnv := getEnv("KUBELET_PORT", defaultDaemonPort) daemonPortEnv := getEnv("KUBELET_PORT", defaultDaemonPort)
daemonPort, err := strconv.ParseInt(daemonPortEnv, 10, 32) daemonPort, err := strconv.ParseInt(daemonPortEnv, 10, 32)
if err != nil { if err != nil {