Files
virtual-kubelet/vkubelet/vkubelet.go
Brian Goff aee1fde504 Fix a case where provider pod status is not found
Updates the pod status in Kubernetes to "Failed" when the pod status is
not found from the provider.

Note that currently thet most providers return `nil, nil` when a pod is
not found. This works but should possibly return a typed error so we can
determine if the error means not found or something else... but this
works as is so I haven't changed it.
2018-11-06 16:11:42 -08:00

247 lines
6.3 KiB
Go

package vkubelet
import (
"context"
"net"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
const (
podStatusReasonProviderFailed = "ProviderFailed"
)
// Server masquarades itself as a kubelet and allows for the virtual node to be backed by non-vm/node providers.
type Server struct {
nodeName string
namespace string
k8sClient *kubernetes.Clientset
taint *corev1.Taint
provider providers.Provider
resourceManager *manager.ResourceManager
podSyncWorkers int
podCh chan *podNotification
}
// Config is used to configure a new server.
type Config struct {
APIConfig APIConfig
Client *kubernetes.Clientset
MetricsAddr string
Namespace string
NodeName string
Provider providers.Provider
ResourceManager *manager.ResourceManager
Taint *corev1.Taint
PodSyncWorkers int
}
// APIConfig is used to configure the API server of the virtual kubelet.
type APIConfig struct {
CertPath string
KeyPath string
Addr string
}
type podNotification struct {
pod *corev1.Pod
ctx context.Context
}
// New creates a new virtual-kubelet server.
func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
s = &Server{
namespace: cfg.Namespace,
nodeName: cfg.NodeName,
taint: cfg.Taint,
k8sClient: cfg.Client,
resourceManager: cfg.ResourceManager,
provider: cfg.Provider,
podSyncWorkers: cfg.PodSyncWorkers,
podCh: make(chan *podNotification, cfg.PodSyncWorkers),
}
ctx = log.WithLogger(ctx, log.G(ctx))
apiL, err := net.Listen("tcp", cfg.APIConfig.Addr)
if err != nil {
return nil, pkgerrors.Wrap(err, "error setting up API listener")
}
defer func() {
if retErr != nil {
apiL.Close()
}
}()
go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath)
if cfg.MetricsAddr != "" {
metricsL, err := net.Listen("tcp", cfg.MetricsAddr)
if err != nil {
return nil, pkgerrors.Wrap(err, "error setting up metrics listener")
}
defer func() {
if retErr != nil {
metricsL.Close()
}
}()
go MetricsServerStart(cfg.Provider, metricsL)
} else {
log.G(ctx).Info("Skipping metrics server startup since no address was provided")
}
if err := s.registerNode(ctx); err != nil {
return s, err
}
tick := time.Tick(5 * time.Second)
go func() {
for {
select {
case <-ctx.Done():
return
case <-tick:
ctx, span := trace.StartSpan(ctx, "syncActualState")
s.updateNode(ctx)
s.updatePodStatuses(ctx)
span.End()
}
}
}()
return s, nil
}
// Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster.
// Run will block until Stop is called or a SIGINT or SIGTERM signal is received.
func (s *Server) Run(ctx context.Context) error {
if err := s.watchForPodEvent(ctx); err != nil {
if pkgerrors.Cause(err) == context.Canceled {
return err
}
log.G(ctx).Error(err)
}
return nil
}
// reconcile is the main reconciliation loop that compares differences between Kubernetes and
// the active provider and reconciles the differences.
func (s *Server) reconcile(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "reconcile")
defer span.End()
logger := log.G(ctx)
logger.Debug("Start reconcile")
defer logger.Debug("End reconcile")
providerPods, err := s.provider.GetPods(ctx)
if err != nil {
logger.WithError(err).Error("Error getting pod list from provider")
return
}
var deletePods []*corev1.Pod
for _, pod := range providerPods {
// Delete pods that don't exist in Kubernetes
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil {
deletePods = append(deletePods, pod)
}
}
span.Annotate(nil, "Got provider pods")
var failedDeleteCount int64
for _, pod := range deletePods {
logger := logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
logger.Debug("Deleting pod")
if err := s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Error deleting pod")
failedDeleteCount++
continue
}
}
span.Annotate(
[]trace.Attribute{
trace.Int64Attribute("expected_delete_pods_count", int64(len(deletePods))),
trace.Int64Attribute("failed_delete_pods_count", failedDeleteCount),
},
"Cleaned up stale provider pods",
)
pods := s.resourceManager.GetPods()
var createPods []*corev1.Pod
cleanupPods := deletePods[:0]
for _, pod := range pods {
var providerPod *corev1.Pod
for _, p := range providerPods {
if p.Namespace == pod.Namespace && p.Name == pod.Name {
providerPod = p
break
}
}
// Delete pod if DeletionTimestamp is set
if pod.DeletionTimestamp != nil {
cleanupPods = append(cleanupPods, pod)
continue
}
if providerPod == nil &&
pod.DeletionTimestamp == nil &&
pod.Status.Phase != corev1.PodSucceeded &&
pod.Status.Phase != corev1.PodFailed &&
pod.Status.Reason != podStatusReasonProviderFailed {
createPods = append(createPods, pod)
}
}
var failedCreateCount int64
for _, pod := range createPods {
logger := logger.WithField("pod", pod.Name)
logger.Debug("Creating pod")
if err := s.createPod(ctx, pod); err != nil {
failedCreateCount++
logger.WithError(err).Error("Error creating pod")
continue
}
}
span.Annotate(
[]trace.Attribute{
trace.Int64Attribute("expected_created_pods", int64(len(createPods))),
trace.Int64Attribute("failed_pod_creates", failedCreateCount),
},
"Created pods in provider",
)
var failedCleanupCount int64
for _, pod := range cleanupPods {
logger := logger.WithField("pod", pod.Name)
log.Trace(logger, "Pod pending deletion")
var err error
if err = s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Error deleting pod")
failedCleanupCount++
continue
}
log.Trace(logger, "Pod deletion complete")
}
span.Annotate(
[]trace.Attribute{
trace.Int64Attribute("expected_cleaned_up_pods", int64(len(cleanupPods))),
trace.Int64Attribute("cleaned_up_pod_failures", failedCleanupCount),
},
"Cleaned up provider pods marked for deletion",
)
}