Files
virtual-kubelet/vkubelet/vkubelet.go
Paulo Pires 28a757f4da use shared informers and workqueue (#425)
* vendor: add vendored code

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* controller: use shared informers and a work queue

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* errors: use cpuguy83/strongerrors

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* aci: fix test that uses resource manager

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* readme: clarify skaffold run before e2e

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* cmd: use root context everywhere

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: refactor pod lifecycle management

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* e2e: fix race in test when observing deletions

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* e2e: test pod forced deletion

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* cmd: fix root context potential leak

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: rename metaKey

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: remove calls to HandleError

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* Revert "errors: use cpuguy83/strongerrors"

This reverts commit f031fc6d.

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* manager: remove redundant lister constraint

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: rename the pod event recorder

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: amend misleading comment

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* mock: add tracing

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: add tracing

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* test: observe timeouts

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* trace: remove unnecessary comments

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: limit concurrency in deleteDanglingPods

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: never store context, always pass in calls

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: remove HandleCrash and just panic

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: don't sync succeeded pods

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: ensure pod deletion from kubernetes

Signed-off-by: Paulo Pires <pjpires@gmail.com>
2018-11-30 15:53:58 -08:00

132 lines
3.2 KiB
Go

package vkubelet
import (
"context"
"net"
"time"
pkgerrors "github.com/pkg/errors"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
)
const (
podStatusReasonProviderFailed = "ProviderFailed"
)
// Server masquarades itself as a kubelet and allows for the virtual node to be backed by non-vm/node providers.
type Server struct {
nodeName string
namespace string
k8sClient *kubernetes.Clientset
taint *corev1.Taint
provider providers.Provider
resourceManager *manager.ResourceManager
podSyncWorkers int
podCh chan *podNotification
podInformer corev1informers.PodInformer
}
// Config is used to configure a new server.
type Config struct {
APIConfig APIConfig
Client *kubernetes.Clientset
MetricsAddr string
Namespace string
NodeName string
Provider providers.Provider
ResourceManager *manager.ResourceManager
Taint *corev1.Taint
PodSyncWorkers int
PodInformer corev1informers.PodInformer
}
// APIConfig is used to configure the API server of the virtual kubelet.
type APIConfig struct {
CertPath string
KeyPath string
Addr string
}
type podNotification struct {
pod *corev1.Pod
ctx context.Context
}
// New creates a new virtual-kubelet server.
func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
s = &Server{
namespace: cfg.Namespace,
nodeName: cfg.NodeName,
taint: cfg.Taint,
k8sClient: cfg.Client,
resourceManager: cfg.ResourceManager,
provider: cfg.Provider,
podSyncWorkers: cfg.PodSyncWorkers,
podCh: make(chan *podNotification, cfg.PodSyncWorkers),
podInformer: cfg.PodInformer,
}
ctx = log.WithLogger(ctx, log.G(ctx))
apiL, err := net.Listen("tcp", cfg.APIConfig.Addr)
if err != nil {
return nil, pkgerrors.Wrap(err, "error setting up API listener")
}
defer func() {
if retErr != nil {
apiL.Close()
}
}()
go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath)
if cfg.MetricsAddr != "" {
metricsL, err := net.Listen("tcp", cfg.MetricsAddr)
if err != nil {
return nil, pkgerrors.Wrap(err, "error setting up metrics listener")
}
defer func() {
if retErr != nil {
metricsL.Close()
}
}()
go MetricsServerStart(cfg.Provider, metricsL)
} else {
log.G(ctx).Info("Skipping metrics server startup since no address was provided")
}
if err := s.registerNode(ctx); err != nil {
return s, err
}
tick := time.Tick(5 * time.Second)
go func() {
for {
select {
case <-ctx.Done():
return
case <-tick:
ctx, span := trace.StartSpan(ctx, "syncActualState")
s.updateNode(ctx)
s.updatePodStatuses(ctx)
span.End()
}
}
}()
return s, nil
}
// Run creates and starts an instance of the pod controller, blocking until it stops.
func (s *Server) Run(ctx context.Context) error {
return NewPodController(s).Run(ctx, s.podSyncWorkers)
}