Make container logs available

Store and retrieve container logs using Cloudwatch Logs
This commit is contained in:
Johannes Würbach
2018-04-20 23:52:59 +02:00
committed by Robbie Zhang
parent 0a1acbc78e
commit 803b0218c8
6 changed files with 79 additions and 4 deletions

View File

@@ -42,6 +42,7 @@ type providerConfig struct {
SecurityGroups []string SecurityGroups []string
AssignPublicIPv4Address bool AssignPublicIPv4Address bool
ExecutionRoleArn string ExecutionRoleArn string
CloudWatchLogGroupName string
PlatformVersion string PlatformVersion string
OperatingSystem string OperatingSystem string
CPU string CPU string
@@ -133,6 +134,7 @@ func (p *FargateProvider) loadConfig(r io.Reader) error {
p.clusterName = config.ClusterName p.clusterName = config.ClusterName
p.assignPublicIPv4Address = config.AssignPublicIPv4Address p.assignPublicIPv4Address = config.AssignPublicIPv4Address
p.executionRoleArn = config.ExecutionRoleArn p.executionRoleArn = config.ExecutionRoleArn
p.cloudWatchLogGroupName = config.CloudWatchLogGroupName
p.platformVersion = config.PlatformVersion p.platformVersion = config.PlatformVersion
p.operatingSystem = config.OperatingSystem p.operatingSystem = config.OperatingSystem
p.capacity.cpu = config.CPU p.capacity.cpu = config.CPU

View File

@@ -28,6 +28,10 @@ AssignPublicIPv4Address = false
# https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_execution_IAM_role.html # https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_execution_IAM_role.html
ExecutionRoleArn = "" ExecutionRoleArn = ""
# AWS CloudWatch Log Group Name used to store container logs. Optional.
# If omitted, no container logs will be stored and retrievable.
CloudWatchLogGroupName = "/ecs/virtual-kubelet-logs"
# Fargate platform version. Optional. Defaults to "LATEST". # Fargate platform version. Optional. Defaults to "LATEST".
# https://docs.aws.amazon.com/AmazonECS/latest/developerguide/platform_versions.html # https://docs.aws.amazon.com/AmazonECS/latest/developerguide/platform_versions.html
PlatformVersion = "LATEST" PlatformVersion = "LATEST"

View File

@@ -5,6 +5,8 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/aws/aws-sdk-go/service/ecs" "github.com/aws/aws-sdk-go/service/ecs"
"github.com/aws/aws-sdk-go/service/ecs/ecsiface" "github.com/aws/aws-sdk-go/service/ecs/ecsiface"
) )
@@ -14,6 +16,7 @@ type Client struct {
region string region string
svc *ecs.ECS svc *ecs.ECS
api ecsiface.ECSAPI api ecsiface.ECSAPI
logsapi cloudwatchlogsiface.CloudWatchLogsAPI
} }
var client *Client var client *Client
@@ -42,6 +45,9 @@ func newClient(region string) (*Client, error) {
client.svc = ecs.New(session) client.svc = ecs.New(session)
client.api = client.svc client.api = client.svc
// Create the CloudWatch service client.
client.logsapi = cloudwatchlogs.New(session)
log.Println("Created Fargate service client.") log.Println("Created Fargate service client.")
return &client, nil return &client, nil

View File

@@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/ecs" "github.com/aws/aws-sdk-go/service/ecs"
k8sTypes "k8s.io/apimachinery/pkg/types" k8sTypes "k8s.io/apimachinery/pkg/types"
) )
@@ -21,6 +22,7 @@ type ClusterConfig struct {
SecurityGroups []string SecurityGroups []string
AssignPublicIPv4Address bool AssignPublicIPv4Address bool
ExecutionRoleArn string ExecutionRoleArn string
CloudWatchLogGroupName string
PlatformVersion string PlatformVersion string
} }
@@ -34,6 +36,7 @@ type Cluster struct {
securityGroups []string securityGroups []string
assignPublicIPv4Address bool assignPublicIPv4Address bool
executionRoleArn string executionRoleArn string
cloudWatchLogGroupName string
platformVersion string platformVersion string
pods map[string]*Pod pods map[string]*Pod
sync.RWMutex sync.RWMutex
@@ -68,6 +71,7 @@ func NewCluster(config *ClusterConfig) (*Cluster, error) {
securityGroups: config.SecurityGroups, securityGroups: config.SecurityGroups,
assignPublicIPv4Address: config.AssignPublicIPv4Address, assignPublicIPv4Address: config.AssignPublicIPv4Address,
executionRoleArn: config.ExecutionRoleArn, executionRoleArn: config.ExecutionRoleArn,
cloudWatchLogGroupName: config.CloudWatchLogGroupName,
platformVersion: config.PlatformVersion, platformVersion: config.PlatformVersion,
pods: make(map[string]*Pod), pods: make(map[string]*Pod),
} }
@@ -295,3 +299,48 @@ func (c *Cluster) RemovePod(tag string) {
delete(c.pods, tag) delete(c.pods, tag)
} }
// GetContainerLogs returns the logs of a container from this cluster.
func (c *Cluster) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
if c.cloudWatchLogGroupName == "" {
return "", fmt.Errorf("logs not configured, please specify a \"CloudWatchLogGroupName\"")
}
prefix := fmt.Sprintf("%s_%s", buildTaskDefinitionTag(c.name, namespace, podName), containerName)
describeResult, err := client.logsapi.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: aws.String(c.cloudWatchLogGroupName),
LogStreamNamePrefix: aws.String(prefix),
})
if err != nil {
return "", err
}
// Nothing logged yet.
if len(describeResult.LogStreams) == 0 {
return "", nil
}
logs := ""
err = client.logsapi.GetLogEventsPages(&cloudwatchlogs.GetLogEventsInput{
Limit: aws.Int64(int64(tail)),
LogGroupName: aws.String(c.cloudWatchLogGroupName),
LogStreamName: describeResult.LogStreams[0].LogStreamName,
}, func(page *cloudwatchlogs.GetLogEventsOutput, lastPage bool) bool {
for _, event := range page.Events {
logs += *event.Message
logs += "\n"
}
// Due to a issue in the aws-sdk last page is never true, but the we can stop
// as soon as no further results are returned.
// See https://github.com/aws/aws-sdk-ruby/pull/730.
return len(page.Events) > 0
})
if err != nil {
return "", err
}
return logs, nil
}

View File

@@ -92,6 +92,18 @@ func NewPod(cluster *Cluster, pod *corev1.Pod) (*Pod, error) {
return nil, err return nil, err
} }
if cluster.cloudWatchLogGroupName != "" {
// Configure container logs to be sent to the configured Cloudwatch Logs Log Group.
cntr.definition.LogConfiguration = &ecs.LogConfiguration{
LogDriver: aws.String(ecs.LogDriverAwslogs),
Options: map[string]*string{
"awslogs-group": aws.String(cluster.cloudWatchLogGroupName),
"awslogs-region": aws.String(cluster.region),
"awslogs-stream-prefix": aws.String(fmt.Sprintf("%s_%s", tag, containerSpec.Name)),
},
}
}
// Add the container's resource requirements to its pod's total resource requirements. // Add the container's resource requirements to its pod's total resource requirements.
fgPod.taskCPU += *cntr.definition.Cpu fgPod.taskCPU += *cntr.definition.Cpu
fgPod.taskMemory += *cntr.definition.Memory fgPod.taskMemory += *cntr.definition.Memory

View File

@@ -32,6 +32,7 @@ type FargateProvider struct {
capacity capacity capacity capacity
assignPublicIPv4Address bool assignPublicIPv4Address bool
executionRoleArn string executionRoleArn string
cloudWatchLogGroupName string
platformVersion string platformVersion string
lastTransitionTime time.Time lastTransitionTime time.Time
} }
@@ -86,6 +87,7 @@ func NewFargateProvider(
SecurityGroups: p.securityGroups, SecurityGroups: p.securityGroups,
AssignPublicIPv4Address: p.assignPublicIPv4Address, AssignPublicIPv4Address: p.assignPublicIPv4Address,
ExecutionRoleArn: p.executionRoleArn, ExecutionRoleArn: p.executionRoleArn,
CloudWatchLogGroupName: p.cloudWatchLogGroupName,
PlatformVersion: p.platformVersion, PlatformVersion: p.platformVersion,
} }
@@ -170,7 +172,7 @@ func (p *FargateProvider) GetPod(namespace, name string) (*corev1.Pod, error) {
// GetContainerLogs retrieves the logs of a container by name from the provider. // GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *FargateProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { func (p *FargateProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
log.Printf("Received GetContainerLogs request for %s/%s/%s.\n", namespace, podName, containerName) log.Printf("Received GetContainerLogs request for %s/%s/%s.\n", namespace, podName, containerName)
return "", errNotImplemented return p.cluster.GetContainerLogs(namespace, podName, containerName, tail)
} }
// GetPodStatus retrieves the status of a pod by name from the provider. // GetPodStatus retrieves the status of a pod by name from the provider.