Merge pull request #574 from cpuguy83/streaming_logs

Use I/O stream for provider logs interface
This commit is contained in:
Brian Goff
2019-05-08 09:25:19 -07:00
committed by GitHub
16 changed files with 212 additions and 101 deletions

View File

@@ -9,6 +9,8 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@@ -277,7 +279,7 @@ func (p *ECIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P
} }
// GetContainerLogs returns the logs of a pod by name that is running inside ECI. // GetContainerLogs returns the logs of a pod by name that is running inside ECI.
func (p *ECIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { func (p *ECIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
eciId := "" eciId := ""
for _, cg := range p.GetCgs() { for _, cg := range p.GetCgs() {
if getECITagValue(&cg, "PodName") == podName && getECITagValue(&cg, "NameSpace") == namespace { if getECITagValue(&cg, "PodName") == podName && getECITagValue(&cg, "NameSpace") == namespace {
@@ -286,13 +288,13 @@ func (p *ECIProvider) GetContainerLogs(ctx context.Context, namespace, podName,
} }
} }
if eciId == "" { if eciId == "" {
return "", errors.New(fmt.Sprintf("GetContainerLogs can't find Pod %s-%s", namespace, podName)) return nil, errors.New(fmt.Sprintf("GetContainerLogs can't find Pod %s-%s", namespace, podName))
} }
request := eci.CreateDescribeContainerLogRequest() request := eci.CreateDescribeContainerLogRequest()
request.ContainerGroupId = eciId request.ContainerGroupId = eciId
request.ContainerName = containerName request.ContainerName = containerName
request.Tail = requests.Integer(tail) request.Tail = requests.Integer(opts.Tail)
// get logs from cg // get logs from cg
logContent := "" logContent := ""
@@ -309,7 +311,7 @@ func (p *ECIProvider) GetContainerLogs(ctx context.Context, namespace, podName,
} }
} }
return logContent, nil return ioutil.NopCloser(strings.NewReader(logContent)), nil
} }
// Get full pod name as defined in the provider context // Get full pod name as defined in the provider context

View File

@@ -2,6 +2,8 @@ package fargate
import ( import (
"fmt" "fmt"
"io"
"io/ioutil"
"log" "log"
"strings" "strings"
"sync" "sync"
@@ -11,6 +13,7 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/ecs" "github.com/aws/aws-sdk-go/service/ecs"
"github.com/cpuguy83/strongerrors" "github.com/cpuguy83/strongerrors"
"github.com/virtual-kubelet/virtual-kubelet/providers"
k8sTypes "k8s.io/apimachinery/pkg/types" k8sTypes "k8s.io/apimachinery/pkg/types"
) )
@@ -310,9 +313,9 @@ func (c *Cluster) RemovePod(tag string) {
} }
// GetContainerLogs returns the logs of a container from this cluster. // GetContainerLogs returns the logs of a container from this cluster.
func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
if c.cloudWatchLogGroupName == "" { if c.cloudWatchLogGroupName == "" {
return "", fmt.Errorf("logs not configured, please specify a \"CloudWatchLogGroupName\"") return nil, fmt.Errorf("logs not configured, please specify a \"CloudWatchLogGroupName\"")
} }
prefix := fmt.Sprintf("%s_%s", buildTaskDefinitionTag(c.name, namespace, podName), containerName) prefix := fmt.Sprintf("%s_%s", buildTaskDefinitionTag(c.name, namespace, podName), containerName)
@@ -321,18 +324,18 @@ func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, tai
LogStreamNamePrefix: aws.String(prefix), LogStreamNamePrefix: aws.String(prefix),
}) })
if err != nil { if err != nil {
return "", err return nil, err
} }
// Nothing logged yet. // Nothing logged yet.
if len(describeResult.LogStreams) == 0 { if len(describeResult.LogStreams) == 0 {
return "", nil return nil, nil
} }
logs := "" logs := ""
err = client.logsapi.GetLogEventsPages(&cloudwatchlogs.GetLogEventsInput{ err = client.logsapi.GetLogEventsPages(&cloudwatchlogs.GetLogEventsInput{
Limit: aws.Int64(int64(tail)), Limit: aws.Int64(int64(opts.Tail)),
LogGroupName: aws.String(c.cloudWatchLogGroupName), LogGroupName: aws.String(c.cloudWatchLogGroupName),
LogStreamName: describeResult.LogStreams[0].LogStreamName, LogStreamName: describeResult.LogStreams[0].LogStreamName,
}, func(page *cloudwatchlogs.GetLogEventsOutput, lastPage bool) bool { }, func(page *cloudwatchlogs.GetLogEventsOutput, lastPage bool) bool {
@@ -348,8 +351,8 @@ func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, tai
}) })
if err != nil { if err != nil {
return "", err return nil, err
} }
return logs, nil return ioutil.NopCloser(strings.NewReader(logs)), nil
} }

View File

@@ -3,13 +3,13 @@ package aws
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"log" "log"
"time" "time"
"github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/providers/aws/fargate" "github.com/virtual-kubelet/virtual-kubelet/providers/aws/fargate"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -172,9 +172,9 @@ func (p *FargateProvider) GetPod(ctx context.Context, namespace, name string) (*
} }
// GetContainerLogs retrieves the logs of a container by name from the provider. // GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *FargateProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { func (p *FargateProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
log.Printf("Received GetContainerLogs request for %s/%s/%s.\n", namespace, podName, containerName) log.Printf("Received GetContainerLogs request for %s/%s/%s.\n", namespace, podName, containerName)
return p.cluster.GetContainerLogs(namespace, podName, containerName, tail) return p.cluster.GetContainerLogs(namespace, podName, containerName, opts)
} }
// GetPodFullName retrieves the full pod name as defined in the provider context. // GetPodFullName retrieves the full pod name as defined in the provider context.

View File

@@ -15,8 +15,9 @@ import (
"github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ecs" "github.com/aws/aws-sdk-go/service/ecs"
"github.com/aws/aws-sdk-go/service/iam" "github.com/aws/aws-sdk-go/service/iam"
"github.com/virtual-kubelet/virtual-kubelet/providers"
vkAWS "github.com/virtual-kubelet/virtual-kubelet/providers/aws" vkAWS "github.com/virtual-kubelet/virtual-kubelet/providers/aws"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@@ -282,13 +283,19 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) {
// Wait a few seconds for the logs to settle. // Wait a few seconds for the logs to settle.
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
logs, err := provider.GetContainerLogs(context.Background(), "default", podName, "echo-container", 100) logs, err := provider.GetContainerLogs(context.Background(), "default", podName, "echo-container", providers.ContainerLogOpts{Tail: 100})
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
}
defer logs.Close()
b, err := ioutil.ReadAll(logs)
if err != nil {
t.Fatal(err)
} }
// Test log output. // Test log output.
receivedLogs := strings.Split(logs, "\n") receivedLogs := strings.Split(string(b), "\n")
expectedLogs := []string{ expectedLogs := []string{
"Started", "Started",
pod.Spec.Containers[0].Env[0].Name + "=" + pod.Spec.Containers[0].Env[0].Value, pod.Spec.Containers[0].Env[0].Name + "=" + pod.Spec.Containers[0].Env[0].Value,

View File

@@ -19,6 +19,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/cpuguy83/strongerrors"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
client "github.com/virtual-kubelet/azure-aci/client" client "github.com/virtual-kubelet/azure-aci/client"
"github.com/virtual-kubelet/azure-aci/client/aci" "github.com/virtual-kubelet/azure-aci/client/aci"
@@ -765,25 +766,26 @@ func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P
} }
// GetContainerLogs returns the logs of a pod by name that is running inside ACI. // GetContainerLogs returns the logs of a pod by name that is running inside ACI.
func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
ctx, span := trace.StartSpan(ctx, "aci.GetContainerLogs") ctx, span := trace.StartSpan(ctx, "aci.GetContainerLogs")
defer span.End() defer span.End()
ctx = addAzureAttributes(ctx, span, p) ctx = addAzureAttributes(ctx, span, p)
logContent := ""
cg, _, err := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName)) cg, _, err := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName))
if err != nil { if err != nil {
return logContent, err return nil, err
} }
if cg.Tags["NodeName"] != p.nodeName { if cg.Tags["NodeName"] != p.nodeName {
return logContent, nil return nil, strongerrors.NotFound(errors.New("got unexpected pod node name"))
} }
// get logs from cg // get logs from cg
retry := 10 retry := 10
logContent := ""
var retries int var retries int
for retries = 0; retries < retry; retries++ { for retries = 0; retries < retry; retries++ {
cLogs, err := p.aciClient.GetContainerLogs(ctx, p.resourceGroup, cg.Name, containerName, tail) cLogs, err := p.aciClient.GetContainerLogs(ctx, p.resourceGroup, cg.Name, containerName, opts.Tail)
if err != nil { if err != nil {
log.G(ctx).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying") log.G(ctx).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying")
time.Sleep(5000 * time.Millisecond) time.Sleep(5000 * time.Millisecond)
@@ -792,7 +794,7 @@ func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName,
break break
} }
} }
return logContent, err return ioutil.NopCloser(strings.NewReader(logContent)), err
} }
// GetPodFullName as defined in the provider context // GetPodFullName as defined in the provider context

View File

@@ -4,17 +4,16 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"os" "os"
"strings" "strings"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch" "github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/to" "github.com/Azure/go-autorest/autorest/to"
"github.com/lawrencegripper/pod2docker" "github.com/lawrencegripper/pod2docker"
"github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/manager"
@@ -254,8 +253,14 @@ func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod,
return pod, nil return pod, nil
} }
const (
startingUpHeader = "Container still starting..\nShowing startup logs from Azure Batch node instead:\n"
stdoutHeader = "----- STDOUT -----\n"
stderrHeader = "----- STDERR -----\n"
)
// GetContainerLogs returns the logs of a container running in a pod by name. // GetContainerLogs returns the logs of a container running in a pod by name.
func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
log.Println("Getting pod logs ....") log.Println("Getting pod logs ....")
taskID := getTaskIDForPod(namespace, podName) taskID := getTaskIDForPod(namespace, podName)
@@ -265,40 +270,44 @@ func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, con
if containerLogReader.Response.Response != nil && containerLogReader.StatusCode == http.StatusNotFound { if containerLogReader.Response.Response != nil && containerLogReader.StatusCode == http.StatusNotFound {
stdoutReader, err := p.getFileFromTask(taskID, "stdout.txt") stdoutReader, err := p.getFileFromTask(taskID, "stdout.txt")
if err != nil { if err != nil {
return "", err return nil, err
} }
stderrReader, err := p.getFileFromTask(taskID, "stderr.txt") stderrReader, err := p.getFileFromTask(taskID, "stderr.txt")
if err != nil { if err != nil {
return "", err return nil, err
} }
var builder strings.Builder stdout := io.MultiReader(strings.NewReader(startingUpHeader), strings.NewReader(stdoutHeader), *stdoutReader.Value, strings.NewReader("\n"))
builderPtr := &builder stderr := io.MultiReader(strings.NewReader(stderrHeader), *stderrReader.Value, strings.NewReader("\n"))
mustWriteString(builderPtr, "Container still starting....\n") return &readCloser{
mustWriteString(builderPtr, "Showing startup logs from Azure Batch node instead:\n") Reader: io.MultiReader(stdout, stderr),
mustWriteString(builderPtr, "----- STDOUT -----\n") closer: func() error {
stdoutBytes, _ := ioutil.ReadAll(*stdoutReader.Value) (*stdoutReader.Value).Close()
mustWrite(builderPtr, stdoutBytes) (*stderrReader.Value).Close()
mustWriteString(builderPtr, "\n") return nil
}}, nil
mustWriteString(builderPtr, "----- STDERR -----\n")
stderrBytes, _ := ioutil.ReadAll(*stderrReader.Value)
mustWrite(builderPtr, stderrBytes)
mustWriteString(builderPtr, "\n")
return builder.String(), nil
} }
if err != nil { if err != nil {
return "", err return nil, err
} }
// TODO(@cpuguy83): don't convert stream to a string
result, err := formatLogJSON(containerLogReader) result, err := formatLogJSON(containerLogReader)
if err != nil { if err != nil {
return "", fmt.Errorf("Container log formating failed err: %v", err) return nil, fmt.Errorf("Container log formating failed err: %v", err)
} }
return result, nil return ioutil.NopCloser(strings.NewReader(result)), nil
}
type readCloser struct {
io.Reader
closer func() error
}
func (r *readCloser) Close() error {
return r.closer()
} }
// Get full pod name as defined in the provider context // Get full pod name as defined in the provider context

View File

@@ -12,6 +12,7 @@ import (
"github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch" "github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch"
"github.com/Azure/go-autorest/autorest" "github.com/Azure/go-autorest/autorest"
"github.com/virtual-kubelet/virtual-kubelet/providers"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
) )
@@ -120,12 +121,18 @@ func Test_readLogs_404Response_expectReturnStartupLogs(t *testing.T) {
return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask") return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask")
} }
result, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, 0) logs, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, providers.ContainerLogOpts{})
if err != nil { if err != nil {
t.Errorf("GetContainerLogs return error: %v", err) t.Fatalf("GetContainerLogs return error: %v", err)
}
defer logs.Close()
r, err := ioutil.ReadAll(logs)
if err != nil {
t.Fatal(err)
} }
fmt.Print(result) result := string(r)
if !strings.Contains(result, "stderrResponse") || !strings.Contains(result, "stdoutResponse") { if !strings.Contains(result, "stderrResponse") || !strings.Contains(result, "stdoutResponse") {
t.Errorf("Result didn't contain expected content have: %v", result) t.Errorf("Result didn't contain expected content have: %v", result)
@@ -154,13 +161,19 @@ func Test_readLogs_JsonResponse_expectFormattedLogs(t *testing.T) {
return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask") return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask")
} }
result, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, 0) logs, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, providers.ContainerLogOpts{})
if err != nil { if err != nil {
t.Errorf("GetContainerLogs return error: %v", err) t.Errorf("GetContainerLogs return error: %v", err)
} }
defer logs.Close()
fmt.Print(result) r, err := ioutil.ReadAll(logs)
if !strings.Contains(result, "Copy output data from the CUDA device to the host memory") || strings.Contains(result, "{") { if err != nil {
t.Fatal(err)
}
result := string(r)
if !strings.Contains(string(result), "Copy output data from the CUDA device to the host memory") || strings.Contains(result, "{") {
t.Errorf("Result didn't contain expected content have or had json: %v", result) t.Errorf("Result didn't contain expected content have or had json: %v", result)
} }

View File

@@ -6,6 +6,7 @@ import (
"bufio" "bufio"
"context" "context"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net" "net"
"os" "os"
@@ -606,10 +607,10 @@ func (p *CRIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P
} }
// Reads a log file into a string // Reads a log file into a string
func readLogFile(filename string, tail int) (string, error) { func readLogFile(filename string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
file, err := os.Open(filename) file, err := os.Open(filename)
if err != nil { if err != nil {
return "", err return nil, err
} }
defer file.Close() defer file.Close()
@@ -619,31 +620,31 @@ func readLogFile(filename string, tail int) (string, error) {
for scanner.Scan() { for scanner.Scan() {
lines = append(lines, scanner.Text()) lines = append(lines, scanner.Text())
} }
if tail > 0 && tail < len(lines) { if opts.Tail > 0 && opts.Tail < len(lines) {
lines = lines[len(lines)-tail:] lines = lines[len(lines)-opts.Tail:]
} }
return strings.Join(lines, ""), nil return ioutil.NopCloser(strings.NewReader(strings.Join(lines, ""))), nil
} }
// Provider function to read the logs of a container // Provider function to read the logs of a container
func (p *CRIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { func (p *CRIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
log.Printf("receive GetContainerLogs %q", containerName) log.Printf("receive GetContainerLogs %q", containerName)
err := p.refreshNodeState() err := p.refreshNodeState()
if err != nil { if err != nil {
return "", err return nil, err
} }
pod := p.findPodByName(namespace, podName) pod := p.findPodByName(namespace, podName)
if pod == nil { if pod == nil {
return "", strongerrors.NotFound(fmt.Errorf("Pod %s in namespace %s not found", podName, namespace)) return nil, strongerrors.NotFound(fmt.Errorf("Pod %s in namespace %s not found", podName, namespace))
} }
container := pod.containers[containerName] container := pod.containers[containerName]
if container == nil { if container == nil {
return "", strongerrors.NotFound(fmt.Errorf("Cannot find container %s in pod %s namespace %s", containerName, podName, namespace)) return nil, strongerrors.NotFound(fmt.Errorf("Cannot find container %s in pod %s namespace %s", containerName, podName, namespace))
} }
return readLogFile(container.LogPath, tail) return readLogFile(container.LogPath, opts)
} }
// Get full pod name as defined in the provider context // Get full pod name as defined in the provider context

View File

@@ -12,6 +12,7 @@ import (
"log" "log"
"net/http" "net/http"
"os" "os"
"strings"
"time" "time"
"github.com/cpuguy83/strongerrors" "github.com/cpuguy83/strongerrors"
@@ -298,8 +299,8 @@ func (p *CCIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P
} }
// GetContainerLogs retrieves the logs of a container by name from the huawei CCI provider. // GetContainerLogs retrieves the logs of a container by name from the huawei CCI provider.
func (p *CCIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { func (p *CCIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
return "", nil return ioutil.NopCloser(strings.NewReader("")), nil
} }
// Get full pod name as defined in the provider context // Get full pod name as defined in the provider context

View File

@@ -4,8 +4,10 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"strings"
"time" "time"
"github.com/cpuguy83/strongerrors" "github.com/cpuguy83/strongerrors"
@@ -195,7 +197,7 @@ func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod
} }
// GetContainerLogs retrieves the logs of a container by name from the provider. // GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
ctx, span := trace.StartSpan(ctx, "GetContainerLogs") ctx, span := trace.StartSpan(ctx, "GetContainerLogs")
defer span.End() defer span.End()
@@ -203,7 +205,7 @@ func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName,
ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName) ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName)
log.G(ctx).Info("receive GetContainerLogs %q", podName) log.G(ctx).Info("receive GetContainerLogs %q", podName)
return "", nil return ioutil.NopCloser(strings.NewReader("")), nil
} }
// Get full pod name as defined in the provider context // Get full pod name as defined in the provider context

View File

@@ -3,14 +3,15 @@ package nomad
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"io/ioutil"
"log" "log"
"os" "os"
"strings" "strings"
nomad "github.com/hashicorp/nomad/api"
"github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers"
nomad "github.com/hashicorp/nomad/api"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -146,8 +147,8 @@ func (p *Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.
} }
// GetContainerLogs retrieves the logs of a container by name from the provider. // GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
return "", nil return ioutil.NopCloser(strings.NewReader("")), nil
} }
// GetPodFullName as defined in the provider context // GetPodFullName as defined in the provider context

View File

@@ -4,9 +4,12 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil"
"log" "log"
"os" "os"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/gophercloud/gophercloud" "github.com/gophercloud/gophercloud"
@@ -232,8 +235,8 @@ func (p *ZunProvider) GetPodStatus(ctx context.Context, namespace, name string)
return &pod.Status, nil return &pod.Status, nil
} }
func (p *ZunProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { func (p *ZunProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
return "not support in Zun Provider", nil return ioutil.NopCloser(strings.NewReader("not support in Zun Provider")), nil
} }
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status

View File

@@ -3,6 +3,7 @@ package providers
import ( import (
"context" "context"
"io" "io"
"time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
@@ -23,7 +24,7 @@ type Provider interface {
GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error)
// GetContainerLogs retrieves the logs of a container by name from the provider. // GetContainerLogs retrieves the logs of a container by name from the provider.
GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts ContainerLogOpts) (io.ReadCloser, error)
// RunInContainer executes a command in a container in the pod, copying data // RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr. // between in/out/err and the container's stdin/stdout/stderr.
@@ -54,6 +55,15 @@ type Provider interface {
OperatingSystem() string OperatingSystem() string
} }
// ContainerLogOpts are used to pass along options to be set on the container
// log stream.
type ContainerLogOpts struct {
Tail int
Since time.Duration
LimitBytes int
Timestamps bool
}
// PodMetricsProvider is an optional interface that providers can implement to expose pod stats // PodMetricsProvider is an optional interface that providers can implement to expose pod stats
type PodMetricsProvider interface { type PodMetricsProvider interface {
GetStatsSummary(context.Context) (*stats.Summary, error) GetStatsSummary(context.Context) (*stats.Summary, error)

View File

@@ -86,8 +86,7 @@ func (p *BrokerProvider) DeletePod(ctx context.Context, pod *v1.Pod) error {
return err return err
} }
_, err = p.doRequest("DELETE", urlPath, podJSON, false) return checkResponseStatus(p.doRequest("DELETE", urlPath, podJSON))
return err
} }
// GetPod returns a pod by name that is being managed by the web server // GetPod returns a pod by name that is being managed by the web server
@@ -110,20 +109,20 @@ func (p *BrokerProvider) GetPod(ctx context.Context, namespace, name string) (*v
} }
// GetContainerLogs returns the logs of a container running in a pod by name. // GetContainerLogs returns the logs of a container running in a pod by name.
func (p *BrokerProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { func (p *BrokerProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error) {
urlPathStr := fmt.Sprintf( urlPathStr := fmt.Sprintf(
"/getContainerLogs?namespace=%s&podName=%s&containerName=%s&tail=%d", "/getContainerLogs?namespace=%s&podName=%s&containerName=%s&tail=%d",
url.QueryEscape(namespace), url.QueryEscape(namespace),
url.QueryEscape(podName), url.QueryEscape(podName),
url.QueryEscape(containerName), url.QueryEscape(containerName),
tail) opts.Tail)
response, err := p.doGetRequestBytes(urlPathStr) r, err := p.doGetRequestRaw(urlPathStr)
if err != nil { if err := checkResponseStatus(r, err); err != nil {
return "", err return nil, err
} }
return string(response), nil return r.Body, nil
} }
// Get full pod name as defined in the provider context // Get full pod name as defined in the provider context
@@ -231,13 +230,16 @@ func (p *BrokerProvider) doGetRequest(urlPathStr string, v interface{}) error {
return json.Unmarshal(response, &v) return json.Unmarshal(response, &v)
} }
func (p *BrokerProvider) doGetRequestBytes(urlPathStr string) ([]byte, error) { func (p *BrokerProvider) doGetRequestRaw(urlPathStr string) (*http.Response, error) {
urlPath, err := url.Parse(urlPathStr) urlPath, err := url.Parse(urlPathStr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return p.doRequest("GET", urlPath, nil)
}
return p.doRequest("GET", urlPath, nil, true) func (p *BrokerProvider) doGetRequestBytes(urlPathStr string) ([]byte, error) {
return readResponse(p.doGetRequestRaw(urlPathStr))
} }
func (p *BrokerProvider) createUpdatePod(pod *v1.Pod, method, postPath string) error { func (p *BrokerProvider) createUpdatePod(pod *v1.Pod, method, postPath string) error {
@@ -252,11 +254,10 @@ func (p *BrokerProvider) createUpdatePod(pod *v1.Pod, method, postPath string) e
if err != nil { if err != nil {
return err return err
} }
_, err = p.doRequest(method, postPathURL, podJSON, false) return checkResponseStatus(p.doRequest(method, postPathURL, podJSON))
return err
} }
func (p *BrokerProvider) doRequest(method string, urlPath *url.URL, body []byte, readResponse bool) ([]byte, error) { func (p *BrokerProvider) doRequest(method string, urlPath *url.URL, body []byte) (*http.Response, error) {
// build full URL // build full URL
requestURL := p.endpoint.ResolveReference(urlPath) requestURL := p.endpoint.ResolveReference(urlPath)
@@ -281,20 +282,33 @@ func (p *BrokerProvider) doRequest(method string, urlPath *url.URL, body []byte,
return nil, err return nil, err
} }
defer response.Body.Close() return response, nil
if response.StatusCode < 200 || response.StatusCode > 299 { }
switch response.StatusCode {
case http.StatusNotFound: func checkResponseStatus(r *http.Response, err error) error {
return nil, strongerrors.NotFound(errors.New(response.Status)) if err != nil {
default: return err
return nil, errors.New(response.Status) }
} if r.StatusCode < 200 || r.StatusCode > 299 {
} switch r.StatusCode {
case http.StatusNotFound:
// read response body if asked to return strongerrors.NotFound(errors.New(r.Status))
if readResponse { default:
return ioutil.ReadAll(response.Body) return errors.New(r.Status)
} }
}
return nil, nil return nil
}
func readResponse(r *http.Response, err error) ([]byte, error) {
if r.Body != nil {
defer r.Body.Close()
}
if err := checkResponseStatus(r, err); err != nil {
return nil, err
}
lr := io.LimitReader(r.Body, 1e6)
return ioutil.ReadAll(lr)
} }

View File

@@ -29,3 +29,29 @@ func handleError(f handlerFunc) http.HandlerFunc {
} }
} }
} }
func flushOnWrite(w io.Writer) io.Writer {
if fw, ok := w.(writeFlusher); ok {
return &flushWriter{fw}
}
return w
}
type flushWriter struct {
w writeFlusher
}
type writeFlusher interface {
Flush() error
Write([]byte) (int, error)
}
func (fw *flushWriter) Write(p []byte) (int, error) {
n, err := fw.w.Write(p)
if n > 0 {
if err := fw.w.Flush(); err != nil {
return n, err
}
}
return n, err
}

View File

@@ -9,11 +9,13 @@ import (
"github.com/cpuguy83/strongerrors" "github.com/cpuguy83/strongerrors"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/providers"
) )
// ContainerLogsBackend is used in place of backend implementations for getting container logs // ContainerLogsBackend is used in place of backend implementations for getting container logs
type ContainerLogsBackend interface { type ContainerLogsBackend interface {
GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts providers.ContainerLogOpts) (io.ReadCloser, error)
} }
// PodLogsHandlerFunc creates an http handler function from a provider to serve logs from a pod // PodLogsHandlerFunc creates an http handler function from a provider to serve logs from a pod
@@ -40,12 +42,27 @@ func PodLogsHandlerFunc(p ContainerLogsBackend) http.HandlerFunc {
tail = t tail = t
} }
podsLogs, err := p.GetContainerLogs(ctx, namespace, pod, container, tail) // TODO(@cpuguy83): support v1.PodLogOptions
// The kubelet decoding here is not straight forward, so this needs to be disected
opts := providers.ContainerLogOpts{
Tail: tail,
}
logs, err := p.GetContainerLogs(ctx, namespace, pod, container, opts)
if err != nil { if err != nil {
return errors.Wrap(err, "error getting container logs?)") return errors.Wrap(err, "error getting container logs?)")
} }
if _, err := io.WriteString(w, podsLogs); err != nil { defer logs.Close()
req.Header.Set("Transfer-Encoding", "chunked")
if _, ok := w.(writeFlusher); !ok {
log.G(ctx).Debug("http response writer does not support flushes")
}
if _, err := io.Copy(flushOnWrite(w), logs); err != nil {
return strongerrors.Unknown(errors.Wrap(err, "error writing response to client")) return strongerrors.Unknown(errors.Wrap(err, "error writing response to client"))
} }
return nil return nil