From 7dd49516d85bcf8ed8ed888fd4d1e5133fba2daa Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 17 May 2019 17:01:05 -0700 Subject: [PATCH] Decouple vkubelet/* packages from providers (#626) This makes the concept of a `Provider` wholely implemented in the cli implementation in cmd/virtual-kubelet. It allows us to slim down the interfaces used in vkubelet (and vkubelet/api) to what is actually used there rather than a huge interface that is only there to serve the CLI's needs. --- cmd/virtual-kubelet/commands/root/http.go | 20 ++++++-- providers/alibabacloud/eci.go | 6 +-- providers/aws/fargate/cluster.go | 4 +- providers/aws/provider.go | 6 +-- providers/aws/provider_test.go | 4 +- providers/azure/aci.go | 8 +-- providers/azurebatch/batch.go | 6 +-- providers/azurebatch/batch_test.go | 6 +-- providers/cri/cri.go | 7 +-- providers/huawei/cci.go | 6 +-- providers/mock/mock.go | 9 ++-- providers/node.go | 28 ----------- providers/nomad/nomad.go | 5 +- providers/openstack/zun.go | 5 +- providers/provider.go | 60 ++--------------------- providers/web/broker.go | 6 +-- vkubelet/api/exec.go | 41 +++++++++++----- vkubelet/api/logs.go | 26 +++++++--- vkubelet/api/pods.go | 12 ++--- vkubelet/{apiserver.go => api/server.go} | 56 ++++++++++++--------- vkubelet/api/stats.go | 15 +++--- vkubelet/doc.go | 6 +-- vkubelet/node.go | 26 ++++++++-- vkubelet/node_test.go | 3 +- vkubelet/podcontroller.go | 33 +++++++++++++ vkubelet/vkubelet.go | 7 ++- 26 files changed, 217 insertions(+), 194 deletions(-) delete mode 100644 providers/node.go rename vkubelet/{apiserver.go => api/server.go} (64%) diff --git a/cmd/virtual-kubelet/commands/root/http.go b/cmd/virtual-kubelet/commands/root/http.go index 14e00c388..3668fe0a8 100644 --- a/cmd/virtual-kubelet/commands/root/http.go +++ b/cmd/virtual-kubelet/commands/root/http.go @@ -26,7 +26,7 @@ import ( "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/providers" - "github.com/virtual-kubelet/virtual-kubelet/vkubelet" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" ) // AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided @@ -86,7 +86,13 @@ func setupHTTPServer(ctx context.Context, p providers.Provider, cfg *apiServerCo } mux := http.NewServeMux() - vkubelet.AttachPodRoutes(p, mux, true) + + podRoutes := api.PodHandlerConfig{ + RunInContainer: p.RunInContainer, + GetContainerLogs: p.GetContainerLogs, + GetPods: p.GetPods, + } + api.AttachPodRoutes(podRoutes, mux, true) s := &http.Server{ Handler: mux, @@ -105,7 +111,15 @@ func setupHTTPServer(ctx context.Context, p providers.Provider, cfg *apiServerCo } mux := http.NewServeMux() - vkubelet.AttachMetricsRoutes(p, mux) + + var summaryHandlerFunc api.PodStatsSummaryHandlerFunc + if mp, ok := p.(providers.PodMetricsProvider); ok { + summaryHandlerFunc = mp.GetStatsSummary + } + podMetricsRoutes := api.PodMetricsConfig{ + GetStatsSummary: summaryHandlerFunc, + } + api.AttachPodMetricsRoutes(podMetricsRoutes, mux) s := &http.Server{ Handler: mux, } diff --git a/providers/alibabacloud/eci.go b/providers/alibabacloud/eci.go index 8c7883541..af919a178 100644 --- a/providers/alibabacloud/eci.go +++ b/providers/alibabacloud/eci.go @@ -20,8 +20,8 @@ import ( "github.com/cpuguy83/strongerrors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/alibabacloud/eci" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" v1 "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -279,7 +279,7 @@ func (p *ECIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P } // GetContainerLogs returns the logs of a pod by name that is running inside ECI. -func (p *ECIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func (p *ECIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { eciId := "" for _, cg := range p.GetCgs() { if getECITagValue(&cg, "PodName") == podName && getECITagValue(&cg, "NameSpace") == namespace { @@ -321,7 +321,7 @@ func (p *ECIProvider) GetPodFullName(namespace string, pod string) string { // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *ECIProvider) RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach providers.AttachIO) error { +func (p *ECIProvider) RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error { return nil } diff --git a/providers/aws/fargate/cluster.go b/providers/aws/fargate/cluster.go index d1466b2c5..57f27e642 100644 --- a/providers/aws/fargate/cluster.go +++ b/providers/aws/fargate/cluster.go @@ -13,7 +13,7 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/ecs" "github.com/cpuguy83/strongerrors" - "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" k8sTypes "k8s.io/apimachinery/pkg/types" ) @@ -313,7 +313,7 @@ func (c *Cluster) RemovePod(tag string) { } // GetContainerLogs returns the logs of a container from this cluster. -func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { if c.cloudWatchLogGroupName == "" { return nil, fmt.Errorf("logs not configured, please specify a \"CloudWatchLogGroupName\"") } diff --git a/providers/aws/provider.go b/providers/aws/provider.go index 4122d4b51..accfcc360 100644 --- a/providers/aws/provider.go +++ b/providers/aws/provider.go @@ -8,8 +8,8 @@ import ( "time" "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/aws/fargate" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -172,7 +172,7 @@ func (p *FargateProvider) GetPod(ctx context.Context, namespace, name string) (* } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (p *FargateProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func (p *FargateProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { log.Printf("Received GetContainerLogs request for %s/%s/%s.\n", namespace, podName, containerName) return p.cluster.GetContainerLogs(namespace, podName, containerName, opts) } @@ -184,7 +184,7 @@ func (p *FargateProvider) GetPodFullName(namespace string, pod string) string { // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *FargateProvider) RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach providers.AttachIO) error { +func (p *FargateProvider) RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error { return errNotImplemented } diff --git a/providers/aws/provider_test.go b/providers/aws/provider_test.go index bcc9abf28..03b0efe81 100644 --- a/providers/aws/provider_test.go +++ b/providers/aws/provider_test.go @@ -15,8 +15,8 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ecs" "github.com/aws/aws-sdk-go/service/iam" - "github.com/virtual-kubelet/virtual-kubelet/providers" vkAWS "github.com/virtual-kubelet/virtual-kubelet/providers/aws" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -283,7 +283,7 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) { // Wait a few seconds for the logs to settle. time.Sleep(10 * time.Second) - logs, err := provider.GetContainerLogs(context.Background(), "default", podName, "echo-container", providers.ContainerLogOpts{Tail: 100}) + logs, err := provider.GetContainerLogs(context.Background(), "default", podName, "echo-container", api.ContainerLogOpts{Tail: 100}) if err != nil { t.Fatal(err) } diff --git a/providers/azure/aci.go b/providers/azure/aci.go index c1d23efb7..bb0b9a32c 100644 --- a/providers/azure/aci.go +++ b/providers/azure/aci.go @@ -26,8 +26,8 @@ import ( "github.com/virtual-kubelet/azure-aci/client/network" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/trace" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" v1 "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -766,7 +766,7 @@ func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P } // GetContainerLogs returns the logs of a pod by name that is running inside ACI. -func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { ctx, span := trace.StartSpan(ctx, "aci.GetContainerLogs") defer span.End() ctx = addAzureAttributes(ctx, span, p) @@ -804,7 +804,7 @@ func (p *ACIProvider) GetPodFullName(namespace string, pod string) string { // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *ACIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { +func (p *ACIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { out := attach.Stdout() if out != nil { defer out.Close() @@ -816,7 +816,7 @@ func (p *ACIProvider) RunInContainer(ctx context.Context, namespace, name, conta } // Set default terminal size - size := providers.TermSize{ + size := api.TermSize{ Height: 60, Width: 120, } diff --git a/providers/azurebatch/batch.go b/providers/azurebatch/batch.go index 8347b8410..0f8bffd92 100644 --- a/providers/azurebatch/batch.go +++ b/providers/azurebatch/batch.go @@ -17,8 +17,8 @@ import ( "github.com/Azure/go-autorest/autorest/to" "github.com/lawrencegripper/pod2docker" "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers" azureCreds "github.com/virtual-kubelet/virtual-kubelet/providers/azure" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -260,7 +260,7 @@ const ( ) // GetContainerLogs returns the logs of a container running in a pod by name. -func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { log.Println("Getting pod logs ....") taskID := getTaskIDForPod(namespace, podName) @@ -319,7 +319,7 @@ func (p *Provider) GetPodFullName(namespace string, pod string) string { // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. // TODO: Implementation -func (p *Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { +func (p *Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { log.Printf("receive ExecInContainer %q\n", container) return nil } diff --git a/providers/azurebatch/batch_test.go b/providers/azurebatch/batch_test.go index 438acb04e..dc7b9dd6c 100644 --- a/providers/azurebatch/batch_test.go +++ b/providers/azurebatch/batch_test.go @@ -12,7 +12,7 @@ import ( "github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch" "github.com/Azure/go-autorest/autorest" - "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" apiv1 "k8s.io/api/core/v1" ) @@ -121,7 +121,7 @@ func Test_readLogs_404Response_expectReturnStartupLogs(t *testing.T) { return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask") } - logs, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, providers.ContainerLogOpts{}) + logs, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, api.ContainerLogOpts{}) if err != nil { t.Fatalf("GetContainerLogs return error: %v", err) } @@ -161,7 +161,7 @@ func Test_readLogs_JsonResponse_expectFormattedLogs(t *testing.T) { return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask") } - logs, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, providers.ContainerLogOpts{}) + logs, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, api.ContainerLogOpts{}) if err != nil { t.Errorf("GetContainerLogs return error: %v", err) } diff --git a/providers/cri/cri.go b/providers/cri/cri.go index 85f7682fb..e89b252f8 100644 --- a/providers/cri/cri.go +++ b/providers/cri/cri.go @@ -20,6 +20,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" "google.golang.org/grpc" v1 "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" @@ -607,7 +608,7 @@ func (p *CRIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P } // Reads a log file into a string -func readLogFile(filename string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func readLogFile(filename string, opts api.ContainerLogOpts) (io.ReadCloser, error) { file, err := os.Open(filename) if err != nil { return nil, err @@ -627,7 +628,7 @@ func readLogFile(filename string, opts providers.ContainerLogOpts) (io.ReadClose } // Provider function to read the logs of a container -func (p *CRIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func (p *CRIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { log.Printf("receive GetContainerLogs %q", containerName) err := p.refreshNodeState() @@ -656,7 +657,7 @@ func (p *CRIProvider) GetPodFullName(namespace string, pod string) string { // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. // TODO: Implementation -func (p *CRIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { +func (p *CRIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { log.Printf("receive ExecInContainer %q\n", container) return nil } diff --git a/providers/huawei/cci.go b/providers/huawei/cci.go index 2a7d17946..941d6496a 100644 --- a/providers/huawei/cci.go +++ b/providers/huawei/cci.go @@ -17,8 +17,8 @@ import ( "github.com/cpuguy83/strongerrors" "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/huawei/auth" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -299,7 +299,7 @@ func (p *CCIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P } // GetContainerLogs retrieves the logs of a container by name from the huawei CCI provider. -func (p *CCIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func (p *CCIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { return ioutil.NopCloser(strings.NewReader("")), nil } @@ -312,7 +312,7 @@ func (p *CCIProvider) GetPodFullName(namespace string, pod string) string { // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. // TODO: Implementation -func (p *CCIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { +func (p *CCIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { log.Printf("receive ExecInContainer %q\n", container) return nil } diff --git a/providers/mock/mock.go b/providers/mock/mock.go index 900730cd9..67ee94547 100644 --- a/providers/mock/mock.go +++ b/providers/mock/mock.go @@ -14,12 +14,11 @@ import ( "github.com/cpuguy83/strongerrors/status/ocstatus" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" - - "github.com/virtual-kubelet/virtual-kubelet/providers" ) const ( @@ -223,7 +222,7 @@ func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { ctx, span := trace.StartSpan(ctx, "GetContainerLogs") defer span.End() @@ -242,7 +241,7 @@ func (p *MockProvider) GetPodFullName(namespace string, pod string) string { // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { +func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { log.G(context.TODO()).Infof("receive ExecInContainer %q", container) return nil } @@ -413,7 +412,7 @@ func (p *MockProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEn // OperatingSystem returns the operating system for this provider. // This is a noop to default to Linux for now. func (p *MockProvider) OperatingSystem() string { - return providers.OperatingSystemLinux + return "Linux" } // GetStatsSummary returns dummy stats for all pods known by this provider. diff --git a/providers/node.go b/providers/node.go deleted file mode 100644 index 343840f42..000000000 --- a/providers/node.go +++ /dev/null @@ -1,28 +0,0 @@ -package providers - -import ( - "context" - - v1 "k8s.io/api/core/v1" -) - -// NodeProvider is the interface used for registering a node and updating its -// status in Kubernetes. -// -// Note: Implementers can choose to manage a node themselves, in which case -// it is not needed to provide an implementation for this interface. -type NodeProvider interface { - // Ping checks if the node is still active. - // This is intended to be lightweight as it will be called periodically as a - // heartbeat to keep the node marked as ready in Kubernetes. - Ping(context.Context) error - - // NotifyNodeStatus is used to asynchronously monitor the node. - // The passed in callback should be called any time there is a change to the - // node's status. - // This will generally trigger a call to the Kubernetes API server to update - // the status. - // - // NotifyNodeStatus should not block callers. - NotifyNodeStatus(ctx context.Context, cb func(*v1.Node)) -} diff --git a/providers/nomad/nomad.go b/providers/nomad/nomad.go index ad13564dc..60ce9d8ca 100644 --- a/providers/nomad/nomad.go +++ b/providers/nomad/nomad.go @@ -12,6 +12,7 @@ import ( nomad "github.com/hashicorp/nomad/api" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -147,7 +148,7 @@ func (p *Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1. } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { return ioutil.NopCloser(strings.NewReader("")), nil } @@ -159,7 +160,7 @@ func (p *Provider) GetPodFullName(ctx context.Context, namespace string, pod str // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. // TODO: Implementation -func (p *Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { +func (p *Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { log.Printf("ExecInContainer %q\n", container) return nil } diff --git a/providers/openstack/zun.go b/providers/openstack/zun.go index 81b34ecad..29debbc78 100644 --- a/providers/openstack/zun.go +++ b/providers/openstack/zun.go @@ -18,6 +18,7 @@ import ( "github.com/gophercloud/gophercloud/pagination" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -215,7 +216,7 @@ func (p *ZunProvider) getContainers(ctx context.Context, pod *v1.Pod) ([]Contain // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *ZunProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { +func (p *ZunProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { log.Printf("receive ExecInContainer %q\n", container) return nil } @@ -235,7 +236,7 @@ func (p *ZunProvider) GetPodStatus(ctx context.Context, namespace, name string) return &pod.Status, nil } -func (p *ZunProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func (p *ZunProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { return ioutil.NopCloser(strings.NewReader("not support in Zun Provider")), nil } diff --git a/providers/provider.go b/providers/provider.go index 81b322d4c..36fcf2afc 100644 --- a/providers/provider.go +++ b/providers/provider.go @@ -3,38 +3,23 @@ package providers import ( "context" "io" - "time" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" v1 "k8s.io/api/core/v1" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) // Provider contains the methods required to implement a virtual-kubelet provider. type Provider interface { - // CreatePod takes a Kubernetes Pod and deploys it within the provider. - CreatePod(ctx context.Context, pod *v1.Pod) error - - // UpdatePod takes a Kubernetes Pod and updates it within the provider. - UpdatePod(ctx context.Context, pod *v1.Pod) error - - // DeletePod takes a Kubernetes Pod and deletes it from the provider. - DeletePod(ctx context.Context, pod *v1.Pod) error - - // GetPod retrieves a pod by name from the provider (can be cached). - GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) + vkubelet.PodLifecycleHandler // GetContainerLogs retrieves the logs of a container by name from the provider. - GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts ContainerLogOpts) (io.ReadCloser, error) + GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. - RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach AttachIO) error - - // GetPodStatus retrieves the status of a pod by name from the provider. - GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) - - // GetPods retrieves a list of all pods running on the provider (can be cached). - GetPods(context.Context) ([]*v1.Pod, error) + RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error // Capacity returns a resource list with the capacity constraints of the provider. Capacity(context.Context) v1.ResourceList @@ -55,42 +40,7 @@ type Provider interface { OperatingSystem() string } -// ContainerLogOpts are used to pass along options to be set on the container -// log stream. -type ContainerLogOpts struct { - Tail int - Since time.Duration - LimitBytes int - Timestamps bool -} - // PodMetricsProvider is an optional interface that providers can implement to expose pod stats type PodMetricsProvider interface { GetStatsSummary(context.Context) (*stats.Summary, error) } - -// PodNotifier notifies callers of pod changes. -// Providers should implement this interface to enable callers to be notified -// of pod status updates asyncronously. -type PodNotifier interface { - // NotifyPods instructs the notifier to call the passed in function when - // the pod status changes. - // - // NotifyPods should not block callers. - NotifyPods(context.Context, func(*v1.Pod)) -} - -// AttachIO is used to pass in streams to attach to a container process -type AttachIO interface { - Stdin() io.Reader - Stdout() io.WriteCloser - Stderr() io.WriteCloser - TTY() bool - Resize() <-chan TermSize -} - -// TermSize is used to set the terminal size from attached clients. -type TermSize struct { - Width uint16 - Height uint16 -} diff --git a/providers/web/broker.go b/providers/web/broker.go index e6d5ee411..9587f2351 100644 --- a/providers/web/broker.go +++ b/providers/web/broker.go @@ -30,7 +30,7 @@ import ( "github.com/cenkalti/backoff" "github.com/cpuguy83/strongerrors" - "github.com/virtual-kubelet/virtual-kubelet/providers" + "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" v1 "k8s.io/api/core/v1" ) @@ -109,7 +109,7 @@ func (p *BrokerProvider) GetPod(ctx context.Context, namespace, name string) (*v } // GetContainerLogs returns the logs of a container running in a pod by name. -func (p *BrokerProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { +func (p *BrokerProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { urlPathStr := fmt.Sprintf( "/getContainerLogs?namespace=%s&podName=%s&containerName=%s&tail=%d", url.QueryEscape(namespace), @@ -134,7 +134,7 @@ func (p *BrokerProvider) GetPodFullName(namespace string, pod string) string { // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. // TODO: Implementation -func (p *BrokerProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { +func (p *BrokerProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { log.Printf("receive ExecInContainer %q\n", container) return nil } diff --git a/vkubelet/api/exec.go b/vkubelet/api/exec.go index 021e330bf..ebbacd9ae 100644 --- a/vkubelet/api/exec.go +++ b/vkubelet/api/exec.go @@ -10,21 +10,38 @@ import ( "github.com/cpuguy83/strongerrors" "github.com/gorilla/mux" "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/providers" "k8s.io/apimachinery/pkg/types" remoteutils "k8s.io/client-go/tools/remotecommand" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" ) -type ExecBackend interface { - RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach providers.AttachIO) error +// ContainerExecHandlerFunc defines the handler function used for "execing" into a +// container in a pod. +type ContainerExecHandlerFunc func(ctx context.Context, namespace, podName, containerName string, cmd []string, attach AttachIO) error + +// AttachIO is used to pass in streams to attach to a container process +type AttachIO interface { + Stdin() io.Reader + Stdout() io.WriteCloser + Stderr() io.WriteCloser + TTY() bool + Resize() <-chan TermSize } -// PodExecHandlerFunc makes an http handler func from a Provider which execs a command in a pod's container +// TermSize is used to set the terminal size from attached clients. +type TermSize struct { + Width uint16 + Height uint16 +} + +// HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container // Note that this handler currently depends on gorrilla/mux to get url parts as variables. // TODO(@cpuguy83): don't force gorilla/mux on consumers of this function -func PodExecHandlerFunc(backend ExecBackend) http.HandlerFunc { +func HandleContainerExec(h ContainerExecHandlerFunc) http.HandlerFunc { + if h == nil { + return NotImplemented + } return handleError(func(w http.ResponseWriter, req *http.Request) error { vars := mux.Vars(req) @@ -48,7 +65,7 @@ func PodExecHandlerFunc(backend ExecBackend) http.HandlerFunc { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - exec := &containerExecContext{ctx: ctx, b: backend, pod: pod, namespace: namespace, container: container} + exec := &containerExecContext{ctx: ctx, h: h, pod: pod, namespace: namespace, container: container} remotecommand.ServeExec(w, req, exec, "", "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols) return nil @@ -77,7 +94,7 @@ func getExecOptions(req *http.Request) (*remotecommand.Options, error) { } type containerExecContext struct { - b ExecBackend + h ContainerExecHandlerFunc eio *execIO namespace, pod, container string ctx context.Context @@ -95,7 +112,7 @@ func (c *containerExecContext) ExecInContainer(name string, uid types.UID, conta } if tty { - eio.chResize = make(chan providers.TermSize) + eio.chResize = make(chan TermSize) } ctx, cancel := context.WithCancel(c.ctx) @@ -105,7 +122,7 @@ func (c *containerExecContext) ExecInContainer(name string, uid types.UID, conta go func() { send := func(s remoteutils.TerminalSize) bool { select { - case eio.chResize <- providers.TermSize{Width: s.Width, Height: s.Height}: + case eio.chResize <- TermSize{Width: s.Width, Height: s.Height}: return false case <-ctx.Done(): return true @@ -125,7 +142,7 @@ func (c *containerExecContext) ExecInContainer(name string, uid types.UID, conta }() } - return c.b.RunInContainer(c.ctx, c.namespace, c.pod, c.container, cmd, eio) + return c.h(c.ctx, c.namespace, c.pod, c.container, cmd, eio) } type execIO struct { @@ -133,7 +150,7 @@ type execIO struct { stdin io.Reader stdout io.WriteCloser stderr io.WriteCloser - chResize chan providers.TermSize + chResize chan TermSize } func (e *execIO) TTY() bool { @@ -152,6 +169,6 @@ func (e *execIO) Stderr() io.WriteCloser { return e.stderr } -func (e *execIO) Resize() <-chan providers.TermSize { +func (e *execIO) Resize() <-chan TermSize { return e.chResize } diff --git a/vkubelet/api/logs.go b/vkubelet/api/logs.go index 7689f3373..d062c98e2 100644 --- a/vkubelet/api/logs.go +++ b/vkubelet/api/logs.go @@ -5,21 +5,31 @@ import ( "io" "net/http" "strconv" + "time" "github.com/cpuguy83/strongerrors" "github.com/gorilla/mux" "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/providers" ) -// ContainerLogsBackend is used in place of backend implementations for getting container logs -type ContainerLogsBackend interface { - GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) +// ContainerLogsHandlerFunc is used in place of backend implementations for getting container logs +type ContainerLogsHandlerFunc func(ctx context.Context, namespace, podName, containerName string, opts ContainerLogOpts) (io.ReadCloser, error) + +// ContainerLogOpts are used to pass along options to be set on the container +// log stream. +type ContainerLogOpts struct { + Tail int + Since time.Duration + LimitBytes int + Timestamps bool } -// PodLogsHandlerFunc creates an http handler function from a provider to serve logs from a pod -func PodLogsHandlerFunc(p ContainerLogsBackend) http.HandlerFunc { +// HandleContainerLogs creates an http handler function from a provider to serve logs from a pod +func HandleContainerLogs(h ContainerLogsHandlerFunc) http.HandlerFunc { + if h == nil { + return NotImplemented + } return handleError(func(w http.ResponseWriter, req *http.Request) error { vars := mux.Vars(req) if len(vars) != 3 { @@ -45,11 +55,11 @@ func PodLogsHandlerFunc(p ContainerLogsBackend) http.HandlerFunc { // TODO(@cpuguy83): support v1.PodLogOptions // The kubelet decoding here is not straight forward, so this needs to be disected - opts := providers.ContainerLogOpts{ + opts := ContainerLogOpts{ Tail: tail, } - logs, err := p.GetContainerLogs(ctx, namespace, pod, container, opts) + logs, err := h(ctx, namespace, pod, container, opts) if err != nil { return errors.Wrap(err, "error getting container logs?)") } diff --git a/vkubelet/api/pods.go b/vkubelet/api/pods.go index 6d7600128..e0dc3376a 100644 --- a/vkubelet/api/pods.go +++ b/vkubelet/api/pods.go @@ -6,18 +6,14 @@ import ( "github.com/cpuguy83/strongerrors" "github.com/virtual-kubelet/virtual-kubelet/log" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" ) -// ContainerLogsBackend is used in place of backend implementations for the provider's pods -type RunningPodsBackend interface { - // GetPods retrieves a list of all pods running on the provider (can be cached). - GetPods(context.Context) ([]*v1.Pod, error) -} +type PodListerFunc func(context.Context) ([]*v1.Pod, error) -func RunningPodsHandlerFunc(p RunningPodsBackend) http.HandlerFunc { +func HandleRunningPods(getPods PodListerFunc) http.HandlerFunc { scheme := runtime.NewScheme() v1.SchemeBuilder.AddToScheme(scheme) codecs := serializer.NewCodecFactory(scheme) @@ -25,7 +21,7 @@ func RunningPodsHandlerFunc(p RunningPodsBackend) http.HandlerFunc { return handleError(func(w http.ResponseWriter, req *http.Request) error { ctx := req.Context() ctx = log.WithLogger(ctx, log.L) - pods, err := p.GetPods(ctx) + pods, err := getPods(ctx) if err != nil { return err } diff --git a/vkubelet/apiserver.go b/vkubelet/api/server.go similarity index 64% rename from vkubelet/apiserver.go rename to vkubelet/api/server.go index b5b8b824a..42997ca74 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/api/server.go @@ -1,12 +1,10 @@ -package vkubelet +package api import ( "net/http" "github.com/gorilla/mux" "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/providers" - "github.com/virtual-kubelet/virtual-kubelet/vkubelet/api" "go.opencensus.io/plugin/ochttp" "go.opencensus.io/plugin/ochttp/propagation/b3" ) @@ -20,37 +18,40 @@ type ServeMux interface { Handle(path string, h http.Handler) } +type PodHandlerConfig struct { + RunInContainer ContainerExecHandlerFunc + GetContainerLogs ContainerLogsHandlerFunc + GetPods PodListerFunc +} + // PodHandler creates an http handler for interacting with pods/containers. -func PodHandler(p providers.Provider, debug bool) http.Handler { +func PodHandler(p PodHandlerConfig, debug bool) http.Handler { r := mux.NewRouter() // This matches the behaviour in the reference kubelet r.StrictSlash(true) if debug { - r.HandleFunc("/runningpods/", api.RunningPodsHandlerFunc(p)).Methods("GET") + r.HandleFunc("/runningpods/", HandleRunningPods(p.GetPods)).Methods("GET") } - r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", api.PodLogsHandlerFunc(p)).Methods("GET") - r.HandleFunc("/exec/{namespace}/{pod}/{container}", api.PodExecHandlerFunc(p)).Methods("POST") + r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", HandleContainerLogs(p.GetContainerLogs)).Methods("GET") + r.HandleFunc("/exec/{namespace}/{pod}/{container}", HandleContainerExec(p.RunInContainer)).Methods("POST") r.NotFoundHandler = http.HandlerFunc(NotFound) return r } -// MetricsSummaryHandler creates an http handler for serving pod metrics. +// PodStatsSummaryHandler creates an http handler for serving pod metrics. // -// If the passed in provider does not implement providers.PodMetricsProvider, -// it will create handlers that just serves http.StatusNotImplemented -func MetricsSummaryHandler(p providers.Provider) http.Handler { +// If the passed in handler func is nil this will create handlers which only +// serves http.StatusNotImplemented +func PodStatsSummaryHandler(f PodStatsSummaryHandlerFunc) http.Handler { + if f == nil { + return http.HandlerFunc(NotImplemented) + } + r := mux.NewRouter() const summaryRoute = "/stats/summary" - var h http.HandlerFunc - - mp, ok := p.(providers.PodMetricsProvider) - if !ok { - h = NotImplemented - } else { - h = api.PodMetricsHandlerFunc(mp) - } + h := HandlePodStatsSummary(f) r.Handle(summaryRoute, ochttp.WithRouteTag(h, "PodStatsSummaryHandler")).Methods("GET") r.Handle(summaryRoute+"/", ochttp.WithRouteTag(h, "PodStatsSummaryHandler")).Methods("GET") @@ -63,16 +64,25 @@ func MetricsSummaryHandler(p providers.Provider) http.Handler { // // Callers should take care to namespace the serve mux as they see fit, however // these routes get called by the Kubernetes API server. -func AttachPodRoutes(p providers.Provider, mux ServeMux, debug bool) { +func AttachPodRoutes(p PodHandlerConfig, mux ServeMux, debug bool) { mux.Handle("/", InstrumentHandler(PodHandler(p, debug))) } -// AttachMetricsRoutes adds the http routes for pod/node metrics to the passed in serve mux. +// PodMetricsConfig stores the handlers for pod metrics routes +// It is used by AttachPodMetrics. +// +// The main reason for this struct is in case of expansion we do not need to break +// the package level API. +type PodMetricsConfig struct { + GetStatsSummary PodStatsSummaryHandlerFunc +} + +// AttachPodMetricsRoutes adds the http routes for pod/node metrics to the passed in serve mux. // // Callers should take care to namespace the serve mux as they see fit, however // these routes get called by the Kubernetes API server. -func AttachMetricsRoutes(p providers.Provider, mux ServeMux) { - mux.Handle("/", InstrumentHandler(MetricsSummaryHandler(p))) +func AttachPodMetricsRoutes(p PodMetricsConfig, mux ServeMux) { + mux.Handle("/", InstrumentHandler(HandlePodStatsSummary(p.GetStatsSummary))) } func instrumentRequest(r *http.Request) *http.Request { diff --git a/vkubelet/api/stats.go b/vkubelet/api/stats.go index afa7b23a7..444e26b21 100644 --- a/vkubelet/api/stats.go +++ b/vkubelet/api/stats.go @@ -10,15 +10,16 @@ import ( stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) -// PodMetricsBackend is used in place of backend implementations to get k8s pod metrics. -type PodMetricsBackend interface { - GetStatsSummary(context.Context) (*stats.Summary, error) -} +// PodStatsSummaryHandlerFunc defines the handler for getting pod stats summaries +type PodStatsSummaryHandlerFunc func(context.Context) (*stats.Summary, error) -// PodMetricsHandlerFunc makes an HTTP handler for implementing the kubelet summary stats endpoint -func PodMetricsHandlerFunc(b PodMetricsBackend) http.HandlerFunc { +// HandlePodStatsSummary makes an HTTP handler for implementing the kubelet summary stats endpoint +func HandlePodStatsSummary(h PodStatsSummaryHandlerFunc) http.HandlerFunc { + if h == nil { + return NotImplemented + } return handleError(func(w http.ResponseWriter, req *http.Request) error { - stats, err := b.GetStatsSummary(req.Context()) + stats, err := h(req.Context()) if err != nil { if errors.Cause(err) == context.Canceled { return strongerrors.Cancelled(err) diff --git a/vkubelet/doc.go b/vkubelet/doc.go index 061cf69fd..47597ba77 100644 --- a/vkubelet/doc.go +++ b/vkubelet/doc.go @@ -19,11 +19,11 @@ controller. Up to this point you have a running virtual kubelet controller, but no HTTP handlers to deal with requests forwarded from the API server for things like -pod logs (e.g. user calls `kubectl logs myVKPod`). This package provides some -helpers for this: `AttachPodRoutes` and `AttachMetricsRoutes`. +pod logs (e.g. user calls `kubectl logs myVKPod`). The api package provides some +helpers for this: `api.AttachPodRoutes` and `api.AttachMetricsRoutes`. mux := http.NewServeMux() - vkubelet.AttachPodRoutes(provider, mux) + api.AttachPodRoutes(provider, mux) You must configure your own HTTP server, but these helpers will add handlers at the correct URI paths to your serve mux. You are not required to use go's diff --git a/vkubelet/node.go b/vkubelet/node.go index 8e3e17618..145315822 100644 --- a/vkubelet/node.go +++ b/vkubelet/node.go @@ -9,7 +9,6 @@ import ( "github.com/cpuguy83/strongerrors/status/ocstatus" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/trace" coord "k8s.io/api/coordination/v1beta1" corev1 "k8s.io/api/core/v1" @@ -21,12 +20,33 @@ import ( v1 "k8s.io/client-go/kubernetes/typed/core/v1" ) +// NodeProvider is the interface used for registering a node and updating its +// status in Kubernetes. +// +// Note: Implementers can choose to manage a node themselves, in which case +// it is not needed to provide an implementation for this interface. +type NodeProvider interface { + // Ping checks if the node is still active. + // This is intended to be lightweight as it will be called periodically as a + // heartbeat to keep the node marked as ready in Kubernetes. + Ping(context.Context) error + + // NotifyNodeStatus is used to asynchronously monitor the node. + // The passed in callback should be called any time there is a change to the + // node's status. + // This will generally trigger a call to the Kubernetes API server to update + // the status. + // + // NotifyNodeStatus should not block callers. + NotifyNodeStatus(ctx context.Context, cb func(*corev1.Node)) +} + // NewNode creates a new node. // This does not have any side-effects on the system or kubernetes. // // Use the node's `Run` method to register and run the loops to update the node // in Kubernetes. -func NewNode(p providers.NodeProvider, node *corev1.Node, leases v1beta1.LeaseInterface, nodes v1.NodeInterface, opts ...NodeOpt) (*Node, error) { +func NewNode(p NodeProvider, node *corev1.Node, leases v1beta1.LeaseInterface, nodes v1.NodeInterface, opts ...NodeOpt) (*Node, error) { n := &Node{p: p, n: node, leases: leases, nodes: nodes} for _, o := range opts { if err := o(n); err != nil { @@ -79,7 +99,7 @@ func WithNodeLease(l *coord.Lease) NodeOpt { // Node deals with creating and managing a node object in Kubernetes. // It can register a node with Kubernetes and periodically update its status. type Node struct { - p providers.NodeProvider + p NodeProvider n *corev1.Node leases v1beta1.LeaseInterface diff --git a/vkubelet/node_test.go b/vkubelet/node_test.go index 169a0ae0c..ddd6bf8c5 100644 --- a/vkubelet/node_test.go +++ b/vkubelet/node_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/virtual-kubelet/virtual-kubelet/providers" "gotest.tools/assert" "gotest.tools/assert/cmp" coord "k8s.io/api/coordination/v1beta1" @@ -232,7 +231,7 @@ func testNode(t *testing.T) *corev1.Node { } type testNodeProvider struct { - providers.NodeProvider + NodeProvider statusHandlers []func(*corev1.Node) } diff --git a/vkubelet/podcontroller.go b/vkubelet/podcontroller.go index e66385ce4..dbebcbeaa 100644 --- a/vkubelet/podcontroller.go +++ b/vkubelet/podcontroller.go @@ -39,6 +39,39 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/log" ) +// PodLifecycleHandler defines the interface used by the PodController to react +// to new and changed pods scheduled to the node that is being managed. +type PodLifecycleHandler interface { + // CreatePod takes a Kubernetes Pod and deploys it within the provider. + CreatePod(ctx context.Context, pod *corev1.Pod) error + + // UpdatePod takes a Kubernetes Pod and updates it within the provider. + UpdatePod(ctx context.Context, pod *corev1.Pod) error + + // DeletePod takes a Kubernetes Pod and deletes it from the provider. + DeletePod(ctx context.Context, pod *corev1.Pod) error + + // GetPod retrieves a pod by name from the provider (can be cached). + GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) + + // GetPodStatus retrieves the status of a pod by name from the provider. + GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) + + // GetPods retrieves a list of all pods running on the provider (can be cached). + GetPods(context.Context) ([]*corev1.Pod, error) +} + +// PodNotifier notifies callers of pod changes. +// Providers should implement this interface to enable callers to be notified +// of pod status updates asyncronously. +type PodNotifier interface { + // NotifyPods instructs the notifier to call the passed in function when + // the pod status changes. + // + // NotifyPods should not block callers. + NotifyPods(context.Context, func(*corev1.Pod)) +} + // PodController is the controller implementation for Pod resources. type PodController struct { // server is the instance to which this controller belongs. diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index 1ac1731f9..2805cb7ad 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -6,7 +6,6 @@ import ( "time" "github.com/virtual-kubelet/virtual-kubelet/manager" - "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" corev1informers "k8s.io/client-go/informers/core/v1" @@ -23,7 +22,7 @@ type Server struct { namespace string nodeName string k8sClient kubernetes.Interface - provider providers.Provider + provider PodLifecycleHandler resourceManager *manager.ResourceManager podSyncWorkers int podInformer corev1informers.PodInformer @@ -35,7 +34,7 @@ type Config struct { Client *kubernetes.Clientset Namespace string NodeName string - Provider providers.Provider + Provider PodLifecycleHandler ResourceManager *manager.ResourceManager PodSyncWorkers int PodInformer corev1informers.PodInformer @@ -68,7 +67,7 @@ func (s *Server) Run(ctx context.Context) error { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podStatusUpdate") s.runProviderSyncWorkers(ctx, q) - if pn, ok := s.provider.(providers.PodNotifier); ok { + if pn, ok := s.provider.(PodNotifier); ok { pn.NotifyPods(ctx, func(pod *corev1.Pod) { s.enqueuePodStatusUpdate(ctx, q, pod) })