From 1bfffa975ecde4ac264cd9024baee0461ae518e1 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 22 Feb 2019 11:36:03 -0800 Subject: [PATCH] Make tracing interface to coalesce logging/tracing (#519) * Define and use an interface for logging. This allows alternative implementations to use whatever logging package they want. Currently the interface just mimicks what logrus already implements, with minor modifications to not rely on logrus itself. I think the interface is pretty solid in terms of logging implementations being able to do what they need to. * Make tracing interface to coalesce logging/tracing Allows us to share data between the tracer and the logger so we can simplify log/trace handling wher we generally want data to go both places. --- cmd/root.go | 44 +++--- log/log.go | 76 +++++---- log/logrus/logrus.go | 34 ++++ log/logrus/logrus_test.go | 16 ++ log/nop.go | 18 +++ providers/azure/aci.go | 30 ++-- providers/azure/acsCredential.go | 4 +- providers/azure/metrics.go | 39 +++-- providers/mock/mock.go | 47 +++--- trace/nop.go | 23 +++ trace/opencensus/opencensus.go | 236 ++++++++++++++++++++++++++++ trace/opencensus/opencensus_test.go | 13 ++ trace/trace.go | 58 +++++++ vkubelet/api/helpers.go | 2 +- vkubelet/apiserver.go | 9 +- vkubelet/node.go | 40 +++-- vkubelet/pod.go | 90 ++++++----- vkubelet/podcontroller.go | 14 +- 18 files changed, 605 insertions(+), 188 deletions(-) create mode 100644 log/logrus/logrus.go create mode 100644 log/logrus/logrus_test.go create mode 100644 log/nop.go create mode 100644 trace/nop.go create mode 100644 trace/opencensus/opencensus.go create mode 100644 trace/opencensus/opencensus_test.go create mode 100644 trace/trace.go diff --git a/cmd/root.go b/cmd/root.go index 82f70d780..826963c7c 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -30,19 +30,21 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" - "go.opencensus.io/trace" + "github.com/virtual-kubelet/virtual-kubelet/log" + logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" + "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/register" + "github.com/virtual-kubelet/virtual-kubelet/trace" + "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet" + octrace "go.opencensus.io/trace" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kubeinformers "k8s.io/client-go/informers" corev1informers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" - - "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers" - "github.com/virtual-kubelet/virtual-kubelet/providers/register" - "github.com/virtual-kubelet/virtual-kubelet/vkubelet" ) const ( @@ -161,6 +163,10 @@ func (mv mapVar) Type() string { } func init() { + // make sure the default logger/tracer is initialized + log.L = logruslogger.FromLogrus(logrus.NewEntry(logrus.StandardLogger())) + trace.T = opencensus.Adapter{} + cobra.OnInitialize(initConfig) // read default node name from environment variable. @@ -242,19 +248,21 @@ func initConfig() { log.G(context.TODO()).WithField("OperatingSystem", operatingSystem).Fatalf("Operating system not supported. Valid options are: %s", strings.Join(providers.ValidOperatingSystems.Names(), " | ")) } - level, err := log.ParseLevel(logLevel) + level, err := logrus.ParseLevel(logLevel) if err != nil { log.G(context.TODO()).WithField("logLevel", logLevel).Fatal("log level is not supported") } logrus.SetLevel(level) - - logger := log.L.WithFields(logrus.Fields{ + logger := logruslogger.FromLogrus(logrus.WithFields(logrus.Fields{ "provider": provider, "operatingSystem": operatingSystem, "node": nodeName, "namespace": kubeNamespace, - }) + })) + + rootContext = log.WithLogger(rootContext, logger) + log.L = logger if !disableTaint { @@ -339,16 +347,16 @@ func initConfig() { if err != nil { log.L.WithError(err).WithField("exporter", e).Fatal("Cannot initialize exporter") } - trace.RegisterExporter(exporter) + octrace.RegisterExporter(exporter) } if len(userTraceExporters) > 0 { - var s trace.Sampler + var s octrace.Sampler switch strings.ToLower(traceSampler) { case "": case "always": - s = trace.AlwaysSample() + s = octrace.AlwaysSample() case "never": - s = trace.NeverSample() + s = octrace.NeverSample() default: rate, err := strconv.Atoi(traceSampler) if err != nil { @@ -357,12 +365,12 @@ func initConfig() { if rate < 0 || rate > 100 { logger.WithField("rate", traceSampler).Fatal("trace sample rate must not be less than zero or greater than 100") } - s = trace.ProbabilitySampler(float64(rate) / 100) + s = octrace.ProbabilitySampler(float64(rate) / 100) } if s != nil { - trace.ApplyConfig( - trace.Config{ + octrace.ApplyConfig( + octrace.Config{ DefaultSampler: s, }, ) diff --git a/log/log.go b/log/log.go index 2bb026bd0..fe3e4178b 100644 --- a/log/log.go +++ b/log/log.go @@ -14,77 +14,71 @@ limitations under the License. */ +// Package log defines the interfaces used for logging in virtual-kubelet. +// It uses a context.Context to store logger details. Additionally you can set +// the default logger to use by setting log.L. This is used when no logger is +// stored in the passed in context. package log import ( "context" - "sync/atomic" - - "github.com/Sirupsen/logrus" ) var ( // G is an alias for GetLogger. - // - // We may want to define this locally to a package to get package tagged log - // messages. G = GetLogger - // L is an alias for the the standard logger. - L = logrus.NewEntry(logrus.StandardLogger()) + // L is the default logger. It should be initialized before using `G` or `GetLogger` + // If L is unitialized and no logger is available in a provided context, a + // panic will occur. + L Logger = nopLogger{} ) type ( loggerKey struct{} ) -// TraceLevel is the log level for tracing. Trace level is lower than debug level, -// and is usually used to trace detailed behavior of the program. -const TraceLevel = logrus.Level(uint32(logrus.DebugLevel + 1)) +// Logger is the interface used for logging in virtual-kubelet +// +// virtual-kubelet will access the logger via context using `GetLogger` (or its alias, `G`) +// You can set the default logger to use by setting the `L` variable. +type Logger interface { + Debug(...interface{}) + Debugf(string, ...interface{}) + Info(...interface{}) + Infof(string, ...interface{}) + Warn(...interface{}) + Warnf(string, ...interface{}) + Error(...interface{}) + Errorf(string, ...interface{}) + Fatal(...interface{}) + Fatalf(string, ...interface{}) -// RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to -// ensure the formatted time is always the same number of characters. -const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" - -// ParseLevel takes a string level and returns the Logrus log level constant. -// It supports trace level. -func ParseLevel(lvl string) (logrus.Level, error) { - if lvl == "trace" { - return TraceLevel, nil - } - return logrus.ParseLevel(lvl) + WithField(string, interface{}) Logger + WithFields(Fields) Logger + WithError(error) Logger } +// Fields allows setting multiple fields on a logger at one time. +type Fields map[string]interface{} + // WithLogger returns a new context with the provided logger. Use in // combination with logger.WithField(s) for great effect. -func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context { +func WithLogger(ctx context.Context, logger Logger) context.Context { return context.WithValue(ctx, loggerKey{}, logger) } // GetLogger retrieves the current logger from the context. If no logger is // available, the default logger is returned. -func GetLogger(ctx context.Context) *logrus.Entry { +func GetLogger(ctx context.Context) Logger { logger := ctx.Value(loggerKey{}) if logger == nil { + if L == nil { + panic("default logger not initialized") + } return L } - return logger.(*logrus.Entry) -} - -// Trace logs a message at level Trace with the log entry passed-in. -func Trace(e *logrus.Entry, args ...interface{}) { - level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level))) - if level >= TraceLevel { - e.Debug(args...) - } -} - -// Tracef logs a message at level Trace with the log entry passed-in. -func Tracef(e *logrus.Entry, format string, args ...interface{}) { - level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level))) - if level >= TraceLevel { - e.Debugf(format, args...) - } + return logger.(Logger) } diff --git a/log/logrus/logrus.go b/log/logrus/logrus.go new file mode 100644 index 000000000..39319f397 --- /dev/null +++ b/log/logrus/logrus.go @@ -0,0 +1,34 @@ +// Package logrus implements a github.com/virtual-kubelet/virtual-kubelet/log.Logger using Logrus as a backend +// You can use this by creating a logrus logger and calling `FromLogrus(entry)`. +// If you want this to be the default logger for virtual-kubelet, set `log.L` to the value returned by `FromLogrus` +package logrus + +import ( + "github.com/Sirupsen/logrus" + "github.com/virtual-kubelet/virtual-kubelet/log" +) + +// Adapter implements the `log.Logger` interface for logrus +type Adapter struct { + *logrus.Entry +} + +// FromLogrus creates a new `log.Logger` from the provided entry +func FromLogrus(entry *logrus.Entry) log.Logger { + return &Adapter{entry} +} + +// WithField adds a field to the log entry. +func (l *Adapter) WithField(key string, val interface{}) log.Logger { + return FromLogrus(l.Entry.WithField(key, val)) +} + +// WithFields adds multiple fields to a log entry. +func (l *Adapter) WithFields(f log.Fields) log.Logger { + return FromLogrus(l.Entry.WithFields(logrus.Fields(f))) +} + +// WithError adds an error to the log entry +func (l *Adapter) WithError(err error) log.Logger { + return FromLogrus(l.Entry.WithError(err)) +} diff --git a/log/logrus/logrus_test.go b/log/logrus/logrus_test.go new file mode 100644 index 000000000..647d515a1 --- /dev/null +++ b/log/logrus/logrus_test.go @@ -0,0 +1,16 @@ +package logrus + +import ( + "testing" + + "github.com/Sirupsen/logrus" + "github.com/virtual-kubelet/virtual-kubelet/log" +) + +func TestImplementsLoggerInterface(t *testing.T) { + l := FromLogrus(&logrus.Entry{}) + + if _, ok := l.(log.Logger); !ok { + t.Fatal("does not implement log.Logger interface") + } +} diff --git a/log/nop.go b/log/nop.go new file mode 100644 index 000000000..4e5c90f04 --- /dev/null +++ b/log/nop.go @@ -0,0 +1,18 @@ +package log + +type nopLogger struct{} + +func (nopLogger) Debug(...interface{}) {} +func (nopLogger) Debugf(string, ...interface{}) {} +func (nopLogger) Info(...interface{}) {} +func (nopLogger) Infof(string, ...interface{}) {} +func (nopLogger) Warn(...interface{}) {} +func (nopLogger) Warnf(string, ...interface{}) {} +func (nopLogger) Error(...interface{}) {} +func (nopLogger) Errorf(string, ...interface{}) {} +func (nopLogger) Fatal(...interface{}) {} +func (nopLogger) Fatalf(string, ...interface{}) {} + +func (l nopLogger) WithField(string, interface{}) Logger { return l } +func (l nopLogger) WithFields(Fields) Logger { return l } +func (l nopLogger) WithError(error) Logger { return l } diff --git a/providers/azure/aci.go b/providers/azure/aci.go index bd1f2d7bc..88374cfb0 100644 --- a/providers/azure/aci.go +++ b/providers/azure/aci.go @@ -19,15 +19,14 @@ import ( "sync" "time" - "github.com/Sirupsen/logrus" "github.com/gorilla/websocket" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" client "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client" "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci" "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/network" - "go.opencensus.io/trace" - "k8s.io/api/core/v1" + "github.com/virtual-kubelet/virtual-kubelet/trace" + v1 "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -480,11 +479,11 @@ func getKubeProxyExtension(secretPath, masterURI, clusterCIDR string) (*aci.Exte return &extension, nil } -func addAzureAttributes(span *trace.Span, p *ACIProvider) { - span.AddAttributes( - trace.StringAttribute("azure.resourceGroup", p.resourceGroup), - trace.StringAttribute("azure.region", p.region), - ) +func addAzureAttributes(ctx context.Context, span trace.Span, p *ACIProvider) context.Context { + return span.WithFields(ctx, log.Fields{ + "azure.resourceGroup": p.resourceGroup, + "azure.region": p.region, + }) } // CreatePod accepts a Pod definition and creates @@ -492,7 +491,7 @@ func addAzureAttributes(span *trace.Span, p *ACIProvider) { func (p *ACIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "aci.CreatePod") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) var containerGroup aci.ContainerGroup containerGroup.Location = p.region @@ -694,7 +693,7 @@ func (p *ACIProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { func (p *ACIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "aci.DeletePod") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) err := p.aciClient.DeleteContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", pod.Namespace, pod.Name)) return wrapError(err) @@ -705,7 +704,7 @@ func (p *ACIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) { ctx, span := trace.StartSpan(ctx, "aci.GetPod") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) cg, err, status := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name)) if err != nil { @@ -726,7 +725,7 @@ func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { ctx, span := trace.StartSpan(ctx, "aci.GetContainerLogs") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) logContent := "" cg, err, _ := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName)) @@ -744,7 +743,6 @@ func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, cLogs, err := p.aciClient.GetContainerLogs(ctx, p.resourceGroup, cg.Name, containerName, tail) if err != nil { log.G(ctx).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying") - span.Annotate(nil, "Error getting container logs, retrying") time.Sleep(5000 * time.Millisecond) } else { logContent = cLogs.Content @@ -841,7 +839,7 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri func (p *ACIProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { ctx, span := trace.StartSpan(ctx, "aci.GetPodStatus") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) pod, err := p.GetPod(ctx, namespace, name) if err != nil { @@ -859,7 +857,7 @@ func (p *ACIProvider) GetPodStatus(ctx context.Context, namespace, name string) func (p *ACIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { ctx, span := trace.StartSpan(ctx, "aci.GetPods") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) cgs, err := p.aciClient.ListContainerGroups(ctx, p.resourceGroup) if err != nil { @@ -875,7 +873,7 @@ func (p *ACIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { p, err := containerGroupToPod(&c) if err != nil { - log.G(context.TODO()).WithFields(logrus.Fields{ + log.G(ctx).WithFields(log.Fields{ "name": c.Name, "id": c.ID, }).WithError(err).Error("error converting container group to pod") diff --git a/providers/azure/acsCredential.go b/providers/azure/acsCredential.go index b8dde860e..1b1e21c9e 100644 --- a/providers/azure/acsCredential.go +++ b/providers/azure/acsCredential.go @@ -25,7 +25,7 @@ type AcsCredential struct { // NewAcsCredential returns an AcsCredential struct from file path func NewAcsCredential(p string) (*AcsCredential, error) { logger := log.G(context.TODO()).WithField("method", "NewAcsCredential").WithField("file", p) - log.Trace(logger, "Reading ACS credential file") + logger.Debug("Reading ACS credential file") b, err := ioutil.ReadFile(p) if err != nil { @@ -38,6 +38,6 @@ func NewAcsCredential(p string) (*AcsCredential, error) { return nil, err } - log.Trace(logger, "Load ACS credential file successfully") + logger.Debug("Load ACS credential file successfully") return &cred, nil } diff --git a/providers/azure/metrics.go b/providers/azure/metrics.go index a6fc83473..e88e4f0f1 100644 --- a/providers/azure/metrics.go +++ b/providers/azure/metrics.go @@ -7,10 +7,11 @@ import ( "github.com/cpuguy83/strongerrors/status/ocstatus" "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci" - "go.opencensus.io/trace" + "github.com/virtual-kubelet/virtual-kubelet/trace" "golang.org/x/sync/errgroup" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) @@ -19,17 +20,24 @@ import ( func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summary, err error) { ctx, span := trace.StartSpan(ctx, "GetSummaryStats") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) p.metricsSync.Lock() defer p.metricsSync.Unlock() - span.Annotate(nil, "acquired metrics mutex") + + log.G(ctx).Debug("acquired metrics mutex") if time.Now().Sub(p.metricsSyncTime) < time.Minute { - span.AddAttributes(trace.BoolAttribute("preCachedResult", true), trace.StringAttribute("cachedResultSampleTime", p.metricsSyncTime.String())) + span.WithFields(ctx, log.Fields{ + "preCachedResult": true, + "cachedResultSampleTime": p.metricsSyncTime.String(), + }) return p.lastMetric, nil } - span.AddAttributes(trace.BoolAttribute("preCachedResult", false), trace.StringAttribute("cachedResultSampleTime", p.metricsSyncTime.String())) + ctx = span.WithFields(ctx, log.Fields{ + "preCachedResult": false, + "cachedResultSampleTime": p.metricsSyncTime.String(), + }) select { case <-ctx.Done(): @@ -62,11 +70,11 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa errGroup.Go(func() error { ctx, span := trace.StartSpan(ctx, "getPodMetrics") defer span.End() - span.AddAttributes( - trace.StringAttribute("UID", string(pod.UID)), - trace.StringAttribute("Name", pod.Name), - trace.StringAttribute("Namespace", pod.Namespace), - ) + logger := log.G(ctx).WithFields(log.Fields{ + "UID": string(pod.UID), + "Name": pod.Name, + "Namespace": pod.Namespace, + }) select { case <-ctx.Done(): @@ -77,7 +85,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa <-sema }() - span.Annotate(nil, "Acquired semaphore") + logger.Debug("Acquired semaphore") cgName := containerGroupName(pod) // cpu/mem and net stats are split because net stats do not support container level detail @@ -92,7 +100,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa span.SetStatus(ocstatus.FromError(err)) return errors.Wrapf(err, "error fetching cpu/mem stats for container group %s", cgName) } - span.Annotate(nil, "Got system stats") + logger.Debug("Got system stats") netStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{ Start: start, @@ -104,7 +112,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa span.SetStatus(ocstatus.FromError(err)) return errors.Wrapf(err, "error fetching network stats for container group %s", cgName) } - span.Annotate(nil, "Got network stats") + logger.Debug("Got network stats") chResult <- collectMetrics(pod, systemStats, netStats) return nil @@ -112,10 +120,11 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa } if err := errGroup.Wait(); err != nil { + span.SetStatus(ocstatus.FromError(err)) return nil, errors.Wrap(err, "error in request to fetch container group metrics") } close(chResult) - span.Annotate([]trace.Attribute{trace.Int64Attribute("nPods", int64(len(pods)))}, "Collected stats from Azure") + log.G(ctx).Debugf("Collected status from azure for %d pods", len(pods)) var s stats.Summary s.Node = stats.NodeStats{ diff --git a/providers/mock/mock.go b/providers/mock/mock.go index b48208c01..83fe86364 100644 --- a/providers/mock/mock.go +++ b/providers/mock/mock.go @@ -6,13 +6,14 @@ import ( "fmt" "io" "io/ioutil" - "log" "math/rand" "time" "github.com/cpuguy83/strongerrors" - "go.opencensus.io/trace" - "k8s.io/api/core/v1" + "github.com/cpuguy83/strongerrors/status/ocstatus" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -114,9 +115,9 @@ func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { defer span.End() // Add the pod's coordinates to the current span. - addAttributes(span, namespaceKey, pod.Namespace, nameKey, pod.Name) + ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name) - log.Printf("receive CreatePod %q\n", pod.Name) + log.G(ctx).Info("receive CreatePod %q", pod.Name) key, err := buildKey(pod) if err != nil { @@ -134,9 +135,9 @@ func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { defer span.End() // Add the pod's coordinates to the current span. - addAttributes(span, namespaceKey, pod.Namespace, nameKey, pod.Name) + ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name) - log.Printf("receive UpdatePod %q\n", pod.Name) + log.G(ctx).Info("receive UpdatePod %q", pod.Name) key, err := buildKey(pod) if err != nil { @@ -154,9 +155,9 @@ func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { defer span.End() // Add the pod's coordinates to the current span. - addAttributes(span, namespaceKey, pod.Namespace, nameKey, pod.Name) + ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name) - log.Printf("receive DeletePod %q\n", pod.Name) + log.G(ctx).Info("receive DeletePod %q", pod.Name) key, err := buildKey(pod) if err != nil { @@ -175,12 +176,15 @@ func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { // GetPod returns a pod by name that is stored in memory. func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { ctx, span := trace.StartSpan(ctx, "GetPod") - defer span.End() + defer func() { + span.SetStatus(ocstatus.FromError(err)) + span.End() + }() // Add the pod's coordinates to the current span. - addAttributes(span, namespaceKey, namespace, nameKey, name) + ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name) - log.Printf("receive GetPod %q\n", name) + log.G(ctx).Info("receive GetPod %q", name) key, err := buildKeyFromNames(namespace, name) if err != nil { @@ -199,9 +203,9 @@ func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, defer span.End() // Add pod and container attributes to the current span. - addAttributes(span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName) + ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName) - log.Printf("receive GetContainerLogs %q\n", podName) + log.G(ctx).Info("receive GetContainerLogs %q", podName) return "", nil } @@ -214,7 +218,7 @@ func (p *MockProvider) GetPodFullName(namespace string, pod string) string { // ExecInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. func (p *MockProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { - log.Printf("receive ExecInContainer %q\n", container) + log.G(context.TODO()).Info("receive ExecInContainer %q", container) return nil } @@ -225,9 +229,9 @@ func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) defer span.End() // Add namespace and name as attributes to the current span. - addAttributes(span, namespaceKey, namespace, nameKey, name) + ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name) - log.Printf("receive GetPodStatus %q\n", name) + log.G(ctx).Info("receive GetPodStatus %q", name) now := metav1.NewTime(time.Now()) @@ -279,7 +283,7 @@ func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { ctx, span := trace.StartSpan(ctx, "GetPods") defer span.End() - log.Printf("receive GetPods\n") + log.G(ctx).Info("receive GetPods") var pods []*v1.Pod @@ -485,11 +489,12 @@ func buildKey(pod *v1.Pod) (string, error) { // attrs must be an even-sized list of string arguments. // Otherwise, the span won't be modified. // TODO: Refactor and move to a "tracing utilities" package. -func addAttributes(span *trace.Span, attrs ...string) { +func addAttributes(ctx context.Context, span trace.Span, attrs ...string) context.Context { if len(attrs)%2 == 1 { - return + return ctx } for i := 0; i < len(attrs); i += 2 { - span.AddAttributes(trace.StringAttribute(attrs[i], attrs[i+1])) + ctx = span.WithField(ctx, attrs[i], attrs[i+1]) } + return ctx } diff --git a/trace/nop.go b/trace/nop.go new file mode 100644 index 000000000..2954ec63c --- /dev/null +++ b/trace/nop.go @@ -0,0 +1,23 @@ +package trace + +import ( + "context" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "go.opencensus.io/trace" +) + +type nopTracer struct{} + +func (nopTracer) StartSpan(ctx context.Context, _ string) (context.Context, Span) { + return ctx, &nopSpan{} +} + +type nopSpan struct{} + +func (nopSpan) End() {} +func (nopSpan) SetStatus(trace.Status) {} +func (nopSpan) Logger() log.Logger { return nil } + +func (nopSpan) WithField(ctx context.Context, _ string, _ interface{}) context.Context { return ctx } +func (nopSpan) WithFields(ctx context.Context, _ log.Fields) context.Context { return ctx } diff --git a/trace/opencensus/opencensus.go b/trace/opencensus/opencensus.go new file mode 100644 index 000000000..afaef4f4d --- /dev/null +++ b/trace/opencensus/opencensus.go @@ -0,0 +1,236 @@ +// Package opencensus implements a github.com/virtual-kubelet/virtual-kubelet/trace.Tracer +// using opencensus as a backend. +// +// Use this by setting `trace.T = Adapter{}` +package opencensus + +import ( + "context" + "fmt" + "sync" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" + octrace "go.opencensus.io/trace" +) + +const ( + lDebug = "DEBUG" + lInfo = "INFO" + lWarn = "WARN" + lErr = "ERROR" + lFatal = "FATAL" +) + +// Adapter implements the trace.Tracer interface for OpenCensus +type Adapter struct{} + +// StartSpan creates a new span from opencensus using the given name. +func (Adapter) StartSpan(ctx context.Context, name string) (context.Context, trace.Span) { + ctx, ocs := octrace.StartSpan(ctx, name) + l := log.G(ctx).WithField("method", name) + + s := &span{s: ocs, l: l} + ctx = log.WithLogger(ctx, s.Logger()) + + return ctx, s +} + +type span struct { + mu sync.Mutex + s *octrace.Span + l log.Logger +} + +func (s *span) End() { + s.s.End() +} + +func (s *span) SetStatus(status trace.Status) { + s.s.SetStatus(status) +} + +func (s *span) WithField(ctx context.Context, key string, val interface{}) context.Context { + s.mu.Lock() + s.l = s.l.WithField(key, val) + ctx = log.WithLogger(ctx, &logger{s: s.s, l: s.l}) + s.mu.Unlock() + + if s.s.IsRecordingEvents() { + s.s.AddAttributes(makeAttribute(key, val)) + } + + return ctx +} + +func (s *span) WithFields(ctx context.Context, f log.Fields) context.Context { + s.mu.Lock() + s.l = s.l.WithFields(f) + ctx = log.WithLogger(ctx, &logger{s: s.s, l: s.l}) + s.mu.Unlock() + + if s.s.IsRecordingEvents() { + attrs := make([]octrace.Attribute, 0, len(f)) + for k, v := range f { + attrs = append(attrs, makeAttribute(k, v)) + } + s.s.AddAttributes(attrs...) + } + + return ctx +} + +func (s *span) Logger() log.Logger { + return &logger{s: s.s, l: s.l} +} + +type logger struct { + s *octrace.Span + l log.Logger + a []octrace.Attribute +} + +func (l *logger) Debug(args ...interface{}) { + if !l.s.IsRecordingEvents() { + l.l.Debug(args...) + return + } + + msg := fmt.Sprint(args...) + l.l.Debug(msg) + l.s.Annotate(withLevel(lDebug, l.a), msg) +} + +func (l *logger) Debugf(f string, args ...interface{}) { + l.l.Debugf(f, args) + l.s.Annotatef(withLevel(lDebug, l.a), f, args...) +} + +func (l *logger) Info(args ...interface{}) { + if !l.s.IsRecordingEvents() { + l.l.Info(args...) + return + } + + msg := fmt.Sprint(args...) + l.l.Info(msg) + l.s.Annotate(withLevel(lInfo, l.a), msg) +} + +func (l *logger) Infof(f string, args ...interface{}) { + l.l.Infof(f, args) + l.s.Annotatef(withLevel(lInfo, l.a), f, args...) +} + +func (l *logger) Warn(args ...interface{}) { + if !l.s.IsRecordingEvents() { + l.l.Warn(args...) + return + } + + msg := fmt.Sprint(args...) + l.l.Warn(msg) + l.s.Annotate(withLevel(lWarn, l.a), msg) +} + +func (l *logger) Warnf(f string, args ...interface{}) { + l.l.Warnf(f, args) + l.s.Annotatef(withLevel(lWarn, l.a), f, args...) +} + +func (l *logger) Error(args ...interface{}) { + if !l.s.IsRecordingEvents() { + l.l.Error(args...) + return + } + + msg := fmt.Sprint(args...) + l.l.Error(msg) + l.s.Annotate(withLevel(lErr, l.a), msg) +} + +func (l *logger) Errorf(f string, args ...interface{}) { + l.l.Errorf(f, args) + l.s.Annotatef(withLevel(lErr, l.a), f, args...) +} + +func (l *logger) Fatal(args ...interface{}) { + if !l.s.IsRecordingEvents() { + l.l.Fatal(args...) + return + } + + msg := fmt.Sprint(args...) + l.s.Annotate(withLevel(lFatal, l.a), msg) + l.l.Fatal(msg) +} + +func (l *logger) Fatalf(f string, args ...interface{}) { + l.s.Annotatef(withLevel(lFatal, l.a), f, args...) + l.l.Fatalf(f, args) +} + +func (l *logger) WithError(err error) log.Logger { + log := l.l.WithError(err) + + var a []octrace.Attribute + if l.s.IsRecordingEvents() { + a = make([]octrace.Attribute, len(l.a), len(l.a)+1) + copy(a, l.a) + a = append(l.a, makeAttribute("err", err)) + } + + return &logger{s: l.s, l: log, a: a} +} + +func (l *logger) WithField(k string, value interface{}) log.Logger { + log := l.l.WithField(k, value) + + var a []octrace.Attribute + + if l.s.IsRecordingEvents() { + a = make([]octrace.Attribute, len(l.a), len(l.a)+1) + copy(a, l.a) + a = append(a, makeAttribute(k, value)) + } + + return &logger{s: l.s, a: a, l: log} +} + +func (l *logger) WithFields(fields log.Fields) log.Logger { + log := l.l.WithFields(fields) + + var a []octrace.Attribute + if l.s.IsRecordingEvents() { + a = make([]octrace.Attribute, len(l.a), len(l.a)+len(fields)) + copy(a, l.a) + for k, v := range fields { + a = append(a, makeAttribute(k, v)) + } + } + + return &logger{s: l.s, a: a, l: log} +} + +func makeAttribute(key string, val interface{}) octrace.Attribute { + var attr octrace.Attribute + + switch v := val.(type) { + case string: + attr = octrace.StringAttribute(key, v) + case int64: + attr = octrace.Int64Attribute(key, v) + case bool: + attr = octrace.BoolAttribute(key, v) + case error: + attr = octrace.StringAttribute(key, v.Error()) + default: + attr = octrace.StringAttribute(key, fmt.Sprintf("%+v", val)) + } + + return attr +} + +func withLevel(l string, attrs []octrace.Attribute) []octrace.Attribute { + return append(attrs, octrace.StringAttribute("level", l)) +} diff --git a/trace/opencensus/opencensus_test.go b/trace/opencensus/opencensus_test.go new file mode 100644 index 000000000..ed3919341 --- /dev/null +++ b/trace/opencensus/opencensus_test.go @@ -0,0 +1,13 @@ +package opencensus + +import ( + "testing" + + "github.com/virtual-kubelet/virtual-kubelet/trace" +) + +func TestTracerImplementsTracer(t *testing.T) { + // ensure that Adapter implements trace.Tracer + if tt := trace.Tracer(Adapter{}); tt == nil { + } +} diff --git a/trace/trace.go b/trace/trace.go new file mode 100644 index 000000000..752432d4c --- /dev/null +++ b/trace/trace.go @@ -0,0 +1,58 @@ +// Package trace abstracts virtual-kubelet's tracing capabilties into a set of +// interfaces. +// While this does allow consumers to use whatever tracing library they want, +// the primary goal is to share logging data between the configured logger and +// tracing spans instead of duplicating calls. +package trace + +import ( + "context" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "go.opencensus.io/trace" +) + +// Status is an alias to opencensus's trace status. +// The main reason we use this instead of implementing our own is library re-use, +// namely for converting an error to a tracing status. +// In the future this may be defined completely in this package. +type Status = trace.Status + +// Tracer is the interface used for creating a tracing span +type Tracer interface { + // StartSpan starts a new span. The span details are emebedded into the returned + // context + StartSpan(context.Context, string) (context.Context, Span) +} + +var ( + // T is the Tracer to use this should be initialized before starting up + // virtual-kubelet + T Tracer = nopTracer{} +) + +// StartSpan starts a span from the configured default tracer +func StartSpan(ctx context.Context, name string) (context.Context, Span) { + ctx, span := T.StartSpan(ctx, name) + ctx = log.WithLogger(ctx, span.Logger()) + return ctx, span +} + +// Span encapsulates a tracing event +type Span interface { + End() + SetStatus(Status) + + // WithField and WithFields adds attributes to an entire span + // + // This interface is a bit weird, but allows us to manage loggers in the context + // It is expected that implementations set `log.WithLogger` so the logger stored + // in the context is updated with the new fields. + WithField(context.Context, string, interface{}) context.Context + WithFields(context.Context, log.Fields) context.Context + + // Logger is used to log individual entries. + // Calls to functions like `WithField` and `WithFields` on the logger should + // not affect the rest of the span but rather individual entries. + Logger() log.Logger +} diff --git a/vkubelet/api/helpers.go b/vkubelet/api/helpers.go index 95e651b56..a9022aba7 100644 --- a/vkubelet/api/helpers.go +++ b/vkubelet/api/helpers.go @@ -25,7 +25,7 @@ func handleError(f handlerFunc) http.HandlerFunc { if code >= 500 { logger.Error("Internal server error on request") } else { - log.Trace(logger, "Error on request") + logger.Debug("Error on request") } } } diff --git a/vkubelet/apiserver.go b/vkubelet/apiserver.go index 39ca4d86c..b4f7251fc 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/apiserver.go @@ -3,7 +3,6 @@ package vkubelet import ( "net/http" - "github.com/Sirupsen/logrus" "github.com/gorilla/mux" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/providers" @@ -73,7 +72,7 @@ func AttachMetricsRoutes(p providers.Provider, mux ServeMux) { func instrumentRequest(r *http.Request) *http.Request { ctx := r.Context() - logger := log.G(ctx).WithFields(logrus.Fields{ + logger := log.G(ctx).WithFields(log.Fields{ "uri": r.RequestURI, "vars": mux.Vars(r), }) @@ -96,14 +95,12 @@ func InstrumentHandler(h http.Handler) http.Handler { // NotFound provides a handler for cases where the requested endpoint doesn't exist func NotFound(w http.ResponseWriter, r *http.Request) { - logger := log.G(r.Context()) - log.Trace(logger, "404 request not found") + log.G(r.Context()).Debug("404 request not found") http.Error(w, "404 request not found", http.StatusNotFound) } // NotImplemented provides a handler for cases where a provider does not implement a given API func NotImplemented(w http.ResponseWriter, r *http.Request) { - logger := log.G(r.Context()) - log.Trace(logger, "501 not implemented") + log.G(r.Context()).Debug("501 not implemented") http.Error(w, "501 not implemented", http.StatusNotImplemented) } diff --git a/vkubelet/node.go b/vkubelet/node.go index a6c990a9d..ce0b0bb5d 100644 --- a/vkubelet/node.go +++ b/vkubelet/node.go @@ -4,10 +4,10 @@ import ( "context" "strings" + "github.com/cpuguy83/strongerrors/status/ocstatus" "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" "github.com/virtual-kubelet/virtual-kubelet/version" - - "go.opencensus.io/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -57,12 +57,11 @@ func (s *Server) registerNode(ctx context.Context) error { DaemonEndpoints: *s.provider.NodeDaemonEndpoints(ctx), }, } - addNodeAttributes(span, node) + ctx = addNodeAttributes(ctx, span, node) if _, err := s.k8sClient.CoreV1().Nodes().Create(node); err != nil && !errors.IsAlreadyExists(err) { - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + span.SetStatus(ocstatus.FromError(err)) return err } - span.Annotate(nil, "Registered node with k8s") log.G(ctx).Info("Registered node") @@ -77,19 +76,20 @@ func (s *Server) updateNode(ctx context.Context) { opts := metav1.GetOptions{} n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts) if err != nil && !errors.IsNotFound(err) { - log.G(ctx).WithError(err).Error("Failed to retrieve node") - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + log.G(ctx).WithError(err).Error("Failed to retrive node") + span.SetStatus(ocstatus.FromError(err)) return } - addNodeAttributes(span, n) - span.Annotate(nil, "Fetched node details from k8s") + + ctx = addNodeAttributes(ctx, span, n) + log.G(ctx).Debug("Fetched node details from k8s") if errors.IsNotFound(err) { if err = s.registerNode(ctx); err != nil { log.G(ctx).WithError(err).Error("Failed to register node") - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + span.SetStatus(ocstatus.FromError(err)) } else { - span.Annotate(nil, "Registered node in k8s") + log.G(ctx).Debug("Registered node in k8s") } return } @@ -106,7 +106,7 @@ func (s *Server) updateNode(ctx context.Context) { n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n) if err != nil { log.G(ctx).WithError(err).Error("Failed to update node") - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + span.SetStatus(ocstatus.FromError(err)) return } } @@ -125,13 +125,11 @@ func (t taintsStringer) String() string { return s } -func addNodeAttributes(span *trace.Span, n *corev1.Node) { - span.AddAttributes( - trace.StringAttribute("UID", string(n.UID)), - trace.StringAttribute("name", n.Name), - trace.StringAttribute("cluster", n.ClusterName), - ) - if span.IsRecordingEvents() { - span.AddAttributes(trace.StringAttribute("taints", taintsStringer(n.Spec.Taints).String())) - } +func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) context.Context { + return span.WithFields(ctx, log.Fields{ + "UID": string(n.UID), + "name": n.Name, + "cluster": n.ClusterName, + "tains": taintsStringer(n.Spec.Taints), + }) } diff --git a/vkubelet/pod.go b/vkubelet/pod.go index 9dff71d3c..90cf34fed 100644 --- a/vkubelet/pod.go +++ b/vkubelet/pod.go @@ -2,29 +2,27 @@ package vkubelet import ( "context" - "fmt" "sync" "time" "github.com/cpuguy83/strongerrors/status/ocstatus" pkgerrors "github.com/pkg/errors" - "go.opencensus.io/trace" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" - - "github.com/virtual-kubelet/virtual-kubelet/log" ) -func addPodAttributes(span *trace.Span, pod *corev1.Pod) { - span.AddAttributes( - trace.StringAttribute("uid", string(pod.GetUID())), - trace.StringAttribute("namespace", pod.GetNamespace()), - trace.StringAttribute("name", pod.GetName()), - trace.StringAttribute("phase", string(pod.Status.Phase)), - trace.StringAttribute("reason", pod.Status.Reason), - ) +func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { + return span.WithFields(ctx, log.Fields{ + "uid": string(pod.GetUID()), + "namespace": pod.GetNamespace(), + "name": pod.GetName(), + "phase": string(pod.Status.Phase), + "reason": pod.Status.Reason, + }) } func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorder record.EventRecorder) error { @@ -40,14 +38,17 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde ctx, span := trace.StartSpan(ctx, "createOrUpdatePod") defer span.End() - addPodAttributes(span, pod) + addPodAttributes(ctx, span, pod) if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil { - span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: err.Error()}) + span.SetStatus(ocstatus.FromError(err)) return err } - logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) + ctx = span.WithFields(ctx, log.Fields{ + "pod": pod.GetName(), + "namespace": pod.GetNamespace(), + }) if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { podPhase := corev1.PodPending @@ -60,19 +61,23 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde pod.Status.Reason = podStatusReasonProviderFailed pod.Status.Message = origErr.Error() + logger := log.G(ctx).WithFields(log.Fields{ + "podPhase": podPhase, + "reason": pod.Status.Reason, + }) + _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) if err != nil { logger.WithError(err).Warn("Failed to update pod status") } else { - span.Annotate(nil, "Updated k8s pod status") + logger.Info("Updated k8s pod status") } - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: origErr.Error()}) + span.SetStatus(ocstatus.FromError(origErr)) return origErr } - span.Annotate(nil, "Created pod in provider") - logger.Info("Pod created") + log.G(ctx).Info("Created pod in provider") return nil } @@ -89,23 +94,22 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error { ctx, span := trace.StartSpan(ctx, "deletePod") defer span.End() - addPodAttributes(span, pod) + ctx = addPodAttributes(ctx, span, pod) var delErr error if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) { - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: delErr.Error()}) + span.SetStatus(ocstatus.FromError(delErr)) return delErr } - span.Annotate(nil, "Deleted pod from provider") - logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) + log.G(ctx).Debug("Deleted pod from provider") + if !errors.IsNotFound(delErr) { if err := s.forceDeletePodResource(ctx, namespace, name); err != nil { span.SetStatus(ocstatus.FromError(err)) return err } - span.Annotate(nil, "Deleted pod from k8s") - logger.Info("Pod deleted") + log.G(ctx).Info("Deleted pod from Kubernetes") } return nil @@ -114,19 +118,19 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error { func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name string) error { ctx, span := trace.StartSpan(ctx, "forceDeletePodResource") defer span.End() - span.AddAttributes( - trace.StringAttribute("namespace", namespace), - trace.StringAttribute("name", name), - ) + ctx = span.WithFields(ctx, log.Fields{ + "namespace": namespace, + "name": name, + }) var grace int64 if err := s.k8sClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil { if errors.IsNotFound(err) { - span.Annotate(nil, "Pod does not exist in Kubernetes, nothing to delete") + log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete") return nil } - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - return fmt.Errorf("Failed to delete Kubernetes pod: %s", err) + span.SetStatus(ocstatus.FromError(err)) + return pkgerrors.Wrap(err, "Failed to delete Kubernetes pod") } return nil } @@ -138,7 +142,8 @@ func (s *Server) updatePodStatuses(ctx context.Context) { // Update all the pods with the provider status. pods := s.resourceManager.GetPods() - span.AddAttributes(trace.Int64Attribute("nPods", int64(len(pods)))) + + ctx = span.WithField(ctx, "nPods", int64(len(pods))) sema := make(chan struct{}, s.podSyncWorkers) var wg sync.WaitGroup @@ -157,8 +162,12 @@ func (s *Server) updatePodStatuses(ctx context.Context) { defer func() { <-sema }() if err := s.updatePodStatus(ctx, pod); err != nil { - logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()).WithField("status", pod.Status.Phase).WithField("reason", pod.Status.Reason) - logger.Error(err) + log.G(ctx).WithFields(log.Fields{ + "pod": pod.GetName(), + "namespace": pod.GetNamespace(), + "status": pod.Status.Phase, + "reason": pod.Status.Reason, + }).Error(err) } }(pod) @@ -170,7 +179,7 @@ func (s *Server) updatePodStatuses(ctx context.Context) { func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "updatePodStatus") defer span.End() - addPodAttributes(span, pod) + ctx = addPodAttributes(ctx, span, pod) if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed || @@ -214,9 +223,10 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { return pkgerrors.Wrap(err, "error while updating pod status in kubernetes") } - span.Annotate([]trace.Attribute{ - trace.StringAttribute("new phase", string(pod.Status.Phase)), - trace.StringAttribute("new reason", pod.Status.Reason), - }, "updated pod status in kubernetes") + log.G(ctx).WithFields(log.Fields{ + "new phase": string(pod.Status.Phase), + "new reason": pod.Status.Reason, + }).Debug("Updated pod status in kubernetes") + return nil } diff --git a/vkubelet/podcontroller.go b/vkubelet/podcontroller.go index 6b69e5e7d..d6a795dec 100644 --- a/vkubelet/podcontroller.go +++ b/vkubelet/podcontroller.go @@ -24,11 +24,11 @@ import ( "github.com/cpuguy83/strongerrors/status/ocstatus" pkgerrors "github.com/pkg/errors" - "go.opencensus.io/trace" + "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers/core/v1" + v1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" @@ -169,7 +169,7 @@ func (pc *PodController) processNextWorkItem(ctx context.Context, workerId strin defer span.End() // Add the ID of the current worker as an attribute to the current span. - span.AddAttributes(trace.StringAttribute("workerId", workerId)) + ctx = span.WithField(ctx, "workerId", workerId) // We wrap this block in a func so we can defer pc.workqueue.Done. err := func(obj interface{}) error { @@ -190,7 +190,7 @@ func (pc *PodController) processNextWorkItem(ctx context.Context, workerId strin return nil } // Add the current key as an attribute to the current span. - span.AddAttributes(trace.StringAttribute("key", key)) + ctx = span.WithField(ctx, "key", key) // Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced. if err := pc.syncHandler(ctx, key); err != nil { if pc.workqueue.NumRequeues(key) < maxRetries { @@ -224,7 +224,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error { defer span.End() // Add the current key as an attribute to the current span. - span.AddAttributes(trace.StringAttribute("key", key)) + ctx = span.WithField(ctx, "key", key) // Convert the namespace/name string into a distinct namespace and name. namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -263,7 +263,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) defer span.End() // Add the pod's attributes to the current span. - addPodAttributes(span, pod) + ctx = addPodAttributes(ctx, span, pod) // Check whether the pod has been marked for deletion. // If it does, guarantee it is deleted in the provider and Kubernetes. @@ -344,7 +344,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int }() // Add the pod's attributes to the current span. - addPodAttributes(span, pod) + ctx = addPodAttributes(ctx, span, pod) // Actually delete the pod. if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil { span.SetStatus(ocstatus.FromError(err))