Files
virtual-kubelet/vkubelet/vkubelet.go
Brian Goff bec818bf3c Do not close pod sync, use context cancel instead. (#402)
Closing the channel is racey and can lead to a panic on exit.
Instead rely on context cancellation to know if workers should exit.
2018-11-05 11:37:00 -08:00

247 lines
6.2 KiB
Go

package vkubelet
import (
"context"
"net"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
const (
podStatusReasonProviderFailed = "ProviderFailed"
)
// Server masquarades itself as a kubelet and allows for the virtual node to be backed by non-vm/node providers.
type Server struct {
nodeName string
namespace string
k8sClient *kubernetes.Clientset
taint *corev1.Taint
provider providers.Provider
resourceManager *manager.ResourceManager
podSyncWorkers int
podCh chan *podNotification
}
// Config is used to configure a new server.
type Config struct {
APIConfig APIConfig
Client *kubernetes.Clientset
MetricsAddr string
Namespace string
NodeName string
Provider providers.Provider
ResourceManager *manager.ResourceManager
Taint *corev1.Taint
PodSyncWorkers int
}
// APIConfig is used to configure the API server of the virtual kubelet.
type APIConfig struct {
CertPath string
KeyPath string
Addr string
}
type podNotification struct {
pod *corev1.Pod
ctx context.Context
}
// New creates a new virtual-kubelet server.
func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
s = &Server{
namespace: cfg.Namespace,
nodeName: cfg.NodeName,
taint: cfg.Taint,
k8sClient: cfg.Client,
resourceManager: cfg.ResourceManager,
provider: cfg.Provider,
podSyncWorkers: cfg.PodSyncWorkers,
podCh: make(chan *podNotification, cfg.PodSyncWorkers),
}
ctx = log.WithLogger(ctx, log.G(ctx))
apiL, err := net.Listen("tcp", cfg.APIConfig.Addr)
if err != nil {
return nil, pkgerrors.Wrap(err, "error setting up API listener")
}
defer func() {
if retErr != nil {
apiL.Close()
}
}()
go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath)
if cfg.MetricsAddr != "" {
metricsL, err := net.Listen("tcp", cfg.MetricsAddr)
if err != nil {
return nil, pkgerrors.Wrap(err, "error setting up metrics listener")
}
defer func() {
if retErr != nil {
metricsL.Close()
}
}()
go MetricsServerStart(cfg.Provider, metricsL)
} else {
log.G(ctx).Info("Skipping metrics server startup since no address was provided")
}
if err := s.registerNode(ctx); err != nil {
return s, err
}
tick := time.Tick(5 * time.Second)
go func() {
for {
select {
case <-ctx.Done():
return
case <-tick:
ctx, span := trace.StartSpan(ctx, "syncActualState")
s.updateNode(ctx)
s.updatePodStatuses(ctx)
span.End()
}
}
}()
return s, nil
}
// Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster.
// Run will block until Stop is called or a SIGINT or SIGTERM signal is received.
func (s *Server) Run(ctx context.Context) error {
if err := s.watchForPodEvent(ctx); err != nil {
if pkgerrors.Cause(err) == context.Canceled {
return err
}
log.G(ctx).Error(err)
}
return nil
}
// reconcile is the main reconciliation loop that compares differences between Kubernetes and
// the active provider and reconciles the differences.
func (s *Server) reconcile(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "reconcile")
defer span.End()
logger := log.G(ctx)
logger.Debug("Start reconcile")
defer logger.Debug("End reconcile")
providerPods, err := s.provider.GetPods(ctx)
if err != nil {
logger.WithError(err).Error("Error getting pod list from provider")
return
}
var deletePods []*corev1.Pod
for _, pod := range providerPods {
// Delete pods that don't exist in Kubernetes
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil {
deletePods = append(deletePods, pod)
}
}
span.Annotate(nil, "Got provider pods")
var failedDeleteCount int64
for _, pod := range deletePods {
logger := logger.WithField("pod", pod.Name)
logger.Debug("Deleting pod '%s'\n", pod.Name)
if err := s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Error deleting pod")
failedDeleteCount++
continue
}
}
span.Annotate(
[]trace.Attribute{
trace.Int64Attribute("expected_delete_pods_count", int64(len(deletePods))),
trace.Int64Attribute("failed_delete_pods_count", failedDeleteCount),
},
"Cleaned up stale provider pods",
)
pods := s.resourceManager.GetPods()
var createPods []*corev1.Pod
cleanupPods := deletePods[:0]
for _, pod := range pods {
var providerPod *corev1.Pod
for _, p := range providerPods {
if p.Namespace == pod.Namespace && p.Name == pod.Name {
providerPod = p
break
}
}
// Delete pod if DeletionTimestamp is set
if pod.DeletionTimestamp != nil {
cleanupPods = append(cleanupPods, pod)
continue
}
if providerPod == nil &&
pod.DeletionTimestamp == nil &&
pod.Status.Phase != corev1.PodSucceeded &&
pod.Status.Phase != corev1.PodFailed &&
pod.Status.Reason != podStatusReasonProviderFailed {
createPods = append(createPods, pod)
}
}
var failedCreateCount int64
for _, pod := range createPods {
logger := logger.WithField("pod", pod.Name)
logger.Debug("Creating pod")
if err := s.createPod(ctx, pod); err != nil {
failedCreateCount++
logger.WithError(err).Error("Error creating pod")
continue
}
}
span.Annotate(
[]trace.Attribute{
trace.Int64Attribute("expected_created_pods", int64(len(createPods))),
trace.Int64Attribute("failed_pod_creates", failedCreateCount),
},
"Created pods in provider",
)
var failedCleanupCount int64
for _, pod := range cleanupPods {
logger := logger.WithField("pod", pod.Name)
log.Trace(logger, "Pod pending deletion")
var err error
if err = s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Error deleting pod")
failedCleanupCount++
continue
}
log.Trace(logger, "Pod deletion complete")
}
span.Annotate(
[]trace.Attribute{
trace.Int64Attribute("expected_cleaned_up_pods", int64(len(cleanupPods))),
trace.Int64Attribute("cleaned_up_pod_failures", failedCleanupCount),
},
"Cleaned up provider pods marked for deletion",
)
}