From bac3a585daee564e28333d2dcb5e94fbb059cb44 Mon Sep 17 00:00:00 2001 From: Robbie Zhang Date: Thu, 2 Aug 2018 13:58:31 -0700 Subject: [PATCH] Fix several bugs in the VNET (#287) * Add more support regions * Add kube-proxy sidecar container * Kube-proxy * Fix several bugs * indent --- providers/azure/aci.go | 117 +++++++++++++++--- providers/azure/acsCredential.go | 17 +-- providers/azure/client/network/profile.go | 31 +++-- .../azure/client/network/profile_test.go | 31 ++++- providers/azure/client/network/subnet.go | 22 ++-- providers/azure/client/network/subnet_test.go | 32 +++-- 6 files changed, 192 insertions(+), 58 deletions(-) diff --git a/providers/azure/aci.go b/providers/azure/aci.go index 052ef7c62..9a75a6131 100644 --- a/providers/azure/aci.go +++ b/providers/azure/aci.go @@ -44,6 +44,13 @@ const ( subnetsAction = "Microsoft.Network/virtualNetworks/subnets/action" subnetDelegationService = "Microsoft.ContainerInstance/containerGroups" networkProfileType = "Microsoft.Network/networkProfiles" + + // KubeProxy SideCar Container + kubeProxyContainerName = "vk-side-car-kube-proxy" + kubeProxyImageName = "k8s-gcrio.azureedge.net/hyperkube-amd64:v1.8.2" + kubeConfigDir = "/etc/kube-proxy" + kubeConfigFile = "kubeconfig" + kubeConfigSecretVolume = "vk-side-car-kubeconfig-secret-volume" ) // ACIProvider implements the virtual-kubelet provider interface and communicates with Azure's ACI APIs. @@ -63,8 +70,11 @@ type ACIProvider struct { subnetName string subnetCIDR string vnetName string + vnetResourceGroup string networkProfile string - + masterURI string + clusterCIDR string + metricsSync sync.Mutex metricsSyncTime time.Time lastMetric *stats.Summary @@ -92,6 +102,8 @@ var validAciRegions = []string{ "westeurope", "southeastasia", "australiaeast", + "eastus2euap", + "westcentralus", } // isValidACIRegion checks to make sure we're using a valid ACI region @@ -158,9 +170,13 @@ func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operat p.resourceGroup = acsCredential.ResourceGroup p.region = acsCredential.Region - } - p.vnetName = acsCredential.VNetName + p.vnetName = acsCredential.VNetName + p.vnetResourceGroup = acsCredential.VNetResourceGroup + if p.vnetResourceGroup == "" { + p.vnetResourceGroup = p.resourceGroup + } + } } if clientID := os.Getenv("AZURE_CLIENT_ID"); clientID != "" { @@ -240,7 +256,12 @@ func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operat p.pods = podsQuota } - if subnetName := os.Getenv("ACI_SUBNET_NAME"); subnetName != "" { + p.operatingSystem = operatingSystem + p.nodeName = nodeName + p.internalIP = internalIP + p.daemonEndpointPort = daemonEndpointPort + + if subnetName := os.Getenv("ACI_SUBNET_NAME"); p.vnetName != "" && subnetName != "" { p.subnetName = subnetName } if subnetCIDR := os.Getenv("ACI_SUBNET_CIDR"); subnetCIDR != "" { @@ -259,10 +280,15 @@ func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operat } } - p.operatingSystem = operatingSystem - p.nodeName = nodeName - p.internalIP = internalIP - p.daemonEndpointPort = daemonEndpointPort + p.masterURI = "10.0.0.1" + if masterURI := os.Getenv("MASTER_URI"); masterURI != "" { + p.masterURI = masterURI + } + + p.clusterCIDR = "10.240.0.0/16" + if clusterCIDR := os.Getenv("CLUSTER_CIDR"); clusterCIDR != "" { + p.clusterCIDR = clusterCIDR + } return &p, err } @@ -274,7 +300,7 @@ func (p *ACIProvider) setupNetworkProfile(auth *client.Authentication) error { } createSubnet := true - subnet, err := c.GetSubnet(p.resourceGroup, p.vnetName, p.subnetName) + subnet, err := c.GetSubnet(p.vnetResourceGroup, p.vnetName, p.subnetName) if err != nil && !network.IsNotFound(err) { return fmt.Errorf("error while looking up subnet: %v", err) } @@ -295,7 +321,8 @@ func (p *ACIProvider) setupNetworkProfile(auth *client.Authentication) error { subnet = &network.Subnet{Name: p.subnetName} } populateSubnet(subnet, p.subnetCIDR) - if err = c.CreateOrUpdateSubnet(p.resourceGroup, p.vnetName, subnet); err != nil { + subnet, err = c.CreateOrUpdateSubnet(p.resourceGroup, p.vnetName, subnet) + if err != nil { return fmt.Errorf("error creating subnet: %v", err) } } @@ -308,25 +335,28 @@ func (p *ACIProvider) setupNetworkProfile(auth *client.Authentication) error { for _, config := range profile.Properties.ContainerNetworkInterfaceConfigurations { for _, ipConfig := range config.Properties.IPConfigurations { if ipConfig.Properties.Subnet.ID == subnet.ID { + p.networkProfile = profile.ID return nil } } } - return fmt.Errorf("found existing network profile but the profile is not linked to the subnet") + return fmt.Errorf("found existing network profile but the profile is not linked to the subnet: %v, %v", profile, err) } // at this point, profile should be nil profile = &network.Profile{ Name: p.nodeName, + Location: p.region, Type: networkProfileType, } populateNetworkProfile(profile, subnet) - if err := c.CreateOrUpdateProfile(p.resourceGroup, profile); err != nil { + profile, err = c.CreateOrUpdateProfile(p.resourceGroup, profile) + if err != nil { return err } - p.networkProfile = profile.ID + p.networkProfile = profile.ID return nil } @@ -371,7 +401,6 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error { containerGroup.Location = p.region containerGroup.RestartPolicy = aci.ContainerGroupRestartPolicy(pod.Spec.RestartPolicy) containerGroup.ContainerGroupProperties.OsType = aci.OperatingSystemTypes(p.OperatingSystem()) - containerGroup.NetworkProfile = &aci.NetworkProfileDefinition{ID: p.networkProfile} // get containers containers, err := p.getContainers(pod) @@ -411,7 +440,7 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error { }) } } - if len(ports) > 0 { + if len(ports) > 0 && p.subnetName == "" { containerGroup.ContainerGroupProperties.IPAddress = &aci.IPAddress{ Ports: ports, Type: "Public", @@ -433,7 +462,8 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error { "CreationTimestamp": podCreationTimestamp, } - // TODO(BJK) containergrouprestartpolicy?? + p.amendVnetResources(&containerGroup) + _, err = p.aciClient.CreateContainerGroup( p.resourceGroup, containerGroupName(pod), @@ -447,6 +477,61 @@ func containerGroupName(pod *v1.Pod) string { return fmt.Sprintf("%s-%s", pod.Namespace, pod.Name) } +func (p *ACIProvider) amendVnetResources(containerGroup *aci.ContainerGroup) { + if p.networkProfile == "" { + return + } + + containerGroup.NetworkProfile = &aci.NetworkProfileDefinition{ID: p.networkProfile} + + //containerGroup.ContainerGroupProperties.Containers = append(containerGroup.ContainerGroupProperties.Containers, *(getKubeProxyContainerSpec(p.clusterCIDR))) + //containerGroup.ContainerGroupProperties.Volumes = append(containerGroup.ContainerGroupProperties.Volumes, *(getKubeProxyVolumeSpec(p.masterURI))) +} + +func getKubeProxyContainerSpec(clusterCIDR string) *aci.Container { + container := aci.Container{ + Name: kubeProxyContainerName, + ContainerProperties: aci.ContainerProperties{ + Image: kubeProxyImageName, + Command: []string{ + "/hyperkube", + "proxy", + "--kubeconfig="+kubeConfigDir+"/"+kubeConfigFile, + "--cluster-cidr="+clusterCIDR, + }, + }, + } + + container.VolumeMounts = []aci.VolumeMount{ + aci.VolumeMount{ + Name: kubeConfigSecretVolume, + MountPath: kubeConfigDir, + ReadOnly: true, + }, + } + + container.Resources = aci.ResourceRequirements{ + Requests: &aci.ResourceRequests{ + CPU: 0.1, + MemoryInGB: 0.10, + }, + } + + return &container +} + +func getKubeProxyVolumeSpec(masterURI string) *aci.Volume { + paths := make(map[string]string) + + + volume := aci.Volume{ + Name: kubeConfigSecretVolume, + Secret: paths, + } + + return &volume +} + // UpdatePod is a noop, ACI currently does not support live updates of a pod. func (p *ACIProvider) UpdatePod(pod *v1.Pod) error { return nil diff --git a/providers/azure/acsCredential.go b/providers/azure/acsCredential.go index 66f1c0797..b8dde860e 100644 --- a/providers/azure/acsCredential.go +++ b/providers/azure/acsCredential.go @@ -11,14 +11,15 @@ import ( // AcsCredential represents the credential file for ACS type AcsCredential struct { - Cloud string `json:"cloud"` - TenantID string `json:"tenantId"` - SubscriptionID string `json:"subscriptionId"` - ClientID string `json:"aadClientId"` - ClientSecret string `json:"aadClientSecret"` - ResourceGroup string `json:"resourceGroup"` - Region string `json:"location"` - VNetName string `json:"vnetName"` + Cloud string `json:"cloud"` + TenantID string `json:"tenantId"` + SubscriptionID string `json:"subscriptionId"` + ClientID string `json:"aadClientId"` + ClientSecret string `json:"aadClientSecret"` + ResourceGroup string `json:"resourceGroup"` + Region string `json:"location"` + VNetName string `json:"vnetName"` + VNetResourceGroup string `json:"vnetResourceGroup"` } // NewAcsCredential returns an AcsCredential struct from file path diff --git a/providers/azure/client/network/profile.go b/providers/azure/client/network/profile.go index 4d700ce1b..6593627f2 100644 --- a/providers/azure/client/network/profile.go +++ b/providers/azure/client/network/profile.go @@ -105,7 +105,7 @@ func (c *Client) GetProfile(resourceGroup, name string) (*Profile, error) { } // CreateOrUpdateProfile creates or updates an Azure network profile -func (c *Client) CreateOrUpdateProfile(resourceGroup string, p *Profile) error { +func (c *Client) CreateOrUpdateProfile(resourceGroup string, p *Profile) (*Profile, error) { urlParams := url.Values{ "api-version": []string{apiVersion}, } @@ -117,12 +117,12 @@ func (c *Client) CreateOrUpdateProfile(resourceGroup string, p *Profile) error { // Create the request. b, err := json.Marshal(p) if err != nil { - return errors.Wrap(err, "marshalling networking profile failed") + return nil, errors.Wrap(err, "marshalling networking profile failed") } req, err := http.NewRequest("PUT", uri, bytes.NewReader(b)) if err != nil { - return errors.Wrap(err, "creating network profile create uri request failed") + return nil, errors.Wrap(err, "creating network profile create uri request failed") } // Add the parameters to the url. @@ -131,19 +131,30 @@ func (c *Client) CreateOrUpdateProfile(resourceGroup string, p *Profile) error { "resourceGroupName": resourceGroup, "profileName": p.Name, }); err != nil { - return errors.Wrap(err, "expanding URL with parameters failed") + return nil, errors.Wrap(err, "expanding URL with parameters failed") } // Send the request. resp, err := c.hc.Do(req) if err != nil { - return errors.Wrap(err, "sending get network profile request failed") + return nil, errors.Wrap(err, "sending get network profile request failed") } defer resp.Body.Close() - // 200 (OK) is a success response. - if err := api.CheckResponse(resp); err != nil { - return err - } - return errors.Wrap(json.NewDecoder(resp.Body).Decode(p), "error decoding network profile create response") + // 200 (OK) is a success response. + if err := api.CheckResponse(resp); err != nil { + return nil, err + } + + // Decode the body from the response. + if resp.Body == nil { + return nil, errors.New("create network profile returned an empty body in the response") + } + + var profile Profile + if err := json.NewDecoder(resp.Body).Decode(&profile); err != nil { + return nil, errors.Wrap(err, "decoding create network profile response body failed") + } + + return &profile, nil } diff --git a/providers/azure/client/network/profile_test.go b/providers/azure/client/network/profile_test.go index cb7ef332a..8eaf71d96 100644 --- a/providers/azure/client/network/profile_test.go +++ b/providers/azure/client/network/profile_test.go @@ -5,6 +5,20 @@ import ( "testing" ) +func TestGetProfileNotFound(t *testing.T) { + c := newTestClient(t) + p, err := c.GetProfile(resourceGroup, "someprofile") + if err == nil { + t.Fatalf("expect error when getting the non-exist profile: %v", p) + } + if !IsNotFound(err) { + t.Fatal("expect NotFound error") + } + if p != nil { + t.Fatal("unexpected profile") + } +} + func TestCreateGetProfile(t *testing.T) { c := newTestClient(t) ensureVnet(t, t.Name()) @@ -15,9 +29,12 @@ func TestCreateGetProfile(t *testing.T) { AddressPrefix: "10.0.0.0/24", }, } - if err := c.CreateOrUpdateSubnet(resourceGroup, t.Name(), subnet); err != nil { + + subnet, err := c.CreateOrUpdateSubnet(resourceGroup, t.Name(), subnet) + if err != nil { t.Fatal(err) } + p := &Profile{ Name: t.Name(), Type: "Microsoft.Network/networkProfiles", @@ -43,11 +60,19 @@ func TestCreateGetProfile(t *testing.T) { }, } - if err := c.CreateOrUpdateProfile(resourceGroup, p); err != nil { + p1, err := c.CreateOrUpdateProfile(resourceGroup, p) + if err != nil { t.Fatal(err) } + if p1 == nil { + t.Fatal("create profile should return profile") + } + if p1.ID == "" { + t.Fatal("create profile should return profile.ID") + } - p2, err := c.GetProfile(resourceGroup, p.Name) + var p2 *Profile + p2, err = c.GetProfile(resourceGroup, p.Name) if err != nil { t.Fatal(err) } diff --git a/providers/azure/client/network/subnet.go b/providers/azure/client/network/subnet.go index 4f729816c..f49ffeba2 100644 --- a/providers/azure/client/network/subnet.go +++ b/providers/azure/client/network/subnet.go @@ -108,7 +108,7 @@ func (c *Client) GetSubnet(resourceGroup, vnet, name string) (*Subnet, error) { } // CreateOrUpdateSubnet creates a new or updates an existing subnet in the defined resourcegroup/vnet -func (c *Client) CreateOrUpdateSubnet(resourceGroup, vnet string, subnet *Subnet) error { +func (c *Client) CreateOrUpdateSubnet(resourceGroup, vnet string, s *Subnet) (*Subnet, error) { urlParams := url.Values{ "api-version": []string{apiVersion}, } @@ -118,37 +118,41 @@ func (c *Client) CreateOrUpdateSubnet(resourceGroup, vnet string, subnet *Subnet uri += "?" + url.Values(urlParams).Encode() // Create the request. - b, err := json.Marshal(subnet) + b, err := json.Marshal(s) if err != nil { - return errors.Wrap(err, "marshallig networking profile failed") + return nil, errors.Wrap(err, "marshallig networking profile failed") } req, err := http.NewRequest("PUT", uri, bytes.NewReader(b)) if err != nil { - return errors.New("creating subnet create uri request failed") + return nil, errors.New("creating subnet create uri request failed") } // Add the parameters to the url. if err := api.ExpandURL(req.URL, map[string]string{ "subscriptionId": c.auth.SubscriptionID, "resourceGroupName": resourceGroup, - "subnetName": subnet.Name, + "subnetName": s.Name, "vnetName": vnet, }); err != nil { - return errors.Wrap(err, "expanding URL with parameters failed") + return nil, errors.Wrap(err, "expanding URL with parameters failed") } // Send the request. resp, err := c.hc.Do(req) if err != nil { - return errors.Wrap(err, "sending create subnet request failed") + return nil, errors.Wrap(err, "sending create subnet request failed") } defer resp.Body.Close() // 200 (OK) is a success response. if err := api.CheckResponse(resp); err != nil { - return err + return nil, err } - return errors.Wrap(json.NewDecoder(resp.Body).Decode(subnet), "error decoding create subnet response") + var subnet Subnet + if err := json.NewDecoder(resp.Body).Decode(&subnet); err != nil { + return nil, err + } + return &subnet, nil } diff --git a/providers/azure/client/network/subnet_test.go b/providers/azure/client/network/subnet_test.go index 32dcc674b..0b91c345d 100644 --- a/providers/azure/client/network/subnet_test.go +++ b/providers/azure/client/network/subnet_test.go @@ -19,24 +19,32 @@ func TestCreateGetSubnet(t *testing.T) { } ensureVnet(t, t.Name()) - if err := c.CreateOrUpdateSubnet(resourceGroup, t.Name(), subnet); err != nil { - t.Fatal(err) - } - - s, err := c.GetSubnet(resourceGroup, t.Name(), subnet.Name) + s1, err := c.CreateOrUpdateSubnet(resourceGroup, t.Name(), subnet) if err != nil { t.Fatal(err) } - if s.Name != subnet.Name { + if s1 == nil { + t.Fatal("create subnet should return subnet") + } + if s1.ID == "" { + t.Fatal("create subnet should return subnet.ID") + } + + var s2 *Subnet + s2, err = c.GetSubnet(resourceGroup, t.Name(), subnet.Name) + if err != nil { + t.Fatal(err) + } + if s2.Name != subnet.Name { t.Fatal("got unexpected subnet") } - if s.Properties.AddressPrefix != subnet.Properties.AddressPrefix { - t.Fatalf("got unexpected address prefix: %s", s.Properties.AddressPrefix) + if s2.Properties.AddressPrefix != subnet.Properties.AddressPrefix { + t.Fatalf("got unexpected address prefix: %s", s2.Properties.AddressPrefix) } - if len(s.Properties.Delegations) != 1 { - t.Fatalf("got unexpected delgations: %v", s.Properties.Delegations) + if len(s2.Properties.Delegations) != 1 { + t.Fatalf("got unexpected delgations: %v", s2.Properties.Delegations) } - if s.Properties.Delegations[0].Name != subnet.Properties.Delegations[0].Name { - t.Fatalf("got unexpected delegation: %v", s.Properties.Delegations[0]) + if s2.Properties.Delegations[0].Name != subnet.Properties.Delegations[0].Name { + t.Fatalf("got unexpected delegation: %v", s2.Properties.Delegations[0]) } }