Add support for tracing via OpenCencus
This adds a few flags for configuring the tracer. Includes support for jaeger tracing (built into OC).
This commit is contained in:
@@ -10,6 +10,8 @@ import (
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/vkubelet/api"
|
||||
"go.opencensus.io/plugin/ochttp"
|
||||
"go.opencensus.io/plugin/ochttp/propagation/b3"
|
||||
)
|
||||
|
||||
// PodHandler creates an http handler for interacting with pods/containers.
|
||||
@@ -28,14 +30,19 @@ func PodHandler(p providers.Provider) http.Handler {
|
||||
func MetricsSummaryHandler(p providers.Provider) http.Handler {
|
||||
r := mux.NewRouter()
|
||||
|
||||
const summaryRoute = "/stats/summary"
|
||||
var h http.HandlerFunc
|
||||
|
||||
mp, ok := p.(providers.PodMetricsProvider)
|
||||
if !ok {
|
||||
r.HandleFunc("/stats/summary", NotImplemented).Methods("GET")
|
||||
r.HandleFunc("/stats/summary/", NotImplemented).Methods("GET")
|
||||
h = NotImplemented
|
||||
} else {
|
||||
r.HandleFunc("/stats/summary", api.PodMetricsHandlerFunc(mp)).Methods("GET")
|
||||
r.HandleFunc("/stats/summary/", api.PodMetricsHandlerFunc(mp)).Methods("GET")
|
||||
h = api.PodMetricsHandlerFunc(mp)
|
||||
}
|
||||
|
||||
r.Handle(summaryRoute, ochttp.WithRouteTag(h, "PodStatsSummaryHandler")).Methods("GET")
|
||||
r.Handle(summaryRoute+"/", ochttp.WithRouteTag(h, "PodStatsSummaryHandler")).Methods("GET")
|
||||
|
||||
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
||||
return r
|
||||
}
|
||||
@@ -54,22 +61,27 @@ func MetricsServerStart(p providers.Provider, l net.Listener) {
|
||||
}
|
||||
}
|
||||
|
||||
func instrumentRequest(r *http.Request) context.Context {
|
||||
func instrumentRequest(r *http.Request) *http.Request {
|
||||
ctx := r.Context()
|
||||
logger := log.G(ctx).WithFields(logrus.Fields{
|
||||
"uri": r.RequestURI,
|
||||
"vars": mux.Vars(r),
|
||||
})
|
||||
return log.WithLogger(ctx, logger)
|
||||
ctx = log.WithLogger(ctx, logger)
|
||||
|
||||
return r.WithContext(ctx)
|
||||
}
|
||||
|
||||
// InstrumentHandler wraps an http.Handler and injects instrumentation into the request context.
|
||||
func InstrumentHandler(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
ctx := instrumentRequest(req)
|
||||
req = req.WithContext(ctx)
|
||||
instrumented := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
req = instrumentRequest(req)
|
||||
h.ServeHTTP(w, req)
|
||||
})
|
||||
return &ochttp.Handler{
|
||||
Handler: instrumented,
|
||||
Propagation: &b3.HTTPFormat{},
|
||||
}
|
||||
}
|
||||
|
||||
// NotFound provides a handler for cases where the requested endpoint doesn't exist
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
"go.opencensus.io/trace"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -102,8 +103,10 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
|
||||
|
||||
go func() {
|
||||
for range tick {
|
||||
ctx, span := trace.StartSpan(ctx, "reconciliationTick")
|
||||
s.updateNode(ctx)
|
||||
s.updatePodStatuses(ctx)
|
||||
span.End()
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -112,6 +115,9 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
|
||||
|
||||
// registerNode registers this virtual node with the Kubernetes API.
|
||||
func (s *Server) registerNode(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "registerNode")
|
||||
defer span.End()
|
||||
|
||||
taints := make([]corev1.Taint, 0)
|
||||
|
||||
if s.taint != nil {
|
||||
@@ -145,10 +151,12 @@ func (s *Server) registerNode(ctx context.Context) error {
|
||||
DaemonEndpoints: *s.provider.NodeDaemonEndpoints(ctx),
|
||||
},
|
||||
}
|
||||
|
||||
addNodeAttributes(span, node)
|
||||
if _, err := s.k8sClient.CoreV1().Nodes().Create(node); err != nil && !errors.IsAlreadyExists(err) {
|
||||
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
|
||||
return err
|
||||
}
|
||||
span.Annotate(nil, "Registered node with k8s")
|
||||
|
||||
log.G(ctx).Info("Registered node")
|
||||
|
||||
@@ -219,6 +227,7 @@ func (s *Server) Run(ctx context.Context) error {
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Stop shutsdown the server.
|
||||
@@ -229,18 +238,52 @@ func (s *Server) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
type taintsStringer []corev1.Taint
|
||||
|
||||
func (t taintsStringer) String() string {
|
||||
var s string
|
||||
for _, taint := range t {
|
||||
if s == "" {
|
||||
s = taint.Key + "=" + taint.Value + ":" + string(taint.Effect)
|
||||
} else {
|
||||
s += ", " + taint.Key + "=" + taint.Value + ":" + string(taint.Effect)
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func addNodeAttributes(span *trace.Span, n *corev1.Node) {
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("UID", string(n.UID)),
|
||||
trace.StringAttribute("name", n.Name),
|
||||
trace.StringAttribute("cluster", n.ClusterName),
|
||||
)
|
||||
if span.IsRecordingEvents() {
|
||||
span.AddAttributes(trace.StringAttribute("taints", taintsStringer(n.Spec.Taints).String()))
|
||||
}
|
||||
}
|
||||
|
||||
// updateNode updates the node status within Kubernetes with updated NodeConditions.
|
||||
func (s *Server) updateNode(ctx context.Context) {
|
||||
ctx, span := trace.StartSpan(ctx, "updateNode")
|
||||
defer span.End()
|
||||
|
||||
opts := metav1.GetOptions{}
|
||||
n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts)
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
log.G(ctx).WithError(err).Error("Failed to retrieve node")
|
||||
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
|
||||
return
|
||||
}
|
||||
addNodeAttributes(span, n)
|
||||
span.Annotate(nil, "Fetched node details from k8s")
|
||||
|
||||
if errors.IsNotFound(err) {
|
||||
if err = s.registerNode(ctx); err != nil {
|
||||
log.G(ctx).WithError(err).Error("Failed to register node")
|
||||
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
|
||||
} else {
|
||||
span.Annotate(nil, "Registered node in k8s")
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -257,6 +300,7 @@ func (s *Server) updateNode(ctx context.Context) {
|
||||
n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("Failed to update node")
|
||||
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -264,6 +308,9 @@ func (s *Server) updateNode(ctx context.Context) {
|
||||
// reconcile is the main reconciliation loop that compares differences between Kubernetes and
|
||||
// the active provider and reconciles the differences.
|
||||
func (s *Server) reconcile(ctx context.Context) {
|
||||
ctx, span := trace.StartSpan(ctx, "reconcile")
|
||||
defer span.End()
|
||||
|
||||
logger := log.G(ctx)
|
||||
logger.Debug("Start reconcile")
|
||||
defer logger.Debug("End reconcile")
|
||||
@@ -274,22 +321,39 @@ func (s *Server) reconcile(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
var deletePods []*corev1.Pod
|
||||
for _, pod := range providerPods {
|
||||
// Delete pods that don't exist in Kubernetes
|
||||
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil {
|
||||
logger := logger.WithField("pod", pod.Name)
|
||||
logger.Debug("Deleting pod '%s'\n", pod.Name)
|
||||
if err := s.deletePod(ctx, pod); err != nil {
|
||||
logger.WithError(err).Error("Error deleting pod")
|
||||
continue
|
||||
}
|
||||
deletePods = append(deletePods, pod)
|
||||
}
|
||||
}
|
||||
span.Annotate(nil, "Got provider pods")
|
||||
|
||||
// Create any pods for k8s pods that don't exist in the provider
|
||||
pods := s.resourceManager.GetPods()
|
||||
for _, pod := range pods {
|
||||
var failedDeleteCount int64
|
||||
for _, pod := range deletePods {
|
||||
logger := logger.WithField("pod", pod.Name)
|
||||
logger.Debug("Deleting pod '%s'\n", pod.Name)
|
||||
if err := s.deletePod(ctx, pod); err != nil {
|
||||
logger.WithError(err).Error("Error deleting pod")
|
||||
failedDeleteCount++
|
||||
continue
|
||||
}
|
||||
}
|
||||
span.Annotate(
|
||||
[]trace.Attribute{
|
||||
trace.Int64Attribute("expected_delete_pods_count", int64(len(deletePods))),
|
||||
trace.Int64Attribute("failed_delete_pods_count", failedDeleteCount),
|
||||
},
|
||||
"Cleaned up stale provider pods",
|
||||
)
|
||||
|
||||
pods := s.resourceManager.GetPods()
|
||||
|
||||
var createPods []*corev1.Pod
|
||||
cleanupPods := deletePods[:0]
|
||||
|
||||
for _, pod := range pods {
|
||||
var providerPod *corev1.Pod
|
||||
for _, p := range providerPods {
|
||||
if p.Namespace == pod.Namespace && p.Name == pod.Name {
|
||||
@@ -298,33 +362,76 @@ func (s *Server) reconcile(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// Delete pod if DeletionTimestamp is set
|
||||
if pod.DeletionTimestamp != nil {
|
||||
cleanupPods = append(cleanupPods, pod)
|
||||
continue
|
||||
}
|
||||
|
||||
if providerPod == nil &&
|
||||
pod.DeletionTimestamp == nil &&
|
||||
pod.Status.Phase != corev1.PodSucceeded &&
|
||||
pod.Status.Phase != corev1.PodFailed &&
|
||||
pod.Status.Reason != PodStatusReason_ProviderFailed {
|
||||
logger.Debug("Creating pod")
|
||||
if err := s.createPod(ctx, pod); err != nil {
|
||||
logger.WithError(err).Error("Error creating pod")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Delete pod if DeletionTimestamp is set
|
||||
if pod.DeletionTimestamp != nil {
|
||||
log.Trace(logger, "Pod pending deletion")
|
||||
var err error
|
||||
if err = s.deletePod(ctx, pod); err != nil {
|
||||
logger.WithError(err).Error("Error deleting pod")
|
||||
continue
|
||||
}
|
||||
log.Trace(logger, "Pod deletion complete")
|
||||
createPods = append(createPods, pod)
|
||||
}
|
||||
}
|
||||
|
||||
var failedCreateCount int64
|
||||
for _, pod := range createPods {
|
||||
logger := logger.WithField("pod", pod.Name)
|
||||
logger.Debug("Creating pod")
|
||||
if err := s.createPod(ctx, pod); err != nil {
|
||||
failedCreateCount++
|
||||
logger.WithError(err).Error("Error creating pod")
|
||||
continue
|
||||
}
|
||||
}
|
||||
span.Annotate(
|
||||
[]trace.Attribute{
|
||||
trace.Int64Attribute("expected_created_pods", int64(len(createPods))),
|
||||
trace.Int64Attribute("failed_pod_creates", failedCreateCount),
|
||||
},
|
||||
"Created pods in provider",
|
||||
)
|
||||
|
||||
var failedCleanupCount int64
|
||||
for _, pod := range cleanupPods {
|
||||
logger := logger.WithField("pod", pod.Name)
|
||||
log.Trace(logger, "Pod pending deletion")
|
||||
var err error
|
||||
if err = s.deletePod(ctx, pod); err != nil {
|
||||
logger.WithError(err).Error("Error deleting pod")
|
||||
failedCleanupCount++
|
||||
continue
|
||||
}
|
||||
log.Trace(logger, "Pod deletion complete")
|
||||
}
|
||||
|
||||
span.Annotate(
|
||||
[]trace.Attribute{
|
||||
trace.Int64Attribute("expected_cleaned_up_pods", int64(len(cleanupPods))),
|
||||
trace.Int64Attribute("cleaned_up_pod_failures", failedCleanupCount),
|
||||
},
|
||||
"Cleaned up provider pods marked for deletion",
|
||||
)
|
||||
}
|
||||
|
||||
func addPodAttributes(span *trace.Span, pod *corev1.Pod) {
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("uid", string(pod.UID)),
|
||||
trace.StringAttribute("namespace", pod.Namespace),
|
||||
trace.StringAttribute("name", pod.Name),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
|
||||
ctx, span := trace.StartSpan(ctx, "createPod")
|
||||
defer span.End()
|
||||
addPodAttributes(span, pod)
|
||||
|
||||
if err := s.populateSecretsAndConfigMapsInEnv(pod); err != nil {
|
||||
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: err.Error()})
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -344,10 +451,14 @@ func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
|
||||
_, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warn("Failed to update pod status")
|
||||
} else {
|
||||
span.Annotate(nil, "Updated k8s pod status")
|
||||
}
|
||||
|
||||
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: origErr.Error()})
|
||||
return origErr
|
||||
}
|
||||
span.Annotate(nil, "Created pod in provider")
|
||||
|
||||
logger.Info("Pod created")
|
||||
|
||||
@@ -355,24 +466,33 @@ func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
|
||||
}
|
||||
|
||||
func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
ctx, span := trace.StartSpan(ctx, "deletePod")
|
||||
defer span.End()
|
||||
addPodAttributes(span, pod)
|
||||
|
||||
var delErr error
|
||||
if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) {
|
||||
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: delErr.Error()})
|
||||
return delErr
|
||||
}
|
||||
span.Annotate(nil, "Deleted pod from provider")
|
||||
|
||||
logger := log.G(ctx).WithField("pod", pod.Name)
|
||||
if !errors.IsNotFound(delErr) {
|
||||
var grace int64
|
||||
if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
|
||||
if errors.IsNotFound(err) {
|
||||
logger.Error("Pod doesn't exist")
|
||||
span.Annotate(nil, "Pod does not exist in k8s, nothing to delete")
|
||||
return nil
|
||||
}
|
||||
|
||||
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
|
||||
return fmt.Errorf("Failed to delete kubernetes pod: %s", err)
|
||||
}
|
||||
span.Annotate(nil, "Deleted pod from k8s")
|
||||
|
||||
s.resourceManager.DeletePod(pod)
|
||||
span.Annotate(nil, "Deleted pod from internal state")
|
||||
logger.Info("Pod deleted")
|
||||
}
|
||||
|
||||
@@ -381,8 +501,13 @@ func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
|
||||
// updatePodStatuses syncs the providers pod status with the kubernetes pod status.
|
||||
func (s *Server) updatePodStatuses(ctx context.Context) {
|
||||
ctx, span := trace.StartSpan(ctx, "updatePodStatuses")
|
||||
defer span.End()
|
||||
|
||||
// Update all the pods with the provider status.
|
||||
pods := s.resourceManager.GetPods()
|
||||
span.AddAttributes(trace.Int64Attribute("nPods", int64(len(pods))))
|
||||
|
||||
for _, pod := range pods {
|
||||
if pod.Status.Phase == corev1.PodSucceeded ||
|
||||
pod.Status.Phase == corev1.PodFailed ||
|
||||
|
||||
Reference in New Issue
Block a user