diff --git a/.circleci/config.yml b/.circleci/config.yml index c262c06f4..ae6692f99 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,7 +2,7 @@ version: 2 jobs: validate: docker: - - image: circleci/golang:1.10 + - image: circleci/golang:1.12 working_directory: /go/src/github.com/virtual-kubelet/virtual-kubelet steps: - checkout @@ -21,7 +21,7 @@ jobs: test: docker: - - image: circleci/golang:1.10 + - image: circleci/golang:1.12 working_directory: /go/src/github.com/virtual-kubelet/virtual-kubelet steps: - checkout @@ -102,6 +102,11 @@ jobs: - run: name: Run the end-to-end test suite command: | + mkdir $HOME/.go + export PATH=$HOME/.go/bin:${PATH} + curl -fsSL -o "/tmp/go.tar.gz" "https://dl.google.com/go/go1.12.5.linux-amd64.tar.gz" + tar -C $HOME/.go --strip-components=1 -xzf "/tmp/go.tar.gz" + go version make e2e - run: name: Collect logs on failure from vkubelet-mock-0 diff --git a/Gopkg.lock b/Gopkg.lock index c5592d306..6cafc76e8 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -145,18 +145,6 @@ revision = "24333298e36590ea0716598caacc8959fc393c48" version = "v0.0.2" -[[projects]] - digest = "1:62e5b997b5ada9b5f71e759c3474f2a0de8de1b21473bab9e4f98c5aa69c05eb" - name = "github.com/cpuguy83/strongerrors" - packages = [ - ".", - "status", - "status/ocstatus", - ] - pruneopts = "NUT" - revision = "05f877ca1e627f0a1e01902ba78c63f1ef7db5a3" - version = "v0.2.1" - [[projects]] digest = "1:a2c1d0e43bd3baaa071d1b9ed72c27d78169b2b269f71c105ac4ba34b1be4a39" name = "github.com/davecgh/go-spew" @@ -1272,7 +1260,7 @@ revision = "0317810137be915b9cf888946c6e115c1bfac693" [[projects]] - digest = "1:4ff70ea1888545c3a245a76657ba38a308d2c068bd26fa4310f63560ebaf265c" + digest = "1:b78561dbff036b36f419a270993ca22bb6fdafd84085686f24e4a5f8e31056be" name = "k8s.io/kubernetes" packages = [ "pkg/api/v1/pod", @@ -1285,6 +1273,7 @@ "pkg/kubelet/apis/stats/v1alpha1", "pkg/kubelet/envvars", "pkg/kubelet/server/remotecommand", + "third_party/forked/golang/expansion", ] pruneopts = "NUT" revision = "c27b913fddd1a6c480c229191a087698aa92f0b1" @@ -1331,9 +1320,6 @@ "github.com/aws/aws-sdk-go/service/ecs/ecsiface", "github.com/aws/aws-sdk-go/service/iam", "github.com/cenkalti/backoff", - "github.com/cpuguy83/strongerrors", - "github.com/cpuguy83/strongerrors/status", - "github.com/cpuguy83/strongerrors/status/ocstatus", "github.com/davecgh/go-spew/spew", "github.com/google/go-cmp/cmp", "github.com/google/uuid", @@ -1409,6 +1395,7 @@ "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1", "k8s.io/kubernetes/pkg/kubelet/envvars", "k8s.io/kubernetes/pkg/kubelet/server/remotecommand", + "k8s.io/kubernetes/third_party/forked/golang/expansion", ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 15856e36a..952f82398 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -70,10 +70,6 @@ name = "github.com/gophercloud/gophercloud" revision = "954aa14363ced787c28efcfcd15ae6945eb862fb" -[[constraint]] - name = "github.com/cpuguy83/strongerrors" - version = "0.2.1" - [[override]] name = "github.com/docker/docker" revision = "49bf474f9ed7ce7143a59d1964ff7b7fd9b52178" diff --git a/Makefile b/Makefile index 65f8cacc7..3b0eb9548 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ include Makefile.e2e # Also, we will want to lock our tool versions using go mod: # https://github.com/golang/go/wiki/Modules#how-can-i-track-tool-dependencies-for-a-module gobin_tool ?= $(shell which gobin || echo $(GOPATH)/bin/gobin) -goimports := golang.org/x/tools/cmd/goimports@release-branch.go1.10 +goimports := golang.org/x/tools/cmd/goimports@release-branch.go1.12 gocovmerge := github.com/wadey/gocovmerge@b5bfa59ec0adc420475f97f89b58045c721d761c goreleaser := github.com/goreleaser/goreleaser@v0.82.2 gox := github.com/mitchellh/gox@v1.0.1 @@ -36,7 +36,7 @@ build: build_tags := netgo osusergo build: OUTPUT_DIR ?= bin build: authors @echo "Building..." - $Q CGO_ENABLED=0 go build -a --tags '$(shell scripts/process_build_tags.sh $(build_tags) $(VK_BUILD_TAGS))' -ldflags '-extldflags "-static"' -o $(OUTPUT_DIR)/$(binary) $(if $V,-v) $(VERSION_FLAGS) ./cmd/$(binary) + $Q CGO_ENABLED=0 go build --tags '$(shell scripts/process_build_tags.sh $(build_tags) $(VK_BUILD_TAGS))' -ldflags '-extldflags "-static"' -o $(OUTPUT_DIR)/$(binary) $(if $V,-v) $(VERSION_FLAGS) ./cmd/$(binary) .PHONY: tags tags: diff --git a/cmd/virtual-kubelet/commands/root/node.go b/cmd/virtual-kubelet/commands/root/node.go index c949abd64..728575688 100644 --- a/cmd/virtual-kubelet/commands/root/node.go +++ b/cmd/virtual-kubelet/commands/root/node.go @@ -18,8 +18,7 @@ import ( "context" "strings" - "github.com/cpuguy83/strongerrors" - "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/version" corev1 "k8s.io/api/core/v1" @@ -100,7 +99,7 @@ func getTaint(c Opts) (*corev1.Taint, error) { case "PreferNoSchedule": effect = corev1.TaintEffectPreferNoSchedule default: - return nil, strongerrors.InvalidArgument(errors.Errorf("taint effect %q is not supported", effectEnv)) + return nil, errdefs.InvalidInputf("taint effect %q is not supported", effectEnv) } return &corev1.Taint{ diff --git a/cmd/virtual-kubelet/commands/root/root.go b/cmd/virtual-kubelet/commands/root/root.go index caacdf63f..c8813f1fe 100644 --- a/cmd/virtual-kubelet/commands/root/root.go +++ b/cmd/virtual-kubelet/commands/root/root.go @@ -17,23 +17,28 @@ package root import ( "context" "os" + "path" "time" - "github.com/cpuguy83/strongerrors" "github.com/pkg/errors" "github.com/spf13/cobra" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/register" "github.com/virtual-kubelet/virtual-kubelet/vkubelet" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/record" ) // NewCommand creates a new top-level command. @@ -59,11 +64,11 @@ func runRootCommand(ctx context.Context, c Opts) error { defer cancel() if ok := providers.ValidOperatingSystems[c.OperatingSystem]; !ok { - return strongerrors.InvalidArgument(errors.Errorf("operating system %q is not supported", c.OperatingSystem)) + return errdefs.InvalidInputf("operating system %q is not supported", c.OperatingSystem) } if c.PodSyncWorkers == 0 { - return strongerrors.InvalidArgument(errors.New("pod sync workers must be greater than 0")) + return errdefs.InvalidInput("pod sync workers must be greater than 0") } var taint *corev1.Taint @@ -88,7 +93,6 @@ func runRootCommand(ctx context.Context, c Opts) error { kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String() })) - // Create a pod informer so we can pass its lister to the resource manager. podInformer := podInformerFactory.Core().V1().Pods() // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). @@ -143,20 +147,40 @@ func runRootCommand(ctx context.Context, c Opts) error { client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease), client.CoreV1().Nodes(), vkubelet.WithNodeDisableLease(!c.EnableNodeLease), + vkubelet.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error { + if !k8serrors.IsNotFound(err) { + return err + } + + log.G(ctx).Debug("node not found") + newNode := pNode.DeepCopy() + newNode.ResourceVersion = "" + _, err = client.CoreV1().Nodes().Create(newNode) + if err != nil { + return err + } + log.G(ctx).Debug("created new node") + return nil + }), ) if err != nil { log.G(ctx).Fatal(err) } - vk := vkubelet.New(vkubelet.Config{ - Client: client, - Namespace: c.KubeNamespace, - NodeName: pNode.Name, + eb := record.NewBroadcaster() + eb.StartLogging(log.G(ctx).Infof) + eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)}) + + pc, err := vkubelet.NewPodController(vkubelet.PodControllerConfig{ + PodClient: client.CoreV1(), + PodInformer: podInformer, + EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}), Provider: p, ResourceManager: rm, - PodSyncWorkers: c.PodSyncWorkers, - PodInformer: podInformer, }) + if err != nil { + return errors.Wrap(err, "error setting up pod controller") + } cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig) if err != nil { @@ -165,7 +189,7 @@ func runRootCommand(ctx context.Context, c Opts) error { defer cancelHTTP() go func() { - if err := vk.Run(ctx); err != nil && errors.Cause(err) != context.Canceled { + if err := pc.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled { log.G(ctx).Fatal(err) } }() @@ -174,7 +198,7 @@ func runRootCommand(ctx context.Context, c Opts) error { // If there is a startup timeout, it does two things: // 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period // 2. It prevents node advertisement from happening until we're in an operational state - err = waitForVK(ctx, c.StartupTimeout, vk) + err = waitFor(ctx, c.StartupTimeout, pc.Ready()) if err != nil { return err } @@ -192,7 +216,7 @@ func runRootCommand(ctx context.Context, c Opts) error { return nil } -func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) error { +func waitFor(ctx context.Context, time time.Duration, ready <-chan struct{}) error { ctx, cancel := context.WithTimeout(ctx, time) defer cancel() @@ -200,7 +224,7 @@ func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) err log.G(ctx).Info("Waiting for pod controller / VK to be ready") select { - case <-vk.Ready(): + case <-ready: return nil case <-ctx.Done(): return errors.Wrap(ctx.Err(), "Error while starting up VK") diff --git a/cmd/virtual-kubelet/commands/root/tracing.go b/cmd/virtual-kubelet/commands/root/tracing.go index 2851b1f37..445273d34 100644 --- a/cmd/virtual-kubelet/commands/root/tracing.go +++ b/cmd/virtual-kubelet/commands/root/tracing.go @@ -22,8 +22,8 @@ import ( "strconv" "strings" - "github.com/cpuguy83/strongerrors" "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus" octrace "go.opencensus.io/trace" @@ -41,7 +41,7 @@ var ( func setupTracing(ctx context.Context, c Opts) error { for k := range c.TraceConfig.Tags { if reservedTagNames[k] { - return strongerrors.InvalidArgument(errors.Errorf("invalid trace tag %q, must not use a reserved tag key")) + return errdefs.InvalidInputf("invalid trace tag %q, must not use a reserved tag key", k) } } if c.TraceConfig.Tags == nil { @@ -72,10 +72,10 @@ func setupTracing(ctx context.Context, c Opts) error { default: rate, err := strconv.Atoi(c.TraceSampleRate) if err != nil { - return strongerrors.InvalidArgument(errors.Wrap(err, "unsupported trace sample rate")) + return errdefs.AsInvalidInput(errors.Wrap(err, "unsupported trace sample rate")) } if rate < 0 || rate > 100 { - return strongerrors.InvalidArgument(errors.Wrap(err, "trace sample rate must be between 0 and 100")) + return errdefs.AsInvalidInput(errors.Wrap(err, "trace sample rate must be between 0 and 100")) } s = octrace.ProbabilitySampler(float64(rate) / 100) } diff --git a/errdefs/invalid.go b/errdefs/invalid.go new file mode 100644 index 000000000..4816557b0 --- /dev/null +++ b/errdefs/invalid.go @@ -0,0 +1,65 @@ +package errdefs + +import ( + "errors" + "fmt" +) + +// InvalidInput is an error interface which denotes whether the opration failed due +// to a the resource not being found. +type ErrInvalidInput interface { + InvalidInput() bool + error +} + +type invalidInputError struct { + error +} + +func (e *invalidInputError) InvalidInput() bool { + return true +} + +func (e *invalidInputError) Cause() error { + return e.error +} + +// AsInvalidInput wraps the passed in error to make it of type ErrInvalidInput +// +// Callers should make sure the passed in error has exactly the error message +// it wants as this function does not decorate the message. +func AsInvalidInput(err error) error { + if err == nil { + return nil + } + return &invalidInputError{err} +} + +// InvalidInput makes an ErrInvalidInput from the provided error message +func InvalidInput(msg string) error { + return &invalidInputError{errors.New(msg)} +} + +// InvalidInputf makes an ErrInvalidInput from the provided error format and args +func InvalidInputf(format string, args ...interface{}) error { + return &invalidInputError{fmt.Errorf(format, args...)} +} + +// IsInvalidInput determines if the passed in error is of type ErrInvalidInput +// +// This will traverse the causal chain (`Cause() error`), until it finds an error +// which implements the `InvalidInput` interface. +func IsInvalidInput(err error) bool { + if err == nil { + return false + } + if e, ok := err.(ErrInvalidInput); ok { + return e.InvalidInput() + } + + if e, ok := err.(causal); ok { + return IsInvalidInput(e.Cause()) + } + + return false +} diff --git a/errdefs/invalid_test.go b/errdefs/invalid_test.go new file mode 100644 index 000000000..e9954f227 --- /dev/null +++ b/errdefs/invalid_test.go @@ -0,0 +1,82 @@ +package errdefs + +import ( + "fmt" + "testing" + + "github.com/pkg/errors" + "gotest.tools/assert" + "gotest.tools/assert/cmp" +) + +type testingInvalidInputError bool + +func (e testingInvalidInputError) Error() string { + return fmt.Sprintf("%v", bool(e)) +} + +func (e testingInvalidInputError) InvalidInput() bool { + return bool(e) +} + +func TestIsInvalidInput(t *testing.T) { + type testCase struct { + name string + err error + xMsg string + xInvalidInput bool + } + + for _, c := range []testCase{ + { + name: "InvalidInputf", + err: InvalidInputf("%s not found", "foo"), + xMsg: "foo not found", + xInvalidInput: true, + }, + { + name: "AsInvalidInput", + err: AsInvalidInput(errors.New("this is a test")), + xMsg: "this is a test", + xInvalidInput: true, + }, + { + name: "AsInvalidInputWithNil", + err: AsInvalidInput(nil), + xMsg: "", + xInvalidInput: false, + }, + { + name: "nilError", + err: nil, + xMsg: "", + xInvalidInput: false, + }, + { + name: "customInvalidInputFalse", + err: testingInvalidInputError(false), + xMsg: "false", + xInvalidInput: false, + }, + { + name: "customInvalidInputTrue", + err: testingInvalidInputError(true), + xMsg: "true", + xInvalidInput: true, + }, + } { + t.Run(c.name, func(t *testing.T) { + assert.Check(t, cmp.Equal(IsInvalidInput(c.err), c.xInvalidInput)) + if c.err != nil { + assert.Check(t, cmp.Equal(c.err.Error(), c.xMsg)) + } + }) + } +} + +func TestInvalidInputCause(t *testing.T) { + err := errors.New("test") + e := &invalidInputError{err} + assert.Check(t, cmp.Equal(e.Cause(), err)) + assert.Check(t, IsInvalidInput(errors.Wrap(e, "some details"))) +} diff --git a/errdefs/notfound.go b/errdefs/notfound.go new file mode 100644 index 000000000..cdc60ca25 --- /dev/null +++ b/errdefs/notfound.go @@ -0,0 +1,65 @@ +package errdefs + +import ( + "errors" + "fmt" +) + +// NotFound is an error interface which denotes whether the opration failed due +// to a the resource not being found. +type ErrNotFound interface { + NotFound() bool + error +} + +type notFoundError struct { + error +} + +func (e *notFoundError) NotFound() bool { + return true +} + +func (e *notFoundError) Cause() error { + return e.error +} + +// AsNotFound wraps the passed in error to make it of type ErrNotFound +// +// Callers should make sure the passed in error has exactly the error message +// it wants as this function does not decorate the message. +func AsNotFound(err error) error { + if err == nil { + return nil + } + return ¬FoundError{err} +} + +// NotFound makes an ErrNotFound from the provided error message +func NotFound(msg string) error { + return ¬FoundError{errors.New(msg)} +} + +// NotFoundf makes an ErrNotFound from the provided error format and args +func NotFoundf(format string, args ...interface{}) error { + return ¬FoundError{fmt.Errorf(format, args...)} +} + +// IsNotFound determines if the passed in error is of type ErrNotFound +// +// This will traverse the causal chain (`Cause() error`), until it finds an error +// which implements the `NotFound` interface. +func IsNotFound(err error) bool { + if err == nil { + return false + } + if e, ok := err.(ErrNotFound); ok { + return e.NotFound() + } + + if e, ok := err.(causal); ok { + return IsNotFound(e.Cause()) + } + + return false +} diff --git a/errdefs/notfound_test.go b/errdefs/notfound_test.go new file mode 100644 index 000000000..2690de84a --- /dev/null +++ b/errdefs/notfound_test.go @@ -0,0 +1,82 @@ +package errdefs + +import ( + "fmt" + "testing" + + "github.com/pkg/errors" + "gotest.tools/assert" + "gotest.tools/assert/cmp" +) + +type testingNotFoundError bool + +func (e testingNotFoundError) Error() string { + return fmt.Sprintf("%v", bool(e)) +} + +func (e testingNotFoundError) NotFound() bool { + return bool(e) +} + +func TestIsNotFound(t *testing.T) { + type testCase struct { + name string + err error + xMsg string + xNotFound bool + } + + for _, c := range []testCase{ + { + name: "NotFoundf", + err: NotFoundf("%s not found", "foo"), + xMsg: "foo not found", + xNotFound: true, + }, + { + name: "AsNotFound", + err: AsNotFound(errors.New("this is a test")), + xMsg: "this is a test", + xNotFound: true, + }, + { + name: "AsNotFoundWithNil", + err: AsNotFound(nil), + xMsg: "", + xNotFound: false, + }, + { + name: "nilError", + err: nil, + xMsg: "", + xNotFound: false, + }, + { + name: "customNotFoundFalse", + err: testingNotFoundError(false), + xMsg: "false", + xNotFound: false, + }, + { + name: "customNotFoundTrue", + err: testingNotFoundError(true), + xMsg: "true", + xNotFound: true, + }, + } { + t.Run(c.name, func(t *testing.T) { + assert.Check(t, cmp.Equal(IsNotFound(c.err), c.xNotFound)) + if c.err != nil { + assert.Check(t, cmp.Equal(c.err.Error(), c.xMsg)) + } + }) + } +} + +func TestNotFoundCause(t *testing.T) { + err := errors.New("test") + e := ¬FoundError{err} + assert.Check(t, cmp.Equal(e.Cause(), err)) + assert.Check(t, IsNotFound(errors.Wrap(e, "some details"))) +} diff --git a/errdefs/wrapped.go b/errdefs/wrapped.go new file mode 100644 index 000000000..446239311 --- /dev/null +++ b/errdefs/wrapped.go @@ -0,0 +1,10 @@ +package errdefs + +// Causal is an error interface for errors which have wrapped another error +// in a non-opaque way. +// +// This pattern is used by github.com/pkg/errors +type causal interface { + Cause() error + error +} diff --git a/manager/resource.go b/manager/resource.go index e8d220afc..cfdbf6531 100644 --- a/manager/resource.go +++ b/manager/resource.go @@ -1,7 +1,7 @@ package manager import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" corev1listers "k8s.io/client-go/listers/core/v1" diff --git a/manager/resource_test.go b/manager/resource_test.go index 216daf96b..fc740349b 100644 --- a/manager/resource_test.go +++ b/manager/resource_test.go @@ -3,7 +3,7 @@ package manager_test import ( "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" diff --git a/providers/alibabacloud/eci.go b/providers/alibabacloud/eci.go index af919a178..080ef0549 100644 --- a/providers/alibabacloud/eci.go +++ b/providers/alibabacloud/eci.go @@ -17,7 +17,7 @@ import ( "time" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" - "github.com/cpuguy83/strongerrors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers/alibabacloud/eci" @@ -254,7 +254,7 @@ func (p *ECIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { } } if eciId == "" { - return strongerrors.NotFound(fmt.Errorf("DeletePod can't find Pod %s-%s", pod.Namespace, pod.Name)) + return errdefs.NotFoundf("DeletePod can't find Pod %s-%s", pod.Namespace, pod.Name) } request := eci.CreateDeleteContainerGroupRequest() diff --git a/providers/alibabacloud/errors.go b/providers/alibabacloud/errors.go index 97807a4c4..1040eb196 100644 --- a/providers/alibabacloud/errors.go +++ b/providers/alibabacloud/errors.go @@ -4,7 +4,7 @@ import ( "net/http" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors" - "github.com/cpuguy83/strongerrors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" ) func wrapError(err error) error { @@ -19,7 +19,7 @@ func wrapError(err error) error { switch se.HttpStatus() { case http.StatusNotFound: - return strongerrors.NotFound(err) + return errdefs.AsNotFound(err) default: return err } diff --git a/providers/aws/fargate/cluster.go b/providers/aws/fargate/cluster.go index 57f27e642..dc07be79d 100644 --- a/providers/aws/fargate/cluster.go +++ b/providers/aws/fargate/cluster.go @@ -12,7 +12,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/ecs" - "github.com/cpuguy83/strongerrors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" k8sTypes "k8s.io/apimachinery/pkg/types" ) @@ -276,7 +276,7 @@ func (c *Cluster) GetPod(namespace string, name string) (*Pod, error) { tag := buildTaskDefinitionTag(c.name, namespace, name) pod, ok := c.pods[tag] if !ok { - return nil, strongerrors.NotFound(fmt.Errorf("pod %s/%s is not found", namespace, name)) + return nil, errdefs.NotFoundf("pod %s/%s is not found", namespace, name) } return pod, nil diff --git a/providers/aws/fargate/region.go b/providers/aws/fargate/region.go index 6c6e0cfb4..37beec372 100644 --- a/providers/aws/fargate/region.go +++ b/providers/aws/fargate/region.go @@ -12,12 +12,17 @@ var ( // FargateRegions are AWS regions where Fargate is available. FargateRegions = Regions{ "ap-northeast-1", // Asia Pacific (Tokyo) + "ap-northeast-2", // Asia Pacific (Seoul) "ap-southeast-1", // Asia Pacific (Singapore) "ap-southeast-2", // Asia Pacific (Sydney) + "ap-south-1", // Asia Pacific (Mumbai) + "ca-central-1", // Canada (Central) "eu-central-1", // EU (Frankfurt) "eu-west-1", // EU (Ireland) + "eu-west-2", // EU (London) "us-east-1", // US East (N. Virginia) "us-east-2", // US East (Ohio) + "us-west-1", // US West (N. California) "us-west-2", // US West (Oregon) } ) diff --git a/providers/azure/aci.go b/providers/azure/aci.go index bb0b9a32c..10355e42e 100644 --- a/providers/azure/aci.go +++ b/providers/azure/aci.go @@ -19,11 +19,11 @@ import ( "sync" "time" - "github.com/cpuguy83/strongerrors" "github.com/gorilla/websocket" client "github.com/virtual-kubelet/azure-aci/client" "github.com/virtual-kubelet/azure-aci/client/aci" "github.com/virtual-kubelet/azure-aci/client/network" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/trace" @@ -777,7 +777,7 @@ func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, } if cg.Tags["NodeName"] != p.nodeName { - return nil, strongerrors.NotFound(errors.New("got unexpected pod node name")) + return nil, errdefs.NotFound("got unexpected pod node name") } // get logs from cg diff --git a/providers/azure/errors.go b/providers/azure/errors.go index 876b00e01..61115eb19 100644 --- a/providers/azure/errors.go +++ b/providers/azure/errors.go @@ -3,8 +3,8 @@ package azure import ( "net/http" - "github.com/cpuguy83/strongerrors" "github.com/virtual-kubelet/azure-aci/client/api" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" ) func wrapError(err error) error { @@ -19,7 +19,7 @@ func wrapError(err error) error { switch e.StatusCode { case http.StatusNotFound: - return strongerrors.NotFound(err) + return errdefs.AsNotFound(err) default: return err } diff --git a/providers/azure/metrics.go b/providers/azure/metrics.go index eeb140d4a..893fe1c43 100644 --- a/providers/azure/metrics.go +++ b/providers/azure/metrics.go @@ -5,7 +5,6 @@ import ( "strings" "time" - "github.com/cpuguy83/strongerrors/status/ocstatus" "github.com/pkg/errors" "github.com/virtual-kubelet/azure-aci/client/aci" "github.com/virtual-kubelet/virtual-kubelet/log" @@ -97,7 +96,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa Types: []aci.MetricType{aci.MetricTypeCPUUsage, aci.MetricTypeMemoryUsage}, }) if err != nil { - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) return errors.Wrapf(err, "error fetching cpu/mem stats for container group %s", cgName) } logger.Debug("Got system stats") @@ -109,7 +108,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa Types: []aci.MetricType{aci.MetricTyperNetworkBytesRecievedPerSecond, aci.MetricTyperNetworkBytesTransmittedPerSecond}, }) if err != nil { - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) return errors.Wrapf(err, "error fetching network stats for container group %s", cgName) } logger.Debug("Got network stats") @@ -120,7 +119,7 @@ func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summa } if err := errGroup.Wait(); err != nil { - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) return nil, errors.Wrap(err, "error in request to fetch container group metrics") } close(chResult) diff --git a/providers/azurebatch/errors.go b/providers/azurebatch/errors.go index 811484cca..6aaff40d5 100644 --- a/providers/azurebatch/errors.go +++ b/providers/azurebatch/errors.go @@ -5,7 +5,7 @@ import ( "github.com/Azure/go-autorest/autorest" "github.com/Azure/go-autorest/autorest/azure" - "github.com/cpuguy83/strongerrors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" ) func wrapError(err error) error { @@ -14,7 +14,7 @@ func wrapError(err error) error { } switch { case isStatus(err, http.StatusNotFound): - return strongerrors.NotFound(err) + return errdefs.AsNotFound(err) default: return err } diff --git a/providers/cri/cri.go b/providers/cri/cri.go index e89b252f8..93368a4e0 100644 --- a/providers/cri/cri.go +++ b/providers/cri/cri.go @@ -16,8 +16,8 @@ import ( "syscall" "time" - "github.com/cpuguy83/strongerrors" log "github.com/sirupsen/logrus" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" @@ -569,7 +569,7 @@ func (p *CRIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { ps, ok := p.podStatus[pod.UID] if !ok { - return strongerrors.NotFound(fmt.Errorf("Pod %s not found", pod.UID)) + return errdefs.NotFoundf("Pod %s not found", pod.UID) } // TODO: Check pod status for running state @@ -601,7 +601,7 @@ func (p *CRIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P pod := p.findPodByName(namespace, name) if pod == nil { - return nil, strongerrors.NotFound(fmt.Errorf("Pod %s in namespace %s could not be found on the node", name, namespace)) + return nil, errdefs.NotFoundf("Pod %s in namespace %s could not be found on the node", name, namespace) } return createPodSpecFromCRI(pod, p.nodeName), nil @@ -638,11 +638,11 @@ func (p *CRIProvider) GetContainerLogs(ctx context.Context, namespace, podName, pod := p.findPodByName(namespace, podName) if pod == nil { - return nil, strongerrors.NotFound(fmt.Errorf("Pod %s in namespace %s not found", podName, namespace)) + return nil, errdefs.NotFoundf("Pod %s in namespace %s not found", podName, namespace) } container := pod.containers[containerName] if container == nil { - return nil, strongerrors.NotFound(fmt.Errorf("Cannot find container %s in pod %s namespace %s", containerName, podName, namespace)) + return nil, errdefs.NotFoundf("Cannot find container %s in pod %s namespace %s", containerName, podName, namespace) } return readLogFile(container.LogPath, opts) @@ -686,7 +686,7 @@ func (p *CRIProvider) GetPodStatus(ctx context.Context, namespace, name string) pod := p.findPodByName(namespace, name) if pod == nil { - return nil, strongerrors.NotFound(fmt.Errorf("Pod %s in namespace %s could not be found on the node", name, namespace)) + return nil, errdefs.NotFoundf("Pod %s in namespace %s could not be found on the node", name, namespace) } return createPodStatusFromCRI(pod), nil diff --git a/providers/huawei/cci.go b/providers/huawei/cci.go index 941d6496a..5a722eb0d 100644 --- a/providers/huawei/cci.go +++ b/providers/huawei/cci.go @@ -15,7 +15,7 @@ import ( "strings" "time" - "github.com/cpuguy83/strongerrors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers/huawei/auth" "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" @@ -257,7 +257,7 @@ func errorFromResponse(resp *http.Response) error { switch resp.StatusCode { case http.StatusNotFound: - return strongerrors.NotFound(err) + return errdefs.AsNotFound(err) default: return err } diff --git a/providers/huawei/cciMock.go b/providers/huawei/cciMock.go index 3e4415374..02d4ec5bd 100644 --- a/providers/huawei/cciMock.go +++ b/providers/huawei/cciMock.go @@ -7,7 +7,7 @@ import ( "net/http/httptest" "github.com/gorilla/mux" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) // CCIMock implements a CCI service mock server. diff --git a/providers/mock/mock.go b/providers/mock/mock.go index 967f87d45..2d4e2ae83 100644 --- a/providers/mock/mock.go +++ b/providers/mock/mock.go @@ -10,8 +10,7 @@ import ( "strings" "time" - "github.com/cpuguy83/strongerrors" - "github.com/cpuguy83/strongerrors/status/ocstatus" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" @@ -252,7 +251,7 @@ func (p *MockLegacyProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err er } if _, exists := p.pods[key]; !exists { - return strongerrors.NotFound(fmt.Errorf("pod not found")) + return errdefs.NotFound("pod not found") } now := metav1.Now() @@ -281,7 +280,7 @@ func (p *MockLegacyProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err er func (p *MockLegacyProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { ctx, span := trace.StartSpan(ctx, "GetPod") defer func() { - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) span.End() }() @@ -298,7 +297,7 @@ func (p *MockLegacyProvider) GetPod(ctx context.Context, namespace, name string) if pod, ok := p.pods[key]; ok { return pod, nil } - return nil, strongerrors.NotFound(fmt.Errorf("pod \"%s/%s\" is not known to the provider", namespace, name)) + return nil, errdefs.NotFoundf("pod \"%s/%s\" is not known to the provider", namespace, name) } // GetContainerLogs retrieves the logs of a container by name from the provider. diff --git a/providers/provider.go b/providers/provider.go index 36fcf2afc..f0976fc7a 100644 --- a/providers/provider.go +++ b/providers/provider.go @@ -11,6 +11,10 @@ import ( ) // Provider contains the methods required to implement a virtual-kubelet provider. +// +// Errors produced by these methods should implement an interface from +// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the +// core logic to be able to understand the type of failure. type Provider interface { vkubelet.PodLifecycleHandler diff --git a/providers/register/register.go b/providers/register/register.go index b8d979fde..51a75d039 100644 --- a/providers/register/register.go +++ b/providers/register/register.go @@ -3,8 +3,7 @@ package register import ( "sort" - "github.com/cpuguy83/strongerrors" - "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" ) @@ -27,7 +26,7 @@ type initFunc func(InitConfig) (providers.Provider, error) func GetProvider(name string, cfg InitConfig) (providers.Provider, error) { f, ok := providerInits[name] if !ok { - return nil, strongerrors.NotFound(errors.Errorf("provider not found: %s", name)) + return nil, errdefs.NotFoundf("provider not found: %s", name) } return f(cfg) } diff --git a/providers/web/broker.go b/providers/web/broker.go index 9587f2351..32e33f336 100644 --- a/providers/web/broker.go +++ b/providers/web/broker.go @@ -29,7 +29,7 @@ import ( "time" "github.com/cenkalti/backoff" - "github.com/cpuguy83/strongerrors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" v1 "k8s.io/api/core/v1" ) @@ -292,7 +292,7 @@ func checkResponseStatus(r *http.Response, err error) error { if r.StatusCode < 200 || r.StatusCode > 299 { switch r.StatusCode { case http.StatusNotFound: - return strongerrors.NotFound(errors.New(r.Status)) + return errdefs.NotFound(r.Status) default: return errors.New(r.Status) } diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go index b0b0d1e47..eacacab51 100644 --- a/test/e2e/main_test.go +++ b/test/e2e/main_test.go @@ -7,7 +7,7 @@ import ( "os" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "github.com/virtual-kubelet/virtual-kubelet/test/e2e/framework" ) diff --git a/test/e2e/node_test.go b/test/e2e/node_test.go index ede1810c8..5c81022b4 100644 --- a/test/e2e/node_test.go +++ b/test/e2e/node_test.go @@ -9,7 +9,7 @@ import ( "gotest.tools/assert" is "gotest.tools/assert/cmp" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" watchapi "k8s.io/apimachinery/pkg/watch" @@ -26,7 +26,7 @@ func TestNodeCreateAfterDelete(t *testing.T) { }) assert.NilError(t, err) - assert.Assert(t, is.Len(podList.Items, 0), "Kubernetes does not allow node deletion with dependent objects (pods) in existence: %v", podList.Items) + assert.Assert(t, is.Len(podList.Items, 0), "Kubernetes does not allow node deletion with dependent objects (pods) in existence: %v") chErr := make(chan error, 1) diff --git a/trace/nop.go b/trace/nop.go index 2954ec63c..7a50bcf61 100644 --- a/trace/nop.go +++ b/trace/nop.go @@ -4,7 +4,6 @@ import ( "context" "github.com/virtual-kubelet/virtual-kubelet/log" - "go.opencensus.io/trace" ) type nopTracer struct{} @@ -15,9 +14,9 @@ func (nopTracer) StartSpan(ctx context.Context, _ string) (context.Context, Span type nopSpan struct{} -func (nopSpan) End() {} -func (nopSpan) SetStatus(trace.Status) {} -func (nopSpan) Logger() log.Logger { return nil } +func (nopSpan) End() {} +func (nopSpan) SetStatus(error) {} +func (nopSpan) Logger() log.Logger { return nil } func (nopSpan) WithField(ctx context.Context, _ string, _ interface{}) context.Context { return ctx } func (nopSpan) WithFields(ctx context.Context, _ log.Fields) context.Context { return ctx } diff --git a/trace/opencensus/ocagent.go b/trace/opencensus/ocagent.go index 8c813eb50..7b219a65b 100644 --- a/trace/opencensus/ocagent.go +++ b/trace/opencensus/ocagent.go @@ -6,8 +6,7 @@ import ( "os" "contrib.go.opencensus.io/exporter/ocagent" - "github.com/cpuguy83/strongerrors" - "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "go.opencensus.io/trace" ) @@ -22,7 +21,7 @@ func NewOCAgentExporter(opts TracingExporterOptions) (trace.Exporter, error) { if endpoint := os.Getenv("OCAGENT_ENDPOINT"); endpoint != "" { agentOpts = append(agentOpts, ocagent.WithAddress(endpoint)) } else { - return nil, strongerrors.InvalidArgument(errors.New("must set endpoint address in OCAGENT_ENDPOINT")) + return nil, errdefs.InvalidInput("must set endpoint address in OCAGENT_ENDPOINT") } switch os.Getenv("OCAGENT_INSECURE") { @@ -30,7 +29,7 @@ func NewOCAgentExporter(opts TracingExporterOptions) (trace.Exporter, error) { case "1", "yes", "y", "on": agentOpts = append(agentOpts, ocagent.WithInsecure()) default: - return nil, strongerrors.InvalidArgument(errors.New("invalid value for OCAGENT_INSECURE")) + return nil, errdefs.InvalidInput("invalid value for OCAGENT_INSECURE") } return ocagent.NewExporter(agentOpts...) diff --git a/trace/opencensus/opencensus.go b/trace/opencensus/opencensus.go index afaef4f4d..ae9023d4b 100644 --- a/trace/opencensus/opencensus.go +++ b/trace/opencensus/opencensus.go @@ -9,6 +9,7 @@ import ( "fmt" "sync" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" octrace "go.opencensus.io/trace" @@ -46,7 +47,30 @@ func (s *span) End() { s.s.End() } -func (s *span) SetStatus(status trace.Status) { +func (s *span) SetStatus(err error) { + if !s.s.IsRecordingEvents() { + return + } + + var status octrace.Status + + if err == nil { + status.Code = octrace.StatusCodeOK + s.s.SetStatus(status) + return + } + + switch { + case errdefs.IsNotFound(err): + status.Code = octrace.StatusCodeNotFound + case errdefs.IsInvalidInput(err): + status.Code = octrace.StatusCodeInvalidArgument + // TODO: other error types + default: + status.Code = octrace.StatusCodeUnknown + } + + status.Message = err.Error() s.s.SetStatus(status) } diff --git a/trace/opencensus/register.go b/trace/opencensus/register.go index 2a74a95a4..a77b5c3f6 100644 --- a/trace/opencensus/register.go +++ b/trace/opencensus/register.go @@ -1,8 +1,7 @@ package opencensus import ( - "github.com/cpuguy83/strongerrors" - "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "go.opencensus.io/trace" ) @@ -30,7 +29,7 @@ func RegisterTracingExporter(name string, f TracingExporterInitFunc) { func GetTracingExporter(name string, opts TracingExporterOptions) (trace.Exporter, error) { f, ok := tracingExporters[name] if !ok { - return nil, strongerrors.NotFound(errors.Errorf("tracing exporter %q not found", name)) + return nil, errdefs.NotFoundf("tracing exporter %q not found", name) } return f(opts) } diff --git a/trace/opencensus/register_test.go b/trace/opencensus/register_test.go index 140f0ba77..02c2ee4ab 100644 --- a/trace/opencensus/register_test.go +++ b/trace/opencensus/register_test.go @@ -3,8 +3,7 @@ package opencensus import ( "testing" - "github.com/cpuguy83/strongerrors" - + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "go.opencensus.io/trace" ) @@ -16,7 +15,7 @@ func TestGetTracingExporter(t *testing.T) { } _, err := GetTracingExporter("notexist", TracingExporterOptions{}) - if !strongerrors.IsNotFound(err) { + if !errdefs.IsNotFound(err) { t.Fatalf("expected not found error, got: %v", err) } diff --git a/trace/trace.go b/trace/trace.go index 752432d4c..27e3ce27e 100644 --- a/trace/trace.go +++ b/trace/trace.go @@ -9,15 +9,8 @@ import ( "context" "github.com/virtual-kubelet/virtual-kubelet/log" - "go.opencensus.io/trace" ) -// Status is an alias to opencensus's trace status. -// The main reason we use this instead of implementing our own is library re-use, -// namely for converting an error to a tracing status. -// In the future this may be defined completely in this package. -type Status = trace.Status - // Tracer is the interface used for creating a tracing span type Tracer interface { // StartSpan starts a new span. The span details are emebedded into the returned @@ -41,7 +34,13 @@ func StartSpan(ctx context.Context, name string) (context.Context, Span) { // Span encapsulates a tracing event type Span interface { End() - SetStatus(Status) + + // SetStatus sets the final status of the span. + // errors passed to this should use interfaces defined in + // github.com/virtual-kubelet/virtual-kubelet/errdefs + // + // If the error is nil, the span should be considered successful. + SetStatus(err error) // WithField and WithFields adds attributes to an entire span // diff --git a/vendor/github.com/cpuguy83/strongerrors/LICENSE b/vendor/github.com/cpuguy83/strongerrors/LICENSE deleted file mode 100644 index 261eeb9e9..000000000 --- a/vendor/github.com/cpuguy83/strongerrors/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/vendor/github.com/cpuguy83/strongerrors/defs.go b/vendor/github.com/cpuguy83/strongerrors/defs.go deleted file mode 100644 index e67ced8fd..000000000 --- a/vendor/github.com/cpuguy83/strongerrors/defs.go +++ /dev/null @@ -1,84 +0,0 @@ -package strongerrors - -// ErrNotFound signals that the requested object doesn't exist -type ErrNotFound interface { - NotFound() -} - -// ErrInvalidArgument signals that the user input is invalid -type ErrInvalidArgument interface { - InvalidArgument() -} - -// ErrConflict signals that some internal state conflicts with the requested action and can't be performed. -// A change in state should be able to clear this error. -type ErrConflict interface { - Conflict() -} - -// ErrUnauthorized is used to signify that the user is not authorized to perform a specific action -type ErrUnauthorized interface { - Unauthorized() -} - -// ErrUnauthenticated is used to indicate that the caller cannot be identified. -type ErrUnauthenticated interface { - Unauthenticated() -} - -// ErrUnavailable signals that the requested action/subsystem is not available. -type ErrUnavailable interface { - Unavailable() -} - -// ErrForbidden signals that the requested action cannot be performed under any circumstances. -// When a ErrForbidden is returned, the caller should never retry the action. -type ErrForbidden interface { - Forbidden() -} - -// ErrSystem signals that some internal error occurred. -// An example of this would be a failed mount request. -type ErrSystem interface { - System() -} - -// ErrNotModified signals that an action can't be performed because it's already in the desired state -type ErrNotModified interface { - NotModified() -} - -// ErrAlreadyExists is a special case of ErrNotModified which signals that the desired object already exists -type ErrAlreadyExists interface { - AlreadyExists() -} - -// ErrNotImplemented signals that the requested action/feature is not implemented on the system as configured. -type ErrNotImplemented interface { - NotImplemented() -} - -// ErrUnknown signals that the kind of error that occurred is not known. -type ErrUnknown interface { - Unknown() -} - -// ErrCancelled signals that the action was cancelled. -type ErrCancelled interface { - Cancelled() -} - -// ErrDeadline signals that the deadline was reached before the action completed. -type ErrDeadline interface { - DeadlineExceeded() -} - -// ErrExhausted indicates that the action cannot be performed because some resource is exhausted. -type ErrExhausted interface { - Exhausted() -} - -// ErrDataLoss indicates that data was lost or there is data corruption. -type ErrDataLoss interface { - DataLoss() -} diff --git a/vendor/github.com/cpuguy83/strongerrors/doc.go b/vendor/github.com/cpuguy83/strongerrors/doc.go deleted file mode 100644 index f262bf98e..000000000 --- a/vendor/github.com/cpuguy83/strongerrors/doc.go +++ /dev/null @@ -1,12 +0,0 @@ -// Package strongerrors defines a set of error interfaces that packages should use for communicating classes of errors. -// Errors that cross the package boundary should implement one (and only one) of these interfaces. -// -// Packages should not reference these interfaces directly, only implement them. -// To check if a particular error implements one of these interfaces, there are helper -// functions provided (e.g. `Is`) which can be used rather than asserting the interfaces directly. -// If you must assert on these interfaces, be sure to check the causal chain (`err.Cause()`). -// -// A set of helper functions are provided to take any error and turn it into a specific error class. -// This frees you from defining the same error classes all over your code. However, you can still -// implement the error classes ony our own if you desire. -package strongerrors diff --git a/vendor/github.com/cpuguy83/strongerrors/helpers.go b/vendor/github.com/cpuguy83/strongerrors/helpers.go deleted file mode 100644 index 9efe54e25..000000000 --- a/vendor/github.com/cpuguy83/strongerrors/helpers.go +++ /dev/null @@ -1,272 +0,0 @@ -package strongerrors - -import "context" - -type errNotFound struct{ error } - -func (errNotFound) NotFound() {} - -func (e errNotFound) Cause() error { - return e.error -} - -// NotFound is a helper to create an error of the class with the same name from any error type -func NotFound(err error) error { - if err == nil { - return nil - } - return errNotFound{err} -} - -type errInvalidArg struct{ error } - -func (errInvalidArg) InvalidArgument() {} - -func (e errInvalidArg) Cause() error { - return e.error -} - -// InvalidArgument is a helper to create an error of the class with the same name from any error type -func InvalidArgument(err error) error { - if err == nil { - return nil - } - return errInvalidArg{err} -} - -type errConflict struct{ error } - -func (errConflict) Conflict() {} - -func (e errConflict) Cause() error { - return e.error -} - -// Conflict is a helper to create an error of the class with the same name from any error type -func Conflict(err error) error { - if err == nil { - return nil - } - return errConflict{err} -} - -type errUnauthorized struct{ error } - -func (errUnauthorized) Unauthorized() {} - -func (e errUnauthorized) Cause() error { - return e.error -} - -// Unauthorized is a helper to create an error of the class with the same name from any error type -func Unauthorized(err error) error { - if err == nil { - return nil - } - return errUnauthorized{err} -} - -type errUnauthenticated struct{ error } - -func (errUnauthenticated) Unauthenticated() {} - -func (e errUnauthenticated) Cause() error { - return e.error -} - -// Unauthenticated is a helper to create an error of the class with the same name from any error type -func Unauthenticated(err error) error { - if err == nil { - return nil - } - return errUnauthenticated{err} -} - -type errUnavailable struct{ error } - -func (errUnavailable) Unavailable() {} - -func (e errUnavailable) Cause() error { - return e.error -} - -// Unavailable is a helper to create an error of the class with the same name from any error type -func Unavailable(err error) error { - return errUnavailable{err} -} - -type errForbidden struct{ error } - -func (errForbidden) Forbidden() {} - -func (e errForbidden) Cause() error { - return e.error -} - -// Forbidden is a helper to create an error of the class with the same name from any error type -func Forbidden(err error) error { - if err == nil { - return nil - } - return errForbidden{err} -} - -type errSystem struct{ error } - -func (errSystem) System() {} - -func (e errSystem) Cause() error { - return e.error -} - -// System is a helper to create an error of the class with the same name from any error type -func System(err error) error { - if err == nil { - return nil - } - return errSystem{err} -} - -type errNotModified struct{ error } - -func (errNotModified) NotModified() {} - -func (e errNotModified) Cause() error { - return e.error -} - -// NotModified is a helper to create an error of the class with the same name from any error type -func NotModified(err error) error { - if err == nil { - return nil - } - return errNotModified{err} -} - -type errAlreadyExists struct{ error } - -func (errAlreadyExists) AlreadyExists() {} - -func (e errAlreadyExists) Cause() error { - return e.error -} - -// AlreadyExists is a helper to create an error of the class with the same name from any error type -func AlreadyExists(err error) error { - if err == nil { - return nil - } - return errAlreadyExists{err} -} - -type errNotImplemented struct{ error } - -func (errNotImplemented) NotImplemented() {} - -func (e errNotImplemented) Cause() error { - return e.error -} - -// NotImplemented is a helper to create an error of the class with the same name from any error type -func NotImplemented(err error) error { - if err == nil { - return nil - } - return errNotImplemented{err} -} - -type errUnknown struct{ error } - -func (errUnknown) Unknown() {} - -func (e errUnknown) Cause() error { - return e.error -} - -// Unknown is a helper to create an error of the class with the same name from any error type -func Unknown(err error) error { - if err == nil { - return nil - } - return errUnknown{err} -} - -type errCancelled struct{ error } - -func (errCancelled) Cancelled() {} - -func (e errCancelled) Cause() error { - return e.error -} - -// Cancelled is a helper to create an error of the class with the same name from any error type -func Cancelled(err error) error { - if err == nil { - return nil - } - return errCancelled{err} -} - -type errDeadline struct{ error } - -func (errDeadline) DeadlineExceeded() {} - -func (e errDeadline) Cause() error { - return e.error -} - -// Deadline is a helper to create an error of the class with the same name from any error type -func Deadline(err error) error { - if err == nil { - return nil - } - return errDeadline{err} -} - -type errExhausted struct{ error } - -func (errExhausted) Exhausted() {} - -func (e errExhausted) Cause() error { - return e.error -} - -// Exhausted is a helper to create an error of the class with the same name from any error type -func Exhausted(err error) error { - if err == nil { - return nil - } - return errExhausted{err} -} - -type errDataLoss struct{ error } - -func (errDataLoss) DataLoss() {} - -func (e errDataLoss) Cause() error { - return e.error -} - -// DataLoss is a helper to create an error of the class with the same name from any error type -func DataLoss(err error) error { - if err == nil { - return nil - } - return errDataLoss{err} -} - -// FromContext returns the error class from the passed in context -func FromContext(ctx context.Context) error { - e := ctx.Err() - if e == nil { - return nil - } - - if e == context.Canceled { - return Cancelled(e) - } - if e == context.DeadlineExceeded { - return Deadline(e) - } - return Unknown(e) -} diff --git a/vendor/github.com/cpuguy83/strongerrors/is.go b/vendor/github.com/cpuguy83/strongerrors/is.go deleted file mode 100644 index 2d94546d7..000000000 --- a/vendor/github.com/cpuguy83/strongerrors/is.go +++ /dev/null @@ -1,128 +0,0 @@ -package strongerrors - -type causer interface { - Cause() error -} - -func getImplementer(err error) error { - switch e := err.(type) { - case - ErrNotFound, - ErrInvalidArgument, - ErrConflict, - ErrUnauthorized, - ErrUnauthenticated, - ErrUnavailable, - ErrForbidden, - ErrSystem, - ErrNotModified, - ErrAlreadyExists, - ErrNotImplemented, - ErrCancelled, - ErrDeadline, - ErrDataLoss, - ErrExhausted, - ErrUnknown: - return err - case causer: - return getImplementer(e.Cause()) - default: - return err - } -} - -// IsNotFound returns if the passed in error is an ErrNotFound -func IsNotFound(err error) bool { - _, ok := getImplementer(err).(ErrNotFound) - return ok -} - -// IsInvalidArgument returns if the passed in error is an ErrInvalidParameter -func IsInvalidArgument(err error) bool { - _, ok := getImplementer(err).(ErrInvalidArgument) - return ok -} - -// IsConflict returns if the passed in error is an ErrConflict -func IsConflict(err error) bool { - _, ok := getImplementer(err).(ErrConflict) - return ok -} - -// IsUnauthorized returns if the the passed in error is an ErrUnauthorized -func IsUnauthorized(err error) bool { - _, ok := getImplementer(err).(ErrUnauthorized) - return ok -} - -// IsUnauthenticated returns if the the passed in error is an ErrUnauthenticated -func IsUnauthenticated(err error) bool { - _, ok := getImplementer(err).(ErrUnauthenticated) - return ok -} - -// IsUnavailable returns if the passed in error is an ErrUnavailable -func IsUnavailable(err error) bool { - _, ok := getImplementer(err).(ErrUnavailable) - return ok -} - -// IsForbidden returns if the passed in error is an ErrForbidden -func IsForbidden(err error) bool { - _, ok := getImplementer(err).(ErrForbidden) - return ok -} - -// IsSystem returns if the passed in error is an ErrSystem -func IsSystem(err error) bool { - _, ok := getImplementer(err).(ErrSystem) - return ok -} - -// IsNotModified returns if the passed in error is a NotModified error -func IsNotModified(err error) bool { - _, ok := getImplementer(err).(ErrNotModified) - return ok -} - -// IsAlreadyExists returns if the passed in error is a AlreadyExists error -func IsAlreadyExists(err error) bool { - _, ok := getImplementer(err).(ErrAlreadyExists) - return ok -} - -// IsNotImplemented returns if the passed in error is an ErrNotImplemented -func IsNotImplemented(err error) bool { - _, ok := getImplementer(err).(ErrNotImplemented) - return ok -} - -// IsUnknown returns if the passed in error is an ErrUnknown -func IsUnknown(err error) bool { - _, ok := getImplementer(err).(ErrUnknown) - return ok -} - -// IsCancelled returns if the passed in error is an ErrCancelled -func IsCancelled(err error) bool { - _, ok := getImplementer(err).(ErrCancelled) - return ok -} - -// IsDeadline returns if the passed in error is an ErrDeadline -func IsDeadline(err error) bool { - _, ok := getImplementer(err).(ErrDeadline) - return ok -} - -// IsExhausted returns if the passed in error is an ErrDeadline -func IsExhausted(err error) bool { - _, ok := getImplementer(err).(ErrExhausted) - return ok -} - -// IsDataLoss returns if the passed in error is an ErrDataLoss -func IsDataLoss(err error) bool { - _, ok := getImplementer(err).(ErrDataLoss) - return ok -} diff --git a/vendor/github.com/cpuguy83/strongerrors/status/grpc.go b/vendor/github.com/cpuguy83/strongerrors/status/grpc.go deleted file mode 100644 index c446a86a5..000000000 --- a/vendor/github.com/cpuguy83/strongerrors/status/grpc.go +++ /dev/null @@ -1,85 +0,0 @@ -package status - -import ( - "github.com/cpuguy83/strongerrors" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// FromGRPC returns an error class from the provided GPRC status -// If the status is nil or OK, this will return nil -// nolint: gocyclo -func FromGRPC(s *status.Status) error { - if s == nil || s.Code() == codes.OK { - return nil - } - - switch s.Code() { - case codes.InvalidArgument: - return strongerrors.InvalidArgument(s.Err()) - case codes.NotFound: - return strongerrors.NotFound(s.Err()) - case codes.Unimplemented: - return strongerrors.NotImplemented(s.Err()) - case codes.DeadlineExceeded: - return strongerrors.Deadline(s.Err()) - case codes.Canceled: - return strongerrors.Cancelled(s.Err()) - case codes.AlreadyExists: - return strongerrors.AlreadyExists(s.Err()) - case codes.PermissionDenied: - return strongerrors.Unauthorized(s.Err()) - case codes.Unauthenticated: - return strongerrors.Unauthenticated(s.Err()) - // TODO(cpuguy83): consider more granular errors for these cases - case codes.FailedPrecondition, codes.Aborted, codes.Unavailable, codes.OutOfRange: - return strongerrors.Conflict(s.Err()) - case codes.ResourceExhausted: - return strongerrors.Exhausted(s.Err()) - case codes.DataLoss: - return strongerrors.DataLoss(s.Err()) - default: - return strongerrors.Unknown(s.Err()) - } -} - -// ToGRPC takes the passed in error and converts it to a GRPC status error -// If the passed in error is already a gprc status error, then it is returned unmodified -// If the passed in error is nil, then a nil error is returned. -// nolint: gocyclo -func ToGRPC(err error) error { - if _, ok := status.FromError(err); ok { - return err - } - - switch { - case strongerrors.IsNotFound(err): - return status.Error(codes.NotFound, err.Error()) - case strongerrors.IsConflict(err), strongerrors.IsNotModified(err): - return status.Error(codes.FailedPrecondition, err.Error()) - case strongerrors.IsInvalidArgument(err): - return status.Error(codes.InvalidArgument, err.Error()) - case strongerrors.IsAlreadyExists(err): - return status.Error(codes.AlreadyExists, err.Error()) - case strongerrors.IsCancelled(err): - return status.Error(codes.Canceled, err.Error()) - case strongerrors.IsDeadline(err): - return status.Error(codes.DeadlineExceeded, err.Error()) - case strongerrors.IsUnauthorized(err): - return status.Error(codes.PermissionDenied, err.Error()) - case strongerrors.IsUnauthenticated(err): - return status.Error(codes.Unauthenticated, err.Error()) - case strongerrors.IsForbidden(err), strongerrors.IsNotImplemented(err): - return status.Error(codes.Unimplemented, err.Error()) - case strongerrors.IsExhausted(err): - return status.Error(codes.ResourceExhausted, err.Error()) - case strongerrors.IsDataLoss(err): - return status.Error(codes.DataLoss, err.Error()) - case strongerrors.IsSystem(err): - return status.Error(codes.Internal, err.Error()) - case strongerrors.IsUnavailable(err): - return status.Error(codes.Unavailable, err.Error()) - default: - return status.Error(codes.Unknown, err.Error()) - } -} diff --git a/vendor/github.com/cpuguy83/strongerrors/status/http.go b/vendor/github.com/cpuguy83/strongerrors/status/http.go deleted file mode 100644 index fa970ce61..000000000 --- a/vendor/github.com/cpuguy83/strongerrors/status/http.go +++ /dev/null @@ -1,37 +0,0 @@ -package status - -import ( - "net/http" - - "github.com/cpuguy83/strongerrors" -) - -// HTTPCode takes an error and returns the HTTP status code for the given error -// If a match is found then the second return argument will be true, otherwise it will be false. -// nolint: gocyclo -func HTTPCode(err error) (int, bool) { - switch { - case strongerrors.IsNotFound(err): - return http.StatusNotFound, true - case strongerrors.IsInvalidArgument(err): - return http.StatusBadRequest, true - case strongerrors.IsConflict(err): - return http.StatusConflict, true - case strongerrors.IsUnauthenticated(err), strongerrors.IsForbidden(err): - return http.StatusForbidden, true - case strongerrors.IsUnauthorized(err): - return http.StatusUnauthorized, true - case strongerrors.IsUnavailable(err): - return http.StatusServiceUnavailable, true - case strongerrors.IsForbidden(err): - return http.StatusForbidden, true - case strongerrors.IsAlreadyExists(err), strongerrors.IsNotModified(err): - return http.StatusNotModified, true - case strongerrors.IsNotImplemented(err): - return http.StatusNotImplemented, true - case strongerrors.IsSystem(err) || strongerrors.IsUnknown(err) || strongerrors.IsDataLoss(err) || strongerrors.IsExhausted(err): - return http.StatusInternalServerError, true - default: - return http.StatusInternalServerError, false - } -} diff --git a/vendor/github.com/cpuguy83/strongerrors/status/ocstatus/status.go b/vendor/github.com/cpuguy83/strongerrors/status/ocstatus/status.go deleted file mode 100644 index e76e19654..000000000 --- a/vendor/github.com/cpuguy83/strongerrors/status/ocstatus/status.go +++ /dev/null @@ -1,49 +0,0 @@ -// Package ocstatus provides error status conversions to opencencus status trace.StatusCode -package ocstatus - -import ( - "github.com/cpuguy83/strongerrors" - "go.opencensus.io/trace" -) - -// FromError makes an opencencus trace.Status from the passed in error. -func FromError(err error) trace.Status { - if err == nil { - return trace.Status{Code: trace.StatusCodeOK} - } - - switch { - case strongerrors.IsNotFound(err): - return status(trace.StatusCodeNotFound, err) - case strongerrors.IsConflict(err), strongerrors.IsNotModified(err): - return status(trace.StatusCodeFailedPrecondition, err) - case strongerrors.IsInvalidArgument(err): - return status(trace.StatusCodeInvalidArgument, err) - case strongerrors.IsAlreadyExists(err): - return status(trace.StatusCodeAlreadyExists, err) - case strongerrors.IsCancelled(err): - return status(trace.StatusCodeCancelled, err) - case strongerrors.IsDeadline(err): - return status(trace.StatusCodeDeadlineExceeded, err) - case strongerrors.IsUnauthorized(err): - return status(trace.StatusCodePermissionDenied, err) - case strongerrors.IsUnauthenticated(err): - return status(trace.StatusCodeUnauthenticated, err) - case strongerrors.IsForbidden(err), strongerrors.IsNotImplemented(err): - return status(trace.StatusCodeUnimplemented, err) - case strongerrors.IsExhausted(err): - return status(trace.StatusCodeResourceExhausted, err) - case strongerrors.IsDataLoss(err): - return status(trace.StatusCodeDataLoss, err) - case strongerrors.IsSystem(err): - return status(trace.StatusCodeInternal, err) - case strongerrors.IsUnavailable(err): - return status(trace.StatusCodeUnavailable, err) - default: - return status(trace.StatusCodeUnknown, err) - } -} - -func status(code int32, err error) trace.Status { - return trace.Status{Code: code, Message: err.Error()} -} diff --git a/vendor/k8s.io/kubernetes/third_party/forked/golang/expansion/expand.go b/vendor/k8s.io/kubernetes/third_party/forked/golang/expansion/expand.go new file mode 100644 index 000000000..6bf0ea8ce --- /dev/null +++ b/vendor/k8s.io/kubernetes/third_party/forked/golang/expansion/expand.go @@ -0,0 +1,102 @@ +package expansion + +import ( + "bytes" +) + +const ( + operator = '$' + referenceOpener = '(' + referenceCloser = ')' +) + +// syntaxWrap returns the input string wrapped by the expansion syntax. +func syntaxWrap(input string) string { + return string(operator) + string(referenceOpener) + input + string(referenceCloser) +} + +// MappingFuncFor returns a mapping function for use with Expand that +// implements the expansion semantics defined in the expansion spec; it +// returns the input string wrapped in the expansion syntax if no mapping +// for the input is found. +func MappingFuncFor(context ...map[string]string) func(string) string { + return func(input string) string { + for _, vars := range context { + val, ok := vars[input] + if ok { + return val + } + } + + return syntaxWrap(input) + } +} + +// Expand replaces variable references in the input string according to +// the expansion spec using the given mapping function to resolve the +// values of variables. +func Expand(input string, mapping func(string) string) string { + var buf bytes.Buffer + checkpoint := 0 + for cursor := 0; cursor < len(input); cursor++ { + if input[cursor] == operator && cursor+1 < len(input) { + // Copy the portion of the input string since the last + // checkpoint into the buffer + buf.WriteString(input[checkpoint:cursor]) + + // Attempt to read the variable name as defined by the + // syntax from the input string + read, isVar, advance := tryReadVariableName(input[cursor+1:]) + + if isVar { + // We were able to read a variable name correctly; + // apply the mapping to the variable name and copy the + // bytes into the buffer + buf.WriteString(mapping(read)) + } else { + // Not a variable name; copy the read bytes into the buffer + buf.WriteString(read) + } + + // Advance the cursor in the input string to account for + // bytes consumed to read the variable name expression + cursor += advance + + // Advance the checkpoint in the input string + checkpoint = cursor + 1 + } + } + + // Return the buffer and any remaining unwritten bytes in the + // input string. + return buf.String() + input[checkpoint:] +} + +// tryReadVariableName attempts to read a variable name from the input +// string and returns the content read from the input, whether that content +// represents a variable name to perform mapping on, and the number of bytes +// consumed in the input string. +// +// The input string is assumed not to contain the initial operator. +func tryReadVariableName(input string) (string, bool, int) { + switch input[0] { + case operator: + // Escaped operator; return it. + return input[0:1], false, 1 + case referenceOpener: + // Scan to expression closer + for i := 1; i < len(input); i++ { + if input[i] == referenceCloser { + return input[1:i], true, i + 1 + } + } + + // Incomplete reference; return it. + return string(operator) + string(referenceOpener), false, 1 + default: + // Not the beginning of an expression, ie, an operator + // that doesn't begin an expression. Return the operator + // and the first rune in the string. + return (string(operator) + string(input[0])), false, 1 + } +} diff --git a/vkubelet/api/exec.go b/vkubelet/api/exec.go index ebbacd9ae..310ac3fa8 100644 --- a/vkubelet/api/exec.go +++ b/vkubelet/api/exec.go @@ -7,9 +7,9 @@ import ( "strings" "time" - "github.com/cpuguy83/strongerrors" "github.com/gorilla/mux" "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "k8s.io/apimachinery/pkg/types" remoteutils "k8s.io/client-go/tools/remotecommand" api "k8s.io/kubernetes/pkg/apis/core" @@ -56,7 +56,7 @@ func HandleContainerExec(h ContainerExecHandlerFunc) http.HandlerFunc { streamOpts, err := getExecOptions(req) if err != nil { - return strongerrors.InvalidArgument(err) + return errdefs.AsInvalidInput(err) } idleTimeout := time.Second * 30 diff --git a/vkubelet/api/helpers.go b/vkubelet/api/helpers.go index 1d49f8273..9d8010d1f 100644 --- a/vkubelet/api/helpers.go +++ b/vkubelet/api/helpers.go @@ -4,7 +4,7 @@ import ( "io" "net/http" - "github.com/cpuguy83/strongerrors/status" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" ) @@ -17,7 +17,7 @@ func handleError(f handlerFunc) http.HandlerFunc { return } - code, _ := status.HTTPCode(err) + code := httpStatusCode(err) w.WriteHeader(code) io.WriteString(w, err.Error()) logger := log.G(req.Context()).WithError(err).WithField("httpStatusCode", code) @@ -55,3 +55,16 @@ func (fw *flushWriter) Write(p []byte) (int, error) { } return n, err } + +func httpStatusCode(err error) int { + switch { + case err == nil: + return http.StatusOK + case errdefs.IsNotFound(err): + return http.StatusNotFound + case errdefs.IsInvalidInput(err): + return http.StatusBadRequest + default: + return http.StatusInternalServerError + } +} diff --git a/vkubelet/api/logs.go b/vkubelet/api/logs.go index d062c98e2..b1dc3a279 100644 --- a/vkubelet/api/logs.go +++ b/vkubelet/api/logs.go @@ -7,9 +7,9 @@ import ( "strconv" "time" - "github.com/cpuguy83/strongerrors" "github.com/gorilla/mux" "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" ) @@ -33,7 +33,7 @@ func HandleContainerLogs(h ContainerLogsHandlerFunc) http.HandlerFunc { return handleError(func(w http.ResponseWriter, req *http.Request) error { vars := mux.Vars(req) if len(vars) != 3 { - return strongerrors.NotFound(errors.New("not found")) + return errdefs.NotFound("not found") } ctx := req.Context() @@ -47,7 +47,7 @@ func HandleContainerLogs(h ContainerLogsHandlerFunc) http.HandlerFunc { if queryTail := q.Get("tailLines"); queryTail != "" { t, err := strconv.Atoi(queryTail) if err != nil { - return strongerrors.InvalidArgument(errors.Wrap(err, "could not parse \"tailLines\"")) + return errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"tailLines\"")) } tail = t } @@ -73,7 +73,7 @@ func HandleContainerLogs(h ContainerLogsHandlerFunc) http.HandlerFunc { } if _, err := io.Copy(flushOnWrite(w), logs); err != nil { - return strongerrors.Unknown(errors.Wrap(err, "error writing response to client")) + return errors.Wrap(err, "error writing response to client") } return nil }) diff --git a/vkubelet/api/pods.go b/vkubelet/api/pods.go index e0dc3376a..37275d374 100644 --- a/vkubelet/api/pods.go +++ b/vkubelet/api/pods.go @@ -4,7 +4,6 @@ import ( "context" "net/http" - "github.com/cpuguy83/strongerrors" "github.com/virtual-kubelet/virtual-kubelet/log" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -36,13 +35,13 @@ func HandleRunningPods(getPods PodListerFunc) http.HandlerFunc { codec := codecs.LegacyCodec(v1.SchemeGroupVersion) data, err := runtime.Encode(codec, podList) if err != nil { - return strongerrors.System(err) + return err } w.Header().Set("Content-Type", "application/json") _, err = w.Write(data) if err != nil { - return strongerrors.System(err) + return err } return nil }) diff --git a/vkubelet/api/stats.go b/vkubelet/api/stats.go index 444e26b21..a2a6fbe6f 100644 --- a/vkubelet/api/stats.go +++ b/vkubelet/api/stats.go @@ -5,7 +5,6 @@ import ( "encoding/json" "net/http" - "github.com/cpuguy83/strongerrors" "github.com/pkg/errors" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) @@ -21,20 +20,36 @@ func HandlePodStatsSummary(h PodStatsSummaryHandlerFunc) http.HandlerFunc { return handleError(func(w http.ResponseWriter, req *http.Request) error { stats, err := h(req.Context()) if err != nil { - if errors.Cause(err) == context.Canceled { - return strongerrors.Cancelled(err) + if isCancelled(err) { + return err } return errors.Wrap(err, "error getting status from provider") } b, err := json.Marshal(stats) if err != nil { - return strongerrors.Unknown(errors.Wrap(err, "error marshalling stats")) + return errors.Wrap(err, "error marshalling stats") } if _, err := w.Write(b); err != nil { - return strongerrors.Unknown(errors.Wrap(err, "could not write to client")) + return errors.Wrap(err, "could not write to client") } return nil }) } + +func isCancelled(err error) bool { + if err == context.Canceled { + return true + } + + if e, ok := err.(causal); ok { + return isCancelled(e.Cause()) + } + return false +} + +type causal interface { + Cause() error + error +} diff --git a/vkubelet/env.go b/vkubelet/env.go index 140720ec0..542c15110 100755 --- a/vkubelet/env.go +++ b/vkubelet/env.go @@ -16,6 +16,7 @@ import ( v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" fieldpath "k8s.io/kubernetes/pkg/fieldpath" "k8s.io/kubernetes/pkg/kubelet/envvars" + "k8s.io/kubernetes/third_party/forked/golang/expansion" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" @@ -78,12 +79,13 @@ func populateEnvironmentVariables(ctx context.Context, pod *corev1.Pod, rm *mana // populateContainerEnvironment populates the environment of a single container in the specified pod. func populateContainerEnvironment(ctx context.Context, pod *corev1.Pod, container *corev1.Container, rm *manager.ResourceManager, recorder record.EventRecorder) error { // Create an "environment map" based on the value of the specified container's ".envFrom" field. - envFrom, err := makeEnvironmentMapBasedOnEnvFrom(ctx, pod, container, rm, recorder) + tmpEnv, err := makeEnvironmentMapBasedOnEnvFrom(ctx, pod, container, rm, recorder) if err != nil { return err } - // Create an "environment map" based on the value of the specified container's ".env" field. - env, err := makeEnvironmentMapBasedOnEnv(ctx, pod, container, rm, recorder) + // Create the final "environment map" for the container using the ".env" and ".envFrom" field + // and service environment variables. + err = makeEnvironmentMap(ctx, pod, container, rm, recorder, tmpEnv) if err != nil { return err } @@ -92,7 +94,17 @@ func populateContainerEnvironment(ctx context.Context, pod *corev1.Pod, containe // This is in accordance with what the Kubelet itself does. // https://github.com/kubernetes/kubernetes/blob/v1.13.1/pkg/kubelet/kubelet_pods.go#L557-L558 container.EnvFrom = []corev1.EnvFromSource{} - container.Env = mergeEnvironments(envFrom, env) + + res := make([]corev1.EnvVar, 0) + + for key, val := range tmpEnv { + res = append(res, corev1.EnvVar{ + Name: key, + Value: val, + }) + } + container.Env = res + return nil } @@ -265,17 +277,37 @@ loop: return res, nil } -// makeEnvironmentMapBasedOnEnv returns a map representing the resolved environment of the specified container after being populated from the entries in the ".env" field. -func makeEnvironmentMapBasedOnEnv(ctx context.Context, pod *corev1.Pod, container *corev1.Container, rm *manager.ResourceManager, recorder record.EventRecorder) (map[string]string, error) { - // Create a map to hold the resolved environment variables. - res := make(map[string]string, len(container.Env)) +// makeEnvironmentMap returns a map representing the resolved environment of the specified container after being populated from the entries in the ".env" and ".envFrom" field. +func makeEnvironmentMap(ctx context.Context, pod *corev1.Pod, container *corev1.Container, rm *manager.ResourceManager, recorder record.EventRecorder, res map[string]string) error { + + // TODO If pod.Spec.EnableServiceLinks is nil then fail as per 1.14 kubelet. + enableServiceLinks := corev1.DefaultEnableServiceLinks + if pod.Spec.EnableServiceLinks != nil { + enableServiceLinks = *pod.Spec.EnableServiceLinks + } + + // Note that there is a race between Kubelet seeing the pod and kubelet seeing the service. + // To avoid this users can: (1) wait between starting a service and starting; or (2) detect + // missing service env var and exit and be restarted; or (3) use DNS instead of env vars + // and keep trying to resolve the DNS name of the service (recommended). + svcEnv, err := getServiceEnvVarMap(rm, pod.Namespace, enableServiceLinks) + if err != nil { + return err + } + + // If the variable's Value is set, expand the `$(var)` references to other + // variables in the .Value field; the sources of variables are the declared + // variables of the container and the service environment variables. + mappingFunc := expansion.MappingFuncFor(res, svcEnv) + // Iterate over environment variables in order to populate the map. loop: for _, env := range container.Env { switch { // Handle values that have been directly provided. case env.Value != "": - res[env.Name] = env.Value + // Expand variable references + res[env.Name] = expansion.Expand(env.Value, mappingFunc) continue loop // Handle population from a configmap key. case env.ValueFrom != nil && env.ValueFrom.ConfigMapKeyRef != nil: @@ -303,10 +335,10 @@ loop: // Hence, we should return a meaningful error. if errors.IsNotFound(err) { recorder.Eventf(pod, corev1.EventTypeWarning, ReasonMandatoryConfigMapNotFound, "configmap %q not found", vf.Name) - return nil, fmt.Errorf("configmap %q not found", vf.Name) + return fmt.Errorf("configmap %q not found", vf.Name) } recorder.Eventf(pod, corev1.EventTypeWarning, ReasonFailedToReadMandatoryConfigMap, "failed to read configmap %q", vf.Name) - return nil, fmt.Errorf("failed to read configmap %q: %v", vf.Name, err) + return fmt.Errorf("failed to read configmap %q: %v", vf.Name, err) } // At this point we have successfully fetched the target configmap. // We must now try to grab the requested key. @@ -325,7 +357,7 @@ loop: // At this point we know the key reference is mandatory. // Hence, we should fail. recorder.Eventf(pod, corev1.EventTypeWarning, ReasonMandatoryConfigMapKeyNotFound, "key %q does not exist in configmap %q", vf.Key, vf.Name) - return nil, fmt.Errorf("configmap %q doesn't contain the %q key required by pod %s", vf.Name, vf.Key, pod.Name) + return fmt.Errorf("configmap %q doesn't contain the %q key required by pod %s", vf.Name, vf.Key, pod.Name) } // Populate the environment variable and continue on to the next reference. res[env.Name] = keyValue @@ -355,10 +387,10 @@ loop: // Hence, we should return a meaningful error. if errors.IsNotFound(err) { recorder.Eventf(pod, corev1.EventTypeWarning, ReasonMandatorySecretNotFound, "secret %q not found", vf.Name) - return nil, fmt.Errorf("secret %q not found", vf.Name) + return fmt.Errorf("secret %q not found", vf.Name) } recorder.Eventf(pod, corev1.EventTypeWarning, ReasonFailedToReadMandatorySecret, "failed to read secret %q", vf.Name) - return nil, fmt.Errorf("failed to read secret %q: %v", vf.Name, err) + return fmt.Errorf("failed to read secret %q: %v", vf.Name, err) } // At this point we have successfully fetched the target secret. // We must now try to grab the requested key. @@ -377,7 +409,7 @@ loop: // At this point we know the key reference is mandatory. // Hence, we should fail. recorder.Eventf(pod, corev1.EventTypeWarning, ReasonMandatorySecretKeyNotFound, "key %q does not exist in secret %q", vf.Key, vf.Name) - return nil, fmt.Errorf("secret %q doesn't contain the %q key required by pod %s", vf.Name, vf.Key, pod.Name) + return fmt.Errorf("secret %q doesn't contain the %q key required by pod %s", vf.Name, vf.Key, pod.Name) } // Populate the environment variable and continue on to the next reference. res[env.Name] = string(keyValue) @@ -389,7 +421,7 @@ loop: runtimeVal, err := podFieldSelectorRuntimeValue(vf, pod) if err != nil { - return res, err + return err } res[env.Name] = runtimeVal @@ -402,21 +434,6 @@ loop: } } - // TODO If pod.Spec.EnableServiceLinks is nil then fail as per 1.14 kubelet. - enableServiceLinks := corev1.DefaultEnableServiceLinks - if pod.Spec.EnableServiceLinks != nil { - enableServiceLinks = *pod.Spec.EnableServiceLinks - } - - // Note that there is a race between Kubelet seeing the pod and kubelet seeing the service. - // To avoid this users can: (1) wait between starting a service and starting; or (2) detect - // missing service env var and exit and be restarted; or (3) use DNS instead of env vars - // and keep trying to resolve the DNS name of the service (recommended). - svcEnv, err := getServiceEnvVarMap(rm, pod.Namespace, enableServiceLinks) - if err != nil { - return nil, err - } - // Append service env vars. for k, v := range svcEnv { if _, present := res[k]; !present { @@ -424,8 +441,7 @@ loop: } } - // Return the populated environment. - return res, nil + return nil } // podFieldSelectorRuntimeValue returns the runtime value of the given @@ -444,25 +460,3 @@ func podFieldSelectorRuntimeValue(fs *corev1.ObjectFieldSelector, pod *corev1.Po } return fieldpath.ExtractFieldPathAsString(pod, internalFieldPath) } - -// mergeEnvironments creates the final environment for a container by merging "envFrom" and "env". -// Values in "env" override any values with the same key defined in "envFrom". -// This is in accordance with what the Kubelet itself does. -// https://github.com/kubernetes/kubernetes/blob/v1.13.1/pkg/kubelet/kubelet_pods.go#L557-L558 -func mergeEnvironments(envFrom map[string]string, env map[string]string) []corev1.EnvVar { - tmp := make(map[string]string, 0) - res := make([]corev1.EnvVar, 0) - for key, val := range envFrom { - tmp[key] = val - } - for key, val := range env { - tmp[key] = val - } - for key, val := range tmp { - res = append(res, corev1.EnvVar{ - Name: key, - Value: val, - }) - } - return res -} diff --git a/vkubelet/env_internal_test.go b/vkubelet/env_internal_test.go index ff71d65af..c0ec982af 100644 --- a/vkubelet/env_internal_test.go +++ b/vkubelet/env_internal_test.go @@ -23,6 +23,10 @@ const ( envVarValue1 = "foo_value" // envVarName2 is a string that can be used as the name of an environment value. envVarName2 = "BAR" + // envVarValue2 is a string meant to be used as the value of the "envVarName2" environment value. + envVarValue2 = "bar_value" + // envVarName12 is a key that can be used as the name of an environment variable. + envVarName12 = "FOOBAR" // envVarName3 is a string that can be used as the name of an environment value. envVarName3 = "CHO" // envVarName4 is a string that can be used as the name of an environment value. @@ -1011,3 +1015,64 @@ func TestServiceEnvVar(t *testing.T) { } } + +// TestComposingEnv tests that env var can be composed from the existing env vars. +func TestComposingEnv(t *testing.T) { + rm := testutil.FakeResourceManager() + er := testutil.FakeEventRecorder(defaultEventRecorderBufferSize) + + // Create a pod object having a single container. + // The container's third environment variable is composed of the previous two. + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "pod-0", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Env: []corev1.EnvVar{ + { + Name: envVarName1, + Value: envVarValue1, + }, + { + Name: envVarName2, + Value: envVarValue2, + }, + { + Name: envVarName12, + Value: "$(" + envVarName1 + ")$(" + envVarName2 + ")", // "$(envVarName1)$(envVarName2)" + }, + }, + }, + }, + EnableServiceLinks: &bFalse, + }, + } + + // Populate the pods's environment. + err := populateEnvironmentVariables(context.Background(), pod, rm, er) + assert.Check(t, err) + + // Make sure that the container's environment contains all the expected keys and values. + assert.Check(t, is.DeepEqual(pod.Spec.Containers[0].Env, []corev1.EnvVar{ + { + Name: envVarName1, + Value: envVarValue1, + }, + { + Name: envVarName2, + Value: envVarValue2, + }, + { + Name: envVarName12, + Value: envVarValue1 + envVarValue2, + }, + }, + sortOpt, + )) + + // Make sure that no events have been recorded. + assert.Check(t, is.Len(er.Events, 0)) +} diff --git a/vkubelet/node.go b/vkubelet/node.go index 145315822..4fa73bc21 100644 --- a/vkubelet/node.go +++ b/vkubelet/node.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - "github.com/cpuguy83/strongerrors/status/ocstatus" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" @@ -47,7 +46,7 @@ type NodeProvider interface { // Use the node's `Run` method to register and run the loops to update the node // in Kubernetes. func NewNode(p NodeProvider, node *corev1.Node, leases v1beta1.LeaseInterface, nodes v1.NodeInterface, opts ...NodeOpt) (*Node, error) { - n := &Node{p: p, n: node, leases: leases, nodes: nodes} + n := &Node{p: p, n: node, leases: leases, nodes: nodes, chReady: make(chan struct{})} for _, o := range opts { if err := o(n); err != nil { return nil, pkgerrors.Wrap(err, "error applying node option") @@ -87,6 +86,25 @@ func WithNodeStatusUpdateInterval(d time.Duration) NodeOpt { } } +// WithNodeStatusUpdateErrorHandler adds an error handler for cases where there is an error +// when updating the node status. +// This allows the caller to have some control on how errors are dealt with when +// updating a node's status. +// +// The error passed to the handler will be the error received from kubernetes +// when updating node status. +func WithNodeStatusUpdateErrorHandler(h ErrorHandler) NodeOpt { + return func(n *Node) error { + n.nodeStatusUpdateErrorHandler = h + return nil + } +} + +// ErrorHandler is a type of function used to allow callbacks for handling errors. +// It is expected that if a nil error is returned that the error is handled and +// progress can continue (or a retry is possible). +type ErrorHandler func(context.Context, error) error + // WithNodeLease sets the base node lease to use. // If a lease time is set, it will be ignored. func WithNodeLease(l *coord.Lease) NodeOpt { @@ -110,6 +128,10 @@ type Node struct { statusInterval time.Duration lease *coord.Lease chStatusUpdate chan *corev1.Node + + nodeStatusUpdateErrorHandler ErrorHandler + + chReady chan struct{} } // The default intervals used for lease and status updates. @@ -138,16 +160,15 @@ func (n *Node) Run(ctx context.Context) error { n.statusInterval = DefaultStatusUpdateInterval } - if err := n.updateStatus(ctx); err != nil { - return pkgerrors.Wrap(err, "error registering node with kubernetes") - } - log.G(ctx).Info("Created node") - n.chStatusUpdate = make(chan *corev1.Node) n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) { n.chStatusUpdate <- node }) + if err := n.ensureNode(ctx); err != nil { + return err + } + if !n.disableLease { n.lease = newLease(n.lease) setLeaseAttrs(n.lease, n.n, n.pingInterval*5) @@ -169,11 +190,32 @@ func (n *Node) Run(ctx context.Context) error { log.G(ctx).Info("Node leases not supported, falling back to only node status updates") } - n.controlLoop(ctx) + return n.controlLoop(ctx) +} + +func (n *Node) ensureNode(ctx context.Context) error { + err := n.updateStatus(ctx, true) + if err == nil || !errors.IsNotFound(err) { + return err + } + + node, err := n.nodes.Create(n.n) + if err != nil { + return pkgerrors.Wrap(err, "error registering node with kubernetes") + } + n.n = node + return nil } -func (n *Node) controlLoop(ctx context.Context) { +// Ready returns a channel that gets closed when the node is fully up and +// running. Note that if there is an error on startup this channel will never +// be started. +func (n *Node) Ready() <-chan struct{} { + return n.chReady +} + +func (n *Node) controlLoop(ctx context.Context) error { pingTimer := time.NewTimer(n.pingInterval) defer pingTimer.Stop() @@ -186,10 +228,12 @@ func (n *Node) controlLoop(ctx context.Context) { } } + close(n.chReady) + for { select { case <-ctx.Done(): - return + return nil case updated := <-n.chStatusUpdate: var t *time.Timer if n.disableLease { @@ -206,12 +250,12 @@ func (n *Node) controlLoop(ctx context.Context) { } n.n.Status = updated.Status - if err := n.updateStatus(ctx); err != nil { + if err := n.updateStatus(ctx, false); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } t.Reset(n.statusInterval) case <-statusTimer.C: - if err := n.updateStatus(ctx); err != nil { + if err := n.updateStatus(ctx, false); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } statusTimer.Reset(n.statusInterval) @@ -230,7 +274,7 @@ func (n *Node) handlePing(ctx context.Context) (retErr error) { ctx, span := trace.StartSpan(ctx, "node.handlePing") defer span.End() defer func() { - span.SetStatus(ocstatus.FromError(retErr)) + span.SetStatus(retErr) }() if err := n.p.Ping(ctx); err != nil { @@ -238,7 +282,7 @@ func (n *Node) handlePing(ctx context.Context) (retErr error) { } if n.disableLease { - return n.updateStatus(ctx) + return n.updateStatus(ctx, false) } return n.updateLease(ctx) @@ -254,12 +298,22 @@ func (n *Node) updateLease(ctx context.Context) error { return nil } -func (n *Node) updateStatus(ctx context.Context) error { +func (n *Node) updateStatus(ctx context.Context, skipErrorCb bool) error { updateNodeStatusHeartbeat(n.n) node, err := UpdateNodeStatus(ctx, n.nodes, n.n) if err != nil { - return err + if skipErrorCb || n.nodeStatusUpdateErrorHandler == nil { + return err + } + if err := n.nodeStatusUpdateErrorHandler(ctx, err); err != nil { + return err + } + + node, err = UpdateNodeStatus(ctx, n.nodes, n.n) + if err != nil { + return err + } } n.n = node @@ -312,7 +366,7 @@ func UpdateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease * l, err = ensureLease(ctx, leases, lease) } if err != nil { - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) return nil, err } log.G(ctx).Debug("created new lease") @@ -370,27 +424,18 @@ func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Nod // // If you use this function, it is up to you to syncronize this with other operations. // This reduces the time to second-level precision. -func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (*corev1.Node, error) { +func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (_ *corev1.Node, retErr error) { ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus") - defer span.End() + defer func() { + span.End() + span.SetStatus(retErr) + }() + var node *corev1.Node oldNode, err := nodes.Get(n.Name, emptyGetOptions) if err != nil { - if !errors.IsNotFound(err) { - span.SetStatus(ocstatus.FromError(err)) - return nil, err - } - - log.G(ctx).Debug("node not found") - newNode := n.DeepCopy() - newNode.ResourceVersion = "" - node, err = nodes.Create(newNode) - if err != nil { - return nil, err - } - log.G(ctx).Debug("created new node") - return node, nil + return nil, err } log.G(ctx).Debug("got node from api server") @@ -440,7 +485,7 @@ func setLeaseAttrs(l *coord.Lease, n *corev1.Node, dur time.Duration) { func updateNodeStatusHeartbeat(n *corev1.Node) { now := metav1.NewTime(time.Now()) - for i, _ := range n.Status.Conditions { + for i := range n.Status.Conditions { n.Status.Conditions[i].LastHeartbeatTime = now } } diff --git a/vkubelet/node_test.go b/vkubelet/node_test.go index ddd6bf8c5..a6c69ab34 100644 --- a/vkubelet/node_test.go +++ b/vkubelet/node_test.go @@ -149,6 +149,56 @@ func testNodeRun(t *testing.T, enableLease bool) { } } +func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) { + c := testclient.NewSimpleClientset() + testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}} + nodes := c.CoreV1().Nodes() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + node, err := NewNode(testP, testNode(t), nil, nodes, + WithNodeStatusUpdateErrorHandler(func(_ context.Context, err error) error { + cancel() + return nil + }), + WithNodeDisableLease(true), + ) + assert.NilError(t, err) + + chErr := make(chan error, 1) + go func() { + chErr <- node.Run(ctx) + }() + + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + + // wait for the node to be ready + select { + case <-timer.C: + t.Fatal("timeout waiting for node to be ready") + case <-chErr: + t.Fatalf("node.Run returned earlier than expected: %v", err) + case <-node.Ready(): + } + + err = nodes.Delete(node.n.Name, nil) + assert.NilError(t, err) + + testP.triggerStatusUpdate(node.n.DeepCopy()) + + timer = time.NewTimer(10 * time.Second) + defer timer.Stop() + + select { + case err := <-chErr: + assert.Equal(t, err, nil) + case <-timer.C: + t.Fatal("timeout waiting for node shutdown") + } +} + func TestEnsureLease(t *testing.T) { c := testclient.NewSimpleClientset().Coordination().Leases(corev1.NamespaceNodeLease) n := testNode(t) @@ -177,6 +227,14 @@ func TestUpdateNodeStatus(t *testing.T) { ctx := context.Background() updated, err := UpdateNodeStatus(ctx, nodes, n.DeepCopy()) + assert.Equal(t, errors.IsNotFound(err), true, err) + + _, err = nodes.Create(n) + assert.NilError(t, err) + + updated, err = UpdateNodeStatus(ctx, nodes, n.DeepCopy()) + assert.NilError(t, err) + assert.NilError(t, err) assert.Check(t, cmp.DeepEqual(n.Status, updated.Status)) @@ -191,10 +249,8 @@ func TestUpdateNodeStatus(t *testing.T) { _, err = nodes.Get(n.Name, metav1.GetOptions{}) assert.Equal(t, errors.IsNotFound(err), true, err) - updated, err = UpdateNodeStatus(ctx, nodes, updated.DeepCopy()) - assert.NilError(t, err) - _, err = nodes.Get(n.Name, metav1.GetOptions{}) - assert.NilError(t, err) + _, err = UpdateNodeStatus(ctx, nodes, updated.DeepCopy()) + assert.Equal(t, errors.IsNotFound(err), true, err) } func TestUpdateNodeLease(t *testing.T) { diff --git a/vkubelet/pod.go b/vkubelet/pod.go index 3338a52fc..7f8720922 100644 --- a/vkubelet/pod.go +++ b/vkubelet/pod.go @@ -5,9 +5,9 @@ import ( "hash/fnv" "time" - "github.com/cpuguy83/strongerrors/status/ocstatus" "github.com/davecgh/go-spew/spew" pkgerrors "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" @@ -15,10 +15,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ) +const ( + podStatusReasonProviderFailed = "ProviderFailed" +) + func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { return span.WithFields(ctx, log.Fields{ "uid": string(pod.GetUID()), @@ -29,7 +32,7 @@ func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) con }) } -func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorder record.EventRecorder) error { +func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod) error { ctx, span := trace.StartSpan(ctx, "createOrUpdatePod") defer span.End() @@ -40,15 +43,15 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde "namespace": pod.GetNamespace(), }) - if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil { - span.SetStatus(ocstatus.FromError(err)) + if err := populateEnvironmentVariables(ctx, pod, pc.resourceManager, pc.recorder); err != nil { + span.SetStatus(err) return err } // Check if the pod is already known by the provider. // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). - if pp, _ := s.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil { + if pp, _ := pc.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil { // Pod Update Only Permits update of: // - `spec.containers[*].image` // - `spec.initContainers[*].image` @@ -58,15 +61,15 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde expected := hashPodSpec(pp.Spec) if actual := hashPodSpec(pod.Spec); actual != expected { log.G(ctx).Debugf("Pod %s exists, updating pod in provider", pp.Name) - if origErr := s.provider.UpdatePod(ctx, pod); origErr != nil { - s.handleProviderError(ctx, span, origErr, pod) + if origErr := pc.provider.UpdatePod(ctx, pod); origErr != nil { + pc.handleProviderError(ctx, span, origErr, pod) return origErr } log.G(ctx).Info("Updated pod in provider") } } else { - if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { - s.handleProviderError(ctx, span, origErr, pod) + if origErr := pc.provider.CreatePod(ctx, pod); origErr != nil { + pc.handleProviderError(ctx, span, origErr, pod) return origErr } log.G(ctx).Info("Created pod in provider") @@ -88,7 +91,7 @@ func hashPodSpec(spec corev1.PodSpec) uint64 { return uint64(hash.Sum32()) } -func (s *Server) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) { +func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) { podPhase := corev1.PodPending if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { podPhase = corev1.PodFailed @@ -104,23 +107,23 @@ func (s *Server) handleProviderError(ctx context.Context, span trace.Span, origE "reason": pod.Status.Reason, }) - _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) + _, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod) if err != nil { logger.WithError(err).Warn("Failed to update pod status") } else { logger.Info("Updated k8s pod status") } - span.SetStatus(ocstatus.FromError(origErr)) + span.SetStatus(origErr) } -func (s *Server) deletePod(ctx context.Context, namespace, name string) error { +func (pc *PodController) deletePod(ctx context.Context, namespace, name string) error { // Grab the pod as known by the provider. // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). - pod, _ := s.provider.GetPod(ctx, namespace, name) + pod, _ := pc.provider.GetPod(ctx, namespace, name) if pod == nil { // The provider is not aware of the pod, but we must still delete the Kubernetes API resource. - return s.forceDeletePodResource(ctx, namespace, name) + return pc.forceDeletePodResource(ctx, namespace, name) } ctx, span := trace.StartSpan(ctx, "deletePod") @@ -128,16 +131,16 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error { ctx = addPodAttributes(ctx, span, pod) var delErr error - if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) { - span.SetStatus(ocstatus.FromError(delErr)) + if delErr = pc.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) { + span.SetStatus(delErr) return delErr } log.G(ctx).Debug("Deleted pod from provider") if !errors.IsNotFound(delErr) { - if err := s.forceDeletePodResource(ctx, namespace, name); err != nil { - span.SetStatus(ocstatus.FromError(err)) + if err := pc.forceDeletePodResource(ctx, namespace, name); err != nil { + span.SetStatus(err) return err } log.G(ctx).Info("Deleted pod from Kubernetes") @@ -146,7 +149,7 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error { return nil } -func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name string) error { +func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, name string) error { ctx, span := trace.StartSpan(ctx, "forceDeletePodResource") defer span.End() ctx = span.WithFields(ctx, log.Fields{ @@ -155,27 +158,27 @@ func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name str }) var grace int64 - if err := s.k8sClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil { + if err := pc.client.Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil { if errors.IsNotFound(err) { log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete") return nil } - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) return pkgerrors.Wrap(err, "Failed to delete Kubernetes pod") } return nil } // updatePodStatuses syncs the providers pod status with the kubernetes pod status. -func (s *Server) updatePodStatuses(ctx context.Context, q workqueue.RateLimitingInterface) { +func (pc *PodController) updatePodStatuses(ctx context.Context, q workqueue.RateLimitingInterface) { ctx, span := trace.StartSpan(ctx, "updatePodStatuses") defer span.End() // Update all the pods with the provider status. - pods, err := s.podInformer.Lister().List(labels.Everything()) + pods, err := pc.podsLister.List(labels.Everything()) if err != nil { err = pkgerrors.Wrap(err, "error getting pod list") - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) log.G(ctx).WithError(err).Error("Error updating pod statuses") return } @@ -183,7 +186,7 @@ func (s *Server) updatePodStatuses(ctx context.Context, q workqueue.RateLimiting for _, pod := range pods { if !shouldSkipPodStatusUpdate(pod) { - s.enqueuePodStatusUpdate(ctx, q, pod) + enqueuePodStatusUpdate(ctx, q, pod) } } } @@ -194,7 +197,7 @@ func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool { pod.Status.Reason == podStatusReasonProviderFailed } -func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { +func (pc *PodController) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { if shouldSkipPodStatusUpdate(pod) { return nil } @@ -203,9 +206,9 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { defer span.End() ctx = addPodAttributes(ctx, span, pod) - status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name) - if err != nil { - span.SetStatus(ocstatus.FromError(err)) + status, err := pc.provider.GetPodStatus(ctx, pod.Namespace, pod.Name) + if err != nil && !errdefs.IsNotFound(err) { + span.SetStatus(err) return pkgerrors.Wrap(err, "error retreiving pod status") } @@ -234,8 +237,8 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { } } - if _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod); err != nil { - span.SetStatus(ocstatus.FromError(err)) + if _, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod); err != nil { + span.SetStatus(err) return pkgerrors.Wrap(err, "error while updating pod status in kubernetes") } @@ -247,7 +250,7 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { return nil } -func (s *Server) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { +func enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key") } else { @@ -255,24 +258,32 @@ func (s *Server) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLim } } -func (s *Server) podStatusHandler(ctx context.Context, key string) (retErr error) { +func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) { ctx, span := trace.StartSpan(ctx, "podStatusHandler") defer span.End() - defer func() { - span.SetStatus(ocstatus.FromError(retErr)) - }() ctx = span.WithField(ctx, "key", key) + log.G(ctx).Debug("processing pod status update") + defer func() { + span.SetStatus(retErr) + if retErr != nil { + log.G(ctx).WithError(retErr).Error("Error processing pod status update") + } + }() namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return pkgerrors.Wrap(err, "error spliting cache key") } - pod, err := s.podInformer.Lister().Pods(namespace).Get(name) + pod, err := pc.podsLister.Pods(namespace).Get(name) if err != nil { + if errors.IsNotFound(err) { + log.G(ctx).WithError(err).Debug("Skipping pod status update for pod missing in Kubernetes") + return nil + } return pkgerrors.Wrap(err, "error looking up pod") } - return s.updatePodStatus(ctx, pod) + return pc.updatePodStatus(ctx, pod) } diff --git a/vkubelet/pod_test.go b/vkubelet/pod_test.go index 9e02c788f..88cb642d5 100644 --- a/vkubelet/pod_test.go +++ b/vkubelet/pod_test.go @@ -2,9 +2,10 @@ package vkubelet import ( "context" + "path" "testing" - "github.com/virtual-kubelet/virtual-kubelet/providers/mock" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" testutil "github.com/virtual-kubelet/virtual-kubelet/test/util" "gotest.tools/assert" is "gotest.tools/assert/cmp" @@ -12,63 +13,82 @@ import ( "k8s.io/client-go/kubernetes/fake" ) -type FakeProvider struct { - *mock.MockProvider - createFn func() - updateFn func() +type mockProvider struct { + pods map[string]*corev1.Pod + + creates int + updates int + deletes int } -func (f *FakeProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error { - f.createFn() - return f.MockProvider.CreatePod(ctx, pod) +func (m *mockProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error { + m.pods[path.Join(pod.GetNamespace(), pod.GetName())] = pod + m.creates++ + return nil } -func (f *FakeProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error { - f.updateFn() - return f.MockProvider.CreatePod(ctx, pod) +func (m *mockProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error { + m.pods[path.Join(pod.GetNamespace(), pod.GetName())] = pod + m.updates++ + return nil } -type TestServer struct { - *Server - mock *FakeProvider +func (m *mockProvider) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) { + p := m.pods[path.Join(namespace, name)] + if p == nil { + return nil, errdefs.NotFound("not found") + } + return p, nil +} + +func (m *mockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) { + p := m.pods[path.Join(namespace, name)] + if p == nil { + return nil, errdefs.NotFound("not found") + } + return &p.Status, nil +} + +func (m *mockProvider) DeletePod(ctx context.Context, p *corev1.Pod) error { + delete(m.pods, path.Join(p.GetNamespace(), p.GetName())) + m.deletes++ + return nil +} + +func (m *mockProvider) GetPods(_ context.Context) ([]*corev1.Pod, error) { + ls := make([]*corev1.Pod, 0, len(m.pods)) + for _, p := range ls { + ls = append(ls, p) + } + return ls, nil +} + +type TestController struct { + *PodController + mock *mockProvider client *fake.Clientset } -func newMockProvider(t *testing.T) (*mock.MockProvider, error) { - return mock.NewMockProviderMockConfig( - mock.MockConfig{}, - "vk123", - "linux", - "127.0.0.1", - 443, - ) +func newMockProvider() *mockProvider { + return &mockProvider{pods: make(map[string]*corev1.Pod)} } -func newTestServer(t *testing.T) *TestServer { - - mockProvider, err := newMockProvider(t) - assert.Check(t, is.Nil(err)) - +func newTestController() *TestController { fk8s := fake.NewSimpleClientset() - fakeProvider := &FakeProvider{ - MockProvider: mockProvider, - } - rm := testutil.FakeResourceManager() + p := newMockProvider() - tsvr := &TestServer{ - Server: &Server{ - namespace: "default", - nodeName: "vk123", - provider: fakeProvider, + return &TestController{ + PodController: &PodController{ + client: fk8s.CoreV1(), + provider: p, resourceManager: rm, - k8sClient: fk8s, + recorder: testutil.FakeEventRecorder(5), }, - mock: fakeProvider, + mock: p, client: fk8s, } - return tsvr } func TestPodHashingEqual(t *testing.T) { @@ -146,7 +166,7 @@ func TestPodHashingDifferent(t *testing.T) { } func TestPodCreateNewPod(t *testing.T) { - svr := newTestServer(t) + svr := newTestController() pod := &corev1.Pod{} pod.ObjectMeta.Namespace = "default" @@ -166,25 +186,16 @@ func TestPodCreateNewPod(t *testing.T) { }, } - created := false - updated := false - // The pod doesn't exist, we should invoke the CreatePod() method of the provider - svr.mock.createFn = func() { - created = true - } - svr.mock.updateFn = func() { - updated = true - } - er := testutil.FakeEventRecorder(5) - err := svr.createOrUpdatePod(context.Background(), pod, er) + err := svr.createOrUpdatePod(context.Background(), pod) + assert.Check(t, is.Nil(err)) // createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist - assert.Check(t, created) - assert.Check(t, !updated) + assert.Check(t, is.Equal(svr.mock.creates, 1)) + assert.Check(t, is.Equal(svr.mock.updates, 0)) } func TestPodUpdateExisting(t *testing.T) { - svr := newTestServer(t) + svr := newTestController() pod := &corev1.Pod{} pod.ObjectMeta.Namespace = "default" @@ -204,17 +215,10 @@ func TestPodUpdateExisting(t *testing.T) { }, } - err := svr.mock.MockProvider.CreatePod(context.Background(), pod) + err := svr.provider.CreatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) - created := false - updated := false - // The pod doesn't exist, we should invoke the CreatePod() method of the provider - svr.mock.createFn = func() { - created = true - } - svr.mock.updateFn = func() { - updated = true - } + assert.Check(t, is.Equal(svr.mock.creates, 1)) + assert.Check(t, is.Equal(svr.mock.updates, 0)) pod2 := &corev1.Pod{} pod2.ObjectMeta.Namespace = "default" @@ -234,17 +238,16 @@ func TestPodUpdateExisting(t *testing.T) { }, } - er := testutil.FakeEventRecorder(5) - err = svr.createOrUpdatePod(context.Background(), pod2, er) + err = svr.createOrUpdatePod(context.Background(), pod2) assert.Check(t, is.Nil(err)) // createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed - assert.Check(t, !created) - assert.Check(t, updated) + assert.Check(t, is.Equal(svr.mock.creates, 1)) + assert.Check(t, is.Equal(svr.mock.updates, 1)) } func TestPodNoSpecChange(t *testing.T) { - svr := newTestServer(t) + svr := newTestController() pod := &corev1.Pod{} pod.ObjectMeta.Namespace = "default" @@ -264,23 +267,15 @@ func TestPodNoSpecChange(t *testing.T) { }, } - err := svr.mock.MockProvider.CreatePod(context.Background(), pod) + err := svr.mock.CreatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) - created := false - updated := false - // The pod doesn't exist, we should invoke the CreatePod() method of the provider - svr.mock.createFn = func() { - created = true - } - svr.mock.updateFn = func() { - updated = true - } + assert.Check(t, is.Equal(svr.mock.creates, 1)) + assert.Check(t, is.Equal(svr.mock.updates, 0)) - er := testutil.FakeEventRecorder(5) - err = svr.createOrUpdatePod(context.Background(), pod, er) + err = svr.createOrUpdatePod(context.Background(), pod) assert.Check(t, is.Nil(err)) // createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change - assert.Check(t, !created) - assert.Check(t, !updated) + assert.Check(t, is.Equal(svr.mock.creates, 1)) + assert.Check(t, is.Equal(svr.mock.updates, 0)) } diff --git a/vkubelet/podcontroller.go b/vkubelet/podcontroller.go index dbebcbeaa..503c78f8b 100644 --- a/vkubelet/podcontroller.go +++ b/vkubelet/podcontroller.go @@ -22,25 +22,28 @@ import ( "sync" "time" - "github.com/cpuguy83/strongerrors/status/ocstatus" pkgerrors "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" - v1 "k8s.io/client-go/informers/core/v1" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + corev1informers "k8s.io/client-go/informers/core/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - - "github.com/virtual-kubelet/virtual-kubelet/log" ) // PodLifecycleHandler defines the interface used by the PodController to react // to new and changed pods scheduled to the node that is being managed. +// +// Errors produced by these methods should implement an interface from +// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the +// core logic to be able to understand the type of failure. type PodLifecycleHandler interface { // CreatePod takes a Kubernetes Pod and deploys it within the provider. CreatePod(ctx context.Context, pod *corev1.Pod) error @@ -74,42 +77,75 @@ type PodNotifier interface { // PodController is the controller implementation for Pod resources. type PodController struct { - // server is the instance to which this controller belongs. - server *Server + provider PodLifecycleHandler + // podsInformer is an informer for Pod resources. - podsInformer v1.PodInformer + podsInformer corev1informers.PodInformer // podsLister is able to list/get Pod resources from a shared informer's store. podsLister corev1listers.PodLister - // workqueue is a rate limited work queue. - // This is used to queue work to be processed instead of performing it as soon as a change happens. - // This means we can ensure we only process a fixed amount of resources at a time, and makes it easy to ensure we are never processing the same item simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface // recorder is an event recorder for recording Event resources to the Kubernetes API. recorder record.EventRecorder - // inSync is a channel which will be closed once the pod controller has become in-sync with apiserver - // it will never close if startup fails, or if the run context is cancelled prior to initialization completing - inSyncCh chan struct{} + // ready is a channel which will be closed once the pod controller is fully up and running. + // this channel will never be closed if there is an error on startup. + ready chan struct{} + + client corev1client.PodsGetter + + resourceManager *manager.ResourceManager // TODO: can we eliminate this? } -// NewPodController returns a new instance of PodController. -func NewPodController(server *Server) *PodController { - // Create an event broadcaster. - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(log.L.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: server.k8sClient.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: fmt.Sprintf("%s/pod-controller", server.nodeName)}) +// PodControllerConfig is used to configure a new PodController. +type PodControllerConfig struct { + // PodClient is used to perform actions on the k8s API, such as updating pod status + // This field is required + PodClient corev1client.PodsGetter - // Create an instance of PodController having a work queue that uses the rate limiter created above. - pc := &PodController{ - server: server, - podsInformer: server.podInformer, - podsLister: server.podInformer.Lister(), - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pods"), - recorder: recorder, - inSyncCh: make(chan struct{}), + // PodInformer is used as a local cache for pods + // This should be configured to only look at pods scheduled to the node which the controller will be managing + PodInformer corev1informers.PodInformer + + EventRecorder record.EventRecorder + + Provider PodLifecycleHandler + + // TODO: get rid of this + ResourceManager *manager.ResourceManager +} + +func NewPodController(cfg PodControllerConfig) (*PodController, error) { + if cfg.PodClient == nil { + return nil, errdefs.InvalidInput("must set core client") } + if cfg.EventRecorder == nil { + return nil, errdefs.InvalidInput("must set event recorder") + } + if cfg.PodInformer == nil { + return nil, errdefs.InvalidInput("must set informer") + } + + return &PodController{ + client: cfg.PodClient, + podsInformer: cfg.PodInformer, + podsLister: cfg.PodInformer.Lister(), + provider: cfg.Provider, + resourceManager: cfg.ResourceManager, + ready: make(chan struct{}), + recorder: cfg.EventRecorder, + }, nil +} + +// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. +// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. +func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error { + k8sQ := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes") + defer k8sQ.ShutDown() + + podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") + pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers) + pc.runSyncFromProvider(ctx, podStatusQueue) + defer podStatusQueue.ShutDown() // Set up event handlers for when Pod resources change. pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -117,7 +153,7 @@ func NewPodController(server *Server) *PodController { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { log.L.Error(err) } else { - pc.workqueue.AddRateLimited(key) + k8sQ.AddRateLimited(key) } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -136,49 +172,39 @@ func NewPodController(server *Server) *PodController { if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { log.L.Error(err) } else { - pc.workqueue.AddRateLimited(key) + k8sQ.AddRateLimited(key) } }, DeleteFunc: func(pod interface{}) { if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { log.L.Error(err) } else { - pc.workqueue.AddRateLimited(key) + k8sQ.AddRateLimited(key) } }, }) - // Return the instance of PodController back to the caller. - return pc -} - -// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. -// It will block until stopCh is closed, at which point it will shutdown the work queue and wait for workers to finish processing their current work items. -func (pc *PodController) Run(ctx context.Context, threadiness int) error { - defer pc.workqueue.ShutDown() - - // Wait for the caches to be synced before starting workers. + // Wait for the caches to be synced *before* starting workers. if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { return pkgerrors.New("failed to wait for caches to sync") } log.G(ctx).Info("Pod cache in-sync") - close(pc.inSyncCh) - // Perform a reconciliation step that deletes any dangling pods from the provider. // This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis. // If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried. - pc.deleteDanglingPods(ctx, threadiness) + pc.deleteDanglingPods(ctx, podSyncWorkers) - // Launch "threadiness" workers to process Pod resources. log.G(ctx).Info("starting workers") - for id := 0; id < threadiness; id++ { + for id := 0; id < podSyncWorkers; id++ { go wait.Until(func() { // Use the worker's "index" as its ID so we can use it for tracing. - pc.runWorker(ctx, strconv.Itoa(id)) + pc.runWorker(ctx, strconv.Itoa(id), k8sQ) }, time.Second, ctx.Done()) } + close(pc.ready) + log.G(ctx).Info("started workers") <-ctx.Done() log.G(ctx).Info("shutting down workers") @@ -186,14 +212,21 @@ func (pc *PodController) Run(ctx context.Context, threadiness int) error { return nil } +// Ready returns a channel which gets closed once the PodController is ready to handle scheduled pods. +// This channel will never close if there is an error on startup. +// The status of this channel after sthudown is indeterminate. +func (pc *PodController) Ready() <-chan struct{} { + return pc.ready +} + // runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue. -func (pc *PodController) runWorker(ctx context.Context, workerId string) { - for pc.processNextWorkItem(ctx, workerId) { +func (pc *PodController) runWorker(ctx context.Context, workerId string, q workqueue.RateLimitingInterface) { + for pc.processNextWorkItem(ctx, workerId, q) { } } // processNextWorkItem will read a single work item off the work queue and attempt to process it,by calling the syncHandler. -func (pc *PodController) processNextWorkItem(ctx context.Context, workerId string) bool { +func (pc *PodController) processNextWorkItem(ctx context.Context, workerId string, q workqueue.RateLimitingInterface) bool { // We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item. ctx, span := trace.StartSpan(ctx, "processNextWorkItem") @@ -201,7 +234,7 @@ func (pc *PodController) processNextWorkItem(ctx context.Context, workerId strin // Add the ID of the current worker as an attribute to the current span. ctx = span.WithField(ctx, "workerId", workerId) - return handleQueueItem(ctx, pc.workqueue, pc.syncHandler) + return handleQueueItem(ctx, q, pc.syncHandler) } // syncHandler compares the actual state with the desired, and attempts to converge the two. @@ -227,14 +260,14 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error { // We've failed to fetch the pod from the lister, but the error is not a 404. // Hence, we add the key back to the work queue so we can retry processing it later. err := pkgerrors.Wrapf(err, "failed to fetch pod with key %q from lister", key) - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) return err } // At this point we know the Pod resource doesn't exist, which most probably means it was deleted. // Hence, we must delete it from the provider if it still exists there. - if err := pc.server.deletePod(ctx, namespace, name); err != nil { + if err := pc.deletePod(ctx, namespace, name); err != nil { err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name)) - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) return err } return nil @@ -254,9 +287,9 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) // Check whether the pod has been marked for deletion. // If it does, guarantee it is deleted in the provider and Kubernetes. if pod.DeletionTimestamp != nil { - if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil { + if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod)) - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) return err } return nil @@ -269,9 +302,9 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) } // Create or update the pod in the provider. - if err := pc.server.createOrUpdatePod(ctx, pod, pc.recorder); err != nil { + if err := pc.createOrUpdatePod(ctx, pod); err != nil { err := pkgerrors.Wrapf(err, "failed to sync pod %q in the provider", loggablePodName(pod)) - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) return err } return nil @@ -283,10 +316,10 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int defer span.End() // Grab the list of pods known to the provider. - pps, err := pc.server.provider.GetPods(ctx) + pps, err := pc.provider.GetPods(ctx) if err != nil { err := pkgerrors.Wrap(err, "failed to fetch the list of pods from the provider") - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) log.G(ctx).Error(err) return } @@ -305,7 +338,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int } // For some reason we couldn't fetch the pod from the lister, so we propagate the error. err := pkgerrors.Wrap(err, "failed to fetch pod from the lister") - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) log.G(ctx).Error(err) return } @@ -332,8 +365,8 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int // Add the pod's attributes to the current span. ctx = addPodAttributes(ctx, span, pod) // Actually delete the pod. - if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil { - span.SetStatus(ocstatus.FromError(err)) + if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { + span.SetStatus(err) log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod)) } else { log.G(ctx).Infof("deleted leaked pod %q in provider", loggablePodName(pod)) diff --git a/vkubelet/queue.go b/vkubelet/queue.go index 880cf1bc2..c215d6eee 100644 --- a/vkubelet/queue.go +++ b/vkubelet/queue.go @@ -2,11 +2,13 @@ package vkubelet import ( "context" + "strconv" + "time" - "github.com/cpuguy83/strongerrors/status/ocstatus" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" ) @@ -26,7 +28,10 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han return false } + log.G(ctx).Debug("Got queue object") + err := func(obj interface{}) error { + defer log.G(ctx).Debug("Processed queue item") // We call Done here so the work queue knows we have finished processing this item. // We also must remember to call Forget if we do not want this work item being re-queued. // For example, we do not call Forget if a transient error occurs. @@ -43,13 +48,14 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han log.G(ctx).Warnf("expected string in work queue item but got %#v", obj) return nil } + // Add the current key as an attribute to the current span. ctx = span.WithField(ctx, "key", key) // Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced. if err := handler(ctx, key); err != nil { if q.NumRequeues(key) < maxRetries { // Put the item back on the work queue to handle any transient errors. - log.G(ctx).Warnf("requeuing %q due to failed sync: %v", key, err) + log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", key) q.AddRateLimited(key) return nil } @@ -64,10 +70,71 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han if err != nil { // We've actually hit an error, so we set the span's status based on the error. - span.SetStatus(ocstatus.FromError(err)) + span.SetStatus(err) log.G(ctx).Error(err) return true } return true } + +func (pc *PodController) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface, numWorkers int) { + for i := 0; i < numWorkers; i++ { + go func(index int) { + workerID := strconv.Itoa(index) + pc.runProviderSyncWorker(ctx, workerID, q) + }(i) + } +} + +func (pc *PodController) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { + for pc.processPodStatusUpdate(ctx, workerID, q) { + } +} + +func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool { + ctx, span := trace.StartSpan(ctx, "processPodStatusUpdate") + defer span.End() + + // Add the ID of the current worker as an attribute to the current span. + ctx = span.WithField(ctx, "workerID", workerID) + + return handleQueueItem(ctx, q, pc.podStatusHandler) +} + +// providerSyncLoop syncronizes pod states from the provider back to kubernetes +// Deprecated: This is only used when the provider does not support async updates +// Providers should implement async update support, even if it just means copying +// something like this in. +func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) { + const sleepTime = 5 * time.Second + + t := time.NewTimer(sleepTime) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + t.Stop() + + ctx, span := trace.StartSpan(ctx, "syncActualState") + pc.updatePodStatuses(ctx, q) + span.End() + + // restart the timer + t.Reset(sleepTime) + } + } +} + +func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) { + if pn, ok := pc.provider.(PodNotifier); ok { + pn.NotifyPods(ctx, func(pod *corev1.Pod) { + enqueuePodStatusUpdate(ctx, q, pod) + }) + } else { + go pc.providerSyncLoop(ctx, q) + } +} diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go deleted file mode 100644 index 2805cb7ad..000000000 --- a/vkubelet/vkubelet.go +++ /dev/null @@ -1,150 +0,0 @@ -package vkubelet - -import ( - "context" - "strconv" - "time" - - "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/trace" - corev1 "k8s.io/api/core/v1" - corev1informers "k8s.io/client-go/informers/core/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/util/workqueue" -) - -const ( - podStatusReasonProviderFailed = "ProviderFailed" -) - -// Server masquarades itself as a kubelet and allows for the virtual node to be backed by non-vm/node providers. -type Server struct { - namespace string - nodeName string - k8sClient kubernetes.Interface - provider PodLifecycleHandler - resourceManager *manager.ResourceManager - podSyncWorkers int - podInformer corev1informers.PodInformer - readyCh chan struct{} -} - -// Config is used to configure a new server. -type Config struct { - Client *kubernetes.Clientset - Namespace string - NodeName string - Provider PodLifecycleHandler - ResourceManager *manager.ResourceManager - PodSyncWorkers int - PodInformer corev1informers.PodInformer -} - -// New creates a new virtual-kubelet server. -// This is the entrypoint to this package. -// -// This creates but does not start the server. -// You must call `Run` on the returned object to start the server. -func New(cfg Config) *Server { - return &Server{ - nodeName: cfg.NodeName, - namespace: cfg.Namespace, - k8sClient: cfg.Client, - resourceManager: cfg.ResourceManager, - provider: cfg.Provider, - podSyncWorkers: cfg.PodSyncWorkers, - podInformer: cfg.PodInformer, - readyCh: make(chan struct{}), - } -} - -// Run creates and starts an instance of the pod controller, blocking until it stops. -// -// Note that this does not setup the HTTP routes that are used to expose pod -// info to the Kubernetes API Server, such as logs, metrics, exec, etc. -// See `AttachPodRoutes` and `AttachMetricsRoutes` to set these up. -func (s *Server) Run(ctx context.Context) error { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podStatusUpdate") - s.runProviderSyncWorkers(ctx, q) - - if pn, ok := s.provider.(PodNotifier); ok { - pn.NotifyPods(ctx, func(pod *corev1.Pod) { - s.enqueuePodStatusUpdate(ctx, q, pod) - }) - } else { - go s.providerSyncLoop(ctx, q) - } - - pc := NewPodController(s) - - go func() { - select { - case <-pc.inSyncCh: - case <-ctx.Done(): - } - close(s.readyCh) - }() - - return pc.Run(ctx, s.podSyncWorkers) -} - -// Ready returns a channel which will be closed once the VKubelet is running -func (s *Server) Ready() <-chan struct{} { - // TODO: right now all this waits on is the in-sync channel. Later, we might either want to expose multiple types - // of ready, for example: - // * In Sync - // * Control Loop running - // * Provider state synchronized with API Server state - return s.readyCh -} - -// providerSyncLoop syncronizes pod states from the provider back to kubernetes -// Deprecated: This is only used when the provider does not support async updates -// Providers should implement async update support, even if it just means copying -// something like this in. -func (s *Server) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) { - const sleepTime = 5 * time.Second - - t := time.NewTimer(sleepTime) - defer t.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-t.C: - t.Stop() - - ctx, span := trace.StartSpan(ctx, "syncActualState") - s.updatePodStatuses(ctx, q) - span.End() - - // restart the timer - t.Reset(sleepTime) - } - } -} - -func (s *Server) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface) { - for i := 0; i < s.podSyncWorkers; i++ { - go func(index int) { - workerID := strconv.Itoa(index) - s.runProviderSyncWorker(ctx, workerID, q) - }(i) - } -} - -func (s *Server) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { - for s.processPodStatusUpdate(ctx, workerID, q) { - } -} - -func (s *Server) processPodStatusUpdate(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool { - ctx, span := trace.StartSpan(ctx, "processPodStatusUpdate") - defer span.End() - - // Add the ID of the current worker as an attribute to the current span. - ctx = span.WithField(ctx, "workerID", workerID) - - return handleQueueItem(ctx, q, s.podStatusHandler) -}