From 803b0218c83d9a284a2660508aa65a715a5f8cb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20W=C3=BCrbach?= Date: Fri, 20 Apr 2018 23:52:59 +0200 Subject: [PATCH] Make container logs available Store and retrieve container logs using Cloudwatch Logs --- providers/aws/config.go | 2 ++ providers/aws/fargate.toml | 4 +++ providers/aws/fargate/client.go | 12 ++++++-- providers/aws/fargate/cluster.go | 49 ++++++++++++++++++++++++++++++++ providers/aws/fargate/pod.go | 12 ++++++++ providers/aws/provider.go | 4 ++- 6 files changed, 79 insertions(+), 4 deletions(-) diff --git a/providers/aws/config.go b/providers/aws/config.go index bab5e3d82..4370d2904 100644 --- a/providers/aws/config.go +++ b/providers/aws/config.go @@ -42,6 +42,7 @@ type providerConfig struct { SecurityGroups []string AssignPublicIPv4Address bool ExecutionRoleArn string + CloudWatchLogGroupName string PlatformVersion string OperatingSystem string CPU string @@ -133,6 +134,7 @@ func (p *FargateProvider) loadConfig(r io.Reader) error { p.clusterName = config.ClusterName p.assignPublicIPv4Address = config.AssignPublicIPv4Address p.executionRoleArn = config.ExecutionRoleArn + p.cloudWatchLogGroupName = config.CloudWatchLogGroupName p.platformVersion = config.PlatformVersion p.operatingSystem = config.OperatingSystem p.capacity.cpu = config.CPU diff --git a/providers/aws/fargate.toml b/providers/aws/fargate.toml index bd7c8de92..db302b5fb 100644 --- a/providers/aws/fargate.toml +++ b/providers/aws/fargate.toml @@ -28,6 +28,10 @@ AssignPublicIPv4Address = false # https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_execution_IAM_role.html ExecutionRoleArn = "" +# AWS CloudWatch Log Group Name used to store container logs. Optional. +# If omitted, no container logs will be stored and retrievable. +CloudWatchLogGroupName = "/ecs/virtual-kubelet-logs" + # Fargate platform version. Optional. Defaults to "LATEST". # https://docs.aws.amazon.com/AmazonECS/latest/developerguide/platform_versions.html PlatformVersion = "LATEST" diff --git a/providers/aws/fargate/client.go b/providers/aws/fargate/client.go index 86eb7ca2a..5c6ba1f5f 100644 --- a/providers/aws/fargate/client.go +++ b/providers/aws/fargate/client.go @@ -5,15 +5,18 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface" "github.com/aws/aws-sdk-go/service/ecs" "github.com/aws/aws-sdk-go/service/ecs/ecsiface" ) // Client communicates with the regional AWS Fargate service. type Client struct { - region string - svc *ecs.ECS - api ecsiface.ECSAPI + region string + svc *ecs.ECS + api ecsiface.ECSAPI + logsapi cloudwatchlogsiface.CloudWatchLogsAPI } var client *Client @@ -42,6 +45,9 @@ func newClient(region string) (*Client, error) { client.svc = ecs.New(session) client.api = client.svc + // Create the CloudWatch service client. + client.logsapi = cloudwatchlogs.New(session) + log.Println("Created Fargate service client.") return &client, nil diff --git a/providers/aws/fargate/cluster.go b/providers/aws/fargate/cluster.go index c7422ce29..cf8350ed2 100644 --- a/providers/aws/fargate/cluster.go +++ b/providers/aws/fargate/cluster.go @@ -8,6 +8,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/ecs" k8sTypes "k8s.io/apimachinery/pkg/types" ) @@ -21,6 +22,7 @@ type ClusterConfig struct { SecurityGroups []string AssignPublicIPv4Address bool ExecutionRoleArn string + CloudWatchLogGroupName string PlatformVersion string } @@ -34,6 +36,7 @@ type Cluster struct { securityGroups []string assignPublicIPv4Address bool executionRoleArn string + cloudWatchLogGroupName string platformVersion string pods map[string]*Pod sync.RWMutex @@ -68,6 +71,7 @@ func NewCluster(config *ClusterConfig) (*Cluster, error) { securityGroups: config.SecurityGroups, assignPublicIPv4Address: config.AssignPublicIPv4Address, executionRoleArn: config.ExecutionRoleArn, + cloudWatchLogGroupName: config.CloudWatchLogGroupName, platformVersion: config.PlatformVersion, pods: make(map[string]*Pod), } @@ -295,3 +299,48 @@ func (c *Cluster) RemovePod(tag string) { delete(c.pods, tag) } + +// GetContainerLogs returns the logs of a container from this cluster. +func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { + if c.cloudWatchLogGroupName == "" { + return "", fmt.Errorf("logs not configured, please specify a \"CloudWatchLogGroupName\"") + } + + prefix := fmt.Sprintf("%s_%s", buildTaskDefinitionTag(c.name, namespace, podName), containerName) + describeResult, err := client.logsapi.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ + LogGroupName: aws.String(c.cloudWatchLogGroupName), + LogStreamNamePrefix: aws.String(prefix), + }) + if err != nil { + return "", err + } + + // Nothing logged yet. + if len(describeResult.LogStreams) == 0 { + return "", nil + } + + logs := "" + + err = client.logsapi.GetLogEventsPages(&cloudwatchlogs.GetLogEventsInput{ + Limit: aws.Int64(int64(tail)), + LogGroupName: aws.String(c.cloudWatchLogGroupName), + LogStreamName: describeResult.LogStreams[0].LogStreamName, + }, func(page *cloudwatchlogs.GetLogEventsOutput, lastPage bool) bool { + for _, event := range page.Events { + logs += *event.Message + logs += "\n" + } + + // Due to a issue in the aws-sdk last page is never true, but the we can stop + // as soon as no further results are returned. + // See https://github.com/aws/aws-sdk-ruby/pull/730. + return len(page.Events) > 0 + }) + + if err != nil { + return "", err + } + + return logs, nil +} diff --git a/providers/aws/fargate/pod.go b/providers/aws/fargate/pod.go index c40cdf0cf..ece3fb894 100644 --- a/providers/aws/fargate/pod.go +++ b/providers/aws/fargate/pod.go @@ -92,6 +92,18 @@ func NewPod(cluster *Cluster, pod *corev1.Pod) (*Pod, error) { return nil, err } + if cluster.cloudWatchLogGroupName != "" { + // Configure container logs to be sent to the configured Cloudwatch Logs Log Group. + cntr.definition.LogConfiguration = &ecs.LogConfiguration{ + LogDriver: aws.String(ecs.LogDriverAwslogs), + Options: map[string]*string{ + "awslogs-group": aws.String(cluster.cloudWatchLogGroupName), + "awslogs-region": aws.String(cluster.region), + "awslogs-stream-prefix": aws.String(fmt.Sprintf("%s_%s", tag, containerSpec.Name)), + }, + } + } + // Add the container's resource requirements to its pod's total resource requirements. fgPod.taskCPU += *cntr.definition.Cpu fgPod.taskMemory += *cntr.definition.Memory diff --git a/providers/aws/provider.go b/providers/aws/provider.go index f3bcfbf21..2bc60cc9a 100644 --- a/providers/aws/provider.go +++ b/providers/aws/provider.go @@ -32,6 +32,7 @@ type FargateProvider struct { capacity capacity assignPublicIPv4Address bool executionRoleArn string + cloudWatchLogGroupName string platformVersion string lastTransitionTime time.Time } @@ -86,6 +87,7 @@ func NewFargateProvider( SecurityGroups: p.securityGroups, AssignPublicIPv4Address: p.assignPublicIPv4Address, ExecutionRoleArn: p.executionRoleArn, + CloudWatchLogGroupName: p.cloudWatchLogGroupName, PlatformVersion: p.platformVersion, } @@ -170,7 +172,7 @@ func (p *FargateProvider) GetPod(namespace, name string) (*corev1.Pod, error) { // GetContainerLogs retrieves the logs of a container by name from the provider. func (p *FargateProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { log.Printf("Received GetContainerLogs request for %s/%s/%s.\n", namespace, podName, containerName) - return "", errNotImplemented + return p.cluster.GetContainerLogs(namespace, podName, containerName, tail) } // GetPodStatus retrieves the status of a pod by name from the provider.