@@ -53,6 +53,12 @@ const (
|
||||
maxDNSSearchListChars = 256
|
||||
)
|
||||
|
||||
const (
|
||||
gpuResourceName v1.ResourceName = "nvidia.com/gpu"
|
||||
gpuTypeAnnotation = "virtual-kubelet.io/gpu-type"
|
||||
)
|
||||
|
||||
|
||||
// ACIProvider implements the virtual-kubelet provider interface and communicates with Azure's ACI APIs.
|
||||
type ACIProvider struct {
|
||||
aciClient *aci.Client
|
||||
@@ -64,6 +70,8 @@ type ACIProvider struct {
|
||||
cpu string
|
||||
memory string
|
||||
pods string
|
||||
gpu string
|
||||
gpuSKUs []aci.GPUSKU
|
||||
internalIP string
|
||||
daemonEndpointPort int32
|
||||
diagnostics *aci.ContainerGroupDiagnostics
|
||||
@@ -260,21 +268,8 @@ func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operat
|
||||
return nil, errors.New(unsupportedRegionMessage)
|
||||
}
|
||||
|
||||
// Set sane defaults for Capacity in case config is not supplied
|
||||
p.cpu = "800"
|
||||
p.memory = "4Ti"
|
||||
p.pods = "800"
|
||||
|
||||
if cpuQuota := os.Getenv("ACI_QUOTA_CPU"); cpuQuota != "" {
|
||||
p.cpu = cpuQuota
|
||||
}
|
||||
|
||||
if memoryQuota := os.Getenv("ACI_QUOTA_MEMORY"); memoryQuota != "" {
|
||||
p.memory = memoryQuota
|
||||
}
|
||||
|
||||
if podsQuota := os.Getenv("ACI_QUOTA_POD"); podsQuota != "" {
|
||||
p.pods = podsQuota
|
||||
if err := p.setupCapacity(context.TODO()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.operatingSystem = operatingSystem
|
||||
@@ -324,6 +319,54 @@ func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operat
|
||||
return &p, err
|
||||
}
|
||||
|
||||
func (p *ACIProvider) setupCapacity(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "setupCapacity")
|
||||
defer span.End()
|
||||
logger := log.G(ctx).WithField("method", "setupCapacity")
|
||||
|
||||
// Set sane defaults for Capacity in case config is not supplied
|
||||
p.cpu = "800"
|
||||
p.memory = "4Ti"
|
||||
p.pods = "800"
|
||||
|
||||
if cpuQuota := os.Getenv("ACI_QUOTA_CPU"); cpuQuota != "" {
|
||||
p.cpu = cpuQuota
|
||||
}
|
||||
|
||||
if memoryQuota := os.Getenv("ACI_QUOTA_MEMORY"); memoryQuota != "" {
|
||||
p.memory = memoryQuota
|
||||
}
|
||||
|
||||
if podsQuota := os.Getenv("ACI_QUOTA_POD"); podsQuota != "" {
|
||||
p.pods = podsQuota
|
||||
}
|
||||
|
||||
metadata, err := p.aciClient.GetResourceProviderMetadata(ctx)
|
||||
|
||||
if err != nil {
|
||||
msg := "Unable to fetch the ACI metadata"
|
||||
logger.WithError(err).Error(msg)
|
||||
return err
|
||||
}
|
||||
|
||||
if metadata == nil || metadata.GPURegionalSKUs == nil {
|
||||
logger.Warn("ACI GPU capacity is not enabled. GPU capacity will be disabled")
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, regionalSKU := range metadata.GPURegionalSKUs {
|
||||
if strings.EqualFold(regionalSKU.Location, p.region) && len(regionalSKU.SKUs) != 0 {
|
||||
p.gpu = "100"
|
||||
if gpu := os.Getenv("ACI_QUOTA_GPU"); gpu != "" {
|
||||
p.gpu = gpu
|
||||
}
|
||||
p.gpuSKUs = regionalSKU.SKUs
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ACIProvider) setupNetworkProfile(auth *client.Authentication) error {
|
||||
c, err := network.NewClient(auth, p.extraUserAgent)
|
||||
if err != nil {
|
||||
@@ -706,7 +749,7 @@ func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.P
|
||||
defer span.End()
|
||||
ctx = addAzureAttributes(ctx, span, p)
|
||||
|
||||
cg, err, status := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name))
|
||||
cg, status, err := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name))
|
||||
if err != nil {
|
||||
if status != nil && *status == http.StatusNotFound {
|
||||
return nil, nil
|
||||
@@ -728,7 +771,7 @@ func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName,
|
||||
ctx = addAzureAttributes(ctx, span, p)
|
||||
|
||||
logContent := ""
|
||||
cg, err, _ := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName))
|
||||
cg, _, err := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName))
|
||||
if err != nil {
|
||||
return logContent, err
|
||||
}
|
||||
@@ -768,7 +811,7 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri
|
||||
defer errstream.Close()
|
||||
}
|
||||
|
||||
cg, err, _ := p.aciClient.GetContainerGroup(context.TODO(), p.resourceGroup, name)
|
||||
cg, _, err := p.aciClient.GetContainerGroup(context.TODO(), p.resourceGroup, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -789,10 +832,10 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri
|
||||
return err
|
||||
}
|
||||
|
||||
wsUri := xcrsp.WebSocketUri
|
||||
wsURI := xcrsp.WebSocketURI
|
||||
password := xcrsp.Password
|
||||
|
||||
c, _, _ := websocket.DefaultDialer.Dial(wsUri, nil)
|
||||
c, _, _ := websocket.DefaultDialer.Dial(wsURI, nil)
|
||||
c.WriteMessage(websocket.TextMessage, []byte(password)) // Websocket password needs to be sent before WS terminal is active
|
||||
|
||||
// Cleanup on exit
|
||||
@@ -889,11 +932,17 @@ func (p *ACIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
|
||||
|
||||
// Capacity returns a resource list containing the capacity limits set for ACI.
|
||||
func (p *ACIProvider) Capacity(ctx context.Context) v1.ResourceList {
|
||||
return v1.ResourceList{
|
||||
"cpu": resource.MustParse(p.cpu),
|
||||
"memory": resource.MustParse(p.memory),
|
||||
"pods": resource.MustParse(p.pods),
|
||||
resourceList := v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse(p.cpu),
|
||||
v1.ResourceMemory: resource.MustParse(p.memory),
|
||||
v1.ResourcePods: resource.MustParse(p.pods),
|
||||
}
|
||||
|
||||
if p.gpu != "" {
|
||||
resourceList[gpuResourceName] = resource.MustParse(p.gpu)
|
||||
}
|
||||
|
||||
return resourceList
|
||||
}
|
||||
|
||||
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
|
||||
@@ -1146,7 +1195,7 @@ func (p *ACIProvider) getContainers(pod *v1.Pod) ([]aci.Container, error) {
|
||||
}
|
||||
|
||||
c.Resources = aci.ResourceRequirements{
|
||||
Requests: &aci.ResourceRequests{
|
||||
Requests: &aci.ComputeResources{
|
||||
CPU: cpuRequest,
|
||||
MemoryInGB: memoryRequest,
|
||||
},
|
||||
@@ -1163,10 +1212,29 @@ func (p *ACIProvider) getContainers(pod *v1.Pod) ([]aci.Container, error) {
|
||||
memoryLimit = float64(container.Resources.Limits.Memory().Value()) / 1000000000.00
|
||||
}
|
||||
|
||||
c.Resources.Limits = &aci.ResourceLimits{
|
||||
c.Resources.Limits = &aci.ComputeResources{
|
||||
CPU: cpuLimit,
|
||||
MemoryInGB: memoryLimit,
|
||||
}
|
||||
|
||||
if gpu, ok := container.Resources.Limits[gpuResourceName]; ok {
|
||||
sku, err := p.getGPUSKU(pod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if gpu.Value() == 0 {
|
||||
return nil, errors.New("GPU must be a integer number")
|
||||
}
|
||||
|
||||
gpuResource := &aci.GPUResource{
|
||||
Count: int32(gpu.Value()),
|
||||
SKU: sku,
|
||||
}
|
||||
|
||||
c.Resources.Requests.GPU = gpuResource
|
||||
c.Resources.Limits.GPU = gpuResource
|
||||
}
|
||||
}
|
||||
|
||||
if container.LivenessProbe != nil {
|
||||
@@ -1190,6 +1258,24 @@ func (p *ACIProvider) getContainers(pod *v1.Pod) ([]aci.Container, error) {
|
||||
return containers, nil
|
||||
}
|
||||
|
||||
func (p *ACIProvider) getGPUSKU(pod *v1.Pod) (aci.GPUSKU, error) {
|
||||
if len(p.gpuSKUs) == 0 {
|
||||
return "", fmt.Errorf("The pod requires GPU resource, but ACI doesn't provide GPU enabled container group in region %s", p.region)
|
||||
}
|
||||
|
||||
if desiredSKU, ok := pod.Annotations[gpuTypeAnnotation]; ok {
|
||||
for _, supportedSKU := range p.gpuSKUs {
|
||||
if strings.EqualFold(string(desiredSKU), string(supportedSKU)) {
|
||||
return supportedSKU, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("The pod requires GPU SKU %s, but ACI only supports SKUs %v in region %s", desiredSKU, p.region, p.gpuSKUs)
|
||||
}
|
||||
|
||||
return p.gpuSKUs[0], nil
|
||||
}
|
||||
|
||||
func getProbe(probe *v1.Probe) (*aci.ContainerProbe, error) {
|
||||
|
||||
if probe.Handler.Exec != nil && probe.Handler.HTTPGet != nil {
|
||||
@@ -1376,11 +1462,19 @@ func containerGroupToPod(cg *aci.ContainerGroup) (*v1.Pod, error) {
|
||||
},
|
||||
}
|
||||
|
||||
if c.Resources.Requests.GPU != nil {
|
||||
container.Resources.Requests[gpuResourceName] = resource.MustParse(fmt.Sprintf("%d", c.Resources.Requests.GPU.Count))
|
||||
}
|
||||
|
||||
if c.Resources.Limits != nil {
|
||||
container.Resources.Limits = v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%g", c.Resources.Limits.CPU)),
|
||||
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%gG", c.Resources.Limits.MemoryInGB)),
|
||||
}
|
||||
|
||||
if c.Resources.Limits.GPU != nil {
|
||||
container.Resources.Limits[gpuResourceName] = resource.MustParse(fmt.Sprintf("%d", c.Resources.Requests.GPU.Count))
|
||||
}
|
||||
}
|
||||
|
||||
containers = append(containers, container)
|
||||
|
||||
Reference in New Issue
Block a user