From c1fe92313178d011aa8db61dd45142761e1f453e Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 12 Oct 2018 17:36:37 -0700 Subject: [PATCH] Minor refactorings (#368) * Split vkubelet funcitons into separate files. * Minor re-org for cmd/census* * refactor run loop --- cmd/census_jaeger.go | 1 + cmd/{census.go => census_tracing.go} | 0 cmd/root.go | 16 +- vkubelet/env.go | 63 +++++ vkubelet/node.go | 129 ++++++++++ vkubelet/pod.go | 166 +++++++++++++ vkubelet/vkubelet.go | 342 ++------------------------- 7 files changed, 387 insertions(+), 330 deletions(-) rename cmd/{census.go => census_tracing.go} (100%) create mode 100644 vkubelet/env.go create mode 100644 vkubelet/node.go create mode 100644 vkubelet/pod.go diff --git a/cmd/census_jaeger.go b/cmd/census_jaeger.go index 1e1609bb0..d4d388206 100644 --- a/cmd/census_jaeger.go +++ b/cmd/census_jaeger.go @@ -14,6 +14,7 @@ func init() { RegisterTracingExporter("jaeger", NewJaegerExporter) } +// NewJaegerExporter creates a new opencensus tracing exporter. func NewJaegerExporter(opts TracingExporterOptions) (trace.Exporter, error) { jOpts := jaeger.Options{ Endpoint: os.Getenv("JAEGER_ENDPOINT"), diff --git a/cmd/census.go b/cmd/census_tracing.go similarity index 100% rename from cmd/census.go rename to cmd/census_tracing.go diff --git a/cmd/root.go b/cmd/root.go index 724fc9e06..bf87b5b39 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -18,9 +18,11 @@ import ( "context" "fmt" "os" + "os/signal" "path/filepath" "strconv" "strings" + "syscall" "github.com/Sirupsen/logrus" "github.com/cpuguy83/strongerrors" @@ -71,7 +73,8 @@ var RootCmd = &cobra.Command{ backend implementation allowing users to create kubernetes nodes without running the kubelet. This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, Run: func(cmd *cobra.Command, args []string) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + f, err := vkubelet.New(ctx, vkubelet.Config{ Client: k8sClient, Namespace: kubeNamespace, @@ -85,7 +88,16 @@ This allows users to schedule kubernetes workloads on nodes that aren't running if err != nil { log.L.WithError(err).Fatal("Error initializing virtual kubelet") } - if err := f.Run(ctx); err != nil { + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sig + cancel() + f.Stop() + }() + + if err := f.Run(ctx); err != nil && errors.Cause(err) != context.Canceled { log.L.Fatal(err) } }, diff --git a/vkubelet/env.go b/vkubelet/env.go new file mode 100644 index 000000000..f63d458f3 --- /dev/null +++ b/vkubelet/env.go @@ -0,0 +1,63 @@ +package vkubelet + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" +) + +// populateEnvironmentVariables populates Secrets and ConfigMap into environment variables +func (s *Server) populateEnvironmentVariables(pod *corev1.Pod) error { + for _, c := range pod.Spec.Containers { + for i, e := range c.Env { + if e.ValueFrom != nil { + // Populate ConfigMaps to Env + if e.ValueFrom.ConfigMapKeyRef != nil { + vf := e.ValueFrom.ConfigMapKeyRef + cm, err := s.resourceManager.GetConfigMap(vf.Name, pod.Namespace) + if vf.Optional != nil && !*vf.Optional && errors.IsNotFound(err) { + return fmt.Errorf("ConfigMap %s is required by Pod %s and does not exist", vf.Name, pod.Name) + } + + if err != nil { + return fmt.Errorf("Error retrieving ConfigMap %s required by Pod %s: %s", vf.Name, pod.Name, err) + } + + var ok bool + if c.Env[i].Value, ok = cm.Data[vf.Key]; !ok { + return fmt.Errorf("ConfigMap %s key %s is required by Pod %s and does not exist", vf.Name, vf.Key, pod.Name) + } + continue + } + + // Populate Secrets to Env + if e.ValueFrom.SecretKeyRef != nil { + vf := e.ValueFrom.SecretKeyRef + sec, err := s.resourceManager.GetSecret(vf.Name, pod.Namespace) + if vf.Optional != nil && !*vf.Optional && errors.IsNotFound(err) { + return fmt.Errorf("Secret %s is required by Pod %s and does not exist", vf.Name, pod.Name) + } + v, ok := sec.Data[vf.Key] + if !ok { + return fmt.Errorf("Secret %s key %s is required by Pod %s and does not exist", vf.Name, vf.Key, pod.Name) + } + c.Env[i].Value = string(v) + continue + } + + // TODO: Populate Downward API to Env + if e.ValueFrom.FieldRef != nil { + continue + } + + // TODO: Populate resource requests + if e.ValueFrom.ResourceFieldRef != nil { + continue + } + } + } + } + + return nil +} diff --git a/vkubelet/node.go b/vkubelet/node.go new file mode 100644 index 000000000..5f2221eec --- /dev/null +++ b/vkubelet/node.go @@ -0,0 +1,129 @@ +package vkubelet + +import ( + "context" + "strings" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "go.opencensus.io/trace" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// registerNode registers this virtual node with the Kubernetes API. +func (s *Server) registerNode(ctx context.Context) error { + ctx, span := trace.StartSpan(ctx, "registerNode") + defer span.End() + + taints := make([]corev1.Taint, 0) + + if s.taint != nil { + taints = append(taints, *s.taint) + } + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.nodeName, + Labels: map[string]string{ + "type": "virtual-kubelet", + "kubernetes.io/role": "agent", + "beta.kubernetes.io/os": strings.ToLower(s.provider.OperatingSystem()), + "kubernetes.io/hostname": s.nodeName, + "alpha.service-controller.kubernetes.io/exclude-balancer": "true", + }, + }, + Spec: corev1.NodeSpec{ + Taints: taints, + }, + Status: corev1.NodeStatus{ + NodeInfo: corev1.NodeSystemInfo{ + OperatingSystem: s.provider.OperatingSystem(), + Architecture: "amd64", + KubeletVersion: "v1.11.2", + }, + Capacity: s.provider.Capacity(ctx), + Allocatable: s.provider.Capacity(ctx), + Conditions: s.provider.NodeConditions(ctx), + Addresses: s.provider.NodeAddresses(ctx), + DaemonEndpoints: *s.provider.NodeDaemonEndpoints(ctx), + }, + } + addNodeAttributes(span, node) + if _, err := s.k8sClient.CoreV1().Nodes().Create(node); err != nil && !errors.IsAlreadyExists(err) { + span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + return err + } + span.Annotate(nil, "Registered node with k8s") + + log.G(ctx).Info("Registered node") + + return nil +} + +// updateNode updates the node status within Kubernetes with updated NodeConditions. +func (s *Server) updateNode(ctx context.Context) { + ctx, span := trace.StartSpan(ctx, "updateNode") + defer span.End() + + opts := metav1.GetOptions{} + n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts) + if err != nil && !errors.IsNotFound(err) { + log.G(ctx).WithError(err).Error("Failed to retrieve node") + span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + return + } + addNodeAttributes(span, n) + span.Annotate(nil, "Fetched node details from k8s") + + if errors.IsNotFound(err) { + if err = s.registerNode(ctx); err != nil { + log.G(ctx).WithError(err).Error("Failed to register node") + span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + } else { + span.Annotate(nil, "Registered node in k8s") + } + return + } + + n.ResourceVersion = "" // Blank out resource version to prevent object has been modified error + n.Status.Conditions = s.provider.NodeConditions(ctx) + + capacity := s.provider.Capacity(ctx) + n.Status.Capacity = capacity + n.Status.Allocatable = capacity + + n.Status.Addresses = s.provider.NodeAddresses(ctx) + + n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n) + if err != nil { + log.G(ctx).WithError(err).Error("Failed to update node") + span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + return + } +} + +type taintsStringer []corev1.Taint + +func (t taintsStringer) String() string { + var s string + for _, taint := range t { + if s == "" { + s = taint.Key + "=" + taint.Value + ":" + string(taint.Effect) + } else { + s += ", " + taint.Key + "=" + taint.Value + ":" + string(taint.Effect) + } + } + return s +} + +func addNodeAttributes(span *trace.Span, n *corev1.Node) { + span.AddAttributes( + trace.StringAttribute("UID", string(n.UID)), + trace.StringAttribute("name", n.Name), + trace.StringAttribute("cluster", n.ClusterName), + ) + if span.IsRecordingEvents() { + span.AddAttributes(trace.StringAttribute("taints", taintsStringer(n.Spec.Taints).String())) + } +} diff --git a/vkubelet/pod.go b/vkubelet/pod.go new file mode 100644 index 000000000..b1bc4e552 --- /dev/null +++ b/vkubelet/pod.go @@ -0,0 +1,166 @@ +package vkubelet + +import ( + "context" + "fmt" + + pkgerrors "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" + "go.opencensus.io/trace" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" +) + +func addPodAttributes(span *trace.Span, pod *corev1.Pod) { + span.AddAttributes( + trace.StringAttribute("uid", string(pod.UID)), + trace.StringAttribute("namespace", pod.Namespace), + trace.StringAttribute("name", pod.Name), + ) +} + +func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error { + ctx, span := trace.StartSpan(ctx, "createPod") + defer span.End() + addPodAttributes(span, pod) + + if err := s.populateEnvironmentVariables(pod); err != nil { + span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: err.Error()}) + return err + } + + logger := log.G(ctx).WithField("pod", pod.Name) + + if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { + podPhase := corev1.PodPending + if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { + podPhase = corev1.PodFailed + } + + pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error + pod.Status.Phase = podPhase + pod.Status.Reason = PodStatusReason_ProviderFailed + pod.Status.Message = origErr.Error() + + _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) + if err != nil { + logger.WithError(err).Warn("Failed to update pod status") + } else { + span.Annotate(nil, "Updated k8s pod status") + } + + span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: origErr.Error()}) + return origErr + } + span.Annotate(nil, "Created pod in provider") + + logger.Info("Pod created") + + return nil +} + +func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error { + ctx, span := trace.StartSpan(ctx, "deletePod") + defer span.End() + addPodAttributes(span, pod) + + var delErr error + if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) { + span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: delErr.Error()}) + return delErr + } + span.Annotate(nil, "Deleted pod from provider") + + logger := log.G(ctx).WithField("pod", pod.Name).WithField("namespace", pod.Namespace) + if !errors.IsNotFound(delErr) { + var grace int64 + if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) { + if errors.IsNotFound(err) { + span.Annotate(nil, "Pod does not exist in k8s, nothing to delete") + return nil + } + + span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + return fmt.Errorf("Failed to delete kubernetes pod: %s", err) + } + span.Annotate(nil, "Deleted pod from k8s") + + s.resourceManager.DeletePod(pod) + span.Annotate(nil, "Deleted pod from internal state") + logger.Info("Pod deleted") + } + + return nil +} + +// updatePodStatuses syncs the providers pod status with the kubernetes pod status. +func (s *Server) updatePodStatuses(ctx context.Context) { + ctx, span := trace.StartSpan(ctx, "updatePodStatuses") + defer span.End() + + // Update all the pods with the provider status. + pods := s.resourceManager.GetPods() + span.AddAttributes(trace.Int64Attribute("nPods", int64(len(pods)))) + + for _, pod := range pods { + if pod.Status.Phase == corev1.PodSucceeded || + pod.Status.Phase == corev1.PodFailed || + pod.Status.Reason == PodStatusReason_ProviderFailed { + continue + } + + status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name) + if err != nil { + log.G(ctx).WithField("pod", pod.Name).WithField("namespace", pod.Namespace).Error("Error retrieving pod status") + return + } + + // Update the pod's status + if status != nil { + pod.Status = *status + s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) + } + } +} + +// watchForPodEvent waits for pod changes from kubernetes and updates the details accordingly in the local state. +// This returns after a single pod event. +func (s *Server) watchForPodEvent(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ev, ok := <-s.podWatcher.ResultChan(): + if !ok { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return pkgerrors.New("pod watcher connection is closed unexpectedly") + } + + ctx, span := trace.StartSpan(ctx, "updateLocalPod") + defer span.End() + span.AddAttributes(trace.StringAttribute("PodEventType", string(ev.Type))) + + log.G(ctx).WithField("type", ev.Type).Debug("Pod watcher event is received") + reconcile := false + switch ev.Type { + case watch.Added: + reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod)) + case watch.Modified: + reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod)) + case watch.Deleted: + reconcile = s.resourceManager.DeletePod(ev.Object.(*corev1.Pod)) + } + + if reconcile { + span.Annotate(nil, "reconciling") + s.reconcile(ctx) + } + } + } +} diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index f9e5f0dc1..1086d3496 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -2,12 +2,7 @@ package vkubelet import ( "context" - "fmt" "net" - "os" - "os/signal" - "strings" - "syscall" "time" pkgerrors "github.com/pkg/errors" @@ -16,7 +11,6 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/providers" "go.opencensus.io/trace" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" @@ -78,6 +72,7 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) { apiL.Close() } }() + go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath) if cfg.MetricsAddr != "" { @@ -103,7 +98,7 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) { go func() { for range tick { - ctx, span := trace.StartSpan(ctx, "reconciliationTick") + ctx, span := trace.StartSpan(ctx, "syncActualState") s.updateNode(ctx) s.updatePodStatuses(ctx) span.End() @@ -113,70 +108,16 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) { return s, nil } -// registerNode registers this virtual node with the Kubernetes API. -func (s *Server) registerNode(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "registerNode") - defer span.End() - - taints := make([]corev1.Taint, 0) - - if s.taint != nil { - taints = append(taints, *s.taint) - } - - node := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: s.nodeName, - Labels: map[string]string{ - "type": "virtual-kubelet", - "kubernetes.io/role": "agent", - "beta.kubernetes.io/os": strings.ToLower(s.provider.OperatingSystem()), - "kubernetes.io/hostname": s.nodeName, - "alpha.service-controller.kubernetes.io/exclude-balancer": "true", - }, - }, - Spec: corev1.NodeSpec{ - Taints: taints, - }, - Status: corev1.NodeStatus{ - NodeInfo: corev1.NodeSystemInfo{ - OperatingSystem: s.provider.OperatingSystem(), - Architecture: "amd64", - KubeletVersion: "v1.11.2", - }, - Capacity: s.provider.Capacity(ctx), - Allocatable: s.provider.Capacity(ctx), - Conditions: s.provider.NodeConditions(ctx), - Addresses: s.provider.NodeAddresses(ctx), - DaemonEndpoints: *s.provider.NodeDaemonEndpoints(ctx), - }, - } - addNodeAttributes(span, node) - if _, err := s.k8sClient.CoreV1().Nodes().Create(node); err != nil && !errors.IsAlreadyExists(err) { - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - return err - } - span.Annotate(nil, "Registered node with k8s") - - log.G(ctx).Info("Registered node") - - return nil -} - // Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster. // Run will block until Stop is called or a SIGINT or SIGTERM signal is received. func (s *Server) Run(ctx context.Context) error { - shouldStop := false - - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sig - shouldStop = true - s.Stop() - }() - for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + opts := metav1.ListOptions{ FieldSelector: fields.OneTermEqualSelector("spec.nodeName", s.nodeName).String(), } @@ -194,40 +135,19 @@ func (s *Server) Run(ctx context.Context) error { return pkgerrors.Wrap(err, "failed to watch pods") } - loop: - for { - select { - case ev, ok := <-s.podWatcher.ResultChan(): - if !ok { - if shouldStop { - log.G(ctx).Info("Pod watcher is stopped") - return nil - } - log.G(ctx).Error("Pod watcher connection is closed unexpectedly") - break loop - } - - log.G(ctx).WithField("type", ev.Type).Debug("Pod watcher event is received") - reconcile := false - switch ev.Type { - case watch.Added: - reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod)) - case watch.Modified: - reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod)) - case watch.Deleted: - reconcile = s.resourceManager.DeletePod(ev.Object.(*corev1.Pod)) - } - - if reconcile { - s.reconcile(ctx) - } + if err := s.watchForPodEvent(ctx); err != nil { + if pkgerrors.Cause(err) == context.Canceled { + return err } + log.G(ctx).Error(err) + break } time.Sleep(5 * time.Second) } + return nil } // Stop shutsdown the server. @@ -238,73 +158,6 @@ func (s *Server) Stop() { } } -type taintsStringer []corev1.Taint - -func (t taintsStringer) String() string { - var s string - for _, taint := range t { - if s == "" { - s = taint.Key + "=" + taint.Value + ":" + string(taint.Effect) - } else { - s += ", " + taint.Key + "=" + taint.Value + ":" + string(taint.Effect) - } - } - return s -} - -func addNodeAttributes(span *trace.Span, n *corev1.Node) { - span.AddAttributes( - trace.StringAttribute("UID", string(n.UID)), - trace.StringAttribute("name", n.Name), - trace.StringAttribute("cluster", n.ClusterName), - ) - if span.IsRecordingEvents() { - span.AddAttributes(trace.StringAttribute("taints", taintsStringer(n.Spec.Taints).String())) - } -} - -// updateNode updates the node status within Kubernetes with updated NodeConditions. -func (s *Server) updateNode(ctx context.Context) { - ctx, span := trace.StartSpan(ctx, "updateNode") - defer span.End() - - opts := metav1.GetOptions{} - n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts) - if err != nil && !errors.IsNotFound(err) { - log.G(ctx).WithError(err).Error("Failed to retrieve node") - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - return - } - addNodeAttributes(span, n) - span.Annotate(nil, "Fetched node details from k8s") - - if errors.IsNotFound(err) { - if err = s.registerNode(ctx); err != nil { - log.G(ctx).WithError(err).Error("Failed to register node") - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - } else { - span.Annotate(nil, "Registered node in k8s") - } - return - } - - n.ResourceVersion = "" // Blank out resource version to prevent object has been modified error - n.Status.Conditions = s.provider.NodeConditions(ctx) - - capacity := s.provider.Capacity(ctx) - n.Status.Capacity = capacity - n.Status.Allocatable = capacity - - n.Status.Addresses = s.provider.NodeAddresses(ctx) - - n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n) - if err != nil { - log.G(ctx).WithError(err).Error("Failed to update node") - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - return - } -} - // reconcile is the main reconciliation loop that compares differences between Kubernetes and // the active provider and reconciles the differences. func (s *Server) reconcile(ctx context.Context) { @@ -416,170 +269,3 @@ func (s *Server) reconcile(ctx context.Context) { "Cleaned up provider pods marked for deletion", ) } - -func addPodAttributes(span *trace.Span, pod *corev1.Pod) { - span.AddAttributes( - trace.StringAttribute("uid", string(pod.UID)), - trace.StringAttribute("namespace", pod.Namespace), - trace.StringAttribute("name", pod.Name), - ) -} - -func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error { - ctx, span := trace.StartSpan(ctx, "createPod") - defer span.End() - addPodAttributes(span, pod) - - if err := s.populateSecretsAndConfigMapsInEnv(pod); err != nil { - span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: err.Error()}) - return err - } - - logger := log.G(ctx).WithField("pod", pod.Name) - - if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { - podPhase := corev1.PodPending - if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { - podPhase = corev1.PodFailed - } - - pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error - pod.Status.Phase = podPhase - pod.Status.Reason = PodStatusReason_ProviderFailed - pod.Status.Message = origErr.Error() - - _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) - if err != nil { - logger.WithError(err).Warn("Failed to update pod status") - } else { - span.Annotate(nil, "Updated k8s pod status") - } - - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: origErr.Error()}) - return origErr - } - span.Annotate(nil, "Created pod in provider") - - logger.Info("Pod created") - - return nil -} - -func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error { - ctx, span := trace.StartSpan(ctx, "deletePod") - defer span.End() - addPodAttributes(span, pod) - - var delErr error - if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) { - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: delErr.Error()}) - return delErr - } - span.Annotate(nil, "Deleted pod from provider") - - logger := log.G(ctx).WithField("pod", pod.Name) - if !errors.IsNotFound(delErr) { - var grace int64 - if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) { - if errors.IsNotFound(err) { - span.Annotate(nil, "Pod does not exist in k8s, nothing to delete") - return nil - } - - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - return fmt.Errorf("Failed to delete kubernetes pod: %s", err) - } - span.Annotate(nil, "Deleted pod from k8s") - - s.resourceManager.DeletePod(pod) - span.Annotate(nil, "Deleted pod from internal state") - logger.Info("Pod deleted") - } - - return nil -} - -// updatePodStatuses syncs the providers pod status with the kubernetes pod status. -func (s *Server) updatePodStatuses(ctx context.Context) { - ctx, span := trace.StartSpan(ctx, "updatePodStatuses") - defer span.End() - - // Update all the pods with the provider status. - pods := s.resourceManager.GetPods() - span.AddAttributes(trace.Int64Attribute("nPods", int64(len(pods)))) - - for _, pod := range pods { - if pod.Status.Phase == corev1.PodSucceeded || - pod.Status.Phase == corev1.PodFailed || - pod.Status.Reason == PodStatusReason_ProviderFailed { - continue - } - - status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name) - if err != nil { - log.G(ctx).WithField("pod", pod.Name).Error("Error retrieving pod status") - return - } - - // Update the pod's status - if status != nil { - pod.Status = *status - s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) - } - } -} - -// populateSecretsAndConfigMapsInEnv populates Secrets and ConfigMap into environment variables -func (s *Server) populateSecretsAndConfigMapsInEnv(pod *corev1.Pod) error { - for _, c := range pod.Spec.Containers { - for i, e := range c.Env { - if e.ValueFrom != nil { - // Populate ConfigMaps to Env - if e.ValueFrom.ConfigMapKeyRef != nil { - vf := e.ValueFrom.ConfigMapKeyRef - cm, err := s.resourceManager.GetConfigMap(vf.Name, pod.Namespace) - if vf.Optional != nil && !*vf.Optional && errors.IsNotFound(err) { - return fmt.Errorf("ConfigMap %s is required by Pod %s and does not exist", vf.Name, pod.Name) - } - - if err != nil { - return fmt.Errorf("Error retrieving ConfigMap %s required by Pod %s: %s", vf.Name, pod.Name, err) - } - - var ok bool - if c.Env[i].Value, ok = cm.Data[vf.Key]; !ok { - return fmt.Errorf("ConfigMap %s key %s is required by Pod %s and does not exist", vf.Name, vf.Key, pod.Name) - } - continue - } - - // Populate Secrets to Env - if e.ValueFrom.SecretKeyRef != nil { - vf := e.ValueFrom.SecretKeyRef - sec, err := s.resourceManager.GetSecret(vf.Name, pod.Namespace) - if vf.Optional != nil && !*vf.Optional && errors.IsNotFound(err) { - return fmt.Errorf("Secret %s is required by Pod %s and does not exist", vf.Name, pod.Name) - } - v, ok := sec.Data[vf.Key] - if !ok { - return fmt.Errorf("Secret %s key %s is required by Pod %s and does not exist", vf.Name, vf.Key, pod.Name) - } - c.Env[i].Value = string(v) - continue - } - - // TODO: Populate Downward API to Env - if e.ValueFrom.FieldRef != nil { - continue - } - - // TODO: Populate resource requests - if e.ValueFrom.ResourceFieldRef != nil { - continue - } - } - } - } - - return nil -}