Minor refactorings (#368)

* Split vkubelet funcitons into separate files.

* Minor re-org for cmd/census*

* refactor run loop
This commit is contained in:
Brian Goff
2018-10-12 17:36:37 -07:00
committed by Robbie Zhang
parent d710e0391c
commit c1fe923131
7 changed files with 387 additions and 330 deletions

View File

@@ -14,6 +14,7 @@ func init() {
RegisterTracingExporter("jaeger", NewJaegerExporter) RegisterTracingExporter("jaeger", NewJaegerExporter)
} }
// NewJaegerExporter creates a new opencensus tracing exporter.
func NewJaegerExporter(opts TracingExporterOptions) (trace.Exporter, error) { func NewJaegerExporter(opts TracingExporterOptions) (trace.Exporter, error) {
jOpts := jaeger.Options{ jOpts := jaeger.Options{
Endpoint: os.Getenv("JAEGER_ENDPOINT"), Endpoint: os.Getenv("JAEGER_ENDPOINT"),

View File

@@ -18,9 +18,11 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"os/signal"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"syscall"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/cpuguy83/strongerrors" "github.com/cpuguy83/strongerrors"
@@ -71,7 +73,8 @@ var RootCmd = &cobra.Command{
backend implementation allowing users to create kubernetes nodes without running the kubelet. backend implementation allowing users to create kubernetes nodes without running the kubelet.
This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
ctx := context.Background() ctx, cancel := context.WithCancel(context.Background())
f, err := vkubelet.New(ctx, vkubelet.Config{ f, err := vkubelet.New(ctx, vkubelet.Config{
Client: k8sClient, Client: k8sClient,
Namespace: kubeNamespace, Namespace: kubeNamespace,
@@ -85,7 +88,16 @@ This allows users to schedule kubernetes workloads on nodes that aren't running
if err != nil { if err != nil {
log.L.WithError(err).Fatal("Error initializing virtual kubelet") log.L.WithError(err).Fatal("Error initializing virtual kubelet")
} }
if err := f.Run(ctx); err != nil {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sig
cancel()
f.Stop()
}()
if err := f.Run(ctx); err != nil && errors.Cause(err) != context.Canceled {
log.L.Fatal(err) log.L.Fatal(err)
} }
}, },

63
vkubelet/env.go Normal file
View File

@@ -0,0 +1,63 @@
package vkubelet
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
)
// populateEnvironmentVariables populates Secrets and ConfigMap into environment variables
func (s *Server) populateEnvironmentVariables(pod *corev1.Pod) error {
for _, c := range pod.Spec.Containers {
for i, e := range c.Env {
if e.ValueFrom != nil {
// Populate ConfigMaps to Env
if e.ValueFrom.ConfigMapKeyRef != nil {
vf := e.ValueFrom.ConfigMapKeyRef
cm, err := s.resourceManager.GetConfigMap(vf.Name, pod.Namespace)
if vf.Optional != nil && !*vf.Optional && errors.IsNotFound(err) {
return fmt.Errorf("ConfigMap %s is required by Pod %s and does not exist", vf.Name, pod.Name)
}
if err != nil {
return fmt.Errorf("Error retrieving ConfigMap %s required by Pod %s: %s", vf.Name, pod.Name, err)
}
var ok bool
if c.Env[i].Value, ok = cm.Data[vf.Key]; !ok {
return fmt.Errorf("ConfigMap %s key %s is required by Pod %s and does not exist", vf.Name, vf.Key, pod.Name)
}
continue
}
// Populate Secrets to Env
if e.ValueFrom.SecretKeyRef != nil {
vf := e.ValueFrom.SecretKeyRef
sec, err := s.resourceManager.GetSecret(vf.Name, pod.Namespace)
if vf.Optional != nil && !*vf.Optional && errors.IsNotFound(err) {
return fmt.Errorf("Secret %s is required by Pod %s and does not exist", vf.Name, pod.Name)
}
v, ok := sec.Data[vf.Key]
if !ok {
return fmt.Errorf("Secret %s key %s is required by Pod %s and does not exist", vf.Name, vf.Key, pod.Name)
}
c.Env[i].Value = string(v)
continue
}
// TODO: Populate Downward API to Env
if e.ValueFrom.FieldRef != nil {
continue
}
// TODO: Populate resource requests
if e.ValueFrom.ResourceFieldRef != nil {
continue
}
}
}
}
return nil
}

129
vkubelet/node.go Normal file
View File

@@ -0,0 +1,129 @@
package vkubelet
import (
"context"
"strings"
"github.com/virtual-kubelet/virtual-kubelet/log"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// registerNode registers this virtual node with the Kubernetes API.
func (s *Server) registerNode(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "registerNode")
defer span.End()
taints := make([]corev1.Taint, 0)
if s.taint != nil {
taints = append(taints, *s.taint)
}
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: s.nodeName,
Labels: map[string]string{
"type": "virtual-kubelet",
"kubernetes.io/role": "agent",
"beta.kubernetes.io/os": strings.ToLower(s.provider.OperatingSystem()),
"kubernetes.io/hostname": s.nodeName,
"alpha.service-controller.kubernetes.io/exclude-balancer": "true",
},
},
Spec: corev1.NodeSpec{
Taints: taints,
},
Status: corev1.NodeStatus{
NodeInfo: corev1.NodeSystemInfo{
OperatingSystem: s.provider.OperatingSystem(),
Architecture: "amd64",
KubeletVersion: "v1.11.2",
},
Capacity: s.provider.Capacity(ctx),
Allocatable: s.provider.Capacity(ctx),
Conditions: s.provider.NodeConditions(ctx),
Addresses: s.provider.NodeAddresses(ctx),
DaemonEndpoints: *s.provider.NodeDaemonEndpoints(ctx),
},
}
addNodeAttributes(span, node)
if _, err := s.k8sClient.CoreV1().Nodes().Create(node); err != nil && !errors.IsAlreadyExists(err) {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
return err
}
span.Annotate(nil, "Registered node with k8s")
log.G(ctx).Info("Registered node")
return nil
}
// updateNode updates the node status within Kubernetes with updated NodeConditions.
func (s *Server) updateNode(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "updateNode")
defer span.End()
opts := metav1.GetOptions{}
n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts)
if err != nil && !errors.IsNotFound(err) {
log.G(ctx).WithError(err).Error("Failed to retrieve node")
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
return
}
addNodeAttributes(span, n)
span.Annotate(nil, "Fetched node details from k8s")
if errors.IsNotFound(err) {
if err = s.registerNode(ctx); err != nil {
log.G(ctx).WithError(err).Error("Failed to register node")
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
} else {
span.Annotate(nil, "Registered node in k8s")
}
return
}
n.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
n.Status.Conditions = s.provider.NodeConditions(ctx)
capacity := s.provider.Capacity(ctx)
n.Status.Capacity = capacity
n.Status.Allocatable = capacity
n.Status.Addresses = s.provider.NodeAddresses(ctx)
n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n)
if err != nil {
log.G(ctx).WithError(err).Error("Failed to update node")
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
return
}
}
type taintsStringer []corev1.Taint
func (t taintsStringer) String() string {
var s string
for _, taint := range t {
if s == "" {
s = taint.Key + "=" + taint.Value + ":" + string(taint.Effect)
} else {
s += ", " + taint.Key + "=" + taint.Value + ":" + string(taint.Effect)
}
}
return s
}
func addNodeAttributes(span *trace.Span, n *corev1.Node) {
span.AddAttributes(
trace.StringAttribute("UID", string(n.UID)),
trace.StringAttribute("name", n.Name),
trace.StringAttribute("cluster", n.ClusterName),
)
if span.IsRecordingEvents() {
span.AddAttributes(trace.StringAttribute("taints", taintsStringer(n.Spec.Taints).String()))
}
}

166
vkubelet/pod.go Normal file
View File

@@ -0,0 +1,166 @@
package vkubelet
import (
"context"
"fmt"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)
func addPodAttributes(span *trace.Span, pod *corev1.Pod) {
span.AddAttributes(
trace.StringAttribute("uid", string(pod.UID)),
trace.StringAttribute("namespace", pod.Namespace),
trace.StringAttribute("name", pod.Name),
)
}
func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "createPod")
defer span.End()
addPodAttributes(span, pod)
if err := s.populateEnvironmentVariables(pod); err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: err.Error()})
return err
}
logger := log.G(ctx).WithField("pod", pod.Name)
if origErr := s.provider.CreatePod(ctx, pod); origErr != nil {
podPhase := corev1.PodPending
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
podPhase = corev1.PodFailed
}
pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
pod.Status.Phase = podPhase
pod.Status.Reason = PodStatusReason_ProviderFailed
pod.Status.Message = origErr.Error()
_, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
if err != nil {
logger.WithError(err).Warn("Failed to update pod status")
} else {
span.Annotate(nil, "Updated k8s pod status")
}
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: origErr.Error()})
return origErr
}
span.Annotate(nil, "Created pod in provider")
logger.Info("Pod created")
return nil
}
func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "deletePod")
defer span.End()
addPodAttributes(span, pod)
var delErr error
if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: delErr.Error()})
return delErr
}
span.Annotate(nil, "Deleted pod from provider")
logger := log.G(ctx).WithField("pod", pod.Name).WithField("namespace", pod.Namespace)
if !errors.IsNotFound(delErr) {
var grace int64
if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
if errors.IsNotFound(err) {
span.Annotate(nil, "Pod does not exist in k8s, nothing to delete")
return nil
}
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
return fmt.Errorf("Failed to delete kubernetes pod: %s", err)
}
span.Annotate(nil, "Deleted pod from k8s")
s.resourceManager.DeletePod(pod)
span.Annotate(nil, "Deleted pod from internal state")
logger.Info("Pod deleted")
}
return nil
}
// updatePodStatuses syncs the providers pod status with the kubernetes pod status.
func (s *Server) updatePodStatuses(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "updatePodStatuses")
defer span.End()
// Update all the pods with the provider status.
pods := s.resourceManager.GetPods()
span.AddAttributes(trace.Int64Attribute("nPods", int64(len(pods))))
for _, pod := range pods {
if pod.Status.Phase == corev1.PodSucceeded ||
pod.Status.Phase == corev1.PodFailed ||
pod.Status.Reason == PodStatusReason_ProviderFailed {
continue
}
status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name)
if err != nil {
log.G(ctx).WithField("pod", pod.Name).WithField("namespace", pod.Namespace).Error("Error retrieving pod status")
return
}
// Update the pod's status
if status != nil {
pod.Status = *status
s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
}
}
}
// watchForPodEvent waits for pod changes from kubernetes and updates the details accordingly in the local state.
// This returns after a single pod event.
func (s *Server) watchForPodEvent(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case ev, ok := <-s.podWatcher.ResultChan():
if !ok {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return pkgerrors.New("pod watcher connection is closed unexpectedly")
}
ctx, span := trace.StartSpan(ctx, "updateLocalPod")
defer span.End()
span.AddAttributes(trace.StringAttribute("PodEventType", string(ev.Type)))
log.G(ctx).WithField("type", ev.Type).Debug("Pod watcher event is received")
reconcile := false
switch ev.Type {
case watch.Added:
reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
case watch.Modified:
reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
case watch.Deleted:
reconcile = s.resourceManager.DeletePod(ev.Object.(*corev1.Pod))
}
if reconcile {
span.Annotate(nil, "reconciling")
s.reconcile(ctx)
}
}
}
}

View File

@@ -2,12 +2,7 @@ package vkubelet
import ( import (
"context" "context"
"fmt"
"net" "net"
"os"
"os/signal"
"strings"
"syscall"
"time" "time"
pkgerrors "github.com/pkg/errors" pkgerrors "github.com/pkg/errors"
@@ -16,7 +11,6 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers"
"go.opencensus.io/trace" "go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@@ -78,6 +72,7 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
apiL.Close() apiL.Close()
} }
}() }()
go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath) go KubeletServerStart(cfg.Provider, apiL, cfg.APIConfig.CertPath, cfg.APIConfig.KeyPath)
if cfg.MetricsAddr != "" { if cfg.MetricsAddr != "" {
@@ -103,7 +98,7 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
go func() { go func() {
for range tick { for range tick {
ctx, span := trace.StartSpan(ctx, "reconciliationTick") ctx, span := trace.StartSpan(ctx, "syncActualState")
s.updateNode(ctx) s.updateNode(ctx)
s.updatePodStatuses(ctx) s.updatePodStatuses(ctx)
span.End() span.End()
@@ -113,70 +108,16 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
return s, nil return s, nil
} }
// registerNode registers this virtual node with the Kubernetes API.
func (s *Server) registerNode(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "registerNode")
defer span.End()
taints := make([]corev1.Taint, 0)
if s.taint != nil {
taints = append(taints, *s.taint)
}
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: s.nodeName,
Labels: map[string]string{
"type": "virtual-kubelet",
"kubernetes.io/role": "agent",
"beta.kubernetes.io/os": strings.ToLower(s.provider.OperatingSystem()),
"kubernetes.io/hostname": s.nodeName,
"alpha.service-controller.kubernetes.io/exclude-balancer": "true",
},
},
Spec: corev1.NodeSpec{
Taints: taints,
},
Status: corev1.NodeStatus{
NodeInfo: corev1.NodeSystemInfo{
OperatingSystem: s.provider.OperatingSystem(),
Architecture: "amd64",
KubeletVersion: "v1.11.2",
},
Capacity: s.provider.Capacity(ctx),
Allocatable: s.provider.Capacity(ctx),
Conditions: s.provider.NodeConditions(ctx),
Addresses: s.provider.NodeAddresses(ctx),
DaemonEndpoints: *s.provider.NodeDaemonEndpoints(ctx),
},
}
addNodeAttributes(span, node)
if _, err := s.k8sClient.CoreV1().Nodes().Create(node); err != nil && !errors.IsAlreadyExists(err) {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
return err
}
span.Annotate(nil, "Registered node with k8s")
log.G(ctx).Info("Registered node")
return nil
}
// Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster. // Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster.
// Run will block until Stop is called or a SIGINT or SIGTERM signal is received. // Run will block until Stop is called or a SIGINT or SIGTERM signal is received.
func (s *Server) Run(ctx context.Context) error { func (s *Server) Run(ctx context.Context) error {
shouldStop := false
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sig
shouldStop = true
s.Stop()
}()
for { for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
opts := metav1.ListOptions{ opts := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", s.nodeName).String(), FieldSelector: fields.OneTermEqualSelector("spec.nodeName", s.nodeName).String(),
} }
@@ -194,40 +135,19 @@ func (s *Server) Run(ctx context.Context) error {
return pkgerrors.Wrap(err, "failed to watch pods") return pkgerrors.Wrap(err, "failed to watch pods")
} }
loop:
for {
select {
case ev, ok := <-s.podWatcher.ResultChan():
if !ok {
if shouldStop {
log.G(ctx).Info("Pod watcher is stopped")
return nil
}
log.G(ctx).Error("Pod watcher connection is closed unexpectedly") if err := s.watchForPodEvent(ctx); err != nil {
break loop if pkgerrors.Cause(err) == context.Canceled {
} return err
log.G(ctx).WithField("type", ev.Type).Debug("Pod watcher event is received")
reconcile := false
switch ev.Type {
case watch.Added:
reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
case watch.Modified:
reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
case watch.Deleted:
reconcile = s.resourceManager.DeletePod(ev.Object.(*corev1.Pod))
}
if reconcile {
s.reconcile(ctx)
}
} }
log.G(ctx).Error(err)
break
} }
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }
return nil
} }
// Stop shutsdown the server. // Stop shutsdown the server.
@@ -238,73 +158,6 @@ func (s *Server) Stop() {
} }
} }
type taintsStringer []corev1.Taint
func (t taintsStringer) String() string {
var s string
for _, taint := range t {
if s == "" {
s = taint.Key + "=" + taint.Value + ":" + string(taint.Effect)
} else {
s += ", " + taint.Key + "=" + taint.Value + ":" + string(taint.Effect)
}
}
return s
}
func addNodeAttributes(span *trace.Span, n *corev1.Node) {
span.AddAttributes(
trace.StringAttribute("UID", string(n.UID)),
trace.StringAttribute("name", n.Name),
trace.StringAttribute("cluster", n.ClusterName),
)
if span.IsRecordingEvents() {
span.AddAttributes(trace.StringAttribute("taints", taintsStringer(n.Spec.Taints).String()))
}
}
// updateNode updates the node status within Kubernetes with updated NodeConditions.
func (s *Server) updateNode(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "updateNode")
defer span.End()
opts := metav1.GetOptions{}
n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts)
if err != nil && !errors.IsNotFound(err) {
log.G(ctx).WithError(err).Error("Failed to retrieve node")
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
return
}
addNodeAttributes(span, n)
span.Annotate(nil, "Fetched node details from k8s")
if errors.IsNotFound(err) {
if err = s.registerNode(ctx); err != nil {
log.G(ctx).WithError(err).Error("Failed to register node")
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
} else {
span.Annotate(nil, "Registered node in k8s")
}
return
}
n.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
n.Status.Conditions = s.provider.NodeConditions(ctx)
capacity := s.provider.Capacity(ctx)
n.Status.Capacity = capacity
n.Status.Allocatable = capacity
n.Status.Addresses = s.provider.NodeAddresses(ctx)
n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n)
if err != nil {
log.G(ctx).WithError(err).Error("Failed to update node")
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
return
}
}
// reconcile is the main reconciliation loop that compares differences between Kubernetes and // reconcile is the main reconciliation loop that compares differences between Kubernetes and
// the active provider and reconciles the differences. // the active provider and reconciles the differences.
func (s *Server) reconcile(ctx context.Context) { func (s *Server) reconcile(ctx context.Context) {
@@ -416,170 +269,3 @@ func (s *Server) reconcile(ctx context.Context) {
"Cleaned up provider pods marked for deletion", "Cleaned up provider pods marked for deletion",
) )
} }
func addPodAttributes(span *trace.Span, pod *corev1.Pod) {
span.AddAttributes(
trace.StringAttribute("uid", string(pod.UID)),
trace.StringAttribute("namespace", pod.Namespace),
trace.StringAttribute("name", pod.Name),
)
}
func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "createPod")
defer span.End()
addPodAttributes(span, pod)
if err := s.populateSecretsAndConfigMapsInEnv(pod); err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: err.Error()})
return err
}
logger := log.G(ctx).WithField("pod", pod.Name)
if origErr := s.provider.CreatePod(ctx, pod); origErr != nil {
podPhase := corev1.PodPending
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
podPhase = corev1.PodFailed
}
pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
pod.Status.Phase = podPhase
pod.Status.Reason = PodStatusReason_ProviderFailed
pod.Status.Message = origErr.Error()
_, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
if err != nil {
logger.WithError(err).Warn("Failed to update pod status")
} else {
span.Annotate(nil, "Updated k8s pod status")
}
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: origErr.Error()})
return origErr
}
span.Annotate(nil, "Created pod in provider")
logger.Info("Pod created")
return nil
}
func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "deletePod")
defer span.End()
addPodAttributes(span, pod)
var delErr error
if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: delErr.Error()})
return delErr
}
span.Annotate(nil, "Deleted pod from provider")
logger := log.G(ctx).WithField("pod", pod.Name)
if !errors.IsNotFound(delErr) {
var grace int64
if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
if errors.IsNotFound(err) {
span.Annotate(nil, "Pod does not exist in k8s, nothing to delete")
return nil
}
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
return fmt.Errorf("Failed to delete kubernetes pod: %s", err)
}
span.Annotate(nil, "Deleted pod from k8s")
s.resourceManager.DeletePod(pod)
span.Annotate(nil, "Deleted pod from internal state")
logger.Info("Pod deleted")
}
return nil
}
// updatePodStatuses syncs the providers pod status with the kubernetes pod status.
func (s *Server) updatePodStatuses(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "updatePodStatuses")
defer span.End()
// Update all the pods with the provider status.
pods := s.resourceManager.GetPods()
span.AddAttributes(trace.Int64Attribute("nPods", int64(len(pods))))
for _, pod := range pods {
if pod.Status.Phase == corev1.PodSucceeded ||
pod.Status.Phase == corev1.PodFailed ||
pod.Status.Reason == PodStatusReason_ProviderFailed {
continue
}
status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name)
if err != nil {
log.G(ctx).WithField("pod", pod.Name).Error("Error retrieving pod status")
return
}
// Update the pod's status
if status != nil {
pod.Status = *status
s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
}
}
}
// populateSecretsAndConfigMapsInEnv populates Secrets and ConfigMap into environment variables
func (s *Server) populateSecretsAndConfigMapsInEnv(pod *corev1.Pod) error {
for _, c := range pod.Spec.Containers {
for i, e := range c.Env {
if e.ValueFrom != nil {
// Populate ConfigMaps to Env
if e.ValueFrom.ConfigMapKeyRef != nil {
vf := e.ValueFrom.ConfigMapKeyRef
cm, err := s.resourceManager.GetConfigMap(vf.Name, pod.Namespace)
if vf.Optional != nil && !*vf.Optional && errors.IsNotFound(err) {
return fmt.Errorf("ConfigMap %s is required by Pod %s and does not exist", vf.Name, pod.Name)
}
if err != nil {
return fmt.Errorf("Error retrieving ConfigMap %s required by Pod %s: %s", vf.Name, pod.Name, err)
}
var ok bool
if c.Env[i].Value, ok = cm.Data[vf.Key]; !ok {
return fmt.Errorf("ConfigMap %s key %s is required by Pod %s and does not exist", vf.Name, vf.Key, pod.Name)
}
continue
}
// Populate Secrets to Env
if e.ValueFrom.SecretKeyRef != nil {
vf := e.ValueFrom.SecretKeyRef
sec, err := s.resourceManager.GetSecret(vf.Name, pod.Namespace)
if vf.Optional != nil && !*vf.Optional && errors.IsNotFound(err) {
return fmt.Errorf("Secret %s is required by Pod %s and does not exist", vf.Name, pod.Name)
}
v, ok := sec.Data[vf.Key]
if !ok {
return fmt.Errorf("Secret %s key %s is required by Pod %s and does not exist", vf.Name, vf.Key, pod.Name)
}
c.Env[i].Value = string(v)
continue
}
// TODO: Populate Downward API to Env
if e.ValueFrom.FieldRef != nil {
continue
}
// TODO: Populate resource requests
if e.ValueFrom.ResourceFieldRef != nil {
continue
}
}
}
}
return nil
}