* vendor: add vendored code
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* controller: use shared informers and a work queue
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* errors: use cpuguy83/strongerrors
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* aci: fix test that uses resource manager
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* readme: clarify skaffold run before e2e
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* cmd: use root context everywhere
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: refactor pod lifecycle management
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* e2e: fix race in test when observing deletions
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* e2e: test pod forced deletion
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* cmd: fix root context potential leak
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: rename metaKey
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: remove calls to HandleError
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* Revert "errors: use cpuguy83/strongerrors"
This reverts commit f031fc6d.
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* manager: remove redundant lister constraint
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: rename the pod event recorder
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: amend misleading comment
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* mock: add tracing
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: add tracing
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* test: observe timeouts
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* trace: remove unnecessary comments
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: limit concurrency in deleteDanglingPods
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: never store context, always pass in calls
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: remove HandleCrash and just panic
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: don't sync succeeded pods
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: ensure pod deletion from kubernetes
Signed-off-by: Paulo Pires <pjpires@gmail.com>
132 lines
3.2 KiB
Go
132 lines
3.2 KiB
Go
package vkubelet
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"time"
|
|
|
|
pkgerrors "github.com/pkg/errors"
|
|
"go.opencensus.io/trace"
|
|
corev1 "k8s.io/api/core/v1"
|
|
corev1informers "k8s.io/client-go/informers/core/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
|
|
"github.com/virtual-kubelet/virtual-kubelet/log"
|
|
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
|
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
|
)
|
|
|
|
const (
|
|
podStatusReasonProviderFailed = "ProviderFailed"
|
|
)
|
|
|
|
// Server masquarades itself as a kubelet and allows for the virtual node to be backed by non-vm/node providers.
|
|
type Server struct {
|
|
nodeName string
|
|
namespace string
|
|
k8sClient *kubernetes.Clientset
|
|
taint *corev1.Taint
|
|
provider providers.Provider
|
|
resourceManager *manager.ResourceManager
|
|
podSyncWorkers int
|
|
podCh chan *podNotification
|
|
podInformer corev1informers.PodInformer
|
|
}
|
|
|
|
// Config is used to configure a new server.
|
|
type Config struct {
|
|
APIConfig APIConfig
|
|
Client *kubernetes.Clientset
|
|
MetricsAddr string
|
|
Namespace string
|
|
NodeName string
|
|
Provider providers.Provider
|
|
ResourceManager *manager.ResourceManager
|
|
Taint *corev1.Taint
|
|
PodSyncWorkers int
|
|
PodInformer corev1informers.PodInformer
|
|
}
|
|
|
|
// APIConfig is used to configure the API server of the virtual kubelet.
|
|
type APIConfig struct {
|
|
CertPath string
|
|
KeyPath string
|
|
Addr string
|
|
}
|
|
|
|
type podNotification struct {
|
|
pod *corev1.Pod
|
|
ctx context.Context
|
|
}
|
|
|
|
// New creates a new virtual-kubelet server.
|
|
func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
|
|
s = &Server{
|
|
namespace: cfg.Namespace,
|
|
nodeName: cfg.NodeName,
|
|
taint: cfg.Taint,
|
|
k8sClient: cfg.Client,
|
|
resourceManager: cfg.ResourceManager,
|
|
provider: cfg.Provider,
|
|
podSyncWorkers: cfg.PodSyncWorkers,
|
|
podCh: make(chan *podNotification, cfg.PodSyncWorkers),
|
|
podInformer: cfg.PodInformer,
|
|
}
|
|
|
|
ctx = log.WithLogger(ctx, log.G(ctx))
|
|
|
|
apiL, err := net.Listen("tcp", cfg.APIConfig.Addr)
|
|
if err != nil {
|
|
return nil, pkgerrors.Wrap(err, "error setting up API listener")
|
|
}
|
|
defer func() {
|
|
if retErr != nil {
|
|
apiL.Close()
|
|
}
|
|
}()
|
|
|
|
go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath)
|
|
|
|
if cfg.MetricsAddr != "" {
|
|
metricsL, err := net.Listen("tcp", cfg.MetricsAddr)
|
|
if err != nil {
|
|
return nil, pkgerrors.Wrap(err, "error setting up metrics listener")
|
|
}
|
|
defer func() {
|
|
if retErr != nil {
|
|
metricsL.Close()
|
|
}
|
|
}()
|
|
go MetricsServerStart(cfg.Provider, metricsL)
|
|
} else {
|
|
log.G(ctx).Info("Skipping metrics server startup since no address was provided")
|
|
}
|
|
|
|
if err := s.registerNode(ctx); err != nil {
|
|
return s, err
|
|
}
|
|
|
|
tick := time.Tick(5 * time.Second)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-tick:
|
|
ctx, span := trace.StartSpan(ctx, "syncActualState")
|
|
s.updateNode(ctx)
|
|
s.updatePodStatuses(ctx)
|
|
span.End()
|
|
}
|
|
}
|
|
}()
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// Run creates and starts an instance of the pod controller, blocking until it stops.
|
|
func (s *Server) Run(ctx context.Context) error {
|
|
return NewPodController(s).Run(ctx, s.podSyncWorkers)
|
|
}
|