controller: use shared informers and a work queue
Signed-off-by: Paulo Pires <pjpires@gmail.com>
This commit is contained in:
46
cmd/root.go
46
cmd/root.go
@@ -23,25 +23,35 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/cpuguy83/strongerrors"
|
||||
homedir "github.com/mitchellh/go-homedir"
|
||||
"github.com/mitchellh/go-homedir"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"go.opencensus.io/trace"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
corev1informers "k8s.io/client-go/informers/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/register"
|
||||
vkubelet "github.com/virtual-kubelet/virtual-kubelet/vkubelet"
|
||||
"go.opencensus.io/trace"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/vkubelet"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultDaemonPort = "10250"
|
||||
// kubeSharedInformerFactoryDefaultResync is the default resync period used by the shared informer factories for Kubernetes resources.
|
||||
// It is set to the same value used by the Kubelet, and can be overridden via the "--full-resync-period" flag.
|
||||
// https://github.com/kubernetes/kubernetes/blob/v1.12.2/pkg/kubelet/apis/config/v1beta1/defaults.go#L51
|
||||
kubeSharedInformerFactoryDefaultResync = 1 * time.Minute
|
||||
)
|
||||
|
||||
var kubeletConfig string
|
||||
@@ -60,6 +70,8 @@ var k8sClient *kubernetes.Clientset
|
||||
var p providers.Provider
|
||||
var rm *manager.ResourceManager
|
||||
var apiConfig vkubelet.APIConfig
|
||||
var podInformer corev1informers.PodInformer
|
||||
var kubeSharedInformerFactoryResync time.Duration
|
||||
var podSyncWorkers int
|
||||
|
||||
var userTraceExporters []string
|
||||
@@ -86,6 +98,7 @@ This allows users to schedule kubernetes workloads on nodes that aren't running
|
||||
ResourceManager: rm,
|
||||
APIConfig: apiConfig,
|
||||
PodSyncWorkers: podSyncWorkers,
|
||||
PodInformer: podInformer,
|
||||
})
|
||||
if err != nil {
|
||||
log.L.WithError(err).Fatal("Error initializing virtual kubelet")
|
||||
@@ -176,6 +189,8 @@ func init() {
|
||||
RootCmd.PersistentFlags().Var(mapVar(userTraceConfig.Tags), "trace-tag", "add tags to include with traces in key=value form")
|
||||
RootCmd.PersistentFlags().StringVar(&traceSampler, "trace-sample-rate", "", "set probability of tracing samples")
|
||||
|
||||
RootCmd.PersistentFlags().DurationVar(&kubeSharedInformerFactoryResync, "full-resync-period", kubeSharedInformerFactoryDefaultResync, "how often to perform a full resync of pods between kubernetes and the provider")
|
||||
|
||||
// Cobra also supports local flags, which will only run
|
||||
// when this action is called directly.
|
||||
// RootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
|
||||
@@ -251,11 +266,30 @@ func initConfig() {
|
||||
logger.WithError(err).Fatal("Error creating kubernetes client")
|
||||
}
|
||||
|
||||
rm, err = manager.NewResourceManager(k8sClient)
|
||||
// Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node.
|
||||
podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k8sClient, kubeSharedInformerFactoryResync, kubeinformers.WithNamespace(kubeNamespace), kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
|
||||
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String()
|
||||
}))
|
||||
// Create a pod informer so we can pass its lister to the resource manager.
|
||||
podInformer = podInformerFactory.Core().V1().Pods()
|
||||
|
||||
// Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
|
||||
scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k8sClient, kubeSharedInformerFactoryResync)
|
||||
// Create a secret informer and a config map informer so we can pass their listers to the resource manager.
|
||||
secretInformer := scmInformerFactory.Core().V1().Secrets()
|
||||
configMapInformer := scmInformerFactory.Core().V1().ConfigMaps()
|
||||
|
||||
// Create a new instance of the resource manager that uses the listers above for pods, secrets and config maps.
|
||||
rm, err = manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister())
|
||||
if err != nil {
|
||||
logger.WithError(err).Fatal("Error initializing resource manager")
|
||||
}
|
||||
|
||||
// Start the shared informer factory for pods.
|
||||
go podInformerFactory.Start(context.Background().Done())
|
||||
// Start the shared informer factory for secrets and configmaps.
|
||||
go scmInformerFactory.Start(context.Background().Done())
|
||||
|
||||
daemonPortEnv := getEnv("KUBELET_PORT", defaultDaemonPort)
|
||||
daemonPort, err := strconv.ParseInt(daemonPortEnv, 10, 32)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user