* Started work on provider * WIP Adding batch provider * Working basic call into pool client. Need to parameterize the baseurl * Fixed job creation by manipulating the content-type * WIP Kicking off containers. Dirty * [wip] More meat around scheduling simple containers. * Working on basic task wrapper to co-schedule pods * WIP on task wrapper * WIP * Working pod minimal wrapper for batch * Integrate pod template code into provider * Cleaning up * Move to docker without gpu * WIP batch integration * partially working * Working logs * Tidy code * WIP: Testing and readme * Added readme and terraform deployment for GPU Azure Batch pool. * Update to enable low priority nodes for gpu * Fix log formatting bug. Return node logs when container not yet started * Moved to golang v1.10 * Fix cri test * Fix up minor docs Issue. Add provider to readme. Add var for vk image.
217 lines
6.0 KiB
Go
217 lines
6.0 KiB
Go
package azurebatch
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"crypto/md5"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch"
|
|
"github.com/Azure/go-autorest/autorest"
|
|
"github.com/Azure/go-autorest/autorest/adal"
|
|
"github.com/Azure/go-autorest/autorest/azure"
|
|
)
|
|
|
|
func mustWriteString(builder *strings.Builder, s string) {
|
|
_, err := builder.WriteString(s)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func mustWrite(builder *strings.Builder, b []byte) {
|
|
_, err := builder.Write(b)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// NewServicePrincipalTokenFromCredentials creates a new ServicePrincipalToken using values of the
|
|
// passed credentials map.
|
|
func newServicePrincipalTokenFromCredentials(c *Config, scope string) (*adal.ServicePrincipalToken, error) {
|
|
oauthConfig, err := adal.NewOAuthConfig(azure.PublicCloud.ActiveDirectoryEndpoint, c.TenantID)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return adal.NewServicePrincipalToken(*oauthConfig, c.ClientID, c.ClientSecret, scope)
|
|
}
|
|
|
|
// GetAzureADAuthorizer return an authorizor for Azure SP
|
|
func getAzureADAuthorizer(c *Config, azureEndpoint string) autorest.Authorizer {
|
|
spt, err := newServicePrincipalTokenFromCredentials(c, azureEndpoint)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("Failed to create authorizer: %v", err))
|
|
}
|
|
auth := autorest.NewBearerAuthorizer(spt)
|
|
return auth
|
|
}
|
|
|
|
func getPool(ctx context.Context, batchBaseURL, poolID string, auth autorest.Authorizer) (*batch.PoolClient, error) {
|
|
poolClient := batch.NewPoolClientWithBaseURI(batchBaseURL)
|
|
poolClient.Authorizer = auth
|
|
poolClient.RetryAttempts = 0
|
|
|
|
pool, err := poolClient.Get(ctx, poolID, "*", "", nil, nil, nil, nil, "", "", nil, nil)
|
|
|
|
// If we observe an error which isn't related to the pool not existing panic.
|
|
// 404 is expected if this is first run.
|
|
if err != nil && pool.Response.Response == nil {
|
|
log.Printf("Failed to get pool. nil response %v", poolID)
|
|
return nil, err
|
|
} else if err != nil && pool.StatusCode == 404 {
|
|
log.Printf("Pool doesn't exist 404 received Error: %v PoolID: %v", err, poolID)
|
|
return nil, err
|
|
} else if err != nil {
|
|
log.Printf("Failed to get pool. Response:%v", pool.Response)
|
|
return nil, err
|
|
}
|
|
|
|
if pool.State == batch.PoolStateActive {
|
|
log.Println("Pool active and running...")
|
|
return &poolClient, nil
|
|
}
|
|
return nil, fmt.Errorf("Pool not in active state: %v", pool.State)
|
|
}
|
|
|
|
func createOrGetJob(ctx context.Context, batchBaseURL, jobID, poolID string, auth autorest.Authorizer) (*batch.JobClient, error) {
|
|
jobClient := batch.NewJobClientWithBaseURI(batchBaseURL)
|
|
jobClient.Authorizer = auth
|
|
// check if job exists already
|
|
currentJob, err := jobClient.Get(ctx, jobID, "", "", nil, nil, nil, nil, "", "", nil, nil)
|
|
|
|
if err == nil && currentJob.State == batch.JobStateActive {
|
|
log.Println("Wrapper job already exists...")
|
|
return &jobClient, nil
|
|
} else if currentJob.Response.StatusCode == 404 {
|
|
|
|
log.Println("Wrapper job missing... creating...")
|
|
wrapperJob := batch.JobAddParameter{
|
|
ID: &jobID,
|
|
PoolInfo: &batch.PoolInformation{
|
|
PoolID: &poolID,
|
|
},
|
|
}
|
|
|
|
_, err := jobClient.Add(ctx, wrapperJob, nil, nil, nil, nil)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &jobClient, nil
|
|
|
|
} else if currentJob.State == batch.JobStateDeleting {
|
|
log.Printf("Job is being deleted... Waiting then will retry")
|
|
time.Sleep(time.Minute)
|
|
return createOrGetJob(ctx, batchBaseURL, jobID, poolID, auth)
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
func getBatchBaseURL(batchAccountName, batchAccountLocation string) string {
|
|
return fmt.Sprintf("https://%s.%s.batch.azure.com", batchAccountName, batchAccountLocation)
|
|
}
|
|
|
|
func envHasValue(env string) bool {
|
|
val := os.Getenv(env)
|
|
if val == "" {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// GetConfigFromEnv - Retreives the azure configuration from environment variables.
|
|
func getAzureConfigFromEnv(config *Config) error {
|
|
if envHasValue("AZURE_CLIENT_ID") {
|
|
config.ClientID = os.Getenv("AZURE_CLIENT_ID")
|
|
}
|
|
if envHasValue("AZURE_CLIENT_SECRET") {
|
|
config.ClientSecret = os.Getenv("AZURE_CLIENT_SECRET")
|
|
}
|
|
if envHasValue("AZURE_RESOURCE_GROUP") {
|
|
config.ResourceGroup = os.Getenv("AZURE_RESOURCE_GROUP")
|
|
}
|
|
if envHasValue("AZURE_SUBSCRIPTION_ID") {
|
|
config.SubscriptionID = os.Getenv("AZURE_SUBSCRIPTION_ID")
|
|
}
|
|
if envHasValue("AZURE_TENANT_ID") {
|
|
config.TenantID = os.Getenv("AZURE_TENANT_ID")
|
|
}
|
|
if envHasValue("AZURE_BATCH_POOLID") {
|
|
config.PoolID = os.Getenv("AZURE_BATCH_POOLID")
|
|
}
|
|
if envHasValue("AZURE_BATCH_JOBID") {
|
|
config.JobID = os.Getenv("AZURE_BATCH_JOBID")
|
|
}
|
|
if envHasValue("AZURE_BATCH_ACCOUNT_LOCATION") {
|
|
config.AccountLocation = os.Getenv("AZURE_BATCH_ACCOUNT_LOCATION")
|
|
}
|
|
if envHasValue("AZURE_BATCH_ACCOUNT_NAME") {
|
|
config.AccountName = os.Getenv("AZURE_BATCH_ACCOUNT_NAME")
|
|
}
|
|
|
|
if config.JobID == "" {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
log.Panic(err)
|
|
}
|
|
config.JobID = hostname
|
|
}
|
|
|
|
if config.ClientID == "" ||
|
|
config.ClientSecret == "" ||
|
|
config.ResourceGroup == "" ||
|
|
config.SubscriptionID == "" ||
|
|
config.PoolID == "" ||
|
|
config.TenantID == "" {
|
|
return &ConfigError{CurrentConfig: config, ErrorDetails: "Missing configuration"}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getTaskIDForPod(namespace, name string) string {
|
|
ID := []byte(fmt.Sprintf("%s-%s", namespace, name))
|
|
return string(fmt.Sprintf("%x", md5.Sum(ID)))
|
|
}
|
|
|
|
type jsonLog struct {
|
|
Log string `json:"log"`
|
|
}
|
|
|
|
func formatLogJSON(readCloser batch.ReadCloser) (string, error) {
|
|
//Read line by line as file isn't valid json. Each line is a single valid json object.
|
|
scanner := bufio.NewScanner(*readCloser.Value)
|
|
var b strings.Builder
|
|
for scanner.Scan() {
|
|
result := jsonLog{}
|
|
err := json.Unmarshal(scanner.Bytes(), &result)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
mustWriteString(&b, result.Log)
|
|
}
|
|
|
|
return b.String(), nil
|
|
}
|
|
|
|
// ConfigError - Error when reading configuration values.
|
|
type ConfigError struct {
|
|
CurrentConfig *Config
|
|
ErrorDetails string
|
|
}
|
|
|
|
func (e *ConfigError) Error() string {
|
|
configJSON, err := json.Marshal(e.CurrentConfig)
|
|
if err != nil {
|
|
return e.ErrorDetails
|
|
}
|
|
|
|
return e.ErrorDetails + ": " + string(configJSON)
|
|
}
|