From 3cc051f7c2eeafb7528c10943d1a0923a8ceb37c Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 15 Apr 2019 12:25:51 -0700 Subject: [PATCH] Use I/O stream for provider logs interface Providers must still update the implementaiton to actually gain any benefit here, but this makes the provider interface a bit more sane. --- providers/alibabacloud/eci.go | 10 +++-- providers/aws/fargate/cluster.go | 17 +++++---- providers/aws/provider.go | 6 +-- providers/aws/provider_test.go | 15 ++++++-- providers/azure/aci.go | 14 ++++--- providers/azurebatch/batch.go | 59 +++++++++++++++++------------- providers/azurebatch/batch_test.go | 25 ++++++++++--- providers/cri/cri.go | 21 ++++++----- providers/huawei/cci.go | 5 ++- providers/mock/mock.go | 6 ++- providers/nomad/nomad.go | 9 +++-- providers/openstack/zun.go | 7 +++- providers/provider.go | 12 +++++- providers/web/broker.go | 58 ++++++++++++++++++----------- vkubelet/api/helpers.go | 26 +++++++++++++ vkubelet/api/logs.go | 23 ++++++++++-- 16 files changed, 212 insertions(+), 101 deletions(-) diff --git a/providers/alibabacloud/eci.go b/providers/alibabacloud/eci.go index 9382d4a8f..8c7883541 100644 --- a/providers/alibabacloud/eci.go +++ b/providers/alibabacloud/eci.go @@ -9,6 +9,8 @@ import ( "encoding/json" "errors" "fmt" + "io" + "io/ioutil" "os" "strconv" "strings" @@ -277,7 +279,7 @@ func (p *ECIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P } // GetContainerLogs returns the logs of a pod by name that is running inside ECI. -func (p *ECIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { +func (p *ECIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { eciId := "" for _, cg := range p.GetCgs() { if getECITagValue(&cg, "PodName") == podName && getECITagValue(&cg, "NameSpace") == namespace { @@ -286,13 +288,13 @@ func (p *ECIProvider) GetContainerLogs(ctx context.Context, namespace, podName, } } if eciId == "" { - return "", errors.New(fmt.Sprintf("GetContainerLogs can't find Pod %s-%s", namespace, podName)) + return nil, errors.New(fmt.Sprintf("GetContainerLogs can't find Pod %s-%s", namespace, podName)) } request := eci.CreateDescribeContainerLogRequest() request.ContainerGroupId = eciId request.ContainerName = containerName - request.Tail = requests.Integer(tail) + request.Tail = requests.Integer(opts.Tail) // get logs from cg logContent := "" @@ -309,7 +311,7 @@ func (p *ECIProvider) GetContainerLogs(ctx context.Context, namespace, podName, } } - return logContent, nil + return ioutil.NopCloser(strings.NewReader(logContent)), nil } // Get full pod name as defined in the provider context diff --git a/providers/aws/fargate/cluster.go b/providers/aws/fargate/cluster.go index 89e778f3b..d1466b2c5 100644 --- a/providers/aws/fargate/cluster.go +++ b/providers/aws/fargate/cluster.go @@ -2,6 +2,8 @@ package fargate import ( "fmt" + "io" + "io/ioutil" "log" "strings" "sync" @@ -11,6 +13,7 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/ecs" "github.com/cpuguy83/strongerrors" + "github.com/virtual-kubelet/virtual-kubelet/providers" k8sTypes "k8s.io/apimachinery/pkg/types" ) @@ -310,9 +313,9 @@ func (c *Cluster) RemovePod(tag string) { } // GetContainerLogs returns the logs of a container from this cluster. -func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { +func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { if c.cloudWatchLogGroupName == "" { - return "", fmt.Errorf("logs not configured, please specify a \"CloudWatchLogGroupName\"") + return nil, fmt.Errorf("logs not configured, please specify a \"CloudWatchLogGroupName\"") } prefix := fmt.Sprintf("%s_%s", buildTaskDefinitionTag(c.name, namespace, podName), containerName) @@ -321,18 +324,18 @@ func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, tai LogStreamNamePrefix: aws.String(prefix), }) if err != nil { - return "", err + return nil, err } // Nothing logged yet. if len(describeResult.LogStreams) == 0 { - return "", nil + return nil, nil } logs := "" err = client.logsapi.GetLogEventsPages(&cloudwatchlogs.GetLogEventsInput{ - Limit: aws.Int64(int64(tail)), + Limit: aws.Int64(int64(opts.Tail)), LogGroupName: aws.String(c.cloudWatchLogGroupName), LogStreamName: describeResult.LogStreams[0].LogStreamName, }, func(page *cloudwatchlogs.GetLogEventsOutput, lastPage bool) bool { @@ -348,8 +351,8 @@ func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, tai }) if err != nil { - return "", err + return nil, err } - return logs, nil + return ioutil.NopCloser(strings.NewReader(logs)), nil } diff --git a/providers/aws/provider.go b/providers/aws/provider.go index 0582c8bf7..4122d4b51 100644 --- a/providers/aws/provider.go +++ b/providers/aws/provider.go @@ -3,13 +3,13 @@ package aws import ( "context" "fmt" + "io" "log" "time" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/aws/fargate" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -172,9 +172,9 @@ func (p *FargateProvider) GetPod(ctx context.Context, namespace, name string) (* } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (p *FargateProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { +func (p *FargateProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { log.Printf("Received GetContainerLogs request for %s/%s/%s.\n", namespace, podName, containerName) - return p.cluster.GetContainerLogs(namespace, podName, containerName, tail) + return p.cluster.GetContainerLogs(namespace, podName, containerName, opts) } // GetPodFullName retrieves the full pod name as defined in the provider context. diff --git a/providers/aws/provider_test.go b/providers/aws/provider_test.go index d95b9bbe4..bcc9abf28 100644 --- a/providers/aws/provider_test.go +++ b/providers/aws/provider_test.go @@ -15,8 +15,9 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ecs" "github.com/aws/aws-sdk-go/service/iam" + "github.com/virtual-kubelet/virtual-kubelet/providers" vkAWS "github.com/virtual-kubelet/virtual-kubelet/providers/aws" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -282,13 +283,19 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) { // Wait a few seconds for the logs to settle. time.Sleep(10 * time.Second) - logs, err := provider.GetContainerLogs(context.Background(), "default", podName, "echo-container", 100) + logs, err := provider.GetContainerLogs(context.Background(), "default", podName, "echo-container", providers.ContainerLogOpts{Tail: 100}) if err != nil { - t.Error(err) + t.Fatal(err) + } + defer logs.Close() + + b, err := ioutil.ReadAll(logs) + if err != nil { + t.Fatal(err) } // Test log output. - receivedLogs := strings.Split(logs, "\n") + receivedLogs := strings.Split(string(b), "\n") expectedLogs := []string{ "Started", pod.Spec.Containers[0].Env[0].Name + "=" + pod.Spec.Containers[0].Env[0].Value, diff --git a/providers/azure/aci.go b/providers/azure/aci.go index 1a99ae3c7..c1d23efb7 100644 --- a/providers/azure/aci.go +++ b/providers/azure/aci.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/cpuguy83/strongerrors" "github.com/gorilla/websocket" client "github.com/virtual-kubelet/azure-aci/client" "github.com/virtual-kubelet/azure-aci/client/aci" @@ -765,25 +766,26 @@ func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P } // GetContainerLogs returns the logs of a pod by name that is running inside ACI. -func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { +func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { ctx, span := trace.StartSpan(ctx, "aci.GetContainerLogs") defer span.End() ctx = addAzureAttributes(ctx, span, p) - logContent := "" cg, _, err := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName)) if err != nil { - return logContent, err + return nil, err } if cg.Tags["NodeName"] != p.nodeName { - return logContent, nil + return nil, strongerrors.NotFound(errors.New("got unexpected pod node name")) } + // get logs from cg retry := 10 + logContent := "" var retries int for retries = 0; retries < retry; retries++ { - cLogs, err := p.aciClient.GetContainerLogs(ctx, p.resourceGroup, cg.Name, containerName, tail) + cLogs, err := p.aciClient.GetContainerLogs(ctx, p.resourceGroup, cg.Name, containerName, opts.Tail) if err != nil { log.G(ctx).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying") time.Sleep(5000 * time.Millisecond) @@ -792,7 +794,7 @@ func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, break } } - return logContent, err + return ioutil.NopCloser(strings.NewReader(logContent)), err } // GetPodFullName as defined in the provider context diff --git a/providers/azurebatch/batch.go b/providers/azurebatch/batch.go index 2d6ddc4ed..8347b8410 100644 --- a/providers/azurebatch/batch.go +++ b/providers/azurebatch/batch.go @@ -4,17 +4,16 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "log" "net/http" "os" "strings" - "github.com/Azure/go-autorest/autorest" - - "github.com/Azure/go-autorest/autorest/azure" - "github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch" + "github.com/Azure/go-autorest/autorest" + "github.com/Azure/go-autorest/autorest/azure" "github.com/Azure/go-autorest/autorest/to" "github.com/lawrencegripper/pod2docker" "github.com/virtual-kubelet/virtual-kubelet/manager" @@ -254,8 +253,14 @@ func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, return pod, nil } +const ( + startingUpHeader = "Container still starting..\nShowing startup logs from Azure Batch node instead:\n" + stdoutHeader = "----- STDOUT -----\n" + stderrHeader = "----- STDERR -----\n" +) + // GetContainerLogs returns the logs of a container running in a pod by name. -func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { +func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { log.Println("Getting pod logs ....") taskID := getTaskIDForPod(namespace, podName) @@ -265,40 +270,44 @@ func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, con if containerLogReader.Response.Response != nil && containerLogReader.StatusCode == http.StatusNotFound { stdoutReader, err := p.getFileFromTask(taskID, "stdout.txt") if err != nil { - return "", err + return nil, err } stderrReader, err := p.getFileFromTask(taskID, "stderr.txt") if err != nil { - return "", err + return nil, err } - var builder strings.Builder - builderPtr := &builder - mustWriteString(builderPtr, "Container still starting....\n") - mustWriteString(builderPtr, "Showing startup logs from Azure Batch node instead:\n") - mustWriteString(builderPtr, "----- STDOUT -----\n") - stdoutBytes, _ := ioutil.ReadAll(*stdoutReader.Value) - mustWrite(builderPtr, stdoutBytes) - mustWriteString(builderPtr, "\n") - - mustWriteString(builderPtr, "----- STDERR -----\n") - stderrBytes, _ := ioutil.ReadAll(*stderrReader.Value) - mustWrite(builderPtr, stderrBytes) - mustWriteString(builderPtr, "\n") - - return builder.String(), nil + stdout := io.MultiReader(strings.NewReader(startingUpHeader), strings.NewReader(stdoutHeader), *stdoutReader.Value, strings.NewReader("\n")) + stderr := io.MultiReader(strings.NewReader(stderrHeader), *stderrReader.Value, strings.NewReader("\n")) + return &readCloser{ + Reader: io.MultiReader(stdout, stderr), + closer: func() error { + (*stdoutReader.Value).Close() + (*stderrReader.Value).Close() + return nil + }}, nil } if err != nil { - return "", err + return nil, err } + // TODO(@cpuguy83): don't convert stream to a string result, err := formatLogJSON(containerLogReader) if err != nil { - return "", fmt.Errorf("Container log formating failed err: %v", err) + return nil, fmt.Errorf("Container log formating failed err: %v", err) } - return result, nil + return ioutil.NopCloser(strings.NewReader(result)), nil +} + +type readCloser struct { + io.Reader + closer func() error +} + +func (r *readCloser) Close() error { + return r.closer() } // Get full pod name as defined in the provider context diff --git a/providers/azurebatch/batch_test.go b/providers/azurebatch/batch_test.go index 6040e8003..438acb04e 100644 --- a/providers/azurebatch/batch_test.go +++ b/providers/azurebatch/batch_test.go @@ -12,6 +12,7 @@ import ( "github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch" "github.com/Azure/go-autorest/autorest" + "github.com/virtual-kubelet/virtual-kubelet/providers" apiv1 "k8s.io/api/core/v1" ) @@ -120,12 +121,18 @@ func Test_readLogs_404Response_expectReturnStartupLogs(t *testing.T) { return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask") } - result, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, 0) + logs, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, providers.ContainerLogOpts{}) if err != nil { - t.Errorf("GetContainerLogs return error: %v", err) + t.Fatalf("GetContainerLogs return error: %v", err) + } + defer logs.Close() + + r, err := ioutil.ReadAll(logs) + if err != nil { + t.Fatal(err) } - fmt.Print(result) + result := string(r) if !strings.Contains(result, "stderrResponse") || !strings.Contains(result, "stdoutResponse") { t.Errorf("Result didn't contain expected content have: %v", result) @@ -154,13 +161,19 @@ func Test_readLogs_JsonResponse_expectFormattedLogs(t *testing.T) { return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask") } - result, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, 0) + logs, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, providers.ContainerLogOpts{}) if err != nil { t.Errorf("GetContainerLogs return error: %v", err) } + defer logs.Close() - fmt.Print(result) - if !strings.Contains(result, "Copy output data from the CUDA device to the host memory") || strings.Contains(result, "{") { + r, err := ioutil.ReadAll(logs) + if err != nil { + t.Fatal(err) + } + + result := string(r) + if !strings.Contains(string(result), "Copy output data from the CUDA device to the host memory") || strings.Contains(result, "{") { t.Errorf("Result didn't contain expected content have or had json: %v", result) } diff --git a/providers/cri/cri.go b/providers/cri/cri.go index 26f39a132..85f7682fb 100644 --- a/providers/cri/cri.go +++ b/providers/cri/cri.go @@ -6,6 +6,7 @@ import ( "bufio" "context" "fmt" + "io" "io/ioutil" "net" "os" @@ -606,10 +607,10 @@ func (p *CRIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P } // Reads a log file into a string -func readLogFile(filename string, tail int) (string, error) { +func readLogFile(filename string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { file, err := os.Open(filename) if err != nil { - return "", err + return nil, err } defer file.Close() @@ -619,31 +620,31 @@ func readLogFile(filename string, tail int) (string, error) { for scanner.Scan() { lines = append(lines, scanner.Text()) } - if tail > 0 && tail < len(lines) { - lines = lines[len(lines)-tail:] + if opts.Tail > 0 && opts.Tail < len(lines) { + lines = lines[len(lines)-opts.Tail:] } - return strings.Join(lines, ""), nil + return ioutil.NopCloser(strings.NewReader(strings.Join(lines, ""))), nil } // Provider function to read the logs of a container -func (p *CRIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { +func (p *CRIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { log.Printf("receive GetContainerLogs %q", containerName) err := p.refreshNodeState() if err != nil { - return "", err + return nil, err } pod := p.findPodByName(namespace, podName) if pod == nil { - return "", strongerrors.NotFound(fmt.Errorf("Pod %s in namespace %s not found", podName, namespace)) + return nil, strongerrors.NotFound(fmt.Errorf("Pod %s in namespace %s not found", podName, namespace)) } container := pod.containers[containerName] if container == nil { - return "", strongerrors.NotFound(fmt.Errorf("Cannot find container %s in pod %s namespace %s", containerName, podName, namespace)) + return nil, strongerrors.NotFound(fmt.Errorf("Cannot find container %s in pod %s namespace %s", containerName, podName, namespace)) } - return readLogFile(container.LogPath, tail) + return readLogFile(container.LogPath, opts) } // Get full pod name as defined in the provider context diff --git a/providers/huawei/cci.go b/providers/huawei/cci.go index 89111abf9..2a7d17946 100644 --- a/providers/huawei/cci.go +++ b/providers/huawei/cci.go @@ -12,6 +12,7 @@ import ( "log" "net/http" "os" + "strings" "time" "github.com/cpuguy83/strongerrors" @@ -298,8 +299,8 @@ func (p *CCIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P } // GetContainerLogs retrieves the logs of a container by name from the huawei CCI provider. -func (p *CCIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { - return "", nil +func (p *CCIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { + return ioutil.NopCloser(strings.NewReader("")), nil } // Get full pod name as defined in the provider context diff --git a/providers/mock/mock.go b/providers/mock/mock.go index c0b726c30..5279bd27c 100644 --- a/providers/mock/mock.go +++ b/providers/mock/mock.go @@ -4,8 +4,10 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "math/rand" + "strings" "time" "github.com/cpuguy83/strongerrors" @@ -195,7 +197,7 @@ func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { +func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { ctx, span := trace.StartSpan(ctx, "GetContainerLogs") defer span.End() @@ -203,7 +205,7 @@ func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName) log.G(ctx).Info("receive GetContainerLogs %q", podName) - return "", nil + return ioutil.NopCloser(strings.NewReader("")), nil } // Get full pod name as defined in the provider context diff --git a/providers/nomad/nomad.go b/providers/nomad/nomad.go index 8fc14478d..ad13564dc 100644 --- a/providers/nomad/nomad.go +++ b/providers/nomad/nomad.go @@ -3,14 +3,15 @@ package nomad import ( "context" "fmt" + "io" + "io/ioutil" "log" "os" "strings" + nomad "github.com/hashicorp/nomad/api" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" - - nomad "github.com/hashicorp/nomad/api" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -146,8 +147,8 @@ func (p *Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1. } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { - return "", nil +func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { + return ioutil.NopCloser(strings.NewReader("")), nil } // GetPodFullName as defined in the provider context diff --git a/providers/openstack/zun.go b/providers/openstack/zun.go index 6f3454872..81b34ecad 100644 --- a/providers/openstack/zun.go +++ b/providers/openstack/zun.go @@ -4,9 +4,12 @@ import ( "context" "encoding/json" "fmt" + "io" + "io/ioutil" "log" "os" "strconv" + "strings" "time" "github.com/gophercloud/gophercloud" @@ -232,8 +235,8 @@ func (p *ZunProvider) GetPodStatus(ctx context.Context, namespace, name string) return &pod.Status, nil } -func (p *ZunProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { - return "not support in Zun Provider", nil +func (p *ZunProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { + return ioutil.NopCloser(strings.NewReader("not support in Zun Provider")), nil } // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status diff --git a/providers/provider.go b/providers/provider.go index 62fe4e919..81b322d4c 100644 --- a/providers/provider.go +++ b/providers/provider.go @@ -3,6 +3,7 @@ package providers import ( "context" "io" + "time" v1 "k8s.io/api/core/v1" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" @@ -23,7 +24,7 @@ type Provider interface { GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) // GetContainerLogs retrieves the logs of a container by name from the provider. - GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) + GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts ContainerLogOpts) (io.ReadCloser, error) // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. @@ -54,6 +55,15 @@ type Provider interface { OperatingSystem() string } +// ContainerLogOpts are used to pass along options to be set on the container +// log stream. +type ContainerLogOpts struct { + Tail int + Since time.Duration + LimitBytes int + Timestamps bool +} + // PodMetricsProvider is an optional interface that providers can implement to expose pod stats type PodMetricsProvider interface { GetStatsSummary(context.Context) (*stats.Summary, error) diff --git a/providers/web/broker.go b/providers/web/broker.go index ca123d271..e6d5ee411 100644 --- a/providers/web/broker.go +++ b/providers/web/broker.go @@ -86,8 +86,7 @@ func (p *BrokerProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { return err } - _, err = p.doRequest("DELETE", urlPath, podJSON, false) - return err + return checkResponseStatus(p.doRequest("DELETE", urlPath, podJSON)) } // GetPod returns a pod by name that is being managed by the web server @@ -110,20 +109,20 @@ func (p *BrokerProvider) GetPod(ctx context.Context, namespace, name string) (*v } // GetContainerLogs returns the logs of a container running in a pod by name. -func (p *BrokerProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { +func (p *BrokerProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) { urlPathStr := fmt.Sprintf( "/getContainerLogs?namespace=%s&podName=%s&containerName=%s&tail=%d", url.QueryEscape(namespace), url.QueryEscape(podName), url.QueryEscape(containerName), - tail) + opts.Tail) - response, err := p.doGetRequestBytes(urlPathStr) - if err != nil { - return "", err + r, err := p.doGetRequestRaw(urlPathStr) + if err := checkResponseStatus(r, err); err != nil { + return nil, err } - return string(response), nil + return r.Body, nil } // Get full pod name as defined in the provider context @@ -231,13 +230,16 @@ func (p *BrokerProvider) doGetRequest(urlPathStr string, v interface{}) error { return json.Unmarshal(response, &v) } -func (p *BrokerProvider) doGetRequestBytes(urlPathStr string) ([]byte, error) { +func (p *BrokerProvider) doGetRequestRaw(urlPathStr string) (*http.Response, error) { urlPath, err := url.Parse(urlPathStr) if err != nil { return nil, err } + return p.doRequest("GET", urlPath, nil) +} - return p.doRequest("GET", urlPath, nil, true) +func (p *BrokerProvider) doGetRequestBytes(urlPathStr string) ([]byte, error) { + return readResponse(p.doGetRequestRaw(urlPathStr)) } func (p *BrokerProvider) createUpdatePod(pod *v1.Pod, method, postPath string) error { @@ -252,11 +254,10 @@ func (p *BrokerProvider) createUpdatePod(pod *v1.Pod, method, postPath string) e if err != nil { return err } - _, err = p.doRequest(method, postPathURL, podJSON, false) - return err + return checkResponseStatus(p.doRequest(method, postPathURL, podJSON)) } -func (p *BrokerProvider) doRequest(method string, urlPath *url.URL, body []byte, readResponse bool) ([]byte, error) { +func (p *BrokerProvider) doRequest(method string, urlPath *url.URL, body []byte) (*http.Response, error) { // build full URL requestURL := p.endpoint.ResolveReference(urlPath) @@ -281,20 +282,33 @@ func (p *BrokerProvider) doRequest(method string, urlPath *url.URL, body []byte, return nil, err } - defer response.Body.Close() - if response.StatusCode < 200 || response.StatusCode > 299 { - switch response.StatusCode { + return response, nil +} + +func checkResponseStatus(r *http.Response, err error) error { + if err != nil { + return err + } + if r.StatusCode < 200 || r.StatusCode > 299 { + switch r.StatusCode { case http.StatusNotFound: - return nil, strongerrors.NotFound(errors.New(response.Status)) + return strongerrors.NotFound(errors.New(r.Status)) default: - return nil, errors.New(response.Status) + return errors.New(r.Status) } } + return nil +} - // read response body if asked to - if readResponse { - return ioutil.ReadAll(response.Body) +func readResponse(r *http.Response, err error) ([]byte, error) { + if r.Body != nil { + defer r.Body.Close() } - return nil, nil + if err := checkResponseStatus(r, err); err != nil { + return nil, err + } + + lr := io.LimitReader(r.Body, 1e6) + return ioutil.ReadAll(lr) } diff --git a/vkubelet/api/helpers.go b/vkubelet/api/helpers.go index a9022aba7..1d49f8273 100644 --- a/vkubelet/api/helpers.go +++ b/vkubelet/api/helpers.go @@ -29,3 +29,29 @@ func handleError(f handlerFunc) http.HandlerFunc { } } } + +func flushOnWrite(w io.Writer) io.Writer { + if fw, ok := w.(writeFlusher); ok { + return &flushWriter{fw} + } + return w +} + +type flushWriter struct { + w writeFlusher +} + +type writeFlusher interface { + Flush() error + Write([]byte) (int, error) +} + +func (fw *flushWriter) Write(p []byte) (int, error) { + n, err := fw.w.Write(p) + if n > 0 { + if err := fw.w.Flush(); err != nil { + return n, err + } + } + return n, err +} diff --git a/vkubelet/api/logs.go b/vkubelet/api/logs.go index d7eec9607..7689f3373 100644 --- a/vkubelet/api/logs.go +++ b/vkubelet/api/logs.go @@ -9,11 +9,13 @@ import ( "github.com/cpuguy83/strongerrors" "github.com/gorilla/mux" "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/providers" ) // ContainerLogsBackend is used in place of backend implementations for getting container logs type ContainerLogsBackend interface { - GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) + GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) } // PodLogsHandlerFunc creates an http handler function from a provider to serve logs from a pod @@ -40,12 +42,27 @@ func PodLogsHandlerFunc(p ContainerLogsBackend) http.HandlerFunc { tail = t } - podsLogs, err := p.GetContainerLogs(ctx, namespace, pod, container, tail) + // TODO(@cpuguy83): support v1.PodLogOptions + // The kubelet decoding here is not straight forward, so this needs to be disected + + opts := providers.ContainerLogOpts{ + Tail: tail, + } + + logs, err := p.GetContainerLogs(ctx, namespace, pod, container, opts) if err != nil { return errors.Wrap(err, "error getting container logs?)") } - if _, err := io.WriteString(w, podsLogs); err != nil { + defer logs.Close() + + req.Header.Set("Transfer-Encoding", "chunked") + + if _, ok := w.(writeFlusher); !ok { + log.G(ctx).Debug("http response writer does not support flushes") + } + + if _, err := io.Copy(flushOnWrite(w), logs); err != nil { return strongerrors.Unknown(errors.Wrap(err, "error writing response to client")) } return nil