Files
virtual-kubelet/providers/azurebatch/batch_helpers.go
Lawrence Gripper d6e8b3daf7 Create a provider to use Azure Batch (#133)
* Started work on provider

* WIP Adding batch provider

* Working basic call into pool client. Need to parameterize the baseurl

* Fixed job creation by manipulating the content-type

* WIP Kicking off containers. Dirty

* [wip] More meat around scheduling simple containers.

* Working on basic task wrapper to co-schedule pods

* WIP on task wrapper

* WIP

* Working pod minimal wrapper for batch

* Integrate pod template code into provider

* Cleaning up

* Move to docker without gpu

* WIP batch integration

* partially working

* Working logs

* Tidy code

* WIP: Testing and readme

* Added readme and terraform deployment for GPU Azure Batch pool.

* Update to enable low priority nodes for gpu

* Fix log formatting bug. Return node logs when container not yet started

* Moved to golang v1.10

* Fix cri test

* Fix up minor docs Issue. Add provider to readme. Add var for vk image.
2018-06-22 16:33:49 -07:00

217 lines
6.0 KiB
Go

package azurebatch
import (
"bufio"
"context"
"crypto/md5"
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
)
func mustWriteString(builder *strings.Builder, s string) {
_, err := builder.WriteString(s)
if err != nil {
panic(err)
}
}
func mustWrite(builder *strings.Builder, b []byte) {
_, err := builder.Write(b)
if err != nil {
panic(err)
}
}
// NewServicePrincipalTokenFromCredentials creates a new ServicePrincipalToken using values of the
// passed credentials map.
func newServicePrincipalTokenFromCredentials(c *Config, scope string) (*adal.ServicePrincipalToken, error) {
oauthConfig, err := adal.NewOAuthConfig(azure.PublicCloud.ActiveDirectoryEndpoint, c.TenantID)
if err != nil {
panic(err)
}
return adal.NewServicePrincipalToken(*oauthConfig, c.ClientID, c.ClientSecret, scope)
}
// GetAzureADAuthorizer return an authorizor for Azure SP
func getAzureADAuthorizer(c *Config, azureEndpoint string) autorest.Authorizer {
spt, err := newServicePrincipalTokenFromCredentials(c, azureEndpoint)
if err != nil {
panic(fmt.Sprintf("Failed to create authorizer: %v", err))
}
auth := autorest.NewBearerAuthorizer(spt)
return auth
}
func getPool(ctx context.Context, batchBaseURL, poolID string, auth autorest.Authorizer) (*batch.PoolClient, error) {
poolClient := batch.NewPoolClientWithBaseURI(batchBaseURL)
poolClient.Authorizer = auth
poolClient.RetryAttempts = 0
pool, err := poolClient.Get(ctx, poolID, "*", "", nil, nil, nil, nil, "", "", nil, nil)
// If we observe an error which isn't related to the pool not existing panic.
// 404 is expected if this is first run.
if err != nil && pool.Response.Response == nil {
log.Printf("Failed to get pool. nil response %v", poolID)
return nil, err
} else if err != nil && pool.StatusCode == 404 {
log.Printf("Pool doesn't exist 404 received Error: %v PoolID: %v", err, poolID)
return nil, err
} else if err != nil {
log.Printf("Failed to get pool. Response:%v", pool.Response)
return nil, err
}
if pool.State == batch.PoolStateActive {
log.Println("Pool active and running...")
return &poolClient, nil
}
return nil, fmt.Errorf("Pool not in active state: %v", pool.State)
}
func createOrGetJob(ctx context.Context, batchBaseURL, jobID, poolID string, auth autorest.Authorizer) (*batch.JobClient, error) {
jobClient := batch.NewJobClientWithBaseURI(batchBaseURL)
jobClient.Authorizer = auth
// check if job exists already
currentJob, err := jobClient.Get(ctx, jobID, "", "", nil, nil, nil, nil, "", "", nil, nil)
if err == nil && currentJob.State == batch.JobStateActive {
log.Println("Wrapper job already exists...")
return &jobClient, nil
} else if currentJob.Response.StatusCode == 404 {
log.Println("Wrapper job missing... creating...")
wrapperJob := batch.JobAddParameter{
ID: &jobID,
PoolInfo: &batch.PoolInformation{
PoolID: &poolID,
},
}
_, err := jobClient.Add(ctx, wrapperJob, nil, nil, nil, nil)
if err != nil {
return nil, err
}
return &jobClient, nil
} else if currentJob.State == batch.JobStateDeleting {
log.Printf("Job is being deleted... Waiting then will retry")
time.Sleep(time.Minute)
return createOrGetJob(ctx, batchBaseURL, jobID, poolID, auth)
}
return nil, err
}
func getBatchBaseURL(batchAccountName, batchAccountLocation string) string {
return fmt.Sprintf("https://%s.%s.batch.azure.com", batchAccountName, batchAccountLocation)
}
func envHasValue(env string) bool {
val := os.Getenv(env)
if val == "" {
return false
}
return true
}
// GetConfigFromEnv - Retreives the azure configuration from environment variables.
func getAzureConfigFromEnv(config *Config) error {
if envHasValue("AZURE_CLIENT_ID") {
config.ClientID = os.Getenv("AZURE_CLIENT_ID")
}
if envHasValue("AZURE_CLIENT_SECRET") {
config.ClientSecret = os.Getenv("AZURE_CLIENT_SECRET")
}
if envHasValue("AZURE_RESOURCE_GROUP") {
config.ResourceGroup = os.Getenv("AZURE_RESOURCE_GROUP")
}
if envHasValue("AZURE_SUBSCRIPTION_ID") {
config.SubscriptionID = os.Getenv("AZURE_SUBSCRIPTION_ID")
}
if envHasValue("AZURE_TENANT_ID") {
config.TenantID = os.Getenv("AZURE_TENANT_ID")
}
if envHasValue("AZURE_BATCH_POOLID") {
config.PoolID = os.Getenv("AZURE_BATCH_POOLID")
}
if envHasValue("AZURE_BATCH_JOBID") {
config.JobID = os.Getenv("AZURE_BATCH_JOBID")
}
if envHasValue("AZURE_BATCH_ACCOUNT_LOCATION") {
config.AccountLocation = os.Getenv("AZURE_BATCH_ACCOUNT_LOCATION")
}
if envHasValue("AZURE_BATCH_ACCOUNT_NAME") {
config.AccountName = os.Getenv("AZURE_BATCH_ACCOUNT_NAME")
}
if config.JobID == "" {
hostname, err := os.Hostname()
if err != nil {
log.Panic(err)
}
config.JobID = hostname
}
if config.ClientID == "" ||
config.ClientSecret == "" ||
config.ResourceGroup == "" ||
config.SubscriptionID == "" ||
config.PoolID == "" ||
config.TenantID == "" {
return &ConfigError{CurrentConfig: config, ErrorDetails: "Missing configuration"}
}
return nil
}
func getTaskIDForPod(namespace, name string) string {
ID := []byte(fmt.Sprintf("%s-%s", namespace, name))
return string(fmt.Sprintf("%x", md5.Sum(ID)))
}
type jsonLog struct {
Log string `json:"log"`
}
func formatLogJSON(readCloser batch.ReadCloser) (string, error) {
//Read line by line as file isn't valid json. Each line is a single valid json object.
scanner := bufio.NewScanner(*readCloser.Value)
var b strings.Builder
for scanner.Scan() {
result := jsonLog{}
err := json.Unmarshal(scanner.Bytes(), &result)
if err != nil {
return "", err
}
mustWriteString(&b, result.Log)
}
return b.String(), nil
}
// ConfigError - Error when reading configuration values.
type ConfigError struct {
CurrentConfig *Config
ErrorDetails string
}
func (e *ConfigError) Error() string {
configJSON, err := json.Marshal(e.CurrentConfig)
if err != nil {
return e.ErrorDetails
}
return e.ErrorDetails + ": " + string(configJSON)
}