diff --git a/cmd/root.go b/cmd/root.go index 82f70d780..826963c7c 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -30,19 +30,21 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" - "go.opencensus.io/trace" + "github.com/virtual-kubelet/virtual-kubelet/log" + logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" + "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/providers/register" + "github.com/virtual-kubelet/virtual-kubelet/trace" + "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet" + octrace "go.opencensus.io/trace" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kubeinformers "k8s.io/client-go/informers" corev1informers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" - - "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers" - "github.com/virtual-kubelet/virtual-kubelet/providers/register" - "github.com/virtual-kubelet/virtual-kubelet/vkubelet" ) const ( @@ -161,6 +163,10 @@ func (mv mapVar) Type() string { } func init() { + // make sure the default logger/tracer is initialized + log.L = logruslogger.FromLogrus(logrus.NewEntry(logrus.StandardLogger())) + trace.T = opencensus.Adapter{} + cobra.OnInitialize(initConfig) // read default node name from environment variable. @@ -242,19 +248,21 @@ func initConfig() { log.G(context.TODO()).WithField("OperatingSystem", operatingSystem).Fatalf("Operating system not supported. Valid options are: %s", strings.Join(providers.ValidOperatingSystems.Names(), " | ")) } - level, err := log.ParseLevel(logLevel) + level, err := logrus.ParseLevel(logLevel) if err != nil { log.G(context.TODO()).WithField("logLevel", logLevel).Fatal("log level is not supported") } logrus.SetLevel(level) - - logger := log.L.WithFields(logrus.Fields{ + logger := logruslogger.FromLogrus(logrus.WithFields(logrus.Fields{ "provider": provider, "operatingSystem": operatingSystem, "node": nodeName, "namespace": kubeNamespace, - }) + })) + + rootContext = log.WithLogger(rootContext, logger) + log.L = logger if !disableTaint { @@ -339,16 +347,16 @@ func initConfig() { if err != nil { log.L.WithError(err).WithField("exporter", e).Fatal("Cannot initialize exporter") } - trace.RegisterExporter(exporter) + octrace.RegisterExporter(exporter) } if len(userTraceExporters) > 0 { - var s trace.Sampler + var s octrace.Sampler switch strings.ToLower(traceSampler) { case "": case "always": - s = trace.AlwaysSample() + s = octrace.AlwaysSample() case "never": - s = trace.NeverSample() + s = octrace.NeverSample() default: rate, err := strconv.Atoi(traceSampler) if err != nil { @@ -357,12 +365,12 @@ func initConfig() { if rate < 0 || rate > 100 { logger.WithField("rate", traceSampler).Fatal("trace sample rate must not be less than zero or greater than 100") } - s = trace.ProbabilitySampler(float64(rate) / 100) + s = octrace.ProbabilitySampler(float64(rate) / 100) } if s != nil { - trace.ApplyConfig( - trace.Config{ + octrace.ApplyConfig( + octrace.Config{ DefaultSampler: s, }, ) diff --git a/log/log.go b/log/log.go index 2bb026bd0..fe3e4178b 100644 --- a/log/log.go +++ b/log/log.go @@ -14,77 +14,71 @@ limitations under the License. */ +// Package log defines the interfaces used for logging in virtual-kubelet. +// It uses a context.Context to store logger details. Additionally you can set +// the default logger to use by setting log.L. This is used when no logger is +// stored in the passed in context. package log import ( "context" - "sync/atomic" - - "github.com/Sirupsen/logrus" ) var ( // G is an alias for GetLogger. - // - // We may want to define this locally to a package to get package tagged log - // messages. G = GetLogger - // L is an alias for the the standard logger. - L = logrus.NewEntry(logrus.StandardLogger()) + // L is the default logger. It should be initialized before using `G` or `GetLogger` + // If L is unitialized and no logger is available in a provided context, a + // panic will occur. + L Logger = nopLogger{} ) type ( loggerKey struct{} ) -// TraceLevel is the log level for tracing. Trace level is lower than debug level, -// and is usually used to trace detailed behavior of the program. -const TraceLevel = logrus.Level(uint32(logrus.DebugLevel + 1)) +// Logger is the interface used for logging in virtual-kubelet +// +// virtual-kubelet will access the logger via context using `GetLogger` (or its alias, `G`) +// You can set the default logger to use by setting the `L` variable. +type Logger interface { + Debug(...interface{}) + Debugf(string, ...interface{}) + Info(...interface{}) + Infof(string, ...interface{}) + Warn(...interface{}) + Warnf(string, ...interface{}) + Error(...interface{}) + Errorf(string, ...interface{}) + Fatal(...interface{}) + Fatalf(string, ...interface{}) -// RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to -// ensure the formatted time is always the same number of characters. -const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" - -// ParseLevel takes a string level and returns the Logrus log level constant. -// It supports trace level. -func ParseLevel(lvl string) (logrus.Level, error) { - if lvl == "trace" { - return TraceLevel, nil - } - return logrus.ParseLevel(lvl) + WithField(string, interface{}) Logger + WithFields(Fields) Logger + WithError(error) Logger } +// Fields allows setting multiple fields on a logger at one time. +type Fields map[string]interface{} + // WithLogger returns a new context with the provided logger. Use in // combination with logger.WithField(s) for great effect. -func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context { +func WithLogger(ctx context.Context, logger Logger) context.Context { return context.WithValue(ctx, loggerKey{}, logger) } // GetLogger retrieves the current logger from the context. If no logger is // available, the default logger is returned. -func GetLogger(ctx context.Context) *logrus.Entry { +func GetLogger(ctx context.Context) Logger { logger := ctx.Value(loggerKey{}) if logger == nil { + if L == nil { + panic("default logger not initialized") + } return L } - return logger.(*logrus.Entry) -} - -// Trace logs a message at level Trace with the log entry passed-in. -func Trace(e *logrus.Entry, args ...interface{}) { - level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level))) - if level >= TraceLevel { - e.Debug(args...) - } -} - -// Tracef logs a message at level Trace with the log entry passed-in. -func Tracef(e *logrus.Entry, format string, args ...interface{}) { - level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level))) - if level >= TraceLevel { - e.Debugf(format, args...) - } + return logger.(Logger) } diff --git a/log/logrus/logrus.go b/log/logrus/logrus.go new file mode 100644 index 000000000..39319f397 --- /dev/null +++ b/log/logrus/logrus.go @@ -0,0 +1,34 @@ +// Package logrus implements a github.com/virtual-kubelet/virtual-kubelet/log.Logger using Logrus as a backend +// You can use this by creating a logrus logger and calling `FromLogrus(entry)`. +// If you want this to be the default logger for virtual-kubelet, set `log.L` to the value returned by `FromLogrus` +package logrus + +import ( + "github.com/Sirupsen/logrus" + "github.com/virtual-kubelet/virtual-kubelet/log" +) + +// Adapter implements the `log.Logger` interface for logrus +type Adapter struct { + *logrus.Entry +} + +// FromLogrus creates a new `log.Logger` from the provided entry +func FromLogrus(entry *logrus.Entry) log.Logger { + return &Adapter{entry} +} + +// WithField adds a field to the log entry. +func (l *Adapter) WithField(key string, val interface{}) log.Logger { + return FromLogrus(l.Entry.WithField(key, val)) +} + +// WithFields adds multiple fields to a log entry. +func (l *Adapter) WithFields(f log.Fields) log.Logger { + return FromLogrus(l.Entry.WithFields(logrus.Fields(f))) +} + +// WithError adds an error to the log entry +func (l *Adapter) WithError(err error) log.Logger { + return FromLogrus(l.Entry.WithError(err)) +} diff --git a/log/logrus/logrus_test.go b/log/logrus/logrus_test.go new file mode 100644 index 000000000..647d515a1 --- /dev/null +++ b/log/logrus/logrus_test.go @@ -0,0 +1,16 @@ +package logrus + +import ( + "testing" + + "github.com/Sirupsen/logrus" + "github.com/virtual-kubelet/virtual-kubelet/log" +) + +func TestImplementsLoggerInterface(t *testing.T) { + l := FromLogrus(&logrus.Entry{}) + + if _, ok := l.(log.Logger); !ok { + t.Fatal("does not implement log.Logger interface") + } +} diff --git a/log/nop.go b/log/nop.go new file mode 100644 index 000000000..4e5c90f04 --- /dev/null +++ b/log/nop.go @@ -0,0 +1,18 @@ +package log + +type nopLogger struct{} + +func (nopLogger) Debug(...interface{}) {} +func (nopLogger) Debugf(string, ...interface{}) {} +func (nopLogger) Info(...interface{}) {} +func (nopLogger) Infof(string, ...interface{}) {} +func (nopLogger) Warn(...interface{}) {} +func (nopLogger) Warnf(string, ...interface{}) {} +func (nopLogger) Error(...interface{}) {} +func (nopLogger) Errorf(string, ...interface{}) {} +func (nopLogger) Fatal(...interface{}) {} +func (nopLogger) Fatalf(string, ...interface{}) {} + +func (l nopLogger) WithField(string, interface{}) Logger { return l } +func (l nopLogger) WithFields(Fields) Logger { return l } +func (l nopLogger) WithError(error) Logger { return l } diff --git a/providers/azure/aci.go b/providers/azure/aci.go index bd1f2d7bc..88374cfb0 100644 --- a/providers/azure/aci.go +++ b/providers/azure/aci.go @@ -19,15 +19,14 @@ import ( "sync" "time" - "github.com/Sirupsen/logrus" "github.com/gorilla/websocket" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" client "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client" "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci" "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/network" - "go.opencensus.io/trace" - "k8s.io/api/core/v1" + "github.com/virtual-kubelet/virtual-kubelet/trace" + v1 "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -480,11 +479,11 @@ func getKubeProxyExtension(secretPath, masterURI, clusterCIDR string) (*aci.Exte return &extension, nil } -func addAzureAttributes(span *trace.Span, p *ACIProvider) { - span.AddAttributes( - trace.StringAttribute("azure.resourceGroup", p.resourceGroup), - trace.StringAttribute("azure.region", p.region), - ) +func addAzureAttributes(ctx context.Context, span trace.Span, p *ACIProvider) context.Context { + return span.WithFields(ctx, log.Fields{ + "azure.resourceGroup": p.resourceGroup, + "azure.region": p.region, + }) } // CreatePod accepts a Pod definition and creates @@ -492,7 +491,7 @@ func addAzureAttributes(span *trace.Span, p *ACIProvider) { func (p *ACIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "aci.CreatePod") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) var containerGroup aci.ContainerGroup containerGroup.Location = p.region @@ -694,7 +693,7 @@ func (p *ACIProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { func (p *ACIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "aci.DeletePod") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) err := p.aciClient.DeleteContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", pod.Namespace, pod.Name)) return wrapError(err) @@ -705,7 +704,7 @@ func (p *ACIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) { ctx, span := trace.StartSpan(ctx, "aci.GetPod") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) cg, err, status := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name)) if err != nil { @@ -726,7 +725,7 @@ func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { ctx, span := trace.StartSpan(ctx, "aci.GetContainerLogs") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) logContent := "" cg, err, _ := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName)) @@ -744,7 +743,6 @@ func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, cLogs, err := p.aciClient.GetContainerLogs(ctx, p.resourceGroup, cg.Name, containerName, tail) if err != nil { log.G(ctx).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying") - span.Annotate(nil, "Error getting container logs, retrying") time.Sleep(5000 * time.Millisecond) } else { logContent = cLogs.Content @@ -841,7 +839,7 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri func (p *ACIProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { ctx, span := trace.StartSpan(ctx, "aci.GetPodStatus") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) pod, err := p.GetPod(ctx, namespace, name) if err != nil { @@ -859,7 +857,7 @@ func (p *ACIProvider) GetPodStatus(ctx context.Context, namespace, name string) func (p *ACIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { ctx, span := trace.StartSpan(ctx, "aci.GetPods") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) cgs, err := p.aciClient.ListContainerGroups(ctx, p.resourceGroup) if err != nil { @@ -875,7 +873,7 @@ func (p *ACIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { p, err := containerGroupToPod(&c) if err != nil { - log.G(context.TODO()).WithFields(logrus.Fields{ + log.G(ctx).WithFields(log.Fields{ "name": c.Name, "id": c.ID, }).WithError(err).Error("error converting container group to pod") diff --git a/providers/azure/acsCredential.go b/providers/azure/acsCredential.go index b8dde860e..1b1e21c9e 100644 --- a/providers/azure/acsCredential.go +++ b/providers/azure/acsCredential.go @@ -25,7 +25,7 @@ type AcsCredential struct { // NewAcsCredential returns an AcsCredential struct from file path func NewAcsCredential(p string) (*AcsCredential, error) { logger := log.G(context.TODO()).WithField("method", "NewAcsCredential").WithField("file", p) - log.Trace(logger, "Reading ACS credential file") + logger.Debug("Reading ACS credential file") b, err := ioutil.ReadFile(p) if err != nil { @@ -38,6 +38,6 @@ func NewAcsCredential(p string) (*AcsCredential, error) { return nil, err } - log.Trace(logger, "Load ACS credential file successfully") + logger.Debug("Load ACS credential file successfully") return &cred, nil } diff --git a/providers/azure/metrics.go b/providers/azure/metrics.go index a6fc83473..e88e4f0f1 100644 --- a/providers/azure/metrics.go +++ b/providers/azure/metrics.go @@ -7,10 +7,11 @@ import ( "github.com/cpuguy83/strongerrors/status/ocstatus" "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci" - "go.opencensus.io/trace" + "github.com/virtual-kubelet/virtual-kubelet/trace" "golang.org/x/sync/errgroup" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) @@ -19,17 +20,24 @@ import ( func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summary, err error) { ctx, span := trace.StartSpan(ctx, "GetSummaryStats") defer span.End() - addAzureAttributes(span, p) + ctx = addAzureAttributes(ctx, span, p) p.metricsSync.Lock() defer p.metricsSync.Unlock() - span.Annotate(nil, "acquired metrics mutex") + + log.G(ctx).Debug("acquired metrics mutex") if time.Now().Sub(p.metricsSyncTime) < time.Minute { - span.AddAttributes(trace.BoolAttribute("preCachedResult", true), trace.StringAttribute("cachedResultSampleTime", p.metricsSyncTime.String())) + span.WithFields(ctx, log.Fields{ + "preCachedResult": true, + "cachedResultSampleTime": p.metricsSyncTime.String(), + }) return p.lastMetric, nil } - span.AddAttributes(trace.BoolAttribute("preCachedResult", false), trace.StringAttribute("cachedResultSampleTime", p.metricsSyncTime.String())) + ctx = span.WithFields(ctx, log.Fields{ + "preCachedResult": false, + "cachedResultSampleTime": p.metricsSyncTime.String(), + }) select { case <-ctx.Done(): @@ -62,11 +70,11 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa errGroup.Go(func() error { ctx, span := trace.StartSpan(ctx, "getPodMetrics") defer span.End() - span.AddAttributes( - trace.StringAttribute("UID", string(pod.UID)), - trace.StringAttribute("Name", pod.Name), - trace.StringAttribute("Namespace", pod.Namespace), - ) + logger := log.G(ctx).WithFields(log.Fields{ + "UID": string(pod.UID), + "Name": pod.Name, + "Namespace": pod.Namespace, + }) select { case <-ctx.Done(): @@ -77,7 +85,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa <-sema }() - span.Annotate(nil, "Acquired semaphore") + logger.Debug("Acquired semaphore") cgName := containerGroupName(pod) // cpu/mem and net stats are split because net stats do not support container level detail @@ -92,7 +100,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa span.SetStatus(ocstatus.FromError(err)) return errors.Wrapf(err, "error fetching cpu/mem stats for container group %s", cgName) } - span.Annotate(nil, "Got system stats") + logger.Debug("Got system stats") netStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{ Start: start, @@ -104,7 +112,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa span.SetStatus(ocstatus.FromError(err)) return errors.Wrapf(err, "error fetching network stats for container group %s", cgName) } - span.Annotate(nil, "Got network stats") + logger.Debug("Got network stats") chResult <- collectMetrics(pod, systemStats, netStats) return nil @@ -112,10 +120,11 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa } if err := errGroup.Wait(); err != nil { + span.SetStatus(ocstatus.FromError(err)) return nil, errors.Wrap(err, "error in request to fetch container group metrics") } close(chResult) - span.Annotate([]trace.Attribute{trace.Int64Attribute("nPods", int64(len(pods)))}, "Collected stats from Azure") + log.G(ctx).Debugf("Collected status from azure for %d pods", len(pods)) var s stats.Summary s.Node = stats.NodeStats{ diff --git a/providers/mock/mock.go b/providers/mock/mock.go index b48208c01..83fe86364 100644 --- a/providers/mock/mock.go +++ b/providers/mock/mock.go @@ -6,13 +6,14 @@ import ( "fmt" "io" "io/ioutil" - "log" "math/rand" "time" "github.com/cpuguy83/strongerrors" - "go.opencensus.io/trace" - "k8s.io/api/core/v1" + "github.com/cpuguy83/strongerrors/status/ocstatus" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -114,9 +115,9 @@ func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { defer span.End() // Add the pod's coordinates to the current span. - addAttributes(span, namespaceKey, pod.Namespace, nameKey, pod.Name) + ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name) - log.Printf("receive CreatePod %q\n", pod.Name) + log.G(ctx).Info("receive CreatePod %q", pod.Name) key, err := buildKey(pod) if err != nil { @@ -134,9 +135,9 @@ func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { defer span.End() // Add the pod's coordinates to the current span. - addAttributes(span, namespaceKey, pod.Namespace, nameKey, pod.Name) + ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name) - log.Printf("receive UpdatePod %q\n", pod.Name) + log.G(ctx).Info("receive UpdatePod %q", pod.Name) key, err := buildKey(pod) if err != nil { @@ -154,9 +155,9 @@ func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { defer span.End() // Add the pod's coordinates to the current span. - addAttributes(span, namespaceKey, pod.Namespace, nameKey, pod.Name) + ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name) - log.Printf("receive DeletePod %q\n", pod.Name) + log.G(ctx).Info("receive DeletePod %q", pod.Name) key, err := buildKey(pod) if err != nil { @@ -175,12 +176,15 @@ func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { // GetPod returns a pod by name that is stored in memory. func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { ctx, span := trace.StartSpan(ctx, "GetPod") - defer span.End() + defer func() { + span.SetStatus(ocstatus.FromError(err)) + span.End() + }() // Add the pod's coordinates to the current span. - addAttributes(span, namespaceKey, namespace, nameKey, name) + ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name) - log.Printf("receive GetPod %q\n", name) + log.G(ctx).Info("receive GetPod %q", name) key, err := buildKeyFromNames(namespace, name) if err != nil { @@ -199,9 +203,9 @@ func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, defer span.End() // Add pod and container attributes to the current span. - addAttributes(span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName) + ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName) - log.Printf("receive GetContainerLogs %q\n", podName) + log.G(ctx).Info("receive GetContainerLogs %q", podName) return "", nil } @@ -214,7 +218,7 @@ func (p *MockProvider) GetPodFullName(namespace string, pod string) string { // ExecInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. func (p *MockProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { - log.Printf("receive ExecInContainer %q\n", container) + log.G(context.TODO()).Info("receive ExecInContainer %q", container) return nil } @@ -225,9 +229,9 @@ func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) defer span.End() // Add namespace and name as attributes to the current span. - addAttributes(span, namespaceKey, namespace, nameKey, name) + ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name) - log.Printf("receive GetPodStatus %q\n", name) + log.G(ctx).Info("receive GetPodStatus %q", name) now := metav1.NewTime(time.Now()) @@ -279,7 +283,7 @@ func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { ctx, span := trace.StartSpan(ctx, "GetPods") defer span.End() - log.Printf("receive GetPods\n") + log.G(ctx).Info("receive GetPods") var pods []*v1.Pod @@ -485,11 +489,12 @@ func buildKey(pod *v1.Pod) (string, error) { // attrs must be an even-sized list of string arguments. // Otherwise, the span won't be modified. // TODO: Refactor and move to a "tracing utilities" package. -func addAttributes(span *trace.Span, attrs ...string) { +func addAttributes(ctx context.Context, span trace.Span, attrs ...string) context.Context { if len(attrs)%2 == 1 { - return + return ctx } for i := 0; i < len(attrs); i += 2 { - span.AddAttributes(trace.StringAttribute(attrs[i], attrs[i+1])) + ctx = span.WithField(ctx, attrs[i], attrs[i+1]) } + return ctx } diff --git a/trace/nop.go b/trace/nop.go new file mode 100644 index 000000000..2954ec63c --- /dev/null +++ b/trace/nop.go @@ -0,0 +1,23 @@ +package trace + +import ( + "context" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "go.opencensus.io/trace" +) + +type nopTracer struct{} + +func (nopTracer) StartSpan(ctx context.Context, _ string) (context.Context, Span) { + return ctx, &nopSpan{} +} + +type nopSpan struct{} + +func (nopSpan) End() {} +func (nopSpan) SetStatus(trace.Status) {} +func (nopSpan) Logger() log.Logger { return nil } + +func (nopSpan) WithField(ctx context.Context, _ string, _ interface{}) context.Context { return ctx } +func (nopSpan) WithFields(ctx context.Context, _ log.Fields) context.Context { return ctx } diff --git a/trace/opencensus/opencensus.go b/trace/opencensus/opencensus.go new file mode 100644 index 000000000..afaef4f4d --- /dev/null +++ b/trace/opencensus/opencensus.go @@ -0,0 +1,236 @@ +// Package opencensus implements a github.com/virtual-kubelet/virtual-kubelet/trace.Tracer +// using opencensus as a backend. +// +// Use this by setting `trace.T = Adapter{}` +package opencensus + +import ( + "context" + "fmt" + "sync" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" + octrace "go.opencensus.io/trace" +) + +const ( + lDebug = "DEBUG" + lInfo = "INFO" + lWarn = "WARN" + lErr = "ERROR" + lFatal = "FATAL" +) + +// Adapter implements the trace.Tracer interface for OpenCensus +type Adapter struct{} + +// StartSpan creates a new span from opencensus using the given name. +func (Adapter) StartSpan(ctx context.Context, name string) (context.Context, trace.Span) { + ctx, ocs := octrace.StartSpan(ctx, name) + l := log.G(ctx).WithField("method", name) + + s := &span{s: ocs, l: l} + ctx = log.WithLogger(ctx, s.Logger()) + + return ctx, s +} + +type span struct { + mu sync.Mutex + s *octrace.Span + l log.Logger +} + +func (s *span) End() { + s.s.End() +} + +func (s *span) SetStatus(status trace.Status) { + s.s.SetStatus(status) +} + +func (s *span) WithField(ctx context.Context, key string, val interface{}) context.Context { + s.mu.Lock() + s.l = s.l.WithField(key, val) + ctx = log.WithLogger(ctx, &logger{s: s.s, l: s.l}) + s.mu.Unlock() + + if s.s.IsRecordingEvents() { + s.s.AddAttributes(makeAttribute(key, val)) + } + + return ctx +} + +func (s *span) WithFields(ctx context.Context, f log.Fields) context.Context { + s.mu.Lock() + s.l = s.l.WithFields(f) + ctx = log.WithLogger(ctx, &logger{s: s.s, l: s.l}) + s.mu.Unlock() + + if s.s.IsRecordingEvents() { + attrs := make([]octrace.Attribute, 0, len(f)) + for k, v := range f { + attrs = append(attrs, makeAttribute(k, v)) + } + s.s.AddAttributes(attrs...) + } + + return ctx +} + +func (s *span) Logger() log.Logger { + return &logger{s: s.s, l: s.l} +} + +type logger struct { + s *octrace.Span + l log.Logger + a []octrace.Attribute +} + +func (l *logger) Debug(args ...interface{}) { + if !l.s.IsRecordingEvents() { + l.l.Debug(args...) + return + } + + msg := fmt.Sprint(args...) + l.l.Debug(msg) + l.s.Annotate(withLevel(lDebug, l.a), msg) +} + +func (l *logger) Debugf(f string, args ...interface{}) { + l.l.Debugf(f, args) + l.s.Annotatef(withLevel(lDebug, l.a), f, args...) +} + +func (l *logger) Info(args ...interface{}) { + if !l.s.IsRecordingEvents() { + l.l.Info(args...) + return + } + + msg := fmt.Sprint(args...) + l.l.Info(msg) + l.s.Annotate(withLevel(lInfo, l.a), msg) +} + +func (l *logger) Infof(f string, args ...interface{}) { + l.l.Infof(f, args) + l.s.Annotatef(withLevel(lInfo, l.a), f, args...) +} + +func (l *logger) Warn(args ...interface{}) { + if !l.s.IsRecordingEvents() { + l.l.Warn(args...) + return + } + + msg := fmt.Sprint(args...) + l.l.Warn(msg) + l.s.Annotate(withLevel(lWarn, l.a), msg) +} + +func (l *logger) Warnf(f string, args ...interface{}) { + l.l.Warnf(f, args) + l.s.Annotatef(withLevel(lWarn, l.a), f, args...) +} + +func (l *logger) Error(args ...interface{}) { + if !l.s.IsRecordingEvents() { + l.l.Error(args...) + return + } + + msg := fmt.Sprint(args...) + l.l.Error(msg) + l.s.Annotate(withLevel(lErr, l.a), msg) +} + +func (l *logger) Errorf(f string, args ...interface{}) { + l.l.Errorf(f, args) + l.s.Annotatef(withLevel(lErr, l.a), f, args...) +} + +func (l *logger) Fatal(args ...interface{}) { + if !l.s.IsRecordingEvents() { + l.l.Fatal(args...) + return + } + + msg := fmt.Sprint(args...) + l.s.Annotate(withLevel(lFatal, l.a), msg) + l.l.Fatal(msg) +} + +func (l *logger) Fatalf(f string, args ...interface{}) { + l.s.Annotatef(withLevel(lFatal, l.a), f, args...) + l.l.Fatalf(f, args) +} + +func (l *logger) WithError(err error) log.Logger { + log := l.l.WithError(err) + + var a []octrace.Attribute + if l.s.IsRecordingEvents() { + a = make([]octrace.Attribute, len(l.a), len(l.a)+1) + copy(a, l.a) + a = append(l.a, makeAttribute("err", err)) + } + + return &logger{s: l.s, l: log, a: a} +} + +func (l *logger) WithField(k string, value interface{}) log.Logger { + log := l.l.WithField(k, value) + + var a []octrace.Attribute + + if l.s.IsRecordingEvents() { + a = make([]octrace.Attribute, len(l.a), len(l.a)+1) + copy(a, l.a) + a = append(a, makeAttribute(k, value)) + } + + return &logger{s: l.s, a: a, l: log} +} + +func (l *logger) WithFields(fields log.Fields) log.Logger { + log := l.l.WithFields(fields) + + var a []octrace.Attribute + if l.s.IsRecordingEvents() { + a = make([]octrace.Attribute, len(l.a), len(l.a)+len(fields)) + copy(a, l.a) + for k, v := range fields { + a = append(a, makeAttribute(k, v)) + } + } + + return &logger{s: l.s, a: a, l: log} +} + +func makeAttribute(key string, val interface{}) octrace.Attribute { + var attr octrace.Attribute + + switch v := val.(type) { + case string: + attr = octrace.StringAttribute(key, v) + case int64: + attr = octrace.Int64Attribute(key, v) + case bool: + attr = octrace.BoolAttribute(key, v) + case error: + attr = octrace.StringAttribute(key, v.Error()) + default: + attr = octrace.StringAttribute(key, fmt.Sprintf("%+v", val)) + } + + return attr +} + +func withLevel(l string, attrs []octrace.Attribute) []octrace.Attribute { + return append(attrs, octrace.StringAttribute("level", l)) +} diff --git a/trace/opencensus/opencensus_test.go b/trace/opencensus/opencensus_test.go new file mode 100644 index 000000000..ed3919341 --- /dev/null +++ b/trace/opencensus/opencensus_test.go @@ -0,0 +1,13 @@ +package opencensus + +import ( + "testing" + + "github.com/virtual-kubelet/virtual-kubelet/trace" +) + +func TestTracerImplementsTracer(t *testing.T) { + // ensure that Adapter implements trace.Tracer + if tt := trace.Tracer(Adapter{}); tt == nil { + } +} diff --git a/trace/trace.go b/trace/trace.go new file mode 100644 index 000000000..752432d4c --- /dev/null +++ b/trace/trace.go @@ -0,0 +1,58 @@ +// Package trace abstracts virtual-kubelet's tracing capabilties into a set of +// interfaces. +// While this does allow consumers to use whatever tracing library they want, +// the primary goal is to share logging data between the configured logger and +// tracing spans instead of duplicating calls. +package trace + +import ( + "context" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "go.opencensus.io/trace" +) + +// Status is an alias to opencensus's trace status. +// The main reason we use this instead of implementing our own is library re-use, +// namely for converting an error to a tracing status. +// In the future this may be defined completely in this package. +type Status = trace.Status + +// Tracer is the interface used for creating a tracing span +type Tracer interface { + // StartSpan starts a new span. The span details are emebedded into the returned + // context + StartSpan(context.Context, string) (context.Context, Span) +} + +var ( + // T is the Tracer to use this should be initialized before starting up + // virtual-kubelet + T Tracer = nopTracer{} +) + +// StartSpan starts a span from the configured default tracer +func StartSpan(ctx context.Context, name string) (context.Context, Span) { + ctx, span := T.StartSpan(ctx, name) + ctx = log.WithLogger(ctx, span.Logger()) + return ctx, span +} + +// Span encapsulates a tracing event +type Span interface { + End() + SetStatus(Status) + + // WithField and WithFields adds attributes to an entire span + // + // This interface is a bit weird, but allows us to manage loggers in the context + // It is expected that implementations set `log.WithLogger` so the logger stored + // in the context is updated with the new fields. + WithField(context.Context, string, interface{}) context.Context + WithFields(context.Context, log.Fields) context.Context + + // Logger is used to log individual entries. + // Calls to functions like `WithField` and `WithFields` on the logger should + // not affect the rest of the span but rather individual entries. + Logger() log.Logger +} diff --git a/vkubelet/api/helpers.go b/vkubelet/api/helpers.go index 95e651b56..a9022aba7 100644 --- a/vkubelet/api/helpers.go +++ b/vkubelet/api/helpers.go @@ -25,7 +25,7 @@ func handleError(f handlerFunc) http.HandlerFunc { if code >= 500 { logger.Error("Internal server error on request") } else { - log.Trace(logger, "Error on request") + logger.Debug("Error on request") } } } diff --git a/vkubelet/apiserver.go b/vkubelet/apiserver.go index 39ca4d86c..b4f7251fc 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/apiserver.go @@ -3,7 +3,6 @@ package vkubelet import ( "net/http" - "github.com/Sirupsen/logrus" "github.com/gorilla/mux" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/providers" @@ -73,7 +72,7 @@ func AttachMetricsRoutes(p providers.Provider, mux ServeMux) { func instrumentRequest(r *http.Request) *http.Request { ctx := r.Context() - logger := log.G(ctx).WithFields(logrus.Fields{ + logger := log.G(ctx).WithFields(log.Fields{ "uri": r.RequestURI, "vars": mux.Vars(r), }) @@ -96,14 +95,12 @@ func InstrumentHandler(h http.Handler) http.Handler { // NotFound provides a handler for cases where the requested endpoint doesn't exist func NotFound(w http.ResponseWriter, r *http.Request) { - logger := log.G(r.Context()) - log.Trace(logger, "404 request not found") + log.G(r.Context()).Debug("404 request not found") http.Error(w, "404 request not found", http.StatusNotFound) } // NotImplemented provides a handler for cases where a provider does not implement a given API func NotImplemented(w http.ResponseWriter, r *http.Request) { - logger := log.G(r.Context()) - log.Trace(logger, "501 not implemented") + log.G(r.Context()).Debug("501 not implemented") http.Error(w, "501 not implemented", http.StatusNotImplemented) } diff --git a/vkubelet/node.go b/vkubelet/node.go index a6c990a9d..ce0b0bb5d 100644 --- a/vkubelet/node.go +++ b/vkubelet/node.go @@ -4,10 +4,10 @@ import ( "context" "strings" + "github.com/cpuguy83/strongerrors/status/ocstatus" "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" "github.com/virtual-kubelet/virtual-kubelet/version" - - "go.opencensus.io/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -57,12 +57,11 @@ func (s *Server) registerNode(ctx context.Context) error { DaemonEndpoints: *s.provider.NodeDaemonEndpoints(ctx), }, } - addNodeAttributes(span, node) + ctx = addNodeAttributes(ctx, span, node) if _, err := s.k8sClient.CoreV1().Nodes().Create(node); err != nil && !errors.IsAlreadyExists(err) { - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + span.SetStatus(ocstatus.FromError(err)) return err } - span.Annotate(nil, "Registered node with k8s") log.G(ctx).Info("Registered node") @@ -77,19 +76,20 @@ func (s *Server) updateNode(ctx context.Context) { opts := metav1.GetOptions{} n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts) if err != nil && !errors.IsNotFound(err) { - log.G(ctx).WithError(err).Error("Failed to retrieve node") - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + log.G(ctx).WithError(err).Error("Failed to retrive node") + span.SetStatus(ocstatus.FromError(err)) return } - addNodeAttributes(span, n) - span.Annotate(nil, "Fetched node details from k8s") + + ctx = addNodeAttributes(ctx, span, n) + log.G(ctx).Debug("Fetched node details from k8s") if errors.IsNotFound(err) { if err = s.registerNode(ctx); err != nil { log.G(ctx).WithError(err).Error("Failed to register node") - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + span.SetStatus(ocstatus.FromError(err)) } else { - span.Annotate(nil, "Registered node in k8s") + log.G(ctx).Debug("Registered node in k8s") } return } @@ -106,7 +106,7 @@ func (s *Server) updateNode(ctx context.Context) { n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n) if err != nil { log.G(ctx).WithError(err).Error("Failed to update node") - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + span.SetStatus(ocstatus.FromError(err)) return } } @@ -125,13 +125,11 @@ func (t taintsStringer) String() string { return s } -func addNodeAttributes(span *trace.Span, n *corev1.Node) { - span.AddAttributes( - trace.StringAttribute("UID", string(n.UID)), - trace.StringAttribute("name", n.Name), - trace.StringAttribute("cluster", n.ClusterName), - ) - if span.IsRecordingEvents() { - span.AddAttributes(trace.StringAttribute("taints", taintsStringer(n.Spec.Taints).String())) - } +func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) context.Context { + return span.WithFields(ctx, log.Fields{ + "UID": string(n.UID), + "name": n.Name, + "cluster": n.ClusterName, + "tains": taintsStringer(n.Spec.Taints), + }) } diff --git a/vkubelet/pod.go b/vkubelet/pod.go index 9dff71d3c..90cf34fed 100644 --- a/vkubelet/pod.go +++ b/vkubelet/pod.go @@ -2,29 +2,27 @@ package vkubelet import ( "context" - "fmt" "sync" "time" "github.com/cpuguy83/strongerrors/status/ocstatus" pkgerrors "github.com/pkg/errors" - "go.opencensus.io/trace" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" - - "github.com/virtual-kubelet/virtual-kubelet/log" ) -func addPodAttributes(span *trace.Span, pod *corev1.Pod) { - span.AddAttributes( - trace.StringAttribute("uid", string(pod.GetUID())), - trace.StringAttribute("namespace", pod.GetNamespace()), - trace.StringAttribute("name", pod.GetName()), - trace.StringAttribute("phase", string(pod.Status.Phase)), - trace.StringAttribute("reason", pod.Status.Reason), - ) +func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { + return span.WithFields(ctx, log.Fields{ + "uid": string(pod.GetUID()), + "namespace": pod.GetNamespace(), + "name": pod.GetName(), + "phase": string(pod.Status.Phase), + "reason": pod.Status.Reason, + }) } func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorder record.EventRecorder) error { @@ -40,14 +38,17 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde ctx, span := trace.StartSpan(ctx, "createOrUpdatePod") defer span.End() - addPodAttributes(span, pod) + addPodAttributes(ctx, span, pod) if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil { - span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: err.Error()}) + span.SetStatus(ocstatus.FromError(err)) return err } - logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) + ctx = span.WithFields(ctx, log.Fields{ + "pod": pod.GetName(), + "namespace": pod.GetNamespace(), + }) if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { podPhase := corev1.PodPending @@ -60,19 +61,23 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde pod.Status.Reason = podStatusReasonProviderFailed pod.Status.Message = origErr.Error() + logger := log.G(ctx).WithFields(log.Fields{ + "podPhase": podPhase, + "reason": pod.Status.Reason, + }) + _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) if err != nil { logger.WithError(err).Warn("Failed to update pod status") } else { - span.Annotate(nil, "Updated k8s pod status") + logger.Info("Updated k8s pod status") } - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: origErr.Error()}) + span.SetStatus(ocstatus.FromError(origErr)) return origErr } - span.Annotate(nil, "Created pod in provider") - logger.Info("Pod created") + log.G(ctx).Info("Created pod in provider") return nil } @@ -89,23 +94,22 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error { ctx, span := trace.StartSpan(ctx, "deletePod") defer span.End() - addPodAttributes(span, pod) + ctx = addPodAttributes(ctx, span, pod) var delErr error if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) { - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: delErr.Error()}) + span.SetStatus(ocstatus.FromError(delErr)) return delErr } - span.Annotate(nil, "Deleted pod from provider") - logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) + log.G(ctx).Debug("Deleted pod from provider") + if !errors.IsNotFound(delErr) { if err := s.forceDeletePodResource(ctx, namespace, name); err != nil { span.SetStatus(ocstatus.FromError(err)) return err } - span.Annotate(nil, "Deleted pod from k8s") - logger.Info("Pod deleted") + log.G(ctx).Info("Deleted pod from Kubernetes") } return nil @@ -114,19 +118,19 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error { func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name string) error { ctx, span := trace.StartSpan(ctx, "forceDeletePodResource") defer span.End() - span.AddAttributes( - trace.StringAttribute("namespace", namespace), - trace.StringAttribute("name", name), - ) + ctx = span.WithFields(ctx, log.Fields{ + "namespace": namespace, + "name": name, + }) var grace int64 if err := s.k8sClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil { if errors.IsNotFound(err) { - span.Annotate(nil, "Pod does not exist in Kubernetes, nothing to delete") + log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete") return nil } - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - return fmt.Errorf("Failed to delete Kubernetes pod: %s", err) + span.SetStatus(ocstatus.FromError(err)) + return pkgerrors.Wrap(err, "Failed to delete Kubernetes pod") } return nil } @@ -138,7 +142,8 @@ func (s *Server) updatePodStatuses(ctx context.Context) { // Update all the pods with the provider status. pods := s.resourceManager.GetPods() - span.AddAttributes(trace.Int64Attribute("nPods", int64(len(pods)))) + + ctx = span.WithField(ctx, "nPods", int64(len(pods))) sema := make(chan struct{}, s.podSyncWorkers) var wg sync.WaitGroup @@ -157,8 +162,12 @@ func (s *Server) updatePodStatuses(ctx context.Context) { defer func() { <-sema }() if err := s.updatePodStatus(ctx, pod); err != nil { - logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()).WithField("status", pod.Status.Phase).WithField("reason", pod.Status.Reason) - logger.Error(err) + log.G(ctx).WithFields(log.Fields{ + "pod": pod.GetName(), + "namespace": pod.GetNamespace(), + "status": pod.Status.Phase, + "reason": pod.Status.Reason, + }).Error(err) } }(pod) @@ -170,7 +179,7 @@ func (s *Server) updatePodStatuses(ctx context.Context) { func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "updatePodStatus") defer span.End() - addPodAttributes(span, pod) + ctx = addPodAttributes(ctx, span, pod) if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed || @@ -214,9 +223,10 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { return pkgerrors.Wrap(err, "error while updating pod status in kubernetes") } - span.Annotate([]trace.Attribute{ - trace.StringAttribute("new phase", string(pod.Status.Phase)), - trace.StringAttribute("new reason", pod.Status.Reason), - }, "updated pod status in kubernetes") + log.G(ctx).WithFields(log.Fields{ + "new phase": string(pod.Status.Phase), + "new reason": pod.Status.Reason, + }).Debug("Updated pod status in kubernetes") + return nil } diff --git a/vkubelet/podcontroller.go b/vkubelet/podcontroller.go index 6b69e5e7d..d6a795dec 100644 --- a/vkubelet/podcontroller.go +++ b/vkubelet/podcontroller.go @@ -24,11 +24,11 @@ import ( "github.com/cpuguy83/strongerrors/status/ocstatus" pkgerrors "github.com/pkg/errors" - "go.opencensus.io/trace" + "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers/core/v1" + v1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" @@ -169,7 +169,7 @@ func (pc *PodController) processNextWorkItem(ctx context.Context, workerId strin defer span.End() // Add the ID of the current worker as an attribute to the current span. - span.AddAttributes(trace.StringAttribute("workerId", workerId)) + ctx = span.WithField(ctx, "workerId", workerId) // We wrap this block in a func so we can defer pc.workqueue.Done. err := func(obj interface{}) error { @@ -190,7 +190,7 @@ func (pc *PodController) processNextWorkItem(ctx context.Context, workerId strin return nil } // Add the current key as an attribute to the current span. - span.AddAttributes(trace.StringAttribute("key", key)) + ctx = span.WithField(ctx, "key", key) // Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced. if err := pc.syncHandler(ctx, key); err != nil { if pc.workqueue.NumRequeues(key) < maxRetries { @@ -224,7 +224,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error { defer span.End() // Add the current key as an attribute to the current span. - span.AddAttributes(trace.StringAttribute("key", key)) + ctx = span.WithField(ctx, "key", key) // Convert the namespace/name string into a distinct namespace and name. namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -263,7 +263,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) defer span.End() // Add the pod's attributes to the current span. - addPodAttributes(span, pod) + ctx = addPodAttributes(ctx, span, pod) // Check whether the pod has been marked for deletion. // If it does, guarantee it is deleted in the provider and Kubernetes. @@ -344,7 +344,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int }() // Add the pod's attributes to the current span. - addPodAttributes(span, pod) + ctx = addPodAttributes(ctx, span, pod) // Actually delete the pod. if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil { span.SetStatus(ocstatus.FromError(err))