package vkubelet import ( "context" "fmt" "net" "os" "os/signal" "strings" "syscall" "time" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" "go.opencensus.io/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" ) const ( PodStatusReason_ProviderFailed = "ProviderFailed" ) // Server masquarades itself as a kubelet and allows for the virtual node to be backed by non-vm/node providers. type Server struct { nodeName string namespace string k8sClient *kubernetes.Clientset taint *corev1.Taint provider providers.Provider podWatcher watch.Interface resourceManager *manager.ResourceManager } // Config is used to configure a new server. type Config struct { APIConfig APIConfig Client *kubernetes.Clientset MetricsAddr string Namespace string NodeName string Provider providers.Provider ResourceManager *manager.ResourceManager Taint *corev1.Taint } type APIConfig struct { CertPath string KeyPath string Addr string } // New creates a new virtual-kubelet server. func New(ctx context.Context, cfg Config) (s *Server, retErr error) { s = &Server{ namespace: cfg.Namespace, nodeName: cfg.NodeName, taint: cfg.Taint, k8sClient: cfg.Client, resourceManager: cfg.ResourceManager, provider: cfg.Provider, } ctx = log.WithLogger(ctx, log.G(ctx)) apiL, err := net.Listen("tcp", cfg.APIConfig.Addr) if err != nil { return nil, pkgerrors.Wrap(err, "error setting up API listener") } defer func() { if retErr != nil { apiL.Close() } }() go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath) if cfg.MetricsAddr != "" { metricsL, err := net.Listen("tcp", cfg.MetricsAddr) if err != nil { return nil, pkgerrors.Wrap(err, "error setting up metrics listener") } defer func() { if retErr != nil { metricsL.Close() } }() go MetricsServerStart(cfg.Provider, metricsL) } else { log.G(ctx).Info("Skipping metrics server startup since no address was provided") } if err := s.registerNode(ctx); err != nil { return s, err } tick := time.Tick(5 * time.Second) go func() { for range tick { ctx, span := trace.StartSpan(ctx, "reconciliationTick") s.updateNode(ctx) s.updatePodStatuses(ctx) span.End() } }() return s, nil } // registerNode registers this virtual node with the Kubernetes API. func (s *Server) registerNode(ctx context.Context) error { ctx, span := trace.StartSpan(ctx, "registerNode") defer span.End() taints := make([]corev1.Taint, 0) if s.taint != nil { taints = append(taints, *s.taint) } node := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: s.nodeName, Labels: map[string]string{ "type": "virtual-kubelet", "kubernetes.io/role": "agent", "beta.kubernetes.io/os": strings.ToLower(s.provider.OperatingSystem()), "kubernetes.io/hostname": s.nodeName, "alpha.service-controller.kubernetes.io/exclude-balancer": "true", }, }, Spec: corev1.NodeSpec{ Taints: taints, }, Status: corev1.NodeStatus{ NodeInfo: corev1.NodeSystemInfo{ OperatingSystem: s.provider.OperatingSystem(), Architecture: "amd64", KubeletVersion: "v1.11.2", }, Capacity: s.provider.Capacity(ctx), Allocatable: s.provider.Capacity(ctx), Conditions: s.provider.NodeConditions(ctx), Addresses: s.provider.NodeAddresses(ctx), DaemonEndpoints: *s.provider.NodeDaemonEndpoints(ctx), }, } addNodeAttributes(span, node) if _, err := s.k8sClient.CoreV1().Nodes().Create(node); err != nil && !errors.IsAlreadyExists(err) { span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) return err } span.Annotate(nil, "Registered node with k8s") log.G(ctx).Info("Registered node") return nil } // Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster. // Run will block until Stop is called or a SIGINT or SIGTERM signal is received. func (s *Server) Run(ctx context.Context) error { shouldStop := false sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) go func() { <-sig shouldStop = true s.Stop() }() for { opts := metav1.ListOptions{ FieldSelector: fields.OneTermEqualSelector("spec.nodeName", s.nodeName).String(), } pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts) if err != nil { return pkgerrors.Wrap(err, "error getting pod list") } s.resourceManager.SetPods(pods) s.reconcile(ctx) opts.ResourceVersion = pods.ResourceVersion s.podWatcher, err = s.k8sClient.CoreV1().Pods(s.namespace).Watch(opts) if err != nil { return pkgerrors.Wrap(err, "failed to watch pods") } loop: for { select { case ev, ok := <-s.podWatcher.ResultChan(): if !ok { if shouldStop { log.G(ctx).Info("Pod watcher is stopped") return nil } log.G(ctx).Error("Pod watcher connection is closed unexpectedly") break loop } log.G(ctx).WithField("type", ev.Type).Debug("Pod watcher event is received") reconcile := false switch ev.Type { case watch.Added: reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod)) case watch.Modified: reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod)) case watch.Deleted: reconcile = s.resourceManager.DeletePod(ev.Object.(*corev1.Pod)) } if reconcile { s.reconcile(ctx) } } } time.Sleep(5 * time.Second) } } // Stop shutsdown the server. // It does not shutdown pods assigned to the virtual node. func (s *Server) Stop() { if s.podWatcher != nil { s.podWatcher.Stop() } } type taintsStringer []corev1.Taint func (t taintsStringer) String() string { var s string for _, taint := range t { if s == "" { s = taint.Key + "=" + taint.Value + ":" + string(taint.Effect) } else { s += ", " + taint.Key + "=" + taint.Value + ":" + string(taint.Effect) } } return s } func addNodeAttributes(span *trace.Span, n *corev1.Node) { span.AddAttributes( trace.StringAttribute("UID", string(n.UID)), trace.StringAttribute("name", n.Name), trace.StringAttribute("cluster", n.ClusterName), ) if span.IsRecordingEvents() { span.AddAttributes(trace.StringAttribute("taints", taintsStringer(n.Spec.Taints).String())) } } // updateNode updates the node status within Kubernetes with updated NodeConditions. func (s *Server) updateNode(ctx context.Context) { ctx, span := trace.StartSpan(ctx, "updateNode") defer span.End() opts := metav1.GetOptions{} n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts) if err != nil && !errors.IsNotFound(err) { log.G(ctx).WithError(err).Error("Failed to retrieve node") span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) return } addNodeAttributes(span, n) span.Annotate(nil, "Fetched node details from k8s") if errors.IsNotFound(err) { if err = s.registerNode(ctx); err != nil { log.G(ctx).WithError(err).Error("Failed to register node") span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) } else { span.Annotate(nil, "Registered node in k8s") } return } n.ResourceVersion = "" // Blank out resource version to prevent object has been modified error n.Status.Conditions = s.provider.NodeConditions(ctx) capacity := s.provider.Capacity(ctx) n.Status.Capacity = capacity n.Status.Allocatable = capacity n.Status.Addresses = s.provider.NodeAddresses(ctx) n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n) if err != nil { log.G(ctx).WithError(err).Error("Failed to update node") span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) return } } // reconcile is the main reconciliation loop that compares differences between Kubernetes and // the active provider and reconciles the differences. func (s *Server) reconcile(ctx context.Context) { ctx, span := trace.StartSpan(ctx, "reconcile") defer span.End() logger := log.G(ctx) logger.Debug("Start reconcile") defer logger.Debug("End reconcile") providerPods, err := s.provider.GetPods(ctx) if err != nil { logger.WithError(err).Error("Error getting pod list from provider") return } var deletePods []*corev1.Pod for _, pod := range providerPods { // Delete pods that don't exist in Kubernetes if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil { deletePods = append(deletePods, pod) } } span.Annotate(nil, "Got provider pods") var failedDeleteCount int64 for _, pod := range deletePods { logger := logger.WithField("pod", pod.Name) logger.Debug("Deleting pod '%s'\n", pod.Name) if err := s.deletePod(ctx, pod); err != nil { logger.WithError(err).Error("Error deleting pod") failedDeleteCount++ continue } } span.Annotate( []trace.Attribute{ trace.Int64Attribute("expected_delete_pods_count", int64(len(deletePods))), trace.Int64Attribute("failed_delete_pods_count", failedDeleteCount), }, "Cleaned up stale provider pods", ) pods := s.resourceManager.GetPods() var createPods []*corev1.Pod cleanupPods := deletePods[:0] for _, pod := range pods { var providerPod *corev1.Pod for _, p := range providerPods { if p.Namespace == pod.Namespace && p.Name == pod.Name { providerPod = p break } } // Delete pod if DeletionTimestamp is set if pod.DeletionTimestamp != nil { cleanupPods = append(cleanupPods, pod) continue } if providerPod == nil && pod.DeletionTimestamp == nil && pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed && pod.Status.Reason != PodStatusReason_ProviderFailed { createPods = append(createPods, pod) } } var failedCreateCount int64 for _, pod := range createPods { logger := logger.WithField("pod", pod.Name) logger.Debug("Creating pod") if err := s.createPod(ctx, pod); err != nil { failedCreateCount++ logger.WithError(err).Error("Error creating pod") continue } } span.Annotate( []trace.Attribute{ trace.Int64Attribute("expected_created_pods", int64(len(createPods))), trace.Int64Attribute("failed_pod_creates", failedCreateCount), }, "Created pods in provider", ) var failedCleanupCount int64 for _, pod := range cleanupPods { logger := logger.WithField("pod", pod.Name) log.Trace(logger, "Pod pending deletion") var err error if err = s.deletePod(ctx, pod); err != nil { logger.WithError(err).Error("Error deleting pod") failedCleanupCount++ continue } log.Trace(logger, "Pod deletion complete") } span.Annotate( []trace.Attribute{ trace.Int64Attribute("expected_cleaned_up_pods", int64(len(cleanupPods))), trace.Int64Attribute("cleaned_up_pod_failures", failedCleanupCount), }, "Cleaned up provider pods marked for deletion", ) } func addPodAttributes(span *trace.Span, pod *corev1.Pod) { span.AddAttributes( trace.StringAttribute("uid", string(pod.UID)), trace.StringAttribute("namespace", pod.Namespace), trace.StringAttribute("name", pod.Name), ) } func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "createPod") defer span.End() addPodAttributes(span, pod) if err := s.populateSecretsAndConfigMapsInEnv(pod); err != nil { span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: err.Error()}) return err } logger := log.G(ctx).WithField("pod", pod.Name) if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { podPhase := corev1.PodPending if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { podPhase = corev1.PodFailed } pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error pod.Status.Phase = podPhase pod.Status.Reason = PodStatusReason_ProviderFailed pod.Status.Message = origErr.Error() _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) if err != nil { logger.WithError(err).Warn("Failed to update pod status") } else { span.Annotate(nil, "Updated k8s pod status") } span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: origErr.Error()}) return origErr } span.Annotate(nil, "Created pod in provider") logger.Info("Pod created") return nil } func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "deletePod") defer span.End() addPodAttributes(span, pod) var delErr error if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) { span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: delErr.Error()}) return delErr } span.Annotate(nil, "Deleted pod from provider") logger := log.G(ctx).WithField("pod", pod.Name) if !errors.IsNotFound(delErr) { var grace int64 if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) { if errors.IsNotFound(err) { span.Annotate(nil, "Pod does not exist in k8s, nothing to delete") return nil } span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) return fmt.Errorf("Failed to delete kubernetes pod: %s", err) } span.Annotate(nil, "Deleted pod from k8s") s.resourceManager.DeletePod(pod) span.Annotate(nil, "Deleted pod from internal state") logger.Info("Pod deleted") } return nil } // updatePodStatuses syncs the providers pod status with the kubernetes pod status. func (s *Server) updatePodStatuses(ctx context.Context) { ctx, span := trace.StartSpan(ctx, "updatePodStatuses") defer span.End() // Update all the pods with the provider status. pods := s.resourceManager.GetPods() span.AddAttributes(trace.Int64Attribute("nPods", int64(len(pods)))) for _, pod := range pods { if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed || pod.Status.Reason == PodStatusReason_ProviderFailed { continue } status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name) if err != nil { log.G(ctx).WithField("pod", pod.Name).Error("Error retrieving pod status") return } // Update the pod's status if status != nil { pod.Status = *status s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) } } } // populateSecretsAndConfigMapsInEnv populates Secrets and ConfigMap into environment variables func (s *Server) populateSecretsAndConfigMapsInEnv(pod *corev1.Pod) error { for _, c := range pod.Spec.Containers { for i, e := range c.Env { if e.ValueFrom != nil { // Populate ConfigMaps to Env if e.ValueFrom.ConfigMapKeyRef != nil { vf := e.ValueFrom.ConfigMapKeyRef cm, err := s.resourceManager.GetConfigMap(vf.Name, pod.Namespace) if vf.Optional != nil && !*vf.Optional && errors.IsNotFound(err) { return fmt.Errorf("ConfigMap %s is required by Pod %s and does not exist", vf.Name, pod.Name) } if err != nil { return fmt.Errorf("Error retrieving ConfigMap %s required by Pod %s: %s", vf.Name, pod.Name, err) } var ok bool if c.Env[i].Value, ok = cm.Data[vf.Key]; !ok { return fmt.Errorf("ConfigMap %s key %s is required by Pod %s and does not exist", vf.Name, vf.Key, pod.Name) } continue } // Populate Secrets to Env if e.ValueFrom.SecretKeyRef != nil { vf := e.ValueFrom.SecretKeyRef sec, err := s.resourceManager.GetSecret(vf.Name, pod.Namespace) if vf.Optional != nil && !*vf.Optional && errors.IsNotFound(err) { return fmt.Errorf("Secret %s is required by Pod %s and does not exist", vf.Name, pod.Name) } v, ok := sec.Data[vf.Key] if !ok { return fmt.Errorf("Secret %s key %s is required by Pod %s and does not exist", vf.Name, vf.Key, pod.Name) } c.Env[i].Value = string(v) continue } // TODO: Populate Downward API to Env if e.ValueFrom.FieldRef != nil { continue } // TODO: Populate resource requests if e.ValueFrom.ResourceFieldRef != nil { continue } } } } return nil }