Make tracing interface to coalesce logging/tracing (#519)

* Define and use an interface for logging.

This allows alternative implementations to use whatever logging package
they want.

Currently the interface just mimicks what logrus already implements,
with minor modifications to not rely on logrus itself. I think the
interface is pretty solid in terms of logging implementations being able
to do what they need to.

* Make tracing interface to coalesce logging/tracing

Allows us to share data between the tracer and the logger so we can
simplify log/trace handling wher we generally want data to go both
places.
This commit is contained in:
Brian Goff
2019-02-22 11:36:03 -08:00
committed by GitHub
parent bd103a43bd
commit 1bfffa975e
18 changed files with 605 additions and 188 deletions

View File

@@ -30,19 +30,21 @@ import (
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opencensus.io/trace"
"github.com/virtual-kubelet/virtual-kubelet/log"
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/providers/register"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"github.com/virtual-kubelet/virtual-kubelet/trace/opencensus"
"github.com/virtual-kubelet/virtual-kubelet/vkubelet"
octrace "go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/providers/register"
"github.com/virtual-kubelet/virtual-kubelet/vkubelet"
)
const (
@@ -161,6 +163,10 @@ func (mv mapVar) Type() string {
}
func init() {
// make sure the default logger/tracer is initialized
log.L = logruslogger.FromLogrus(logrus.NewEntry(logrus.StandardLogger()))
trace.T = opencensus.Adapter{}
cobra.OnInitialize(initConfig)
// read default node name from environment variable.
@@ -242,19 +248,21 @@ func initConfig() {
log.G(context.TODO()).WithField("OperatingSystem", operatingSystem).Fatalf("Operating system not supported. Valid options are: %s", strings.Join(providers.ValidOperatingSystems.Names(), " | "))
}
level, err := log.ParseLevel(logLevel)
level, err := logrus.ParseLevel(logLevel)
if err != nil {
log.G(context.TODO()).WithField("logLevel", logLevel).Fatal("log level is not supported")
}
logrus.SetLevel(level)
logger := log.L.WithFields(logrus.Fields{
logger := logruslogger.FromLogrus(logrus.WithFields(logrus.Fields{
"provider": provider,
"operatingSystem": operatingSystem,
"node": nodeName,
"namespace": kubeNamespace,
})
}))
rootContext = log.WithLogger(rootContext, logger)
log.L = logger
if !disableTaint {
@@ -339,16 +347,16 @@ func initConfig() {
if err != nil {
log.L.WithError(err).WithField("exporter", e).Fatal("Cannot initialize exporter")
}
trace.RegisterExporter(exporter)
octrace.RegisterExporter(exporter)
}
if len(userTraceExporters) > 0 {
var s trace.Sampler
var s octrace.Sampler
switch strings.ToLower(traceSampler) {
case "":
case "always":
s = trace.AlwaysSample()
s = octrace.AlwaysSample()
case "never":
s = trace.NeverSample()
s = octrace.NeverSample()
default:
rate, err := strconv.Atoi(traceSampler)
if err != nil {
@@ -357,12 +365,12 @@ func initConfig() {
if rate < 0 || rate > 100 {
logger.WithField("rate", traceSampler).Fatal("trace sample rate must not be less than zero or greater than 100")
}
s = trace.ProbabilitySampler(float64(rate) / 100)
s = octrace.ProbabilitySampler(float64(rate) / 100)
}
if s != nil {
trace.ApplyConfig(
trace.Config{
octrace.ApplyConfig(
octrace.Config{
DefaultSampler: s,
},
)

View File

@@ -14,77 +14,71 @@
limitations under the License.
*/
// Package log defines the interfaces used for logging in virtual-kubelet.
// It uses a context.Context to store logger details. Additionally you can set
// the default logger to use by setting log.L. This is used when no logger is
// stored in the passed in context.
package log
import (
"context"
"sync/atomic"
"github.com/Sirupsen/logrus"
)
var (
// G is an alias for GetLogger.
//
// We may want to define this locally to a package to get package tagged log
// messages.
G = GetLogger
// L is an alias for the the standard logger.
L = logrus.NewEntry(logrus.StandardLogger())
// L is the default logger. It should be initialized before using `G` or `GetLogger`
// If L is unitialized and no logger is available in a provided context, a
// panic will occur.
L Logger = nopLogger{}
)
type (
loggerKey struct{}
)
// TraceLevel is the log level for tracing. Trace level is lower than debug level,
// and is usually used to trace detailed behavior of the program.
const TraceLevel = logrus.Level(uint32(logrus.DebugLevel + 1))
// Logger is the interface used for logging in virtual-kubelet
//
// virtual-kubelet will access the logger via context using `GetLogger` (or its alias, `G`)
// You can set the default logger to use by setting the `L` variable.
type Logger interface {
Debug(...interface{})
Debugf(string, ...interface{})
Info(...interface{})
Infof(string, ...interface{})
Warn(...interface{})
Warnf(string, ...interface{})
Error(...interface{})
Errorf(string, ...interface{})
Fatal(...interface{})
Fatalf(string, ...interface{})
// RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to
// ensure the formatted time is always the same number of characters.
const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
// ParseLevel takes a string level and returns the Logrus log level constant.
// It supports trace level.
func ParseLevel(lvl string) (logrus.Level, error) {
if lvl == "trace" {
return TraceLevel, nil
}
return logrus.ParseLevel(lvl)
WithField(string, interface{}) Logger
WithFields(Fields) Logger
WithError(error) Logger
}
// Fields allows setting multiple fields on a logger at one time.
type Fields map[string]interface{}
// WithLogger returns a new context with the provided logger. Use in
// combination with logger.WithField(s) for great effect.
func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context {
func WithLogger(ctx context.Context, logger Logger) context.Context {
return context.WithValue(ctx, loggerKey{}, logger)
}
// GetLogger retrieves the current logger from the context. If no logger is
// available, the default logger is returned.
func GetLogger(ctx context.Context) *logrus.Entry {
func GetLogger(ctx context.Context) Logger {
logger := ctx.Value(loggerKey{})
if logger == nil {
if L == nil {
panic("default logger not initialized")
}
return L
}
return logger.(*logrus.Entry)
}
// Trace logs a message at level Trace with the log entry passed-in.
func Trace(e *logrus.Entry, args ...interface{}) {
level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level)))
if level >= TraceLevel {
e.Debug(args...)
}
}
// Tracef logs a message at level Trace with the log entry passed-in.
func Tracef(e *logrus.Entry, format string, args ...interface{}) {
level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level)))
if level >= TraceLevel {
e.Debugf(format, args...)
}
return logger.(Logger)
}

34
log/logrus/logrus.go Normal file
View File

@@ -0,0 +1,34 @@
// Package logrus implements a github.com/virtual-kubelet/virtual-kubelet/log.Logger using Logrus as a backend
// You can use this by creating a logrus logger and calling `FromLogrus(entry)`.
// If you want this to be the default logger for virtual-kubelet, set `log.L` to the value returned by `FromLogrus`
package logrus
import (
"github.com/Sirupsen/logrus"
"github.com/virtual-kubelet/virtual-kubelet/log"
)
// Adapter implements the `log.Logger` interface for logrus
type Adapter struct {
*logrus.Entry
}
// FromLogrus creates a new `log.Logger` from the provided entry
func FromLogrus(entry *logrus.Entry) log.Logger {
return &Adapter{entry}
}
// WithField adds a field to the log entry.
func (l *Adapter) WithField(key string, val interface{}) log.Logger {
return FromLogrus(l.Entry.WithField(key, val))
}
// WithFields adds multiple fields to a log entry.
func (l *Adapter) WithFields(f log.Fields) log.Logger {
return FromLogrus(l.Entry.WithFields(logrus.Fields(f)))
}
// WithError adds an error to the log entry
func (l *Adapter) WithError(err error) log.Logger {
return FromLogrus(l.Entry.WithError(err))
}

16
log/logrus/logrus_test.go Normal file
View File

@@ -0,0 +1,16 @@
package logrus
import (
"testing"
"github.com/Sirupsen/logrus"
"github.com/virtual-kubelet/virtual-kubelet/log"
)
func TestImplementsLoggerInterface(t *testing.T) {
l := FromLogrus(&logrus.Entry{})
if _, ok := l.(log.Logger); !ok {
t.Fatal("does not implement log.Logger interface")
}
}

18
log/nop.go Normal file
View File

@@ -0,0 +1,18 @@
package log
type nopLogger struct{}
func (nopLogger) Debug(...interface{}) {}
func (nopLogger) Debugf(string, ...interface{}) {}
func (nopLogger) Info(...interface{}) {}
func (nopLogger) Infof(string, ...interface{}) {}
func (nopLogger) Warn(...interface{}) {}
func (nopLogger) Warnf(string, ...interface{}) {}
func (nopLogger) Error(...interface{}) {}
func (nopLogger) Errorf(string, ...interface{}) {}
func (nopLogger) Fatal(...interface{}) {}
func (nopLogger) Fatalf(string, ...interface{}) {}
func (l nopLogger) WithField(string, interface{}) Logger { return l }
func (l nopLogger) WithFields(Fields) Logger { return l }
func (l nopLogger) WithError(error) Logger { return l }

View File

@@ -19,15 +19,14 @@ import (
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/gorilla/websocket"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
client "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/network"
"go.opencensus.io/trace"
"k8s.io/api/core/v1"
"github.com/virtual-kubelet/virtual-kubelet/trace"
v1 "k8s.io/api/core/v1"
k8serr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -480,11 +479,11 @@ func getKubeProxyExtension(secretPath, masterURI, clusterCIDR string) (*aci.Exte
return &extension, nil
}
func addAzureAttributes(span *trace.Span, p *ACIProvider) {
span.AddAttributes(
trace.StringAttribute("azure.resourceGroup", p.resourceGroup),
trace.StringAttribute("azure.region", p.region),
)
func addAzureAttributes(ctx context.Context, span trace.Span, p *ACIProvider) context.Context {
return span.WithFields(ctx, log.Fields{
"azure.resourceGroup": p.resourceGroup,
"azure.region": p.region,
})
}
// CreatePod accepts a Pod definition and creates
@@ -492,7 +491,7 @@ func addAzureAttributes(span *trace.Span, p *ACIProvider) {
func (p *ACIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
ctx, span := trace.StartSpan(ctx, "aci.CreatePod")
defer span.End()
addAzureAttributes(span, p)
ctx = addAzureAttributes(ctx, span, p)
var containerGroup aci.ContainerGroup
containerGroup.Location = p.region
@@ -694,7 +693,7 @@ func (p *ACIProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
func (p *ACIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error {
ctx, span := trace.StartSpan(ctx, "aci.DeletePod")
defer span.End()
addAzureAttributes(span, p)
ctx = addAzureAttributes(ctx, span, p)
err := p.aciClient.DeleteContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", pod.Namespace, pod.Name))
return wrapError(err)
@@ -705,7 +704,7 @@ func (p *ACIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error {
func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) {
ctx, span := trace.StartSpan(ctx, "aci.GetPod")
defer span.End()
addAzureAttributes(span, p)
ctx = addAzureAttributes(ctx, span, p)
cg, err, status := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name))
if err != nil {
@@ -726,7 +725,7 @@ func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P
func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
ctx, span := trace.StartSpan(ctx, "aci.GetContainerLogs")
defer span.End()
addAzureAttributes(span, p)
ctx = addAzureAttributes(ctx, span, p)
logContent := ""
cg, err, _ := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName))
@@ -744,7 +743,6 @@ func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName,
cLogs, err := p.aciClient.GetContainerLogs(ctx, p.resourceGroup, cg.Name, containerName, tail)
if err != nil {
log.G(ctx).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying")
span.Annotate(nil, "Error getting container logs, retrying")
time.Sleep(5000 * time.Millisecond)
} else {
logContent = cLogs.Content
@@ -841,7 +839,7 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri
func (p *ACIProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
ctx, span := trace.StartSpan(ctx, "aci.GetPodStatus")
defer span.End()
addAzureAttributes(span, p)
ctx = addAzureAttributes(ctx, span, p)
pod, err := p.GetPod(ctx, namespace, name)
if err != nil {
@@ -859,7 +857,7 @@ func (p *ACIProvider) GetPodStatus(ctx context.Context, namespace, name string)
func (p *ACIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
ctx, span := trace.StartSpan(ctx, "aci.GetPods")
defer span.End()
addAzureAttributes(span, p)
ctx = addAzureAttributes(ctx, span, p)
cgs, err := p.aciClient.ListContainerGroups(ctx, p.resourceGroup)
if err != nil {
@@ -875,7 +873,7 @@ func (p *ACIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
p, err := containerGroupToPod(&c)
if err != nil {
log.G(context.TODO()).WithFields(logrus.Fields{
log.G(ctx).WithFields(log.Fields{
"name": c.Name,
"id": c.ID,
}).WithError(err).Error("error converting container group to pod")

View File

@@ -25,7 +25,7 @@ type AcsCredential struct {
// NewAcsCredential returns an AcsCredential struct from file path
func NewAcsCredential(p string) (*AcsCredential, error) {
logger := log.G(context.TODO()).WithField("method", "NewAcsCredential").WithField("file", p)
log.Trace(logger, "Reading ACS credential file")
logger.Debug("Reading ACS credential file")
b, err := ioutil.ReadFile(p)
if err != nil {
@@ -38,6 +38,6 @@ func NewAcsCredential(p string) (*AcsCredential, error) {
return nil, err
}
log.Trace(logger, "Load ACS credential file successfully")
logger.Debug("Load ACS credential file successfully")
return &cred, nil
}

View File

@@ -7,10 +7,11 @@ import (
"github.com/cpuguy83/strongerrors/status/ocstatus"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
"go.opencensus.io/trace"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"golang.org/x/sync/errgroup"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
@@ -19,17 +20,24 @@ import (
func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summary, err error) {
ctx, span := trace.StartSpan(ctx, "GetSummaryStats")
defer span.End()
addAzureAttributes(span, p)
ctx = addAzureAttributes(ctx, span, p)
p.metricsSync.Lock()
defer p.metricsSync.Unlock()
span.Annotate(nil, "acquired metrics mutex")
log.G(ctx).Debug("acquired metrics mutex")
if time.Now().Sub(p.metricsSyncTime) < time.Minute {
span.AddAttributes(trace.BoolAttribute("preCachedResult", true), trace.StringAttribute("cachedResultSampleTime", p.metricsSyncTime.String()))
span.WithFields(ctx, log.Fields{
"preCachedResult": true,
"cachedResultSampleTime": p.metricsSyncTime.String(),
})
return p.lastMetric, nil
}
span.AddAttributes(trace.BoolAttribute("preCachedResult", false), trace.StringAttribute("cachedResultSampleTime", p.metricsSyncTime.String()))
ctx = span.WithFields(ctx, log.Fields{
"preCachedResult": false,
"cachedResultSampleTime": p.metricsSyncTime.String(),
})
select {
case <-ctx.Done():
@@ -62,11 +70,11 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa
errGroup.Go(func() error {
ctx, span := trace.StartSpan(ctx, "getPodMetrics")
defer span.End()
span.AddAttributes(
trace.StringAttribute("UID", string(pod.UID)),
trace.StringAttribute("Name", pod.Name),
trace.StringAttribute("Namespace", pod.Namespace),
)
logger := log.G(ctx).WithFields(log.Fields{
"UID": string(pod.UID),
"Name": pod.Name,
"Namespace": pod.Namespace,
})
select {
case <-ctx.Done():
@@ -77,7 +85,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa
<-sema
}()
span.Annotate(nil, "Acquired semaphore")
logger.Debug("Acquired semaphore")
cgName := containerGroupName(pod)
// cpu/mem and net stats are split because net stats do not support container level detail
@@ -92,7 +100,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa
span.SetStatus(ocstatus.FromError(err))
return errors.Wrapf(err, "error fetching cpu/mem stats for container group %s", cgName)
}
span.Annotate(nil, "Got system stats")
logger.Debug("Got system stats")
netStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{
Start: start,
@@ -104,7 +112,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa
span.SetStatus(ocstatus.FromError(err))
return errors.Wrapf(err, "error fetching network stats for container group %s", cgName)
}
span.Annotate(nil, "Got network stats")
logger.Debug("Got network stats")
chResult <- collectMetrics(pod, systemStats, netStats)
return nil
@@ -112,10 +120,11 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa
}
if err := errGroup.Wait(); err != nil {
span.SetStatus(ocstatus.FromError(err))
return nil, errors.Wrap(err, "error in request to fetch container group metrics")
}
close(chResult)
span.Annotate([]trace.Attribute{trace.Int64Attribute("nPods", int64(len(pods)))}, "Collected stats from Azure")
log.G(ctx).Debugf("Collected status from azure for %d pods", len(pods))
var s stats.Summary
s.Node = stats.NodeStats{

View File

@@ -6,13 +6,14 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"time"
"github.com/cpuguy83/strongerrors"
"go.opencensus.io/trace"
"k8s.io/api/core/v1"
"github.com/cpuguy83/strongerrors/status/ocstatus"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@@ -114,9 +115,9 @@ func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
defer span.End()
// Add the pod's coordinates to the current span.
addAttributes(span, namespaceKey, pod.Namespace, nameKey, pod.Name)
ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name)
log.Printf("receive CreatePod %q\n", pod.Name)
log.G(ctx).Info("receive CreatePod %q", pod.Name)
key, err := buildKey(pod)
if err != nil {
@@ -134,9 +135,9 @@ func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
defer span.End()
// Add the pod's coordinates to the current span.
addAttributes(span, namespaceKey, pod.Namespace, nameKey, pod.Name)
ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name)
log.Printf("receive UpdatePod %q\n", pod.Name)
log.G(ctx).Info("receive UpdatePod %q", pod.Name)
key, err := buildKey(pod)
if err != nil {
@@ -154,9 +155,9 @@ func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
defer span.End()
// Add the pod's coordinates to the current span.
addAttributes(span, namespaceKey, pod.Namespace, nameKey, pod.Name)
ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name)
log.Printf("receive DeletePod %q\n", pod.Name)
log.G(ctx).Info("receive DeletePod %q", pod.Name)
key, err := buildKey(pod)
if err != nil {
@@ -175,12 +176,15 @@ func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
// GetPod returns a pod by name that is stored in memory.
func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
ctx, span := trace.StartSpan(ctx, "GetPod")
defer span.End()
defer func() {
span.SetStatus(ocstatus.FromError(err))
span.End()
}()
// Add the pod's coordinates to the current span.
addAttributes(span, namespaceKey, namespace, nameKey, name)
ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name)
log.Printf("receive GetPod %q\n", name)
log.G(ctx).Info("receive GetPod %q", name)
key, err := buildKeyFromNames(namespace, name)
if err != nil {
@@ -199,9 +203,9 @@ func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName,
defer span.End()
// Add pod and container attributes to the current span.
addAttributes(span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName)
ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName)
log.Printf("receive GetContainerLogs %q\n", podName)
log.G(ctx).Info("receive GetContainerLogs %q", podName)
return "", nil
}
@@ -214,7 +218,7 @@ func (p *MockProvider) GetPodFullName(namespace string, pod string) string {
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *MockProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
log.G(context.TODO()).Info("receive ExecInContainer %q", container)
return nil
}
@@ -225,9 +229,9 @@ func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string)
defer span.End()
// Add namespace and name as attributes to the current span.
addAttributes(span, namespaceKey, namespace, nameKey, name)
ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name)
log.Printf("receive GetPodStatus %q\n", name)
log.G(ctx).Info("receive GetPodStatus %q", name)
now := metav1.NewTime(time.Now())
@@ -279,7 +283,7 @@ func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
ctx, span := trace.StartSpan(ctx, "GetPods")
defer span.End()
log.Printf("receive GetPods\n")
log.G(ctx).Info("receive GetPods")
var pods []*v1.Pod
@@ -485,11 +489,12 @@ func buildKey(pod *v1.Pod) (string, error) {
// attrs must be an even-sized list of string arguments.
// Otherwise, the span won't be modified.
// TODO: Refactor and move to a "tracing utilities" package.
func addAttributes(span *trace.Span, attrs ...string) {
func addAttributes(ctx context.Context, span trace.Span, attrs ...string) context.Context {
if len(attrs)%2 == 1 {
return
return ctx
}
for i := 0; i < len(attrs); i += 2 {
span.AddAttributes(trace.StringAttribute(attrs[i], attrs[i+1]))
ctx = span.WithField(ctx, attrs[i], attrs[i+1])
}
return ctx
}

23
trace/nop.go Normal file
View File

@@ -0,0 +1,23 @@
package trace
import (
"context"
"github.com/virtual-kubelet/virtual-kubelet/log"
"go.opencensus.io/trace"
)
type nopTracer struct{}
func (nopTracer) StartSpan(ctx context.Context, _ string) (context.Context, Span) {
return ctx, &nopSpan{}
}
type nopSpan struct{}
func (nopSpan) End() {}
func (nopSpan) SetStatus(trace.Status) {}
func (nopSpan) Logger() log.Logger { return nil }
func (nopSpan) WithField(ctx context.Context, _ string, _ interface{}) context.Context { return ctx }
func (nopSpan) WithFields(ctx context.Context, _ log.Fields) context.Context { return ctx }

View File

@@ -0,0 +1,236 @@
// Package opencensus implements a github.com/virtual-kubelet/virtual-kubelet/trace.Tracer
// using opencensus as a backend.
//
// Use this by setting `trace.T = Adapter{}`
package opencensus
import (
"context"
"fmt"
"sync"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
octrace "go.opencensus.io/trace"
)
const (
lDebug = "DEBUG"
lInfo = "INFO"
lWarn = "WARN"
lErr = "ERROR"
lFatal = "FATAL"
)
// Adapter implements the trace.Tracer interface for OpenCensus
type Adapter struct{}
// StartSpan creates a new span from opencensus using the given name.
func (Adapter) StartSpan(ctx context.Context, name string) (context.Context, trace.Span) {
ctx, ocs := octrace.StartSpan(ctx, name)
l := log.G(ctx).WithField("method", name)
s := &span{s: ocs, l: l}
ctx = log.WithLogger(ctx, s.Logger())
return ctx, s
}
type span struct {
mu sync.Mutex
s *octrace.Span
l log.Logger
}
func (s *span) End() {
s.s.End()
}
func (s *span) SetStatus(status trace.Status) {
s.s.SetStatus(status)
}
func (s *span) WithField(ctx context.Context, key string, val interface{}) context.Context {
s.mu.Lock()
s.l = s.l.WithField(key, val)
ctx = log.WithLogger(ctx, &logger{s: s.s, l: s.l})
s.mu.Unlock()
if s.s.IsRecordingEvents() {
s.s.AddAttributes(makeAttribute(key, val))
}
return ctx
}
func (s *span) WithFields(ctx context.Context, f log.Fields) context.Context {
s.mu.Lock()
s.l = s.l.WithFields(f)
ctx = log.WithLogger(ctx, &logger{s: s.s, l: s.l})
s.mu.Unlock()
if s.s.IsRecordingEvents() {
attrs := make([]octrace.Attribute, 0, len(f))
for k, v := range f {
attrs = append(attrs, makeAttribute(k, v))
}
s.s.AddAttributes(attrs...)
}
return ctx
}
func (s *span) Logger() log.Logger {
return &logger{s: s.s, l: s.l}
}
type logger struct {
s *octrace.Span
l log.Logger
a []octrace.Attribute
}
func (l *logger) Debug(args ...interface{}) {
if !l.s.IsRecordingEvents() {
l.l.Debug(args...)
return
}
msg := fmt.Sprint(args...)
l.l.Debug(msg)
l.s.Annotate(withLevel(lDebug, l.a), msg)
}
func (l *logger) Debugf(f string, args ...interface{}) {
l.l.Debugf(f, args)
l.s.Annotatef(withLevel(lDebug, l.a), f, args...)
}
func (l *logger) Info(args ...interface{}) {
if !l.s.IsRecordingEvents() {
l.l.Info(args...)
return
}
msg := fmt.Sprint(args...)
l.l.Info(msg)
l.s.Annotate(withLevel(lInfo, l.a), msg)
}
func (l *logger) Infof(f string, args ...interface{}) {
l.l.Infof(f, args)
l.s.Annotatef(withLevel(lInfo, l.a), f, args...)
}
func (l *logger) Warn(args ...interface{}) {
if !l.s.IsRecordingEvents() {
l.l.Warn(args...)
return
}
msg := fmt.Sprint(args...)
l.l.Warn(msg)
l.s.Annotate(withLevel(lWarn, l.a), msg)
}
func (l *logger) Warnf(f string, args ...interface{}) {
l.l.Warnf(f, args)
l.s.Annotatef(withLevel(lWarn, l.a), f, args...)
}
func (l *logger) Error(args ...interface{}) {
if !l.s.IsRecordingEvents() {
l.l.Error(args...)
return
}
msg := fmt.Sprint(args...)
l.l.Error(msg)
l.s.Annotate(withLevel(lErr, l.a), msg)
}
func (l *logger) Errorf(f string, args ...interface{}) {
l.l.Errorf(f, args)
l.s.Annotatef(withLevel(lErr, l.a), f, args...)
}
func (l *logger) Fatal(args ...interface{}) {
if !l.s.IsRecordingEvents() {
l.l.Fatal(args...)
return
}
msg := fmt.Sprint(args...)
l.s.Annotate(withLevel(lFatal, l.a), msg)
l.l.Fatal(msg)
}
func (l *logger) Fatalf(f string, args ...interface{}) {
l.s.Annotatef(withLevel(lFatal, l.a), f, args...)
l.l.Fatalf(f, args)
}
func (l *logger) WithError(err error) log.Logger {
log := l.l.WithError(err)
var a []octrace.Attribute
if l.s.IsRecordingEvents() {
a = make([]octrace.Attribute, len(l.a), len(l.a)+1)
copy(a, l.a)
a = append(l.a, makeAttribute("err", err))
}
return &logger{s: l.s, l: log, a: a}
}
func (l *logger) WithField(k string, value interface{}) log.Logger {
log := l.l.WithField(k, value)
var a []octrace.Attribute
if l.s.IsRecordingEvents() {
a = make([]octrace.Attribute, len(l.a), len(l.a)+1)
copy(a, l.a)
a = append(a, makeAttribute(k, value))
}
return &logger{s: l.s, a: a, l: log}
}
func (l *logger) WithFields(fields log.Fields) log.Logger {
log := l.l.WithFields(fields)
var a []octrace.Attribute
if l.s.IsRecordingEvents() {
a = make([]octrace.Attribute, len(l.a), len(l.a)+len(fields))
copy(a, l.a)
for k, v := range fields {
a = append(a, makeAttribute(k, v))
}
}
return &logger{s: l.s, a: a, l: log}
}
func makeAttribute(key string, val interface{}) octrace.Attribute {
var attr octrace.Attribute
switch v := val.(type) {
case string:
attr = octrace.StringAttribute(key, v)
case int64:
attr = octrace.Int64Attribute(key, v)
case bool:
attr = octrace.BoolAttribute(key, v)
case error:
attr = octrace.StringAttribute(key, v.Error())
default:
attr = octrace.StringAttribute(key, fmt.Sprintf("%+v", val))
}
return attr
}
func withLevel(l string, attrs []octrace.Attribute) []octrace.Attribute {
return append(attrs, octrace.StringAttribute("level", l))
}

View File

@@ -0,0 +1,13 @@
package opencensus
import (
"testing"
"github.com/virtual-kubelet/virtual-kubelet/trace"
)
func TestTracerImplementsTracer(t *testing.T) {
// ensure that Adapter implements trace.Tracer
if tt := trace.Tracer(Adapter{}); tt == nil {
}
}

58
trace/trace.go Normal file
View File

@@ -0,0 +1,58 @@
// Package trace abstracts virtual-kubelet's tracing capabilties into a set of
// interfaces.
// While this does allow consumers to use whatever tracing library they want,
// the primary goal is to share logging data between the configured logger and
// tracing spans instead of duplicating calls.
package trace
import (
"context"
"github.com/virtual-kubelet/virtual-kubelet/log"
"go.opencensus.io/trace"
)
// Status is an alias to opencensus's trace status.
// The main reason we use this instead of implementing our own is library re-use,
// namely for converting an error to a tracing status.
// In the future this may be defined completely in this package.
type Status = trace.Status
// Tracer is the interface used for creating a tracing span
type Tracer interface {
// StartSpan starts a new span. The span details are emebedded into the returned
// context
StartSpan(context.Context, string) (context.Context, Span)
}
var (
// T is the Tracer to use this should be initialized before starting up
// virtual-kubelet
T Tracer = nopTracer{}
)
// StartSpan starts a span from the configured default tracer
func StartSpan(ctx context.Context, name string) (context.Context, Span) {
ctx, span := T.StartSpan(ctx, name)
ctx = log.WithLogger(ctx, span.Logger())
return ctx, span
}
// Span encapsulates a tracing event
type Span interface {
End()
SetStatus(Status)
// WithField and WithFields adds attributes to an entire span
//
// This interface is a bit weird, but allows us to manage loggers in the context
// It is expected that implementations set `log.WithLogger` so the logger stored
// in the context is updated with the new fields.
WithField(context.Context, string, interface{}) context.Context
WithFields(context.Context, log.Fields) context.Context
// Logger is used to log individual entries.
// Calls to functions like `WithField` and `WithFields` on the logger should
// not affect the rest of the span but rather individual entries.
Logger() log.Logger
}

View File

@@ -25,7 +25,7 @@ func handleError(f handlerFunc) http.HandlerFunc {
if code >= 500 {
logger.Error("Internal server error on request")
} else {
log.Trace(logger, "Error on request")
logger.Debug("Error on request")
}
}
}

View File

@@ -3,7 +3,6 @@ package vkubelet
import (
"net/http"
"github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/providers"
@@ -73,7 +72,7 @@ func AttachMetricsRoutes(p providers.Provider, mux ServeMux) {
func instrumentRequest(r *http.Request) *http.Request {
ctx := r.Context()
logger := log.G(ctx).WithFields(logrus.Fields{
logger := log.G(ctx).WithFields(log.Fields{
"uri": r.RequestURI,
"vars": mux.Vars(r),
})
@@ -96,14 +95,12 @@ func InstrumentHandler(h http.Handler) http.Handler {
// NotFound provides a handler for cases where the requested endpoint doesn't exist
func NotFound(w http.ResponseWriter, r *http.Request) {
logger := log.G(r.Context())
log.Trace(logger, "404 request not found")
log.G(r.Context()).Debug("404 request not found")
http.Error(w, "404 request not found", http.StatusNotFound)
}
// NotImplemented provides a handler for cases where a provider does not implement a given API
func NotImplemented(w http.ResponseWriter, r *http.Request) {
logger := log.G(r.Context())
log.Trace(logger, "501 not implemented")
log.G(r.Context()).Debug("501 not implemented")
http.Error(w, "501 not implemented", http.StatusNotImplemented)
}

View File

@@ -4,10 +4,10 @@ import (
"context"
"strings"
"github.com/cpuguy83/strongerrors/status/ocstatus"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"github.com/virtual-kubelet/virtual-kubelet/version"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -57,12 +57,11 @@ func (s *Server) registerNode(ctx context.Context) error {
DaemonEndpoints: *s.provider.NodeDaemonEndpoints(ctx),
},
}
addNodeAttributes(span, node)
ctx = addNodeAttributes(ctx, span, node)
if _, err := s.k8sClient.CoreV1().Nodes().Create(node); err != nil && !errors.IsAlreadyExists(err) {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
span.SetStatus(ocstatus.FromError(err))
return err
}
span.Annotate(nil, "Registered node with k8s")
log.G(ctx).Info("Registered node")
@@ -77,19 +76,20 @@ func (s *Server) updateNode(ctx context.Context) {
opts := metav1.GetOptions{}
n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts)
if err != nil && !errors.IsNotFound(err) {
log.G(ctx).WithError(err).Error("Failed to retrieve node")
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
log.G(ctx).WithError(err).Error("Failed to retrive node")
span.SetStatus(ocstatus.FromError(err))
return
}
addNodeAttributes(span, n)
span.Annotate(nil, "Fetched node details from k8s")
ctx = addNodeAttributes(ctx, span, n)
log.G(ctx).Debug("Fetched node details from k8s")
if errors.IsNotFound(err) {
if err = s.registerNode(ctx); err != nil {
log.G(ctx).WithError(err).Error("Failed to register node")
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
span.SetStatus(ocstatus.FromError(err))
} else {
span.Annotate(nil, "Registered node in k8s")
log.G(ctx).Debug("Registered node in k8s")
}
return
}
@@ -106,7 +106,7 @@ func (s *Server) updateNode(ctx context.Context) {
n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n)
if err != nil {
log.G(ctx).WithError(err).Error("Failed to update node")
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
span.SetStatus(ocstatus.FromError(err))
return
}
}
@@ -125,13 +125,11 @@ func (t taintsStringer) String() string {
return s
}
func addNodeAttributes(span *trace.Span, n *corev1.Node) {
span.AddAttributes(
trace.StringAttribute("UID", string(n.UID)),
trace.StringAttribute("name", n.Name),
trace.StringAttribute("cluster", n.ClusterName),
)
if span.IsRecordingEvents() {
span.AddAttributes(trace.StringAttribute("taints", taintsStringer(n.Spec.Taints).String()))
}
func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) context.Context {
return span.WithFields(ctx, log.Fields{
"UID": string(n.UID),
"name": n.Name,
"cluster": n.ClusterName,
"tains": taintsStringer(n.Spec.Taints),
})
}

View File

@@ -2,29 +2,27 @@ package vkubelet
import (
"context"
"fmt"
"sync"
"time"
"github.com/cpuguy83/strongerrors/status/ocstatus"
pkgerrors "github.com/pkg/errors"
"go.opencensus.io/trace"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"github.com/virtual-kubelet/virtual-kubelet/log"
)
func addPodAttributes(span *trace.Span, pod *corev1.Pod) {
span.AddAttributes(
trace.StringAttribute("uid", string(pod.GetUID())),
trace.StringAttribute("namespace", pod.GetNamespace()),
trace.StringAttribute("name", pod.GetName()),
trace.StringAttribute("phase", string(pod.Status.Phase)),
trace.StringAttribute("reason", pod.Status.Reason),
)
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
return span.WithFields(ctx, log.Fields{
"uid": string(pod.GetUID()),
"namespace": pod.GetNamespace(),
"name": pod.GetName(),
"phase": string(pod.Status.Phase),
"reason": pod.Status.Reason,
})
}
func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorder record.EventRecorder) error {
@@ -40,14 +38,17 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde
ctx, span := trace.StartSpan(ctx, "createOrUpdatePod")
defer span.End()
addPodAttributes(span, pod)
addPodAttributes(ctx, span, pod)
if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: err.Error()})
span.SetStatus(ocstatus.FromError(err))
return err
}
logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
ctx = span.WithFields(ctx, log.Fields{
"pod": pod.GetName(),
"namespace": pod.GetNamespace(),
})
if origErr := s.provider.CreatePod(ctx, pod); origErr != nil {
podPhase := corev1.PodPending
@@ -60,19 +61,23 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde
pod.Status.Reason = podStatusReasonProviderFailed
pod.Status.Message = origErr.Error()
logger := log.G(ctx).WithFields(log.Fields{
"podPhase": podPhase,
"reason": pod.Status.Reason,
})
_, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
if err != nil {
logger.WithError(err).Warn("Failed to update pod status")
} else {
span.Annotate(nil, "Updated k8s pod status")
logger.Info("Updated k8s pod status")
}
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: origErr.Error()})
span.SetStatus(ocstatus.FromError(origErr))
return origErr
}
span.Annotate(nil, "Created pod in provider")
logger.Info("Pod created")
log.G(ctx).Info("Created pod in provider")
return nil
}
@@ -89,23 +94,22 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error {
ctx, span := trace.StartSpan(ctx, "deletePod")
defer span.End()
addPodAttributes(span, pod)
ctx = addPodAttributes(ctx, span, pod)
var delErr error
if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: delErr.Error()})
span.SetStatus(ocstatus.FromError(delErr))
return delErr
}
span.Annotate(nil, "Deleted pod from provider")
logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
log.G(ctx).Debug("Deleted pod from provider")
if !errors.IsNotFound(delErr) {
if err := s.forceDeletePodResource(ctx, namespace, name); err != nil {
span.SetStatus(ocstatus.FromError(err))
return err
}
span.Annotate(nil, "Deleted pod from k8s")
logger.Info("Pod deleted")
log.G(ctx).Info("Deleted pod from Kubernetes")
}
return nil
@@ -114,19 +118,19 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error {
func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name string) error {
ctx, span := trace.StartSpan(ctx, "forceDeletePodResource")
defer span.End()
span.AddAttributes(
trace.StringAttribute("namespace", namespace),
trace.StringAttribute("name", name),
)
ctx = span.WithFields(ctx, log.Fields{
"namespace": namespace,
"name": name,
})
var grace int64
if err := s.k8sClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil {
if errors.IsNotFound(err) {
span.Annotate(nil, "Pod does not exist in Kubernetes, nothing to delete")
log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete")
return nil
}
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
return fmt.Errorf("Failed to delete Kubernetes pod: %s", err)
span.SetStatus(ocstatus.FromError(err))
return pkgerrors.Wrap(err, "Failed to delete Kubernetes pod")
}
return nil
}
@@ -138,7 +142,8 @@ func (s *Server) updatePodStatuses(ctx context.Context) {
// Update all the pods with the provider status.
pods := s.resourceManager.GetPods()
span.AddAttributes(trace.Int64Attribute("nPods", int64(len(pods))))
ctx = span.WithField(ctx, "nPods", int64(len(pods)))
sema := make(chan struct{}, s.podSyncWorkers)
var wg sync.WaitGroup
@@ -157,8 +162,12 @@ func (s *Server) updatePodStatuses(ctx context.Context) {
defer func() { <-sema }()
if err := s.updatePodStatus(ctx, pod); err != nil {
logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()).WithField("status", pod.Status.Phase).WithField("reason", pod.Status.Reason)
logger.Error(err)
log.G(ctx).WithFields(log.Fields{
"pod": pod.GetName(),
"namespace": pod.GetNamespace(),
"status": pod.Status.Phase,
"reason": pod.Status.Reason,
}).Error(err)
}
}(pod)
@@ -170,7 +179,7 @@ func (s *Server) updatePodStatuses(ctx context.Context) {
func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "updatePodStatus")
defer span.End()
addPodAttributes(span, pod)
ctx = addPodAttributes(ctx, span, pod)
if pod.Status.Phase == corev1.PodSucceeded ||
pod.Status.Phase == corev1.PodFailed ||
@@ -214,9 +223,10 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {
return pkgerrors.Wrap(err, "error while updating pod status in kubernetes")
}
span.Annotate([]trace.Attribute{
trace.StringAttribute("new phase", string(pod.Status.Phase)),
trace.StringAttribute("new reason", pod.Status.Reason),
}, "updated pod status in kubernetes")
log.G(ctx).WithFields(log.Fields{
"new phase": string(pod.Status.Phase),
"new reason": pod.Status.Reason,
}).Debug("Updated pod status in kubernetes")
return nil
}

View File

@@ -24,11 +24,11 @@ import (
"github.com/cpuguy83/strongerrors/status/ocstatus"
pkgerrors "github.com/pkg/errors"
"go.opencensus.io/trace"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers/core/v1"
v1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
@@ -169,7 +169,7 @@ func (pc *PodController) processNextWorkItem(ctx context.Context, workerId strin
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
span.AddAttributes(trace.StringAttribute("workerId", workerId))
ctx = span.WithField(ctx, "workerId", workerId)
// We wrap this block in a func so we can defer pc.workqueue.Done.
err := func(obj interface{}) error {
@@ -190,7 +190,7 @@ func (pc *PodController) processNextWorkItem(ctx context.Context, workerId strin
return nil
}
// Add the current key as an attribute to the current span.
span.AddAttributes(trace.StringAttribute("key", key))
ctx = span.WithField(ctx, "key", key)
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
if err := pc.syncHandler(ctx, key); err != nil {
if pc.workqueue.NumRequeues(key) < maxRetries {
@@ -224,7 +224,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
defer span.End()
// Add the current key as an attribute to the current span.
span.AddAttributes(trace.StringAttribute("key", key))
ctx = span.WithField(ctx, "key", key)
// Convert the namespace/name string into a distinct namespace and name.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
@@ -263,7 +263,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod)
defer span.End()
// Add the pod's attributes to the current span.
addPodAttributes(span, pod)
ctx = addPodAttributes(ctx, span, pod)
// Check whether the pod has been marked for deletion.
// If it does, guarantee it is deleted in the provider and Kubernetes.
@@ -344,7 +344,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int
}()
// Add the pod's attributes to the current span.
addPodAttributes(span, pod)
ctx = addPodAttributes(ctx, span, pod)
// Actually delete the pod.
if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
span.SetStatus(ocstatus.FromError(err))