diff --git a/.circleci/config.yml b/.circleci/config.yml index 025aca688..2420fe113 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -23,4 +23,4 @@ jobs: command: V=1 make build - run: name: Tests - command: V=1 CI=1 make test + command: V=1 CI=1 SKIP_AWS_E2E=1 make test diff --git a/providers/aws/helpers_test.go b/providers/aws/helpers_test.go new file mode 100644 index 000000000..08de98e24 --- /dev/null +++ b/providers/aws/helpers_test.go @@ -0,0 +1,284 @@ +package aws_test + +import ( + "fmt" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" +) + +// createVpcWithInternetAccess create a VPC with one subnet and internet access +// and tags all created resources +func createVpcWithInternetAccess(ec2Client *ec2.EC2) (*string, error) { + vpcCreateResponse, err := ec2Client.CreateVpc(&ec2.CreateVpcInput{ + CidrBlock: aws.String("172.31.0.0/16"), + }) + if err != nil { + return nil, err + } + vpcID := vpcCreateResponse.Vpc.VpcId + + err = tagResource(ec2Client, vpcID) + if err != nil { + return nil, err + } + + subnetResponse, err := ec2Client.CreateSubnet(&ec2.CreateSubnetInput{ + CidrBlock: aws.String("172.31.0.0/16"), + VpcId: vpcID, + }) + if err != nil { + return nil, err + } + subnetID := subnetResponse.Subnet.SubnetId + + err = tagResource(ec2Client, subnetID) + if err != nil { + return nil, err + } + + igResponse, err := ec2Client.CreateInternetGateway(&ec2.CreateInternetGatewayInput{}) + if err != nil { + return nil, err + } + igID := igResponse.InternetGateway.InternetGatewayId + + err = tagResource(ec2Client, igID) + if err != nil { + return nil, err + } + + _, err = ec2Client.AttachInternetGateway(&ec2.AttachInternetGatewayInput{ + InternetGatewayId: igID, + VpcId: vpcID, + }) + if err != nil { + return nil, err + } + + routeTableResponse, err := ec2Client.CreateRouteTable(&ec2.CreateRouteTableInput{ + VpcId: vpcID, + }) + if err != nil { + return nil, err + } + + routeTableID := routeTableResponse.RouteTable.RouteTableId + + err = tagResource(ec2Client, routeTableID) + if err != nil { + return nil, err + } + + _, err = ec2Client.AssociateRouteTable(&ec2.AssociateRouteTableInput{ + RouteTableId: routeTableID, + SubnetId: subnetID, + }) + if err != nil { + return nil, err + } + + _, err = ec2Client.CreateRoute(&ec2.CreateRouteInput{ + DestinationCidrBlock: aws.String("0.0.0.0/0"), + GatewayId: igID, + RouteTableId: routeTableID, + }) + if err != nil { + return nil, err + } + + return subnetID, nil +} + +// tagResource tries to tag an EC2 resource in a loop to workaround EC2 eventual consistency +func tagResource(ec2Client *ec2.EC2, resourceID *string) error { + fmt.Printf("Tagging: %s\n", *resourceID) + return retry(func() error { + _, err := ec2Client.CreateTags(&ec2.CreateTagsInput{ + Resources: []*string{resourceID}, + Tags: []*ec2.Tag{&ec2.Tag{ + Key: aws.String("Name"), + Value: aws.String("vk-aws-e2e-test"), + }}, + }) + + return err + }) +} + +// deleteVpc deletes all resources of the created VPC by enumarating all tagged +// resources and deleting them with multiple retries to cope with EC2 eventual +// consistency +func deleteVpc(ec2Client *ec2.EC2) error { + // Remove any routing tables + retry(func() error { + resourceIDs, err := findResourceByTag(ec2Client, "route-table") + if err != nil { + return err + } + + describeResponse, err := ec2Client.DescribeRouteTables(&ec2.DescribeRouteTablesInput{ + RouteTableIds: resourceIDs, + }) + if err != nil { + return err + } + + for _, routeTable := range describeResponse.RouteTables { + for _, association := range routeTable.Associations { + _, err = ec2Client.DisassociateRouteTable(&ec2.DisassociateRouteTableInput{ + AssociationId: association.RouteTableAssociationId, + }) + + if err != nil { + return err + } + } + + _, err = ec2Client.DeleteRouteTable(&ec2.DeleteRouteTableInput{ + RouteTableId: routeTable.RouteTableId, + }) + + if err != nil { + return err + } + } + + return nil + }) + + // Remove associatated internet gateways + retry(func() error { + resourceIDs, err := findResourceByTag(ec2Client, "internet-gateway") + if err != nil { + return err + } + + describeResponse, err := ec2Client.DescribeInternetGateways(&ec2.DescribeInternetGatewaysInput{ + InternetGatewayIds: resourceIDs, + }) + if err != nil { + return err + } + + for _, internetGateway := range describeResponse.InternetGateways { + for _, attachment := range internetGateway.Attachments { + _, err = ec2Client.DetachInternetGateway(&ec2.DetachInternetGatewayInput{ + InternetGatewayId: internetGateway.InternetGatewayId, + VpcId: attachment.VpcId, + }) + + if err != nil { + return err + } + } + + _, err = ec2Client.DeleteInternetGateway(&ec2.DeleteInternetGatewayInput{ + InternetGatewayId: internetGateway.InternetGatewayId, + }) + + if err != nil { + return err + } + } + + return nil + }) + + // Remove subnets + retry(func() error { + resourceIDs, err := findResourceByTag(ec2Client, "subnet") + if err != nil { + return err + } + + for _, resourceID := range resourceIDs { + _, err = ec2Client.DeleteSubnet(&ec2.DeleteSubnetInput{ + SubnetId: resourceID, + }) + + if err != nil { + return err + } + } + + return nil + }) + + // Remove the VPC itself + retry(func() error { + resourceIDs, err := findResourceByTag(ec2Client, "vpc") + if err != nil { + return err + } + + for _, resourceID := range resourceIDs { + _, err = ec2Client.DeleteVpc(&ec2.DeleteVpcInput{ + VpcId: resourceID, + }) + + if err != nil { + return err + } + } + + return nil + }) + + return nil +} + +// findResourceByTag finds EC2 resources by a tag +func findResourceByTag(ec2Client *ec2.EC2, resourceType string) ([]*string, error) { + describeResponse, err := ec2Client.DescribeTags(&ec2.DescribeTagsInput{ + Filters: []*ec2.Filter{ + &ec2.Filter{ + Name: aws.String("key"), + Values: []*string{aws.String("Name")}, + }, + &ec2.Filter{ + Name: aws.String("value"), + Values: []*string{aws.String("vk-aws-e2e-test")}, + }, + &ec2.Filter{ + Name: aws.String("resource-type"), + Values: []*string{aws.String(resourceType)}, + }, + }, + }) + + if err != nil { + return nil, err + } + + resourceIDs := make([]*string, len(describeResponse.Tags)) + for i, tag := range describeResponse.Tags { + resourceIDs[i] = tag.ResourceId + } + + return resourceIDs, nil +} + +type fn func() error + +// retry retries an action up to 10 times +func retry(action fn) error { + attempts := 10 + sleep := time.Second * 10 + + for { + if attempts == 0 { + return fmt.Errorf("action failed, maximum attempts reached") + } + + err := action() + if err == nil { + return nil + } + + fmt.Printf("action failed, err: %s retrying...\n", err) + + time.Sleep(sleep) + } +} diff --git a/providers/aws/provider_test.go b/providers/aws/provider_test.go new file mode 100644 index 000000000..4e3c30477 --- /dev/null +++ b/providers/aws/provider_test.go @@ -0,0 +1,302 @@ +package aws_test + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ecs" + "github.com/aws/aws-sdk-go/service/iam" + vkAWS "github.com/virtual-kubelet/virtual-kubelet/providers/aws" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +const testRegion = "us-east-1" + +const executorRoleAssumePolicy = `{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "", + "Effect": "Allow", + "Principal": { + "Service": "ecs-tasks.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] +}` + +const testConfig = ` +Region = "%s" +ClusterName = "%s" +CloudWatchLogGroupName = "%s" +ExecutionRoleArn = "%s" +Subnets = [ "%s" ] +SecurityGroups = [ ] +# Required to pull the busybox image +AssignPublicIPv4Address = true +` + +const testName = "vk-aws-e2e-test" + +func TestAWS(t *testing.T) { + if os.Getenv("SKIP_AWS_E2E") == "1" { + t.Skip("skipping AWS e2e tests") + } + + session := session.New(&aws.Config{ + Region: aws.String(testRegion), + }) + ec2Client := ec2.New(session) + ecsClient := ecs.New(session) + cloudwatchClient := cloudwatchlogs.New(session) + iamClient := iam.New(session) + + // Create a new VPC with one subnet and internet access + // Internet access is required to pull public images from the docker registry + subnetID, err := createVpcWithInternetAccess(ec2Client) + if err != nil { + t.Error(err) + } + + // Create the Cloudwatch Logs Log Group used by container logs + logGroupdID := aws.String("/ecs/vk-aws-e2e-test") + _, err = cloudwatchClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + LogGroupName: logGroupdID, + }) + if err != nil { + t.Error(err) + } + + // Create the role used by Fargate to write logs and pull ECR images + executorRoleID := aws.String("vk-aws-e2e-test") + _, err = iamClient.CreateRole(&iam.CreateRoleInput{ + RoleName: executorRoleID, + AssumeRolePolicyDocument: aws.String(executorRoleAssumePolicy), + }) + if err != nil { + t.Error(err) + } + + // Default policy allowing log writes and ECR pulls + iamClient.AttachRolePolicy(&iam.AttachRolePolicyInput{ + PolicyArn: aws.String("arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"), + RoleName: executorRoleID, + }) + if err != nil { + t.Error(err) + } + + // Create a cluster for the E2E test + createResponse, err := ecsClient.CreateCluster(&ecs.CreateClusterInput{ + ClusterName: aws.String("vk-aws-e2e-test"), + }) + if err != nil { + t.Error(err) + } + clusterID := createResponse.Cluster.ClusterArn + + time.Sleep(10 * time.Second) + + t.Run("Create, list and delete pod", func(t *testing.T) { + if clusterID == nil || logGroupdID == nil || executorRoleID == nil || subnetID == nil { + t.Fatal("Can't start tests without all requirements.") + } + + // Write config file with our test configuration + config := fmt.Sprintf(testConfig, testRegion, "vk-aws-e2e-test", *logGroupdID, *executorRoleID, *subnetID) + fmt.Printf("Test with config:\n%s", config) + + tmpfile, err := ioutil.TempFile("", "example") + if err != nil { + t.Fatal(err) + } + + defer os.Remove(tmpfile.Name()) + + if _, err = tmpfile.Write([]byte(config)); err != nil { + t.Fatal(err) + } + + if err = tmpfile.Close(); err != nil { + t.Fatal(err) + } + + // Start the FargateProvider + provider, err := vkAWS.NewFargateProvider(tmpfile.Name(), nil, "vk-aws-test", "Linux", "1.2.3.4", 10250) + if err != nil { + t.Fatal(err) + } + + // Its a new cluster, there shouldn't be anything running + pods, err := provider.GetPods() + if err != nil { + t.Error(err) + } + if len(pods) != 0 { + t.Errorf("Expect zero pods, but received %d pods\n%v", len(pods), pods) + } + + // Create a test pod + podName := fmt.Sprintf("test_%d", time.Now().UnixNano()/1000) + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: "default", + UID: types.UID("unique"), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{v1.Container{ + Name: "echo-container", + Image: "busybox", + Command: []string{ + "/bin/sh", + }, + Args: []string{ + "-c", "echo \"Started\"; while true; do sleep 1; done", + }, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("200m"), + v1.ResourceMemory: resource.MustParse("450Mi"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("100m"), + v1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }}, + }, + } + + err = provider.CreatePod(pod) + if err != nil { + t.Fatal(err) + } + + // Now there should be a pod running + + pods, err = provider.GetPods() + if err != nil { + t.Error(err) + } + if len(pods) != 1 { + t.Errorf("Expect one pods, but received %d pods\n%v", len(pods), pods) + } + + // Wait until its scheduled and running + err = waitUntilPodStatus(provider, podName, v1.PodRunning) + if err != nil { + t.Error(err) + } + + // Some addition time for the logs to settle + time.Sleep(10 * time.Second) + + logs, err := provider.GetContainerLogs("default", podName, "echo-container", 100) + if err != nil { + t.Error(err) + } + + if logs != "Started\n" { + t.Errorf("Expected logs to be \"Started\\n\", but received \"%v\"", logs) + } + + // Remove the pod + err = provider.DeletePod(pod) + if err != nil { + t.Fatal(err) + } + + err = waitUntilPodStatus(provider, podName, v1.PodSucceeded) + if err != nil { + t.Error(err) + } + + // The cluster should be empty again + pods, err = provider.GetPods() + if err != nil { + t.Error(err) + } + if len(pods) != 0 { + t.Errorf("Expect zero pods, but received %d pods\n%v", len(pods), pods) + } + }) + + // Remove the cluster + _, err = ecsClient.DeleteCluster(&ecs.DeleteClusterInput{ + Cluster: clusterID, + }) + if err != nil { + t.Error(err) + } + + // Remove the execution role + iamClient.DetachRolePolicy(&iam.DetachRolePolicyInput{ + PolicyArn: aws.String("arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"), + RoleName: executorRoleID, + }) + if err != nil { + t.Error(err) + } + + _, err = iamClient.DeleteRole(&iam.DeleteRoleInput{ + RoleName: executorRoleID, + }) + if err != nil { + t.Error(err) + } + + // Remove the log group + _, err = cloudwatchClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ + LogGroupName: logGroupdID, + }) + if err != nil { + t.Error(err) + } + + // Remove the VPC + err = deleteVpc(ec2Client) + if err != nil { + t.Error(err) + } +} + +// waitUntilPodStatus polls pod status until the desired state was reached +func waitUntilPodStatus(provider *vkAWS.FargateProvider, podName string, desiredStatus v1.PodPhase) error { + ctx := context.Background() + context.WithTimeout(ctx, time.Duration(time.Second*60)) + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + status, err := provider.GetPodStatus("default", podName) + if err != nil { + if strings.Contains(err.Error(), "is not found") { + return nil + } + + return err + } + if status.Phase == desiredStatus { + return nil + } + + time.Sleep(3 * time.Second) + } + } +}