* Started work on provider * WIP Adding batch provider * Working basic call into pool client. Need to parameterize the baseurl * Fixed job creation by manipulating the content-type * WIP Kicking off containers. Dirty * [wip] More meat around scheduling simple containers. * Working on basic task wrapper to co-schedule pods * WIP on task wrapper * WIP * Working pod minimal wrapper for batch * Integrate pod template code into provider * Cleaning up * Move to docker without gpu * WIP batch integration * partially working * Working logs * Tidy code * WIP: Testing and readme * Added readme and terraform deployment for GPU Azure Batch pool. * Update to enable low priority nodes for gpu * Fix log formatting bug. Return node logs when container not yet started * Moved to golang v1.10 * Fix cri test * Fix up minor docs Issue. Add provider to readme. Add var for vk image.
142 lines
3.4 KiB
Go
142 lines
3.4 KiB
Go
package azurebatch
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch"
|
|
apiv1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
)
|
|
|
|
func getPodFromTask(task *batch.CloudTask) (pod *apiv1.Pod, err error) {
|
|
if task == nil || task.EnvironmentSettings == nil {
|
|
return nil, fmt.Errorf("invalid input task: %v", task)
|
|
}
|
|
|
|
ok := false
|
|
jsonData := ""
|
|
settings := *task.EnvironmentSettings
|
|
for _, s := range settings {
|
|
if *s.Name == podJSONKey && s.Value != nil {
|
|
ok = true
|
|
jsonData = *s.Value
|
|
}
|
|
}
|
|
if !ok {
|
|
return nil, fmt.Errorf("task doesn't have pod json stored in it: %v", task.EnvironmentSettings)
|
|
}
|
|
|
|
pod = &apiv1.Pod{}
|
|
err = json.Unmarshal([]byte(jsonData), pod)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return
|
|
}
|
|
|
|
func convertTaskToPodStatus(task *batch.CloudTask) (status *apiv1.PodStatus, err error) {
|
|
|
|
pod, err := getPodFromTask(task)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Todo: Review indivudal container status response
|
|
status = &apiv1.PodStatus{
|
|
Phase: convertTaskStatusToPodPhase(task),
|
|
Conditions: []apiv1.PodCondition{},
|
|
Message: "",
|
|
Reason: "",
|
|
HostIP: "",
|
|
PodIP: "127.0.0.1",
|
|
StartTime: &pod.CreationTimestamp,
|
|
}
|
|
|
|
for _, container := range pod.Spec.Containers {
|
|
containerStatus := apiv1.ContainerStatus{
|
|
Name: container.Name,
|
|
State: convertTaskStatusToContainerState(task),
|
|
Ready: true,
|
|
RestartCount: 0,
|
|
Image: container.Image,
|
|
ImageID: "",
|
|
ContainerID: "",
|
|
}
|
|
status.ContainerStatuses = append(status.ContainerStatuses, containerStatus)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func convertTaskStatusToPodPhase(t *batch.CloudTask) (podPhase apiv1.PodPhase) {
|
|
switch t.State {
|
|
case batch.TaskStatePreparing:
|
|
podPhase = apiv1.PodPending
|
|
case batch.TaskStateActive:
|
|
podPhase = apiv1.PodPending
|
|
case batch.TaskStateRunning:
|
|
podPhase = apiv1.PodRunning
|
|
case batch.TaskStateCompleted:
|
|
podPhase = apiv1.PodFailed
|
|
|
|
if t.ExecutionInfo != nil && t.ExecutionInfo.ExitCode != nil && *t.ExecutionInfo.ExitCode == 0 {
|
|
podPhase = apiv1.PodSucceeded
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func convertTaskStatusToContainerState(t *batch.CloudTask) (containerState apiv1.ContainerState) {
|
|
|
|
startTime := metav1.Time{}
|
|
if t.ExecutionInfo != nil {
|
|
if t.ExecutionInfo.StartTime != nil {
|
|
startTime.Time = t.ExecutionInfo.StartTime.Time
|
|
}
|
|
}
|
|
|
|
switch t.State {
|
|
case batch.TaskStatePreparing:
|
|
containerState = apiv1.ContainerState{
|
|
Waiting: &apiv1.ContainerStateWaiting{
|
|
Message: "Waiting for machine in AzureBatch",
|
|
Reason: "Preparing",
|
|
},
|
|
}
|
|
case batch.TaskStateActive:
|
|
containerState = apiv1.ContainerState{
|
|
Waiting: &apiv1.ContainerStateWaiting{
|
|
Message: "Waiting for machine in AzureBatch",
|
|
Reason: "Queued",
|
|
},
|
|
}
|
|
case batch.TaskStateRunning:
|
|
containerState = apiv1.ContainerState{
|
|
Running: &apiv1.ContainerStateRunning{
|
|
StartedAt: startTime,
|
|
},
|
|
}
|
|
case batch.TaskStateCompleted:
|
|
termStatus := apiv1.ContainerState{
|
|
Terminated: &apiv1.ContainerStateTerminated{
|
|
FinishedAt: metav1.Time{
|
|
Time: t.StateTransitionTime.Time,
|
|
},
|
|
StartedAt: startTime,
|
|
},
|
|
}
|
|
|
|
if t.ExecutionInfo != nil && t.ExecutionInfo.ExitCode != nil {
|
|
exitCode := *t.ExecutionInfo.ExitCode
|
|
termStatus.Terminated.ExitCode = exitCode
|
|
if exitCode != 0 {
|
|
termStatus.Terminated.Message = *t.ExecutionInfo.FailureInfo.Message
|
|
}
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|