Implement Fargate pod and container objects

This commit is contained in:
Onur Filiz
2018-04-04 14:02:58 -07:00
committed by Robbie Zhang
parent 64864ffdab
commit fcbff0320c
5 changed files with 998 additions and 5 deletions

View File

@@ -5,9 +5,11 @@ import (
"log"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ecs"
k8sTypes "k8s.io/apimachinery/pkg/types"
)
// ClusterConfig contains a Fargate cluster's configurable parameters.
@@ -31,6 +33,7 @@ type Cluster struct {
securityGroups []string
assignPublicIPv4Address bool
platformVersion string
pods map[string]*Pod
sync.RWMutex
}
@@ -63,6 +66,7 @@ func NewCluster(config *ClusterConfig) (*Cluster, error) {
securityGroups: config.SecurityGroups,
assignPublicIPv4Address: config.AssignPublicIPv4Address,
platformVersion: config.PlatformVersion,
pods: make(map[string]*Pod),
}
// Check if the cluster already exists.
@@ -80,6 +84,12 @@ func NewCluster(config *ClusterConfig) (*Cluster, error) {
}
}
// Load existing pod state from Fargate to the local cache.
err = cluster.loadPodState()
if err != nil {
return nil, err
}
return cluster, nil
}
@@ -132,3 +142,150 @@ func (c *Cluster) describe() error {
return nil
}
// LoadPodState rebuilds pod and container objects in this cluster by loading existing tasks from
// Fargate. This is done during startup and whenever the local state is suspected to be out of sync
// with the actual state in Fargate. Caching state locally minimizes the number of service calls.
func (c *Cluster) loadPodState() error {
api := client.api
log.Printf("Loading pod state from cluster %s.", c.name)
taskArns := make([]*string, 0)
// Get a list of all Fargate tasks running on this cluster.
err := api.ListTasksPages(
&ecs.ListTasksInput{
Cluster: aws.String(c.name),
DesiredStatus: aws.String(ecs.DesiredStatusRunning),
LaunchType: aws.String(ecs.LaunchTypeFargate),
},
func(page *ecs.ListTasksOutput, lastPage bool) bool {
taskArns = append(taskArns, page.TaskArns...)
return !lastPage
},
)
if err != nil {
err := fmt.Errorf("failed to load pod state: %v", err)
log.Println(err)
return err
}
log.Printf("Found %d tasks on cluster %s.", len(taskArns), c.name)
pods := make(map[string]*Pod)
// For each task running on this Fargate cluster...
for _, taskArn := range taskArns {
// Describe the task.
describeTasksOutput, err := api.DescribeTasks(
&ecs.DescribeTasksInput{
Cluster: aws.String(c.name),
Tasks: []*string{taskArn},
},
)
if err != nil || len(describeTasksOutput.Tasks) != 1 {
log.Printf("Failed to describe task %s. Skipping.", *taskArn)
continue
}
task := describeTasksOutput.Tasks[0]
// Describe the task definition.
describeTaskDefinitionOutput, err := api.DescribeTaskDefinition(
&ecs.DescribeTaskDefinitionInput{
TaskDefinition: task.TaskDefinitionArn,
},
)
if err != nil {
log.Printf("Failed to describe task definition %s. Skipping.", *task.TaskDefinitionArn)
continue
}
taskDef := describeTaskDefinitionOutput.TaskDefinition
// A pod's tag is stored in its task definition's Family field.
tag := *taskDef.Family
// Rebuild the pod object.
// Not all tasks are necessarily pods. Skip tasks that do not have a valid tag.
pod, err := NewPodFromTag(c, tag)
if err != nil {
log.Printf("Skipping unknown task %s: %v", *taskArn, err)
continue
}
pod.uid = k8sTypes.UID(*task.StartedBy)
pod.taskDefArn = *task.TaskDefinitionArn
pod.taskArn = *task.TaskArn
pod.taskStatus = *task.LastStatus
pod.taskRefreshTime = time.Now()
// Rebuild the container objects.
for _, cntrDef := range taskDef.ContainerDefinitions {
cntr, _ := newContainerFromDefinition(cntrDef, task.CreatedAt)
pod.taskCPU += *cntr.definition.Cpu
pod.taskMemory += *cntr.definition.Memory
pod.containers[*cntrDef.Name] = cntr
log.Printf("Found pod %s/%s on cluster %s.", pod.namespace, pod.name, c.name)
}
pods[tag] = pod
}
// Update local state.
c.Lock()
c.pods = pods
c.Unlock()
return nil
}
// GetPod returns a Kubernetes pod deployed on this cluster.
func (c *Cluster) GetPod(namespace string, name string) (*Pod, error) {
c.RLock()
defer c.RUnlock()
tag := buildTaskDefinitionTag(c.name, namespace, name)
pod, ok := c.pods[tag]
if !ok {
return nil, fmt.Errorf("pod %s/%s is not found", namespace, name)
}
return pod, nil
}
// GetPods returns all Kubernetes pods deployed on this cluster.
func (c *Cluster) GetPods() ([]*Pod, error) {
c.RLock()
defer c.RUnlock()
pods := make([]*Pod, 0, len(c.pods))
for _, pod := range c.pods {
pods = append(pods, pod)
}
return pods, nil
}
// InsertPod inserts a Kubernetes pod to this cluster.
func (c *Cluster) InsertPod(pod *Pod, tag string) {
c.Lock()
defer c.Unlock()
c.pods[tag] = pod
}
// RemovePod removes a Kubernetes pod from this cluster.
func (c *Cluster) RemovePod(tag string) {
c.Lock()
defer c.Unlock()
delete(c.pods, tag)
}