package azure import ( "bytes" "context" "crypto/sha256" "encoding/base64" "encoding/hex" "encoding/json" "errors" "fmt" "io" "io/ioutil" "net" "net/http" "os" "reflect" "strings" "sync" "time" "github.com/Sirupsen/logrus" "github.com/gorilla/websocket" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" client "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client" "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci" "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/network" "go.opencensus.io/trace" "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clientcmdv1 "k8s.io/client-go/tools/clientcmd/api/v1" "k8s.io/client-go/tools/remotecommand" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) const ( // The service account secret mount path. serviceAccountSecretMountPath = "/var/run/secrets/kubernetes.io/serviceaccount" virtualKubeletDNSNameLabel = "virtualkubelet.io/dnsnamelabel" subnetsAction = "Microsoft.Network/virtualNetworks/subnets/action" subnetDelegationService = "Microsoft.ContainerInstance/containerGroups" ) // DNS configuration settings const ( maxDNSNameservers = 3 maxDNSSearchPaths = 6 maxDNSSearchListChars = 256 ) // ACIProvider implements the virtual-kubelet provider interface and communicates with Azure's ACI APIs. type ACIProvider struct { aciClient *aci.Client resourceManager *manager.ResourceManager resourceGroup string region string nodeName string operatingSystem string cpu string memory string pods string internalIP string daemonEndpointPort int32 diagnostics *aci.ContainerGroupDiagnostics subnetName string subnetCIDR string vnetName string vnetResourceGroup string networkProfile string kubeProxyExtension *aci.Extension kubeDNSIP string extraUserAgent string metricsSync sync.Mutex metricsSyncTime time.Time lastMetric *stats.Summary } // AuthConfig is the secret returned from an ImageRegistryCredential type AuthConfig struct { Username string `json:"username,omitempty"` Password string `json:"password,omitempty"` Auth string `json:"auth,omitempty"` Email string `json:"email,omitempty"` ServerAddress string `json:"serveraddress,omitempty"` IdentityToken string `json:"identitytoken,omitempty"` RegistryToken string `json:"registrytoken,omitempty"` } // See https://azure.microsoft.com/en-us/status/ for valid regions. var validAciRegions = []string{ "australiaeast", "canadacentral", "centralindia", "centralus", "eastasia", "eastus", "eastus2", "eastus2euap", "japaneast", "northcentralus", "northeurope", "southcentralus", "southeastasia", "southindia", "uksouth", "westcentralus", "westus", "westus2", "westeurope", } // isValidACIRegion checks to make sure we're using a valid ACI region func isValidACIRegion(region string) bool { regionLower := strings.ToLower(region) regionTrimmed := strings.Replace(regionLower, " ", "", -1) for _, validRegion := range validAciRegions { if regionTrimmed == validRegion { return true } } return false } // NewACIProvider creates a new ACIProvider. func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*ACIProvider, error) { var p ACIProvider var err error p.resourceManager = rm if config != "" { f, err := os.Open(config) if err != nil { return nil, err } defer f.Close() if err := p.loadConfig(f); err != nil { return nil, err } } var azAuth *client.Authentication if authFilepath := os.Getenv("AZURE_AUTH_LOCATION"); authFilepath != "" { auth, err := client.NewAuthenticationFromFile(authFilepath) if err != nil { return nil, err } azAuth = auth } if acsFilepath := os.Getenv("ACS_CREDENTIAL_LOCATION"); acsFilepath != "" { acsCredential, err := NewAcsCredential(acsFilepath) if err != nil { return nil, err } if acsCredential != nil { if acsCredential.Cloud != client.PublicCloud.Name { return nil, fmt.Errorf("ACI only supports Public Azure. '%v' is not supported", acsCredential.Cloud) } azAuth = client.NewAuthentication( acsCredential.Cloud, acsCredential.ClientID, acsCredential.ClientSecret, acsCredential.SubscriptionID, acsCredential.TenantID) p.resourceGroup = acsCredential.ResourceGroup p.region = acsCredential.Region p.vnetName = acsCredential.VNetName p.vnetResourceGroup = acsCredential.VNetResourceGroup if p.vnetResourceGroup == "" { p.vnetResourceGroup = p.resourceGroup } } } if clientID := os.Getenv("AZURE_CLIENT_ID"); clientID != "" { azAuth.ClientID = clientID } if clientSecret := os.Getenv("AZURE_CLIENT_SECRET"); clientSecret != "" { azAuth.ClientSecret = clientSecret } if tenantID := os.Getenv("AZURE_TENANT_ID"); tenantID != "" { azAuth.TenantID = tenantID } if subscriptionID := os.Getenv("AZURE_SUBSCRIPTION_ID"); subscriptionID != "" { azAuth.SubscriptionID = subscriptionID } p.extraUserAgent = os.Getenv("ACI_EXTRA_USER_AGENT") p.aciClient, err = aci.NewClient(azAuth, p.extraUserAgent) if err != nil { return nil, err } // If the log analytics file has been specified, load workspace credentials from the file if logAnalyticsAuthFile := os.Getenv("LOG_ANALYTICS_AUTH_LOCATION"); logAnalyticsAuthFile != "" { p.diagnostics, err = aci.NewContainerGroupDiagnosticsFromFile(logAnalyticsAuthFile) if err != nil { return nil, err } } // If we have both the log analytics workspace id and key, add them to the provider // Environment variables overwrite the values provided in the file if logAnalyticsID := os.Getenv("LOG_ANALYTICS_ID"); logAnalyticsID != "" { if logAnalyticsKey := os.Getenv("LOG_ANALYTICS_KEY"); logAnalyticsKey != "" { p.diagnostics, err = aci.NewContainerGroupDiagnostics(logAnalyticsID, logAnalyticsKey) if err != nil { return nil, err } } } if clusterResourceID := os.Getenv("CLUSTER_RESOURCE_ID"); clusterResourceID != "" { if p.diagnostics != nil && p.diagnostics.LogAnalytics != nil { p.diagnostics.LogAnalytics.LogType = aci.LogAnlyticsLogTypeContainerInsights p.diagnostics.LogAnalytics.Metadata = map[string]string{ aci.LogAnalyticsMetadataKeyClusterResourceID: clusterResourceID, aci.LogAnalyticsMetadataKeyNodeName: nodeName, } } } if rg := os.Getenv("ACI_RESOURCE_GROUP"); rg != "" { p.resourceGroup = rg } if p.resourceGroup == "" { return nil, errors.New("Resource group can not be empty please set ACI_RESOURCE_GROUP") } if r := os.Getenv("ACI_REGION"); r != "" { p.region = r } if p.region == "" { return nil, errors.New("Region can not be empty please set ACI_REGION") } if r := p.region; !isValidACIRegion(r) { unsupportedRegionMessage := fmt.Sprintf("Region %s is invalid. Current supported regions are: %s", r, strings.Join(validAciRegions, ", ")) return nil, errors.New(unsupportedRegionMessage) } // Set sane defaults for Capacity in case config is not supplied p.cpu = "800" p.memory = "4Ti" p.pods = "800" if cpuQuota := os.Getenv("ACI_QUOTA_CPU"); cpuQuota != "" { p.cpu = cpuQuota } if memoryQuota := os.Getenv("ACI_QUOTA_MEMORY"); memoryQuota != "" { p.memory = memoryQuota } if podsQuota := os.Getenv("ACI_QUOTA_POD"); podsQuota != "" { p.pods = podsQuota } p.operatingSystem = operatingSystem p.nodeName = nodeName p.internalIP = internalIP p.daemonEndpointPort = daemonEndpointPort if subnetName := os.Getenv("ACI_SUBNET_NAME"); p.vnetName != "" && subnetName != "" { p.subnetName = subnetName } if subnetCIDR := os.Getenv("ACI_SUBNET_CIDR"); subnetCIDR != "" { if p.subnetName == "" { return nil, fmt.Errorf("subnet CIDR defined but no subnet name, subnet name is required to set a subnet CIDR") } if _, _, err := net.ParseCIDR(subnetCIDR); err != nil { return nil, fmt.Errorf("error parsing provided subnet range: %v", err) } p.subnetCIDR = subnetCIDR } if p.subnetName != "" { if err := p.setupNetworkProfile(azAuth); err != nil { return nil, fmt.Errorf("error setting up network profile: %v", err) } masterURI := os.Getenv("MASTER_URI") if masterURI == "" { masterURI = "10.0.0.1" } clusterCIDR := os.Getenv("CLUSTER_CIDR") if clusterCIDR == "" { clusterCIDR = "10.240.0.0/16" } p.kubeProxyExtension, err = getKubeProxyExtension(serviceAccountSecretMountPath, masterURI, clusterCIDR) if err != nil { return nil, fmt.Errorf("error creating kube proxy extension: %v", err) } p.kubeDNSIP = "10.0.0.10" if kubeDNSIP := os.Getenv("KUBE_DNS_IP"); kubeDNSIP != "" { p.kubeDNSIP = kubeDNSIP } } return &p, err } func (p *ACIProvider) setupNetworkProfile(auth *client.Authentication) error { c, err := network.NewClient(auth, p.extraUserAgent) if err != nil { return fmt.Errorf("error creating azure networking client: %v", err) } createSubnet := true subnet, err := c.GetSubnet(p.vnetResourceGroup, p.vnetName, p.subnetName) if err != nil && !network.IsNotFound(err) { return fmt.Errorf("error while looking up subnet: %v", err) } if network.IsNotFound(err) && p.subnetCIDR == "" { return fmt.Errorf("subnet '%s' is not found in vnet '%s' in resource group '%s' and subnet CIDR is not specified", p.subnetName, p.vnetName, p.vnetResourceGroup) } if err == nil { if p.subnetCIDR == "" { p.subnetCIDR = *subnet.SubnetPropertiesFormat.AddressPrefix } if p.subnetCIDR != *subnet.SubnetPropertiesFormat.AddressPrefix { return fmt.Errorf("found subnet '%s' using different CIDR: '%s'. desired: '%s'", p.subnetName, *subnet.SubnetPropertiesFormat.AddressPrefix, p.subnetCIDR) } if subnet.SubnetPropertiesFormat.RouteTable != nil { return fmt.Errorf("unable to delegate subnet '%s' to Azure Container Instance since it references the route table '%s'.", p.subnetName, *subnet.SubnetPropertiesFormat.RouteTable.ID) } if subnet.SubnetPropertiesFormat.ServiceAssociationLinks != nil { for _, l := range *subnet.SubnetPropertiesFormat.ServiceAssociationLinks { if l.ServiceAssociationLinkPropertiesFormat != nil && *l.ServiceAssociationLinkPropertiesFormat.LinkedResourceType == subnetDelegationService { createSubnet = false break } return fmt.Errorf("unable to delegate subnet '%s' to Azure Container Instance as it is used by other Azure resource: '%v'.", p.subnetName, l) } } else { for _, d := range *subnet.SubnetPropertiesFormat.Delegations { if d.ServiceDelegationPropertiesFormat != nil && *d.ServiceDelegationPropertiesFormat.ServiceName == subnetDelegationService { createSubnet = false break } } } } if createSubnet { subnet = network.NewSubnetWithContainerInstanceDelegation(p.subnetName, p.subnetCIDR) subnet, err = c.CreateOrUpdateSubnet(p.vnetResourceGroup, p.vnetName, subnet) if err != nil { return fmt.Errorf("error creating subnet: %v", err) } } networkProfileName := getNetworkProfileName(*subnet.ID) profile, err := c.GetProfile(p.resourceGroup, networkProfileName) if err != nil && !network.IsNotFound(err) { return fmt.Errorf("error while looking up network profile: %v", err) } if err == nil { for _, config := range *profile.ProfilePropertiesFormat.ContainerNetworkInterfaceConfigurations { for _, ipConfig := range *config.ContainerNetworkInterfaceConfigurationPropertiesFormat.IPConfigurations { if *ipConfig.IPConfigurationProfilePropertiesFormat.Subnet.ID == *subnet.ID { p.networkProfile = *profile.ID return nil } } } } // at this point, profile should be nil profile = network.NewNetworkProfile(networkProfileName, p.region, *subnet.ID) profile, err = c.CreateOrUpdateProfile(p.resourceGroup, profile) if err != nil { return err } p.networkProfile = *profile.ID return nil } func getNetworkProfileName(subnetID string) string { h := sha256.New() h.Write([]byte(strings.ToUpper(subnetID))) hashBytes := h.Sum(nil) return fmt.Sprintf("vk-%s", hex.EncodeToString(hashBytes)) } func getKubeProxyExtension(secretPath, masterURI, clusterCIDR string) (*aci.Extension, error) { ca, err := ioutil.ReadFile(secretPath + "/ca.crt") if err != nil { return nil, fmt.Errorf("failed to read ca.crt file: %v", err) } var token []byte token, err = ioutil.ReadFile(secretPath + "/token") if err != nil { return nil, fmt.Errorf("failed to read token file: %v", err) } name := "virtual-kubelet" config := clientcmdv1.Config{ APIVersion: "v1", Kind: "Config", Clusters: []clientcmdv1.NamedCluster{ clientcmdv1.NamedCluster{ Name: name, Cluster: clientcmdv1.Cluster{ Server: masterURI, CertificateAuthorityData: ca, }, }, }, AuthInfos: []clientcmdv1.NamedAuthInfo{ clientcmdv1.NamedAuthInfo{ Name: name, AuthInfo: clientcmdv1.AuthInfo{ Token: string(token), }, }, }, Contexts: []clientcmdv1.NamedContext{ clientcmdv1.NamedContext{ Name: name, Context: clientcmdv1.Context{ Cluster: name, AuthInfo: name, }, }, }, CurrentContext: name, } b := new(bytes.Buffer) if err := json.NewEncoder(b).Encode(config); err != nil { return nil, fmt.Errorf("failed to encode the kubeconfig: %v", err) } extension := aci.Extension{ Name: "kube-proxy", Properties: &aci.ExtensionProperties{ Type: aci.ExtensionTypeKubeProxy, Version: aci.ExtensionVersion1_0, Settings: map[string]string{ aci.KubeProxyExtensionSettingClusterCIDR: clusterCIDR, aci.KubeProxyExtensionSettingKubeVersion: aci.KubeProxyExtensionKubeVersion, }, ProtectedSettings: map[string]string{ aci.KubeProxyExtensionSettingKubeConfig: base64.StdEncoding.EncodeToString(b.Bytes()), }, }, } return &extension, nil } func addAzureAttributes(span *trace.Span, p *ACIProvider) { span.AddAttributes( trace.StringAttribute("azure.resourceGroup", p.resourceGroup), trace.StringAttribute("azure.region", p.region), ) } // CreatePod accepts a Pod definition and creates // an ACI deployment func (p *ACIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "aci.CreatePod") defer span.End() addAzureAttributes(span, p) var containerGroup aci.ContainerGroup containerGroup.Location = p.region containerGroup.RestartPolicy = aci.ContainerGroupRestartPolicy(pod.Spec.RestartPolicy) containerGroup.ContainerGroupProperties.OsType = aci.OperatingSystemTypes(p.OperatingSystem()) // get containers containers, err := p.getContainers(pod) if err != nil { return err } // get registry creds creds, err := p.getImagePullSecrets(pod) if err != nil { return err } // get volumes volumes, err := p.getVolumes(pod) if err != nil { return err } // assign all the things containerGroup.ContainerGroupProperties.Containers = containers containerGroup.ContainerGroupProperties.Volumes = volumes containerGroup.ContainerGroupProperties.ImageRegistryCredentials = creds containerGroup.ContainerGroupProperties.Diagnostics = p.getDiagnostics(pod) filterServiceAccountSecretVolume(p.operatingSystem, &containerGroup) // create ipaddress if containerPort is used count := 0 for _, container := range containers { count = count + len(container.Ports) } ports := make([]aci.Port, 0, count) for _, container := range containers { for _, containerPort := range container.Ports { ports = append(ports, aci.Port{ Port: containerPort.Port, Protocol: aci.ContainerGroupNetworkProtocol("TCP"), }) } } if len(ports) > 0 && p.subnetName == "" { containerGroup.ContainerGroupProperties.IPAddress = &aci.IPAddress{ Ports: ports, Type: "Public", } if dnsNameLabel := pod.Annotations[virtualKubeletDNSNameLabel]; dnsNameLabel != "" { containerGroup.ContainerGroupProperties.IPAddress.DNSNameLabel = dnsNameLabel } } podUID := string(pod.UID) podCreationTimestamp := pod.CreationTimestamp.String() containerGroup.Tags = map[string]string{ "PodName": pod.Name, "ClusterName": pod.ClusterName, "NodeName": pod.Spec.NodeName, "Namespace": pod.Namespace, "UID": podUID, "CreationTimestamp": podCreationTimestamp, } p.amendVnetResources(&containerGroup, pod) _, err = p.aciClient.CreateContainerGroup( ctx, p.resourceGroup, containerGroupName(pod), containerGroup, ) return err } func (p *ACIProvider) amendVnetResources(containerGroup *aci.ContainerGroup, pod *v1.Pod) { if p.networkProfile == "" { return } containerGroup.NetworkProfile = &aci.NetworkProfileDefinition{ID: p.networkProfile} containerGroup.ContainerGroupProperties.Extensions = []*aci.Extension{p.kubeProxyExtension} containerGroup.ContainerGroupProperties.DNSConfig = p.getDNSConfig(pod.Spec.DNSPolicy, pod.Spec.DNSConfig) } func (p *ACIProvider) getDNSConfig(dnsPolicy v1.DNSPolicy, dnsConfig *v1.PodDNSConfig) *aci.DNSConfig { nameServers := make([]string, 0) if dnsPolicy == v1.DNSClusterFirst || dnsPolicy == v1.DNSClusterFirstWithHostNet { nameServers = append(nameServers, p.kubeDNSIP) } searchDomains := []string{} options := []string{} if dnsConfig != nil { nameServers = omitDuplicates(append(nameServers, dnsConfig.Nameservers...)) searchDomains = omitDuplicates(dnsConfig.Searches) for _, option := range dnsConfig.Options { op := option.Name if option.Value != nil && *(option.Value) != "" { op = op + ":" + *(option.Value) } options = append(options, op) } } if len(nameServers) == 0 { return nil } result := aci.DNSConfig{ NameServers: formDNSNameserversFitsLimits(nameServers), SearchDomains: formDNSSearchFitsLimits(searchDomains), Options: strings.Join(options, " "), } return &result } func omitDuplicates(strs []string) []string { uniqueStrs := make(map[string]bool) var ret []string for _, str := range strs { if !uniqueStrs[str] { ret = append(ret, str) uniqueStrs[str] = true } } return ret } func formDNSNameserversFitsLimits(nameservers []string) []string { if len(nameservers) > maxDNSNameservers { nameservers = nameservers[:maxDNSNameservers] msg := fmt.Sprintf("Nameserver limits were exceeded, some nameservers have been omitted, the applied nameserver line is: %s", strings.Join(nameservers, ";")) log.G(context.TODO()).WithField("method", "formDNSNameserversFitsLimits").Warn(msg) } return nameservers } func formDNSSearchFitsLimits(searches []string) string { limitsExceeded := false if len(searches) > maxDNSSearchPaths { searches = searches[:maxDNSSearchPaths] limitsExceeded = true } if resolvSearchLineStrLen := len(strings.Join(searches, " ")); resolvSearchLineStrLen > maxDNSSearchListChars { cutDomainsNum := 0 cutDomainsLen := 0 for i := len(searches) - 1; i >= 0; i-- { cutDomainsLen += len(searches[i]) + 1 cutDomainsNum++ if (resolvSearchLineStrLen - cutDomainsLen) <= maxDNSSearchListChars { break } } searches = searches[:(len(searches) - cutDomainsNum)] limitsExceeded = true } if limitsExceeded { msg := fmt.Sprintf("Search Line limits were exceeded, some search paths have been omitted, the applied search line is: %s", strings.Join(searches, ";")) log.G(context.TODO()).WithField("method", "formDNSSearchFitsLimits").Warn(msg) } return strings.Join(searches, " ") } func (p *ACIProvider) getDiagnostics(pod *v1.Pod) *aci.ContainerGroupDiagnostics { if p.diagnostics != nil && p.diagnostics.LogAnalytics != nil && p.diagnostics.LogAnalytics.LogType == aci.LogAnlyticsLogTypeContainerInsights { d := *p.diagnostics d.LogAnalytics.Metadata[aci.LogAnalyticsMetadataKeyPodUUID] = string(pod.ObjectMeta.UID) return &d } return p.diagnostics } func containerGroupName(pod *v1.Pod) string { return fmt.Sprintf("%s-%s", pod.Namespace, pod.Name) } // UpdatePod is a noop, ACI currently does not support live updates of a pod. func (p *ACIProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { return nil } // DeletePod deletes the specified pod out of ACI. func (p *ACIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "aci.DeletePod") defer span.End() addAzureAttributes(span, p) err := p.aciClient.DeleteContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", pod.Namespace, pod.Name)) return wrapError(err) } // GetPod returns a pod by name that is running inside ACI // returns nil if a pod by that name is not found. func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) { ctx, span := trace.StartSpan(ctx, "aci.GetPod") defer span.End() addAzureAttributes(span, p) cg, err, status := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name)) if err != nil { if status != nil && *status == http.StatusNotFound { return nil, nil } return nil, err } if cg.Tags["NodeName"] != p.nodeName { return nil, nil } return containerGroupToPod(cg) } // GetContainerLogs returns the logs of a pod by name that is running inside ACI. func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { ctx, span := trace.StartSpan(ctx, "aci.GetContainerLogs") defer span.End() addAzureAttributes(span, p) logContent := "" cg, err, _ := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName)) if err != nil { return logContent, err } if cg.Tags["NodeName"] != p.nodeName { return logContent, nil } // get logs from cg retry := 10 var retries int for retries = 0; retries < retry; retries++ { cLogs, err := p.aciClient.GetContainerLogs(ctx, p.resourceGroup, cg.Name, containerName, tail) if err != nil { log.G(ctx).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying") span.Annotate(nil, "Error getting container logs, retrying") time.Sleep(5000 * time.Millisecond) } else { logContent = cLogs.Content break } } return logContent, err } // GetPodFullName as defined in the provider context func (p *ACIProvider) GetPodFullName(namespace string, pod string) string { return fmt.Sprintf("%s-%s", namespace, pod) } // ExecInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, errstream io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { // Cleanup on exit if out != nil { defer out.Close() } if errstream != nil { defer errstream.Close() } cg, err, _ := p.aciClient.GetContainerGroup(context.TODO(), p.resourceGroup, name) if err != nil { return err } // Set default terminal size terminalSize := remotecommand.TerminalSize{ Height: 60, Width: 120, } if resize != nil { terminalSize = <-resize // Receive terminal resize event if resize stream is present } xcrsp, err := p.aciClient.LaunchExec(p.resourceGroup, cg.Name, container, cmd[0], terminalSize) if err != nil { return err } wsUri := xcrsp.WebSocketUri password := xcrsp.Password c, _, _ := websocket.DefaultDialer.Dial(wsUri, nil) c.WriteMessage(websocket.TextMessage, []byte(password)) // Websocket password needs to be sent before WS terminal is active // Cleanup on exit defer c.Close() defer out.Close() if in != nil { // Inputstream go func() { for { var msg = make([]byte, 512) n, err := in.Read(msg) if err == io.EOF { // Handle EOF } if err != nil { // Handle errors return } if n > 0 { // Only call WriteMessage if there is data to send c.WriteMessage(websocket.BinaryMessage, msg[:n]) } } }() } if out != nil { //Outputstream for { _, cr, err := c.NextReader() if err != nil { // Handle errors break } io.Copy(out, cr) } } return nil } // GetPodStatus returns the status of a pod by name that is running inside ACI // returns nil if a pod by that name is not found. func (p *ACIProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { ctx, span := trace.StartSpan(ctx, "aci.GetPodStatus") defer span.End() addAzureAttributes(span, p) pod, err := p.GetPod(ctx, namespace, name) if err != nil { return nil, err } if pod == nil { return nil, nil } return &pod.Status, nil } // GetPods returns a list of all pods known to be running within ACI. func (p *ACIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { ctx, span := trace.StartSpan(ctx, "aci.GetPods") defer span.End() addAzureAttributes(span, p) cgs, err := p.aciClient.ListContainerGroups(ctx, p.resourceGroup) if err != nil { return nil, err } pods := make([]*v1.Pod, 0, len(cgs.Value)) for _, cg := range cgs.Value { c := cg if cg.Tags["NodeName"] != p.nodeName { continue } p, err := containerGroupToPod(&c) if err != nil { log.G(context.TODO()).WithFields(logrus.Fields{ "name": c.Name, "id": c.ID, }).WithError(err).Error("error converting container group to pod") continue } pods = append(pods, p) } return pods, nil } // Capacity returns a resource list containing the capacity limits set for ACI. func (p *ACIProvider) Capacity(ctx context.Context) v1.ResourceList { return v1.ResourceList{ "cpu": resource.MustParse(p.cpu), "memory": resource.MustParse(p.memory), "pods": resource.MustParse(p.pods), } } // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status // within Kubernetes. func (p *ACIProvider) NodeConditions(ctx context.Context) []v1.NodeCondition { // TODO: Make these dynamic and augment with custom ACI specific conditions of interest return []v1.NodeCondition{ { Type: "Ready", Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: "KubeletReady", Message: "kubelet is ready.", }, { Type: "OutOfDisk", Status: v1.ConditionFalse, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: "KubeletHasSufficientDisk", Message: "kubelet has sufficient disk space available", }, { Type: "MemoryPressure", Status: v1.ConditionFalse, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: "KubeletHasSufficientMemory", Message: "kubelet has sufficient memory available", }, { Type: "DiskPressure", Status: v1.ConditionFalse, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: "KubeletHasNoDiskPressure", Message: "kubelet has no disk pressure", }, { Type: "NetworkUnavailable", Status: v1.ConditionFalse, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: "RouteCreated", Message: "RouteController created a route", }, } } // NodeAddresses returns a list of addresses for the node status // within Kubernetes. func (p *ACIProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress { // TODO: Make these dynamic and augment with custom ACI specific conditions of interest return []v1.NodeAddress{ { Type: "InternalIP", Address: p.internalIP, }, } } // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. func (p *ACIProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints { return &v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{ Port: p.daemonEndpointPort, }, } } // OperatingSystem returns the operating system that was provided by the config. func (p *ACIProvider) OperatingSystem() string { return p.operatingSystem } func (p *ACIProvider) getImagePullSecrets(pod *v1.Pod) ([]aci.ImageRegistryCredential, error) { ips := make([]aci.ImageRegistryCredential, 0, len(pod.Spec.ImagePullSecrets)) for _, ref := range pod.Spec.ImagePullSecrets { secret, err := p.resourceManager.GetSecret(ref.Name, pod.Namespace) if err != nil { return ips, err } if secret == nil { return nil, fmt.Errorf("error getting image pull secret") } // TODO: Check if secret type is v1.SecretTypeDockercfg and use DockerConfigKey instead of hardcoded value // TODO: Check if secret type is v1.SecretTypeDockerConfigJson and use DockerConfigJsonKey to determine if it's in json format // TODO: Return error if it's not one of these two types switch secret.Type { case v1.SecretTypeDockercfg: ips, err = readDockerCfgSecret(secret, ips) case v1.SecretTypeDockerConfigJson: ips, err = readDockerConfigJSONSecret(secret, ips) default: return nil, fmt.Errorf("image pull secret type is not one of kubernetes.io/dockercfg or kubernetes.io/dockerconfigjson") } if err != nil { return ips, err } } return ips, nil } func makeRegistryCredential(server string, authConfig AuthConfig) (*aci.ImageRegistryCredential, error) { username := authConfig.Username password := authConfig.Password if username == "" { if authConfig.Auth == "" { return nil, fmt.Errorf("no username present in auth config for server: %s", server) } decoded, err := base64.StdEncoding.DecodeString(authConfig.Auth) if err != nil { return nil, fmt.Errorf("error decoding the auth for server: %s Error: %v", server, err) } parts := strings.Split(string(decoded), ":") if len(parts) != 2 { return nil, fmt.Errorf("malformed auth for server: %s", server) } username = parts[0] password = parts[1] } cred := aci.ImageRegistryCredential{ Server: server, Username: username, Password: password, } return &cred, nil } func readDockerCfgSecret(secret *v1.Secret, ips []aci.ImageRegistryCredential) ([]aci.ImageRegistryCredential, error) { var err error var authConfigs map[string]AuthConfig repoData, ok := secret.Data[string(v1.DockerConfigKey)] if !ok { return ips, fmt.Errorf("no dockercfg present in secret") } err = json.Unmarshal(repoData, &authConfigs) if err != nil { return ips, err } for server := range authConfigs { cred, err := makeRegistryCredential(server, authConfigs[server]) if err != nil { return ips, err } ips = append(ips, *cred) } return ips, err } func readDockerConfigJSONSecret(secret *v1.Secret, ips []aci.ImageRegistryCredential) ([]aci.ImageRegistryCredential, error) { var err error repoData, ok := secret.Data[string(v1.DockerConfigJsonKey)] if !ok { return ips, fmt.Errorf("no dockerconfigjson present in secret") } var authConfigs map[string]map[string]AuthConfig err = json.Unmarshal(repoData, &authConfigs) if err != nil { return ips, err } auths, ok := authConfigs["auths"] if !ok { return ips, fmt.Errorf("malformed dockerconfigjson in secret") } for server := range auths { cred, err := makeRegistryCredential(server, auths[server]) if err != nil { return ips, err } ips = append(ips, *cred) } return ips, err } func (p *ACIProvider) getContainers(pod *v1.Pod) ([]aci.Container, error) { containers := make([]aci.Container, 0, len(pod.Spec.Containers)) for _, container := range pod.Spec.Containers { c := aci.Container{ Name: container.Name, ContainerProperties: aci.ContainerProperties{ Image: container.Image, Command: append(container.Command, container.Args...), Ports: make([]aci.ContainerPort, 0, len(container.Ports)), }, } for _, p := range container.Ports { c.Ports = append(c.Ports, aci.ContainerPort{ Port: p.ContainerPort, Protocol: getProtocol(p.Protocol), }) } c.VolumeMounts = make([]aci.VolumeMount, 0, len(container.VolumeMounts)) for _, v := range container.VolumeMounts { c.VolumeMounts = append(c.VolumeMounts, aci.VolumeMount{ Name: v.Name, MountPath: v.MountPath, ReadOnly: v.ReadOnly, }) } c.EnvironmentVariables = make([]aci.EnvironmentVariable, 0, len(container.Env)) for _, e := range container.Env { if e.Value != "" { envVar := getACIEnvVar(e) c.EnvironmentVariables = append(c.EnvironmentVariables, envVar) } } // NOTE(robbiezhang): ACI CPU request must be times of 10m cpuRequest := 1.00 if _, ok := container.Resources.Requests[v1.ResourceCPU]; ok { cpuRequest = float64(container.Resources.Requests.Cpu().MilliValue()/10.00) / 100.00 if cpuRequest < 0.01 { cpuRequest = 0.01 } } // NOTE(robbiezhang): ACI memory request must be times of 0.1 GB memoryRequest := 1.50 if _, ok := container.Resources.Requests[v1.ResourceMemory]; ok { memoryRequest = float64(container.Resources.Requests.Memory().Value()/100000000.00) / 10.00 if memoryRequest < 0.10 { memoryRequest = 0.10 } } c.Resources = aci.ResourceRequirements{ Requests: &aci.ResourceRequests{ CPU: cpuRequest, MemoryInGB: memoryRequest, }, } if container.Resources.Limits != nil { cpuLimit := cpuRequest if _, ok := container.Resources.Limits[v1.ResourceCPU]; ok { cpuLimit = float64(container.Resources.Limits.Cpu().MilliValue()) / 1000.00 } memoryLimit := memoryRequest if _, ok := container.Resources.Limits[v1.ResourceMemory]; ok { memoryLimit = float64(container.Resources.Limits.Memory().Value()) / 1000000000.00 } c.Resources.Limits = &aci.ResourceLimits{ CPU: cpuLimit, MemoryInGB: memoryLimit, } } if container.LivenessProbe != nil { probe, err := getProbe(container.LivenessProbe) if err != nil { return nil, err } c.LivenessProbe = probe } if container.ReadinessProbe != nil { probe, err := getProbe(container.ReadinessProbe) if err != nil { return nil, err } c.ReadinessProbe = probe } containers = append(containers, c) } return containers, nil } func getProbe(probe *v1.Probe) (*aci.ContainerProbe, error) { if probe.Handler.Exec != nil && probe.Handler.HTTPGet != nil { return nil, fmt.Errorf("probe may not specify more than one of \"exec\" and \"httpGet\"") } if probe.Handler.Exec == nil && probe.Handler.HTTPGet == nil { return nil, fmt.Errorf("probe must specify one of \"exec\" and \"httpGet\"") } // Probes have can have a Exec or HTTP Get Handler. // Create those if they exist, then add to the // ContainerProbe struct var exec *aci.ContainerExecProbe if probe.Handler.Exec != nil { exec = &aci.ContainerExecProbe{ Command: probe.Handler.Exec.Command, } } var httpGET *aci.ContainerHTTPGetProbe if probe.Handler.HTTPGet != nil { httpGET = &aci.ContainerHTTPGetProbe{ Port: probe.Handler.HTTPGet.Port.IntValue(), Path: probe.Handler.HTTPGet.Path, Scheme: string(probe.Handler.HTTPGet.Scheme), } } return &aci.ContainerProbe{ Exec: exec, HTTPGet: httpGET, InitialDelaySeconds: probe.InitialDelaySeconds, Period: probe.PeriodSeconds, FailureThreshold: probe.FailureThreshold, SuccessThreshold: probe.SuccessThreshold, TimeoutSeconds: probe.TimeoutSeconds, }, nil } func (p *ACIProvider) getVolumes(pod *v1.Pod) ([]aci.Volume, error) { volumes := make([]aci.Volume, 0, len(pod.Spec.Volumes)) for _, v := range pod.Spec.Volumes { // Handle the case for the AzureFile volume. if v.AzureFile != nil { secret, err := p.resourceManager.GetSecret(v.AzureFile.SecretName, pod.Namespace) if err != nil { return volumes, err } if secret == nil { return nil, fmt.Errorf("Getting secret for AzureFile volume returned an empty secret") } volumes = append(volumes, aci.Volume{ Name: v.Name, AzureFile: &aci.AzureFileVolume{ ShareName: v.AzureFile.ShareName, ReadOnly: v.AzureFile.ReadOnly, StorageAccountName: string(secret.Data["azurestorageaccountname"]), StorageAccountKey: string(secret.Data["azurestorageaccountkey"]), }, }) continue } // Handle the case for the EmptyDir. if v.EmptyDir != nil { volumes = append(volumes, aci.Volume{ Name: v.Name, EmptyDir: map[string]interface{}{}, }) continue } // Handle the case for GitRepo volume. if v.GitRepo != nil { volumes = append(volumes, aci.Volume{ Name: v.Name, GitRepo: &aci.GitRepoVolume{ Directory: v.GitRepo.Directory, Repository: v.GitRepo.Repository, Revision: v.GitRepo.Revision, }, }) continue } // Handle the case for Secret volume. if v.Secret != nil { paths := make(map[string]string) secret, err := p.resourceManager.GetSecret(v.Secret.SecretName, pod.Namespace) if v.Secret.Optional != nil && !*v.Secret.Optional && k8serr.IsNotFound(err) { return nil, fmt.Errorf("Secret %s is required by Pod %s and does not exist", v.Secret.SecretName, pod.Name) } if secret == nil { continue } for k, v := range secret.Data { paths[k] = base64.StdEncoding.EncodeToString(v) } if len(paths) != 0 { volumes = append(volumes, aci.Volume{ Name: v.Name, Secret: paths, }) } continue } // Handle the case for ConfigMap volume. if v.ConfigMap != nil { paths := make(map[string]string) configMap, err := p.resourceManager.GetConfigMap(v.ConfigMap.Name, pod.Namespace) if v.ConfigMap.Optional != nil && !*v.ConfigMap.Optional && k8serr.IsNotFound(err) { return nil, fmt.Errorf("ConfigMap %s is required by Pod %s and does not exist", v.ConfigMap.Name, pod.Name) } if configMap == nil { continue } for k, v := range configMap.BinaryData { paths[k] = base64.StdEncoding.EncodeToString(v) } if len(paths) != 0 { volumes = append(volumes, aci.Volume{ Name: v.Name, Secret: paths, }) } continue } // If we've made it this far we have found a volume type that isn't supported return nil, fmt.Errorf("Pod %s requires volume %s which is of an unsupported type", pod.Name, v.Name) } return volumes, nil } func getProtocol(pro v1.Protocol) aci.ContainerNetworkProtocol { switch pro { case v1.ProtocolUDP: return aci.ContainerNetworkProtocolUDP default: return aci.ContainerNetworkProtocolTCP } } func containerGroupToPod(cg *aci.ContainerGroup) (*v1.Pod, error) { var podCreationTimestamp metav1.Time if cg.Tags["CreationTimestamp"] != "" { t, err := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", cg.Tags["CreationTimestamp"]) if err != nil { return nil, err } podCreationTimestamp = metav1.NewTime(t) } containerStartTime := metav1.NewTime(time.Time(cg.Containers[0].ContainerProperties.InstanceView.CurrentState.StartTime)) // Use the Provisioning State if it's not Succeeded, // otherwise use the state of the instance. aciState := cg.ContainerGroupProperties.ProvisioningState if aciState == "Succeeded" { aciState = cg.ContainerGroupProperties.InstanceView.State } containers := make([]v1.Container, 0, len(cg.Containers)) containerStatuses := make([]v1.ContainerStatus, 0, len(cg.Containers)) for _, c := range cg.Containers { container := v1.Container{ Name: c.Name, Image: c.Image, Command: c.Command, Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{ v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%g", c.Resources.Requests.CPU)), v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%gG", c.Resources.Requests.MemoryInGB)), }, }, } if c.Resources.Limits != nil { container.Resources.Limits = v1.ResourceList{ v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%g", c.Resources.Limits.CPU)), v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%gG", c.Resources.Limits.MemoryInGB)), } } containers = append(containers, container) containerStatus := v1.ContainerStatus{ Name: c.Name, State: aciContainerStateToContainerState(c.InstanceView.CurrentState), LastTerminationState: aciContainerStateToContainerState(c.InstanceView.PreviousState), Ready: aciStateToPodPhase(c.InstanceView.CurrentState.State) == v1.PodRunning, RestartCount: c.InstanceView.RestartCount, Image: c.Image, ImageID: "", ContainerID: getContainerID(cg.ID, c.Name), } // Add to containerStatuses containerStatuses = append(containerStatuses, containerStatus) } ip := "" if cg.IPAddress != nil { ip = cg.IPAddress.IP } p := v1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ Name: cg.Tags["PodName"], Namespace: cg.Tags["Namespace"], ClusterName: cg.Tags["ClusterName"], UID: types.UID(cg.Tags["UID"]), CreationTimestamp: podCreationTimestamp, }, Spec: v1.PodSpec{ NodeName: cg.Tags["NodeName"], Volumes: []v1.Volume{}, Containers: containers, }, Status: v1.PodStatus{ Phase: aciStateToPodPhase(aciState), Conditions: aciStateToPodConditions(aciState, podCreationTimestamp), Message: "", Reason: "", HostIP: "", PodIP: ip, StartTime: &containerStartTime, ContainerStatuses: containerStatuses, }, } return &p, nil } func getContainerID(cgID, containerName string) string { if cgID == "" { return "" } containerResourceID := fmt.Sprintf("%s/containers/%s", cgID, containerName) h := sha256.New() h.Write([]byte(strings.ToUpper(containerResourceID))) hashBytes := h.Sum(nil) return fmt.Sprintf("aci://%s", hex.EncodeToString(hashBytes)) } func aciStateToPodPhase(state string) v1.PodPhase { switch state { case "Running": return v1.PodRunning case "Succeeded": return v1.PodSucceeded case "Failed": return v1.PodFailed case "Canceled": return v1.PodFailed case "Creating": return v1.PodPending case "Repairing": return v1.PodPending case "Pending": return v1.PodPending case "Accepted": return v1.PodPending } return v1.PodUnknown } func aciStateToPodConditions(state string, transitiontime metav1.Time) []v1.PodCondition { switch state { case "Running", "Succeeded": return []v1.PodCondition{ v1.PodCondition{ Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: transitiontime, }, v1.PodCondition{ Type: v1.PodInitialized, Status: v1.ConditionTrue, LastTransitionTime: transitiontime, }, v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionTrue, LastTransitionTime: transitiontime, }, } } return []v1.PodCondition{} } func aciContainerStateToContainerState(cs aci.ContainerState) v1.ContainerState { startTime := metav1.NewTime(time.Time(cs.StartTime)) // Handle the case where the container is running. if cs.State == "Running" || cs.State == "Succeeded" { return v1.ContainerState{ Running: &v1.ContainerStateRunning{ StartedAt: startTime, }, } } // Handle the case where the container failed. if cs.State == "Failed" || cs.State == "Canceled" { return v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ ExitCode: cs.ExitCode, Reason: cs.State, Message: cs.DetailStatus, StartedAt: startTime, FinishedAt: metav1.NewTime(time.Time(cs.FinishTime)), }, } } state := cs.State if state == "" { state = "Creating" } // Handle the case where the container is pending. // Which should be all other aci states. return v1.ContainerState{ Waiting: &v1.ContainerStateWaiting{ Reason: state, Message: cs.DetailStatus, }, } } // Filters service account secret volume for Windows. // Service account secret volume gets automatically turned on if not specified otherwise. // ACI doesn't support secret volume for Windows, so we need to filter it. func filterServiceAccountSecretVolume(osType string, containerGroup *aci.ContainerGroup) { if strings.EqualFold(osType, "Windows") { serviceAccountSecretVolumeName := make(map[string]bool) for index, container := range containerGroup.ContainerGroupProperties.Containers { volumeMounts := make([]aci.VolumeMount, 0, len(container.VolumeMounts)) for _, volumeMount := range container.VolumeMounts { if !strings.EqualFold(serviceAccountSecretMountPath, volumeMount.MountPath) { volumeMounts = append(volumeMounts, volumeMount) } else { serviceAccountSecretVolumeName[volumeMount.Name] = true } } containerGroup.ContainerGroupProperties.Containers[index].VolumeMounts = volumeMounts } if len(serviceAccountSecretVolumeName) == 0 { return } l := log.G(context.TODO()).WithField("containerGroup", containerGroup.Name) l.Infof("Ignoring service account secret volumes '%v' for Windows", reflect.ValueOf(serviceAccountSecretVolumeName).MapKeys()) volumes := make([]aci.Volume, 0, len(containerGroup.ContainerGroupProperties.Volumes)) for _, volume := range containerGroup.ContainerGroupProperties.Volumes { if _, ok := serviceAccountSecretVolumeName[volume.Name]; !ok { volumes = append(volumes, volume) } } containerGroup.ContainerGroupProperties.Volumes = volumes } } func getACIEnvVar(e v1.EnvVar) aci.EnvironmentVariable { var envVar aci.EnvironmentVariable // If the variable is a secret, use SecureValue if e.ValueFrom != nil && e.ValueFrom.SecretKeyRef != nil { envVar = aci.EnvironmentVariable{ Name: e.Name, SecureValue: e.Value, } } else { envVar = aci.EnvironmentVariable{ Name: e.Name, Value: e.Value, } } return envVar }