diff --git a/virtual-kubelet/commands/root/flag.go b/virtual-kubelet/commands/root/flag.go index f2d1e97..85e04ae 100644 --- a/virtual-kubelet/commands/root/flag.go +++ b/virtual-kubelet/commands/root/flag.go @@ -22,7 +22,6 @@ import ( "github.com/pkg/errors" "github.com/spf13/pflag" - "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" "k8s.io/klog" ) @@ -74,7 +73,7 @@ func installFlags(flags *pflag.FlagSet, c *Opts) { flags.IntVar(&c.PodSyncWorkers, "pod-sync-workers", c.PodSyncWorkers, `set the number of pod synchronization workers`) flags.BoolVar(&c.EnableNodeLease, "enable-node-lease", c.EnableNodeLease, `use node leases (1.13) for node heartbeats`) - flags.StringSliceVar(&c.TraceExporters, "trace-exporter", c.TraceExporters, fmt.Sprintf("sets the tracing exporter to use, available exporters: %s", opencensus.AvailableTraceExporters())) + flags.StringSliceVar(&c.TraceExporters, "trace-exporter", c.TraceExporters, fmt.Sprintf("sets the tracing exporter to use, available exporters: %s", AvailableTraceExporters())) flags.StringVar(&c.TraceConfig.ServiceName, "trace-service-name", c.TraceConfig.ServiceName, "sets the name of the service used to register with the trace exporter") flags.Var(mapVar(c.TraceConfig.Tags), "trace-tag", "add tags to include with traces in key=value form") flags.StringVar(&c.TraceSampleRate, "trace-sample-rate", c.TraceSampleRate, "set probability of tracing samples") diff --git a/virtual-kubelet/commands/root/http.go b/virtual-kubelet/commands/root/http.go index 3668fe0..8ffba86 100644 --- a/virtual-kubelet/commands/root/http.go +++ b/virtual-kubelet/commands/root/http.go @@ -25,8 +25,8 @@ import ( "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/node/api" "github.com/virtual-kubelet/virtual-kubelet/providers" - "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" ) // AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided diff --git a/virtual-kubelet/commands/root/opts.go b/virtual-kubelet/commands/root/opts.go index 7bb8496..7b3c46d 100644 --- a/virtual-kubelet/commands/root/opts.go +++ b/virtual-kubelet/commands/root/opts.go @@ -22,7 +22,6 @@ import ( "github.com/mitchellh/go-homedir" "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" corev1 "k8s.io/api/core/v1" ) @@ -77,7 +76,7 @@ type Opts struct { TraceExporters []string TraceSampleRate string - TraceConfig opencensus.TracingExporterOptions + TraceConfig TracingExporterOptions // Startup Timeout is how long to wait for the kubelet to start StartupTimeout time.Duration diff --git a/virtual-kubelet/commands/root/root.go b/virtual-kubelet/commands/root/root.go index 0c91fe6..cafbcb8 100644 --- a/virtual-kubelet/commands/root/root.go +++ b/virtual-kubelet/commands/root/root.go @@ -25,9 +25,9 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/node" "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/register" - "github.com/virtual-kubelet/virtual-kubelet/vkubelet" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -147,12 +147,12 @@ func runRootCommand(ctx context.Context, c Opts) error { } pNode := NodeFromProvider(ctx, c.NodeName, taint, p) - node, err := vkubelet.NewNode( - vkubelet.NaiveNodeProvider{}, + nodeRunner, err := node.NewNodeController( + node.NaiveNodeProvider{}, pNode, client.CoreV1().Nodes(), - vkubelet.WithNodeEnableLeaseV1Beta1(leaseClient, nil), - vkubelet.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error { + node.WithNodeEnableLeaseV1Beta1(leaseClient, nil), + node.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error { if !k8serrors.IsNotFound(err) { return err } @@ -176,7 +176,7 @@ func runRootCommand(ctx context.Context, c Opts) error { eb.StartLogging(log.G(ctx).Infof) eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)}) - pc, err := vkubelet.NewPodController(vkubelet.PodControllerConfig{ + pc, err := node.NewPodController(node.PodControllerConfig{ PodClient: client.CoreV1(), PodInformer: podInformer, EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}), @@ -210,7 +210,7 @@ func runRootCommand(ctx context.Context, c Opts) error { } go func() { - if err := node.Run(ctx); err != nil { + if err := nodeRunner.Run(ctx); err != nil { log.G(ctx).Fatal(err) } }() diff --git a/virtual-kubelet/commands/root/tracing.go b/virtual-kubelet/commands/root/tracing.go index 445273d..e3dc619 100644 --- a/virtual-kubelet/commands/root/tracing.go +++ b/virtual-kubelet/commands/root/tracing.go @@ -25,7 +25,6 @@ import ( "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" octrace "go.opencensus.io/trace" "go.opencensus.io/zpages" ) @@ -55,7 +54,7 @@ func setupTracing(ctx context.Context, c Opts) error { setupZpages(ctx) continue } - exporter, err := opencensus.GetTracingExporter(e, c.TraceConfig) + exporter, err := GetTracingExporter(e, c.TraceConfig) if err != nil { return err } diff --git a/virtual-kubelet/commands/root/tracing_register.go b/virtual-kubelet/commands/root/tracing_register.go new file mode 100644 index 0000000..6ce7cdb --- /dev/null +++ b/virtual-kubelet/commands/root/tracing_register.go @@ -0,0 +1,44 @@ +package root + +import ( + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "go.opencensus.io/trace" +) + +type TracingExporterOptions struct { + Tags map[string]string + ServiceName string +} + +var ( + tracingExporters = make(map[string]TracingExporterInitFunc) +) + +// TracingExporterInitFunc is the function that is called to initialize an exporter. +// This is used when registering an exporter and called when a user specifed they want to use the exporter. +type TracingExporterInitFunc func(TracingExporterOptions) (trace.Exporter, error) + +// RegisterTracingExporter registers a tracing exporter. +// For a user to select an exporter, it must be registered here. +func RegisterTracingExporter(name string, f TracingExporterInitFunc) { + tracingExporters[name] = f +} + +// GetTracingExporter gets the specified tracing exporter passing in the options to the exporter init function. +// For an exporter to be availbale here it must be registered with `RegisterTracingExporter`. +func GetTracingExporter(name string, opts TracingExporterOptions) (trace.Exporter, error) { + f, ok := tracingExporters[name] + if !ok { + return nil, errdefs.NotFoundf("tracing exporter %q not found", name) + } + return f(opts) +} + +// AvailableTraceExporters gets the list of registered exporters +func AvailableTraceExporters() []string { + out := make([]string, 0, len(tracingExporters)) + for k := range tracingExporters { + out = append(out, k) + } + return out +} diff --git a/virtual-kubelet/commands/root/tracing_register_jaeger.go b/virtual-kubelet/commands/root/tracing_register_jaeger.go new file mode 100644 index 0000000..ce89328 --- /dev/null +++ b/virtual-kubelet/commands/root/tracing_register_jaeger.go @@ -0,0 +1,37 @@ +// +build !no_jaeger_exporter + +package root + +import ( + "errors" + "os" + + "go.opencensus.io/exporter/jaeger" + "go.opencensus.io/trace" +) + +func init() { + RegisterTracingExporter("jaeger", NewJaegerExporter) +} + +// NewJaegerExporter creates a new opencensus tracing exporter. +func NewJaegerExporter(opts TracingExporterOptions) (trace.Exporter, error) { + jOpts := jaeger.Options{ + Endpoint: os.Getenv("JAEGER_ENDPOINT"), + AgentEndpoint: os.Getenv("JAEGER_AGENT_ENDPOINT"), + Username: os.Getenv("JAEGER_USER"), + Password: os.Getenv("JAEGER_PASSWORD"), + Process: jaeger.Process{ + ServiceName: opts.ServiceName, + }, + } + + if jOpts.Endpoint == "" && jOpts.AgentEndpoint == "" { + return nil, errors.New("Must specify either JAEGER_ENDPOINT or JAEGER_AGENT_ENDPOINT") + } + + for k, v := range opts.Tags { + jOpts.Process.Tags = append(jOpts.Process.Tags, jaeger.StringTag(k, v)) + } + return jaeger.NewExporter(jOpts) +} diff --git a/virtual-kubelet/commands/root/tracing_register_ocagent.go b/virtual-kubelet/commands/root/tracing_register_ocagent.go new file mode 100644 index 0000000..c950280 --- /dev/null +++ b/virtual-kubelet/commands/root/tracing_register_ocagent.go @@ -0,0 +1,36 @@ +// +build !no_ocagent_exporter + +package root + +import ( + "os" + + "contrib.go.opencensus.io/exporter/ocagent" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "go.opencensus.io/trace" +) + +func init() { + RegisterTracingExporter("ocagent", NewOCAgentExporter) +} + +// NewOCAgentExporter creates a new opencensus tracing exporter using the opencensus agent forwarder. +func NewOCAgentExporter(opts TracingExporterOptions) (trace.Exporter, error) { + agentOpts := append([]ocagent.ExporterOption{}, ocagent.WithServiceName(opts.ServiceName)) + + if endpoint := os.Getenv("OCAGENT_ENDPOINT"); endpoint != "" { + agentOpts = append(agentOpts, ocagent.WithAddress(endpoint)) + } else { + return nil, errdefs.InvalidInput("must set endpoint address in OCAGENT_ENDPOINT") + } + + switch os.Getenv("OCAGENT_INSECURE") { + case "0", "no", "n", "off", "": + case "1", "yes", "y", "on": + agentOpts = append(agentOpts, ocagent.WithInsecure()) + default: + return nil, errdefs.InvalidInput("invalid value for OCAGENT_INSECURE") + } + + return ocagent.NewExporter(agentOpts...) +} diff --git a/virtual-kubelet/commands/root/tracing_register_test.go b/virtual-kubelet/commands/root/tracing_register_test.go new file mode 100644 index 0000000..1de04a9 --- /dev/null +++ b/virtual-kubelet/commands/root/tracing_register_test.go @@ -0,0 +1,44 @@ +package root + +import ( + "testing" + + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "go.opencensus.io/trace" +) + +func TestGetTracingExporter(t *testing.T) { + defer delete(tracingExporters, "mock") + + mockExporterFn := func(_ TracingExporterOptions) (trace.Exporter, error) { + return nil, nil + } + + _, err := GetTracingExporter("notexist", TracingExporterOptions{}) + if !errdefs.IsNotFound(err) { + t.Fatalf("expected not found error, got: %v", err) + } + + RegisterTracingExporter("mock", mockExporterFn) + + if _, err := GetTracingExporter("mock", TracingExporterOptions{}); err != nil { + t.Fatal(err) + } +} + +func TestAvailableExporters(t *testing.T) { + defer delete(tracingExporters, "mock") + + mockExporterFn := func(_ TracingExporterOptions) (trace.Exporter, error) { + return nil, nil + } + RegisterTracingExporter("mock", mockExporterFn) + + for _, e := range AvailableTraceExporters() { + if e == "mock" { + return + } + } + + t.Fatal("could not find mock exporter in list of registered exporters") +}