diff --git a/providers/aws/fargate/cluster.go b/providers/aws/fargate/cluster.go index a8cffe16b..51402a817 100644 --- a/providers/aws/fargate/cluster.go +++ b/providers/aws/fargate/cluster.go @@ -5,9 +5,11 @@ import ( "log" "strings" "sync" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ecs" + k8sTypes "k8s.io/apimachinery/pkg/types" ) // ClusterConfig contains a Fargate cluster's configurable parameters. @@ -31,6 +33,7 @@ type Cluster struct { securityGroups []string assignPublicIPv4Address bool platformVersion string + pods map[string]*Pod sync.RWMutex } @@ -63,6 +66,7 @@ func NewCluster(config *ClusterConfig) (*Cluster, error) { securityGroups: config.SecurityGroups, assignPublicIPv4Address: config.AssignPublicIPv4Address, platformVersion: config.PlatformVersion, + pods: make(map[string]*Pod), } // Check if the cluster already exists. @@ -80,6 +84,12 @@ func NewCluster(config *ClusterConfig) (*Cluster, error) { } } + // Load existing pod state from Fargate to the local cache. + err = cluster.loadPodState() + if err != nil { + return nil, err + } + return cluster, nil } @@ -132,3 +142,150 @@ func (c *Cluster) describe() error { return nil } + +// LoadPodState rebuilds pod and container objects in this cluster by loading existing tasks from +// Fargate. This is done during startup and whenever the local state is suspected to be out of sync +// with the actual state in Fargate. Caching state locally minimizes the number of service calls. +func (c *Cluster) loadPodState() error { + api := client.api + + log.Printf("Loading pod state from cluster %s.", c.name) + + taskArns := make([]*string, 0) + + // Get a list of all Fargate tasks running on this cluster. + err := api.ListTasksPages( + &ecs.ListTasksInput{ + Cluster: aws.String(c.name), + DesiredStatus: aws.String(ecs.DesiredStatusRunning), + LaunchType: aws.String(ecs.LaunchTypeFargate), + }, + func(page *ecs.ListTasksOutput, lastPage bool) bool { + taskArns = append(taskArns, page.TaskArns...) + return !lastPage + }, + ) + + if err != nil { + err := fmt.Errorf("failed to load pod state: %v", err) + log.Println(err) + return err + } + + log.Printf("Found %d tasks on cluster %s.", len(taskArns), c.name) + + pods := make(map[string]*Pod) + + // For each task running on this Fargate cluster... + for _, taskArn := range taskArns { + // Describe the task. + describeTasksOutput, err := api.DescribeTasks( + &ecs.DescribeTasksInput{ + Cluster: aws.String(c.name), + Tasks: []*string{taskArn}, + }, + ) + + if err != nil || len(describeTasksOutput.Tasks) != 1 { + log.Printf("Failed to describe task %s. Skipping.", *taskArn) + continue + } + + task := describeTasksOutput.Tasks[0] + + // Describe the task definition. + describeTaskDefinitionOutput, err := api.DescribeTaskDefinition( + &ecs.DescribeTaskDefinitionInput{ + TaskDefinition: task.TaskDefinitionArn, + }, + ) + + if err != nil { + log.Printf("Failed to describe task definition %s. Skipping.", *task.TaskDefinitionArn) + continue + } + + taskDef := describeTaskDefinitionOutput.TaskDefinition + + // A pod's tag is stored in its task definition's Family field. + tag := *taskDef.Family + + // Rebuild the pod object. + // Not all tasks are necessarily pods. Skip tasks that do not have a valid tag. + pod, err := NewPodFromTag(c, tag) + if err != nil { + log.Printf("Skipping unknown task %s: %v", *taskArn, err) + continue + } + + pod.uid = k8sTypes.UID(*task.StartedBy) + pod.taskDefArn = *task.TaskDefinitionArn + pod.taskArn = *task.TaskArn + pod.taskStatus = *task.LastStatus + pod.taskRefreshTime = time.Now() + + // Rebuild the container objects. + for _, cntrDef := range taskDef.ContainerDefinitions { + cntr, _ := newContainerFromDefinition(cntrDef, task.CreatedAt) + + pod.taskCPU += *cntr.definition.Cpu + pod.taskMemory += *cntr.definition.Memory + pod.containers[*cntrDef.Name] = cntr + + log.Printf("Found pod %s/%s on cluster %s.", pod.namespace, pod.name, c.name) + } + + pods[tag] = pod + } + + // Update local state. + c.Lock() + c.pods = pods + c.Unlock() + + return nil +} + +// GetPod returns a Kubernetes pod deployed on this cluster. +func (c *Cluster) GetPod(namespace string, name string) (*Pod, error) { + c.RLock() + defer c.RUnlock() + + tag := buildTaskDefinitionTag(c.name, namespace, name) + pod, ok := c.pods[tag] + if !ok { + return nil, fmt.Errorf("pod %s/%s is not found", namespace, name) + } + + return pod, nil +} + +// GetPods returns all Kubernetes pods deployed on this cluster. +func (c *Cluster) GetPods() ([]*Pod, error) { + c.RLock() + defer c.RUnlock() + + pods := make([]*Pod, 0, len(c.pods)) + + for _, pod := range c.pods { + pods = append(pods, pod) + } + + return pods, nil +} + +// InsertPod inserts a Kubernetes pod to this cluster. +func (c *Cluster) InsertPod(pod *Pod, tag string) { + c.Lock() + defer c.Unlock() + + c.pods[tag] = pod +} + +// RemovePod removes a Kubernetes pod from this cluster. +func (c *Cluster) RemovePod(tag string) { + c.Lock() + defer c.Unlock() + + delete(c.pods, tag) +} diff --git a/providers/aws/fargate/container.go b/providers/aws/fargate/container.go new file mode 100644 index 000000000..5722c5a90 --- /dev/null +++ b/providers/aws/fargate/container.go @@ -0,0 +1,211 @@ +package fargate + +import ( + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ecs" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + // Container status strings. + containerStatusProvisioning = "PROVISIONING" + containerStatusPending = "PENDING" + containerStatusRunning = "RUNNING" + containerStatusStopped = "STOPPED" + + // Default container resource limits. + containerDefaultCPULimit = VCPU / 4 + containerDefaultMemoryLimit = 512 * MiB +) + +// Container is the representation of a Kubernetes container in Fargate. +type container struct { + definition ecs.ContainerDefinition + startTime time.Time + finishTime time.Time +} + +// NewContainer creates a new container from a Kubernetes container spec. +func newContainer(spec *corev1.Container) (*container, error) { + var cntr container + + // Translate the Kubernetes container spec to a Fargate container definition. + cntr.definition = ecs.ContainerDefinition{ + Name: aws.String(spec.Name), + Image: aws.String(spec.Image), + EntryPoint: aws.StringSlice(spec.Command), + Command: aws.StringSlice(spec.Args), + } + + if spec.WorkingDir != "" { + cntr.definition.WorkingDirectory = aws.String(spec.WorkingDir) + } + + // Translate the Kubernetes container resource requirements to Fargate units. + cntr.setResourceRequirements(&spec.Resources) + + return &cntr, nil +} + +// NewContainerFromDefinition creates a new container from a Fargate container definition. +func newContainerFromDefinition(def *ecs.ContainerDefinition, startTime *time.Time) (*container, error) { + var cntr container + + cntr.definition = *def + + if startTime != nil { + cntr.startTime = *startTime + } + + return &cntr, nil +} + +// GetStatus returns the status of a container running in Fargate. +func (cntr *container) getStatus(runtimeState *ecs.Container) corev1.ContainerStatus { + var reason string + var state corev1.ContainerState + var isReady bool + + if runtimeState.Reason != nil { + reason = *runtimeState.Reason + } + + switch *runtimeState.LastStatus { + case containerStatusProvisioning, + containerStatusPending: + state = corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{ + Reason: reason, + Message: "", + }, + } + + case containerStatusRunning: + if cntr.startTime.IsZero() { + cntr.startTime = time.Now() + } + + isReady = true + + state = corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{ + StartedAt: metav1.NewTime(cntr.startTime), + }, + } + + case containerStatusStopped: + if cntr.finishTime.IsZero() { + cntr.finishTime = time.Now() + } + + var exitCode int32 + if runtimeState.ExitCode != nil { + exitCode = int32(*runtimeState.ExitCode) + } + + state = corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: exitCode, + Signal: 0, + Reason: reason, + Message: "", + StartedAt: metav1.NewTime(cntr.startTime), + FinishedAt: metav1.NewTime(cntr.finishTime), + ContainerID: "", + }, + } + } + + return corev1.ContainerStatus{ + Name: *runtimeState.Name, + State: state, + Ready: isReady, + RestartCount: 0, + Image: *cntr.definition.Image, + ImageID: "", + ContainerID: "", + } +} + +// SetResourceRequirements translates Kubernetes container resource requirements to Fargate units. +func (cntr *container) setResourceRequirements(reqs *corev1.ResourceRequirements) { + // + // Kubernetes container resource requirements consist of "limits" and "requests" for each + // resource type. Limits are the maximum amount of resources allowed. Requests are the minimum + // amount of resources reserved for the container. Both are optional. If requests are omitted, + // they default to limits. If limits are also omitted, they both default to an + // implementation-defined value. + // + // Fargate container resource requirements consist of CPU shares and memory limits. Memory is a + // hard limit, which when exceeded, causes the container to be killed. MemoryReservation is a + // the amount of resources reserved for the container. At least one must be specified. + // + var quantity resource.Quantity + var reqQuantity resource.Quantity + var limQuantity resource.Quantity + var ok bool + var reqOk bool + var limOk bool + + // Use the defaults if the container does not have any resource requirements. + cpu := containerDefaultCPULimit + memory := containerDefaultMemoryLimit + memoryReservation := containerDefaultMemoryLimit + + // Compute CPU requirements. + if reqs != nil { + // Fargate tasks do not share resources with other tasks. Therefore the task and each + // container in it must be allocated their resource limits. Hence limits are preferred + // over requests. + if reqs.Limits != nil { + quantity, ok = reqs.Limits[corev1.ResourceCPU] + } + if !ok && reqs.Requests != nil { + quantity, ok = reqs.Requests[corev1.ResourceCPU] + } + if ok { + // Because Fargate task CPU limit is the sum of the task's containers' CPU shares, + // the container's CPU share equals its CPU limit. + // + // Convert CPU unit from Kubernetes milli-CPUs to EC2 vCPUs. + cpu = quantity.ScaledValue(resource.Milli) * VCPU / 1000 + } + } + + // Compute memory requirements. + if reqs != nil { + // Find the memory request and limit, if available. + if reqs.Requests != nil { + reqQuantity, reqOk = reqs.Requests[corev1.ResourceMemory] + } + if reqs.Limits != nil { + limQuantity, limOk = reqs.Limits[corev1.ResourceMemory] + } + + // If one is omitted, use the other one's value. + if !limOk && reqOk { + limQuantity = reqQuantity + } else if !reqOk && limOk { + reqQuantity = limQuantity + } + + // If at least one is specified... + if reqOk || limOk { + // Convert memory unit from bytes to MiBs, rounding up to the next MiB. + // This is necessary because Fargate container definition memory reservations and + // limits are both in MiBs. + memoryReservation = (reqQuantity.Value() + MiB - 1) / MiB + memory = (limQuantity.Value() + MiB - 1) / MiB + } + } + + // Set final values. + cntr.definition.Cpu = aws.Int64(cpu) + cntr.definition.Memory = aws.Int64(memory) + cntr.definition.MemoryReservation = aws.Int64(memoryReservation) +} diff --git a/providers/aws/fargate/fargate.go b/providers/aws/fargate/fargate.go new file mode 100644 index 000000000..49290cff3 --- /dev/null +++ b/providers/aws/fargate/fargate.go @@ -0,0 +1,47 @@ +package fargate + +const ( + // EC2 compute resource units. + + // VCPU is one virtual CPU core in EC2. + VCPU int64 = 1024 + // MiB is 2^20 bytes. + MiB int64 = 1024 * 1024 + // GiB is 2^30 bytes. + GiB int64 = 1024 * MiB +) + +// TaskSize represents a Fargate task size. +type taskSize struct { + cpu int64 + memory memorySizeRange +} + +// MemorySizeRange represents a range of Fargate task memory sizes. +type memorySizeRange struct { + min int64 + max int64 + inc int64 +} + +var ( + // Fargate task size table. + // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definition_parameters.html#task_size + // + // VCPU Memory (in MiBs, available in 1GiB increments) + // ==== =================== + // 256 512, 1024 ... 2048 + // 512 1024 ... 4096 + // 1024 2048 ... 8192 + // 2048 4096 ... 16384 + // 4096 8192 ... 30720 + // + taskSizeTable = []taskSize{ + {VCPU / 4, memorySizeRange{512 * MiB, 512 * MiB, 1}}, + {VCPU / 4, memorySizeRange{1 * GiB, 2 * GiB, 1 * GiB}}, + {VCPU / 2, memorySizeRange{1 * GiB, 4 * GiB, 1 * GiB}}, + {1 * VCPU, memorySizeRange{2 * GiB, 8 * GiB, 1 * GiB}}, + {2 * VCPU, memorySizeRange{4 * GiB, 16 * GiB, 1 * GiB}}, + {4 * VCPU, memorySizeRange{8 * GiB, 30 * GiB, 1 * GiB}}, + } +) diff --git a/providers/aws/fargate/pod.go b/providers/aws/fargate/pod.go new file mode 100644 index 000000000..f106f59df --- /dev/null +++ b/providers/aws/fargate/pod.go @@ -0,0 +1,486 @@ +package fargate + +import ( + "fmt" + "log" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ecs" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sTypes "k8s.io/apimachinery/pkg/types" +) + +const ( + // Prefixes for objects created in Fargate. + taskDefFamilyPrefix = "vk-podspec" + taskTagPrefix = "vk-pod" + + // Task status strings. + taskStatusProvisioning = "PROVISIONING" + taskStatusPending = "PENDING" + taskStatusRunning = "RUNNING" + taskStatusStopped = "STOPPED" + + // Task attachment types. + taskAttachmentENI = "ElasticNetworkInterface" + taskAttachmentENIPrivateIPv4Address = "privateIPv4Address" + + // Reason used for task state changes. + taskGenericReason = "Initiated by user" +) + +// Pod is the representation of a Kubernetes pod in Fargate. +type Pod struct { + // Kubernetes pod properties. + namespace string + name string + uid k8sTypes.UID + + // Fargate task properties. + cluster *Cluster + taskDefArn string + taskArn string + taskStatus string + taskRefreshTime time.Time + taskCPU int64 + taskMemory int64 + containers map[string]*container +} + +// NewPod creates a new Kubernetes pod on Fargate. +func NewPod(cluster *Cluster, pod *corev1.Pod) (*Pod, error) { + api := client.api + + // Initialize the pod. + fgPod := &Pod{ + namespace: pod.Namespace, + name: pod.Name, + uid: pod.UID, + cluster: cluster, + containers: make(map[string]*container), + } + + tag := fgPod.buildTaskDefinitionTag() + + // Create a task definition matching the pod spec. + taskDef := &ecs.RegisterTaskDefinitionInput{ + Family: aws.String(tag), + RequiresCompatibilities: []*string{aws.String(ecs.CompatibilityFargate)}, + NetworkMode: aws.String(ecs.NetworkModeAwsvpc), + ContainerDefinitions: []*ecs.ContainerDefinition{}, + } + + // For each container in the pod... + for _, containerSpec := range pod.Spec.Containers { + // Create a container definition. + cntr, err := newContainer(&containerSpec) + if err != nil { + return nil, err + } + + // Add the container's resource requirements to its pod's total resource requirements. + fgPod.taskCPU += *cntr.definition.Cpu + fgPod.taskMemory += *cntr.definition.Memory + + // Insert the container to its pod. + fgPod.containers[containerSpec.Name] = cntr + + // Insert container definition to the task definition. + taskDef.ContainerDefinitions = append(taskDef.ContainerDefinitions, &cntr.definition) + } + + // Set task resource limits. + err := fgPod.mapTaskSize() + if err != nil { + return nil, err + } + + taskDef.Cpu = aws.String(strconv.Itoa(int(fgPod.taskCPU))) + taskDef.Memory = aws.String(strconv.Itoa(int(fgPod.taskMemory))) + + // Register the task definition with Fargate. + log.Printf("RegisterTaskDefinition input:%+v", taskDef) + output, err := api.RegisterTaskDefinition(taskDef) + log.Printf("RegisterTaskDefinition err:%+v output:%+v", err, output) + if err != nil { + err = fmt.Errorf("failed to register task definition: %v", err) + return nil, err + } + + // Save the registered task definition ARN. + fgPod.taskDefArn = *output.TaskDefinition.TaskDefinitionArn + + if cluster != nil { + cluster.InsertPod(fgPod, tag) + } + + return fgPod, nil +} + +// NewPodFromTag creates a new pod identified by a tag. +func NewPodFromTag(cluster *Cluster, tag string) (*Pod, error) { + data := strings.Split(tag, "_") + + if len(data) < 4 || + data[0] != taskDefFamilyPrefix || + data[1] != cluster.name { + return nil, fmt.Errorf("invalid tag") + } + + pod := &Pod{ + namespace: data[2], + name: data[3], + cluster: cluster, + containers: make(map[string]*container), + } + + return pod, nil +} + +// Start deploys and runs a Kubernetes pod on Fargate. +func (pod *Pod) Start() error { + api := client.api + + // Pods always get an ENI with a private IPv4 address in customer subnet. + // Assign a public IPv4 address to the ENI only if requested. + assignPublicIPAddress := ecs.AssignPublicIpDisabled + if pod.cluster.assignPublicIPv4Address { + assignPublicIPAddress = ecs.AssignPublicIpEnabled + } + + // Start the task. + runTaskInput := &ecs.RunTaskInput{ + Cluster: aws.String(pod.cluster.name), + Count: aws.Int64(1), + LaunchType: aws.String(ecs.LaunchTypeFargate), + NetworkConfiguration: &ecs.NetworkConfiguration{ + AwsvpcConfiguration: &ecs.AwsVpcConfiguration{ + AssignPublicIp: aws.String(assignPublicIPAddress), + SecurityGroups: aws.StringSlice(pod.cluster.securityGroups), + Subnets: aws.StringSlice(pod.cluster.subnets), + }, + }, + PlatformVersion: aws.String(pod.cluster.platformVersion), + StartedBy: aws.String(pod.buildTaskTag()), + TaskDefinition: aws.String(pod.taskDefArn), + } + + log.Printf("RunTask input:%+v", runTaskInput) + runTaskOutput, err := api.RunTask(runTaskInput) + log.Printf("RunTask err:%+v output:%+v", err, runTaskOutput) + if err != nil || len(runTaskOutput.Tasks) == 0 { + err = fmt.Errorf("failed to run task: %v", err) + return err + } + + // Save the task ARN. + pod.taskArn = *runTaskOutput.Tasks[0].TaskArn + + return nil +} + +// Stop stops a running Kubernetes pod on Fargate. +func (pod *Pod) Stop() error { + api := client.api + + // Stop the task. + stopTaskInput := &ecs.StopTaskInput{ + Cluster: aws.String(pod.cluster.name), + Reason: aws.String(taskGenericReason), + Task: aws.String(pod.taskArn), + } + + log.Printf("StopTask input:%+v", stopTaskInput) + stopTaskOutput, err := api.StopTask(stopTaskInput) + log.Printf("StopTask err:%+v output:%+v", err, stopTaskOutput) + if err != nil { + err = fmt.Errorf("failed to stop task: %v", err) + return err + } + + // Deregister the task definition. + _, err = api.DeregisterTaskDefinition(&ecs.DeregisterTaskDefinitionInput{ + TaskDefinition: aws.String(pod.taskDefArn), + }) + if err != nil { + log.Printf("Failed to deregister task definition: %v", err) + } + + // Remove the pod from its cluster. + if pod.cluster != nil { + pod.cluster.RemovePod(pod.buildTaskDefinitionTag()) + } + + return nil +} + +// GetSpec returns the specification of a Kubernetes pod on Fargate. +func (pod *Pod) GetSpec() (*corev1.Pod, error) { + task, err := pod.describe() + if err != nil { + return nil, err + } + + return pod.getSpec(task) +} + +// GetStatus returns the status of a Kubernetes pod on Fargate. +func (pod *Pod) GetStatus() corev1.PodStatus { + task, err := pod.describe() + if err != nil { + return corev1.PodStatus{Phase: corev1.PodUnknown} + } + + return pod.getStatus(task) +} + +// BuildTaskDefinitionTag returns the task definition tag for this pod. +func (pod *Pod) buildTaskDefinitionTag() string { + return buildTaskDefinitionTag(pod.cluster.name, pod.namespace, pod.name) +} + +// buildTaskDefinitionTag builds a task definition tag from its components. +func buildTaskDefinitionTag(clusterName string, namespace string, name string) string { + // vk-podspec_cluster_namespacae_podname + return fmt.Sprintf("%s_%s_%s_%s", taskDefFamilyPrefix, clusterName, namespace, name) +} + +// BuildTaskTag returns the pod's task tag, used for mapping a task back to its pod. +func (pod *Pod) buildTaskTag() string { + return fmt.Sprintf("%s", pod.uid) +} + +// mapTaskSize maps Kubernetes pod resource requirements to a Fargate task size. +func (pod *Pod) mapTaskSize() error { + // + // Kubernetes pods do not have explicit resource requirements; their containers do. Pod resource + // requirements are the sum of the pod's containers' requirements. + // + // Fargate tasks have explicit CPU and memory limits. Both are required and specify the maximum + // amount of resources for the task. The limits must match a task size on taskSizeTable. + // + var cpu int64 + var memory int64 + + // Find the smallest Fargate task size that can satisfy the total resource request. + for _, row := range taskSizeTable { + if pod.taskCPU <= row.cpu { + for mem := row.memory.min; mem <= row.memory.max; mem += row.memory.inc { + if pod.taskMemory <= mem/MiB { + cpu = row.cpu + memory = mem / MiB + break + } + } + + if cpu != 0 { + break + } + } + } + + log.Printf("Mapped resource requirements (cpu:%v, memory:%v) to task size (cpu:%v, memory:%v)", + pod.taskCPU, pod.taskMemory, cpu, memory) + + // Fail if the resource requirements cannot be satisfied by any Fargate task size. + if cpu == 0 { + return fmt.Errorf("resource requirements (cpu:%v, memory:%v) are too high", + pod.taskCPU, pod.taskMemory) + } + + // Fargate task CPU size is specified in vCPU/1024s and memory size is specified in MiBs. + pod.taskCPU = cpu + pod.taskMemory = memory + + return nil +} + +// Describe retrieves the status of a Kubernetes pod from Fargate. +func (pod *Pod) describe() (*ecs.Task, error) { + api := client.api + + // Describe the task. + describeTasksInput := &ecs.DescribeTasksInput{ + Cluster: aws.String(pod.cluster.name), + Tasks: []*string{aws.String(pod.taskArn)}, + } + + describeTasksOutput, err := api.DescribeTasks(describeTasksInput) + if err != nil { + return nil, err + } + + task := describeTasksOutput.Tasks[0] + + pod.taskStatus = *task.LastStatus + pod.taskRefreshTime = time.Now() + + return task, nil +} + +// GetSpec returns the specification of a Kubernetes pod on Fargate. +func (pod *Pod) getSpec(task *ecs.Task) (*corev1.Pod, error) { + containers := make([]corev1.Container, 0, len(task.Containers)) + + for _, c := range task.Containers { + cntrDef := pod.containers[*c.Name].definition + + cntr := corev1.Container{ + Name: *c.Name, + Image: *cntrDef.Image, + Command: aws.StringValueSlice(cntrDef.EntryPoint), + Args: aws.StringValueSlice(cntrDef.Command), + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", *cntrDef.Cpu)), + corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dMi", *cntrDef.Memory)), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", *cntrDef.Cpu)), + corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dMi", *cntrDef.MemoryReservation)), + }, + }, + Ports: make([]corev1.ContainerPort, 0, len(cntrDef.PortMappings)), + Env: make([]corev1.EnvVar, 0, len(cntrDef.Environment)), + } + + if cntrDef.WorkingDirectory != nil { + cntr.WorkingDir = *cntrDef.WorkingDirectory + } + + for _, mapping := range cntrDef.PortMappings { + cntr.Ports = append(cntr.Ports, corev1.ContainerPort{ + ContainerPort: int32(*mapping.ContainerPort), + HostPort: int32(*mapping.HostPort), + Protocol: corev1.ProtocolTCP, + }) + } + + for _, env := range cntrDef.Environment { + cntr.Env = append(cntr.Env, corev1.EnvVar{ + Name: *env.Name, + Value: *env.Value, + }) + } + + containers = append(containers, cntr) + } + + podSpec := corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: pod.namespace, + Name: pod.name, + UID: pod.uid, + }, + Spec: corev1.PodSpec{ + NodeName: pod.cluster.nodeName, + Volumes: []corev1.Volume{}, + Containers: containers, + }, + Status: pod.getStatus(task), + } + + return &podSpec, nil +} + +// GetStatus returns the status of a Kubernetes pod on Fargate. +func (pod *Pod) getStatus(task *ecs.Task) corev1.PodStatus { + // Translate task status to pod phase. + phase := corev1.PodUnknown + + switch pod.taskStatus { + case taskStatusProvisioning: + phase = corev1.PodPending + case taskStatusPending: + phase = corev1.PodPending + case taskStatusRunning: + phase = corev1.PodRunning + case taskStatusStopped: + phase = corev1.PodSucceeded + } + + // Set pod conditions based on task's last known status. + isScheduled := corev1.ConditionFalse + isInitialized := corev1.ConditionFalse + isReady := corev1.ConditionFalse + + switch pod.taskStatus { + case taskStatusProvisioning: + isScheduled = corev1.ConditionTrue + case taskStatusPending: + isScheduled = corev1.ConditionTrue + case taskStatusRunning: + isScheduled = corev1.ConditionTrue + isInitialized = corev1.ConditionTrue + isReady = corev1.ConditionTrue + case taskStatusStopped: + isScheduled = corev1.ConditionTrue + isInitialized = corev1.ConditionTrue + isReady = corev1.ConditionTrue + } + + conditions := []corev1.PodCondition{ + corev1.PodCondition{ + Type: corev1.PodScheduled, + Status: isScheduled, + }, + corev1.PodCondition{ + Type: corev1.PodInitialized, + Status: isInitialized, + }, + corev1.PodCondition{ + Type: corev1.PodReady, + Status: isReady, + }, + } + + // Set the pod start time as the task creation time. + var startTime metav1.Time + if task.CreatedAt != nil { + startTime = metav1.NewTime(*task.CreatedAt) + } + + // Set the pod IP address from the task ENI information. + privateIPv4Address := "" + for _, attachment := range task.Attachments { + if *attachment.Type == taskAttachmentENI { + for _, detail := range attachment.Details { + if *detail.Name == taskAttachmentENIPrivateIPv4Address { + privateIPv4Address = *detail.Value + } + } + } + } + + // Get statuses from all containers in this pod. + containerStatuses := make([]corev1.ContainerStatus, 0, len(task.Containers)) + for _, cntr := range task.Containers { + containerStatuses = append(containerStatuses, pod.containers[*cntr.Name].getStatus(cntr)) + } + + // Build the pod status structure to be reported. + status := corev1.PodStatus{ + Phase: phase, + Conditions: conditions, + Message: "", + Reason: "", + HostIP: privateIPv4Address, + PodIP: privateIPv4Address, + StartTime: &startTime, + InitContainerStatuses: nil, + ContainerStatuses: containerStatuses, + QOSClass: corev1.PodQOSBestEffort, + } + + return status +} diff --git a/providers/aws/provider.go b/providers/aws/provider.go index a188b3c9a..cf0dbe2df 100644 --- a/providers/aws/provider.go +++ b/providers/aws/provider.go @@ -6,6 +6,7 @@ import ( "time" "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers/aws/fargate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -26,6 +27,7 @@ type FargateProvider struct { securityGroups []string // Fargate resources. + cluster *fargate.Cluster clusterName string capacity capacity assignPublicIPv4Address bool @@ -74,6 +76,23 @@ func NewFargateProvider( log.Printf("Loaded provider configuration file %s.", config) + // Find or create the configured Fargate cluster. + clusterConfig := fargate.ClusterConfig{ + Region: p.region, + Name: p.clusterName, + NodeName: nodeName, + Subnets: p.subnets, + SecurityGroups: p.securityGroups, + AssignPublicIPv4Address: p.assignPublicIPv4Address, + PlatformVersion: p.platformVersion, + } + + p.cluster, err = fargate.NewCluster(&clusterConfig) + if err != nil { + err = fmt.Errorf("failed to create Fargate cluster: %v", err) + return nil, err + } + p.lastTransitionTime = time.Now() log.Printf("Created Fargate provider: %+v.", p) @@ -84,7 +103,20 @@ func NewFargateProvider( // CreatePod takes a Kubernetes Pod and deploys it within the Fargate provider. func (p *FargateProvider) CreatePod(pod *corev1.Pod) error { log.Printf("Received CreatePod request for %+v.\n", pod) - return errNotImplemented + + fgPod, err := fargate.NewPod(p.cluster, pod) + if err != nil { + log.Printf("Failed to create pod: %v.\n", err) + return err + } + + err = fgPod.Start() + if err != nil { + log.Printf("Failed to start pod: %v.\n", err) + return err + } + + return nil } // UpdatePod takes a Kubernetes Pod and updates it within the provider. @@ -96,13 +128,41 @@ func (p *FargateProvider) UpdatePod(pod *corev1.Pod) error { // DeletePod takes a Kubernetes Pod and deletes it from the provider. func (p *FargateProvider) DeletePod(pod *corev1.Pod) error { log.Printf("Received DeletePod request for %s/%s.\n", pod.Namespace, pod.Name) - return errNotImplemented + + fgPod, err := p.cluster.GetPod(pod.Namespace, pod.Name) + if err != nil { + log.Printf("Failed to get pod: %v.\n", err) + return err + } + + err = fgPod.Stop() + if err != nil { + log.Printf("Failed to stop pod: %v.\n", err) + return err + } + + return nil } // GetPod retrieves a pod by name from the provider (can be cached). func (p *FargateProvider) GetPod(namespace, name string) (*corev1.Pod, error) { log.Printf("Received GetPod request for %s/%s.\n", namespace, name) - return nil, errNotImplemented + + pod, err := p.cluster.GetPod(namespace, name) + if err != nil { + log.Printf("Failed to get pod: %v.\n", err) + return nil, err + } + + spec, err := pod.GetSpec() + if err != nil { + log.Printf("Failed to get pod spec: %v.\n", err) + return nil, err + } + + log.Printf("Responding to GetPod: %+v.\n", spec) + + return spec, nil } // GetContainerLogs retrieves the logs of a container by name from the provider. @@ -114,13 +174,45 @@ func (p *FargateProvider) GetContainerLogs(namespace, podName, containerName str // GetPodStatus retrieves the status of a pod by name from the provider. func (p *FargateProvider) GetPodStatus(namespace, name string) (*corev1.PodStatus, error) { log.Printf("Received GetPodStatus request for %s/%s.\n", namespace, name) - return nil, errNotImplemented + + pod, err := p.cluster.GetPod(namespace, name) + if err != nil { + log.Printf("Failed to get pod: %v.\n", err) + return nil, err + } + + status := pod.GetStatus() + + log.Printf("Responding to GetPodStatus: %+v.\n", status) + + return &status, nil } // GetPods retrieves a list of all pods running on the provider (can be cached). func (p *FargateProvider) GetPods() ([]*corev1.Pod, error) { log.Println("Received GetPods request.") - return nil, errNotImplemented + + pods, err := p.cluster.GetPods() + if err != nil { + log.Printf("Failed to get pods: %v.\n", err) + return nil, err + } + + var result []*corev1.Pod + + for _, pod := range pods { + spec, err := pod.GetSpec() + if err != nil { + log.Printf("Failed to get pod spec: %v.\n", err) + continue + } + + result = append(result, spec) + } + + log.Printf("Responding to GetPods: %+v.\n", result) + + return result, nil } // Capacity returns a resource list with the capacity constraints of the provider.