Fix several bugs in the VNET (#287)

* Add more support regions

* Add kube-proxy sidecar container

* Kube-proxy

* Fix several bugs

* indent
This commit is contained in:
Robbie Zhang
2018-08-02 13:58:31 -07:00
parent f3578bfc9b
commit bac3a585da
6 changed files with 192 additions and 58 deletions

View File

@@ -44,6 +44,13 @@ const (
subnetsAction = "Microsoft.Network/virtualNetworks/subnets/action"
subnetDelegationService = "Microsoft.ContainerInstance/containerGroups"
networkProfileType = "Microsoft.Network/networkProfiles"
// KubeProxy SideCar Container
kubeProxyContainerName = "vk-side-car-kube-proxy"
kubeProxyImageName = "k8s-gcrio.azureedge.net/hyperkube-amd64:v1.8.2"
kubeConfigDir = "/etc/kube-proxy"
kubeConfigFile = "kubeconfig"
kubeConfigSecretVolume = "vk-side-car-kubeconfig-secret-volume"
)
// ACIProvider implements the virtual-kubelet provider interface and communicates with Azure's ACI APIs.
@@ -63,8 +70,11 @@ type ACIProvider struct {
subnetName string
subnetCIDR string
vnetName string
vnetResourceGroup string
networkProfile string
masterURI string
clusterCIDR string
metricsSync sync.Mutex
metricsSyncTime time.Time
lastMetric *stats.Summary
@@ -92,6 +102,8 @@ var validAciRegions = []string{
"westeurope",
"southeastasia",
"australiaeast",
"eastus2euap",
"westcentralus",
}
// isValidACIRegion checks to make sure we're using a valid ACI region
@@ -158,9 +170,13 @@ func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operat
p.resourceGroup = acsCredential.ResourceGroup
p.region = acsCredential.Region
}
p.vnetName = acsCredential.VNetName
p.vnetName = acsCredential.VNetName
p.vnetResourceGroup = acsCredential.VNetResourceGroup
if p.vnetResourceGroup == "" {
p.vnetResourceGroup = p.resourceGroup
}
}
}
if clientID := os.Getenv("AZURE_CLIENT_ID"); clientID != "" {
@@ -240,7 +256,12 @@ func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operat
p.pods = podsQuota
}
if subnetName := os.Getenv("ACI_SUBNET_NAME"); subnetName != "" {
p.operatingSystem = operatingSystem
p.nodeName = nodeName
p.internalIP = internalIP
p.daemonEndpointPort = daemonEndpointPort
if subnetName := os.Getenv("ACI_SUBNET_NAME"); p.vnetName != "" && subnetName != "" {
p.subnetName = subnetName
}
if subnetCIDR := os.Getenv("ACI_SUBNET_CIDR"); subnetCIDR != "" {
@@ -259,10 +280,15 @@ func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operat
}
}
p.operatingSystem = operatingSystem
p.nodeName = nodeName
p.internalIP = internalIP
p.daemonEndpointPort = daemonEndpointPort
p.masterURI = "10.0.0.1"
if masterURI := os.Getenv("MASTER_URI"); masterURI != "" {
p.masterURI = masterURI
}
p.clusterCIDR = "10.240.0.0/16"
if clusterCIDR := os.Getenv("CLUSTER_CIDR"); clusterCIDR != "" {
p.clusterCIDR = clusterCIDR
}
return &p, err
}
@@ -274,7 +300,7 @@ func (p *ACIProvider) setupNetworkProfile(auth *client.Authentication) error {
}
createSubnet := true
subnet, err := c.GetSubnet(p.resourceGroup, p.vnetName, p.subnetName)
subnet, err := c.GetSubnet(p.vnetResourceGroup, p.vnetName, p.subnetName)
if err != nil && !network.IsNotFound(err) {
return fmt.Errorf("error while looking up subnet: %v", err)
}
@@ -295,7 +321,8 @@ func (p *ACIProvider) setupNetworkProfile(auth *client.Authentication) error {
subnet = &network.Subnet{Name: p.subnetName}
}
populateSubnet(subnet, p.subnetCIDR)
if err = c.CreateOrUpdateSubnet(p.resourceGroup, p.vnetName, subnet); err != nil {
subnet, err = c.CreateOrUpdateSubnet(p.resourceGroup, p.vnetName, subnet)
if err != nil {
return fmt.Errorf("error creating subnet: %v", err)
}
}
@@ -308,25 +335,28 @@ func (p *ACIProvider) setupNetworkProfile(auth *client.Authentication) error {
for _, config := range profile.Properties.ContainerNetworkInterfaceConfigurations {
for _, ipConfig := range config.Properties.IPConfigurations {
if ipConfig.Properties.Subnet.ID == subnet.ID {
p.networkProfile = profile.ID
return nil
}
}
}
return fmt.Errorf("found existing network profile but the profile is not linked to the subnet")
return fmt.Errorf("found existing network profile but the profile is not linked to the subnet: %v, %v", profile, err)
}
// at this point, profile should be nil
profile = &network.Profile{
Name: p.nodeName,
Location: p.region,
Type: networkProfileType,
}
populateNetworkProfile(profile, subnet)
if err := c.CreateOrUpdateProfile(p.resourceGroup, profile); err != nil {
profile, err = c.CreateOrUpdateProfile(p.resourceGroup, profile)
if err != nil {
return err
}
p.networkProfile = profile.ID
p.networkProfile = profile.ID
return nil
}
@@ -371,7 +401,6 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error {
containerGroup.Location = p.region
containerGroup.RestartPolicy = aci.ContainerGroupRestartPolicy(pod.Spec.RestartPolicy)
containerGroup.ContainerGroupProperties.OsType = aci.OperatingSystemTypes(p.OperatingSystem())
containerGroup.NetworkProfile = &aci.NetworkProfileDefinition{ID: p.networkProfile}
// get containers
containers, err := p.getContainers(pod)
@@ -411,7 +440,7 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error {
})
}
}
if len(ports) > 0 {
if len(ports) > 0 && p.subnetName == "" {
containerGroup.ContainerGroupProperties.IPAddress = &aci.IPAddress{
Ports: ports,
Type: "Public",
@@ -433,7 +462,8 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error {
"CreationTimestamp": podCreationTimestamp,
}
// TODO(BJK) containergrouprestartpolicy??
p.amendVnetResources(&containerGroup)
_, err = p.aciClient.CreateContainerGroup(
p.resourceGroup,
containerGroupName(pod),
@@ -447,6 +477,61 @@ func containerGroupName(pod *v1.Pod) string {
return fmt.Sprintf("%s-%s", pod.Namespace, pod.Name)
}
func (p *ACIProvider) amendVnetResources(containerGroup *aci.ContainerGroup) {
if p.networkProfile == "" {
return
}
containerGroup.NetworkProfile = &aci.NetworkProfileDefinition{ID: p.networkProfile}
//containerGroup.ContainerGroupProperties.Containers = append(containerGroup.ContainerGroupProperties.Containers, *(getKubeProxyContainerSpec(p.clusterCIDR)))
//containerGroup.ContainerGroupProperties.Volumes = append(containerGroup.ContainerGroupProperties.Volumes, *(getKubeProxyVolumeSpec(p.masterURI)))
}
func getKubeProxyContainerSpec(clusterCIDR string) *aci.Container {
container := aci.Container{
Name: kubeProxyContainerName,
ContainerProperties: aci.ContainerProperties{
Image: kubeProxyImageName,
Command: []string{
"/hyperkube",
"proxy",
"--kubeconfig="+kubeConfigDir+"/"+kubeConfigFile,
"--cluster-cidr="+clusterCIDR,
},
},
}
container.VolumeMounts = []aci.VolumeMount{
aci.VolumeMount{
Name: kubeConfigSecretVolume,
MountPath: kubeConfigDir,
ReadOnly: true,
},
}
container.Resources = aci.ResourceRequirements{
Requests: &aci.ResourceRequests{
CPU: 0.1,
MemoryInGB: 0.10,
},
}
return &container
}
func getKubeProxyVolumeSpec(masterURI string) *aci.Volume {
paths := make(map[string]string)
volume := aci.Volume{
Name: kubeConfigSecretVolume,
Secret: paths,
}
return &volume
}
// UpdatePod is a noop, ACI currently does not support live updates of a pod.
func (p *ACIProvider) UpdatePod(pod *v1.Pod) error {
return nil

View File

@@ -11,14 +11,15 @@ import (
// AcsCredential represents the credential file for ACS
type AcsCredential struct {
Cloud string `json:"cloud"`
TenantID string `json:"tenantId"`
SubscriptionID string `json:"subscriptionId"`
ClientID string `json:"aadClientId"`
ClientSecret string `json:"aadClientSecret"`
ResourceGroup string `json:"resourceGroup"`
Region string `json:"location"`
VNetName string `json:"vnetName"`
Cloud string `json:"cloud"`
TenantID string `json:"tenantId"`
SubscriptionID string `json:"subscriptionId"`
ClientID string `json:"aadClientId"`
ClientSecret string `json:"aadClientSecret"`
ResourceGroup string `json:"resourceGroup"`
Region string `json:"location"`
VNetName string `json:"vnetName"`
VNetResourceGroup string `json:"vnetResourceGroup"`
}
// NewAcsCredential returns an AcsCredential struct from file path

View File

@@ -105,7 +105,7 @@ func (c *Client) GetProfile(resourceGroup, name string) (*Profile, error) {
}
// CreateOrUpdateProfile creates or updates an Azure network profile
func (c *Client) CreateOrUpdateProfile(resourceGroup string, p *Profile) error {
func (c *Client) CreateOrUpdateProfile(resourceGroup string, p *Profile) (*Profile, error) {
urlParams := url.Values{
"api-version": []string{apiVersion},
}
@@ -117,12 +117,12 @@ func (c *Client) CreateOrUpdateProfile(resourceGroup string, p *Profile) error {
// Create the request.
b, err := json.Marshal(p)
if err != nil {
return errors.Wrap(err, "marshalling networking profile failed")
return nil, errors.Wrap(err, "marshalling networking profile failed")
}
req, err := http.NewRequest("PUT", uri, bytes.NewReader(b))
if err != nil {
return errors.Wrap(err, "creating network profile create uri request failed")
return nil, errors.Wrap(err, "creating network profile create uri request failed")
}
// Add the parameters to the url.
@@ -131,19 +131,30 @@ func (c *Client) CreateOrUpdateProfile(resourceGroup string, p *Profile) error {
"resourceGroupName": resourceGroup,
"profileName": p.Name,
}); err != nil {
return errors.Wrap(err, "expanding URL with parameters failed")
return nil, errors.Wrap(err, "expanding URL with parameters failed")
}
// Send the request.
resp, err := c.hc.Do(req)
if err != nil {
return errors.Wrap(err, "sending get network profile request failed")
return nil, errors.Wrap(err, "sending get network profile request failed")
}
defer resp.Body.Close()
// 200 (OK) is a success response.
if err := api.CheckResponse(resp); err != nil {
return err
}
return errors.Wrap(json.NewDecoder(resp.Body).Decode(p), "error decoding network profile create response")
// 200 (OK) is a success response.
if err := api.CheckResponse(resp); err != nil {
return nil, err
}
// Decode the body from the response.
if resp.Body == nil {
return nil, errors.New("create network profile returned an empty body in the response")
}
var profile Profile
if err := json.NewDecoder(resp.Body).Decode(&profile); err != nil {
return nil, errors.Wrap(err, "decoding create network profile response body failed")
}
return &profile, nil
}

View File

@@ -5,6 +5,20 @@ import (
"testing"
)
func TestGetProfileNotFound(t *testing.T) {
c := newTestClient(t)
p, err := c.GetProfile(resourceGroup, "someprofile")
if err == nil {
t.Fatalf("expect error when getting the non-exist profile: %v", p)
}
if !IsNotFound(err) {
t.Fatal("expect NotFound error")
}
if p != nil {
t.Fatal("unexpected profile")
}
}
func TestCreateGetProfile(t *testing.T) {
c := newTestClient(t)
ensureVnet(t, t.Name())
@@ -15,9 +29,12 @@ func TestCreateGetProfile(t *testing.T) {
AddressPrefix: "10.0.0.0/24",
},
}
if err := c.CreateOrUpdateSubnet(resourceGroup, t.Name(), subnet); err != nil {
subnet, err := c.CreateOrUpdateSubnet(resourceGroup, t.Name(), subnet)
if err != nil {
t.Fatal(err)
}
p := &Profile{
Name: t.Name(),
Type: "Microsoft.Network/networkProfiles",
@@ -43,11 +60,19 @@ func TestCreateGetProfile(t *testing.T) {
},
}
if err := c.CreateOrUpdateProfile(resourceGroup, p); err != nil {
p1, err := c.CreateOrUpdateProfile(resourceGroup, p)
if err != nil {
t.Fatal(err)
}
if p1 == nil {
t.Fatal("create profile should return profile")
}
if p1.ID == "" {
t.Fatal("create profile should return profile.ID")
}
p2, err := c.GetProfile(resourceGroup, p.Name)
var p2 *Profile
p2, err = c.GetProfile(resourceGroup, p.Name)
if err != nil {
t.Fatal(err)
}

View File

@@ -108,7 +108,7 @@ func (c *Client) GetSubnet(resourceGroup, vnet, name string) (*Subnet, error) {
}
// CreateOrUpdateSubnet creates a new or updates an existing subnet in the defined resourcegroup/vnet
func (c *Client) CreateOrUpdateSubnet(resourceGroup, vnet string, subnet *Subnet) error {
func (c *Client) CreateOrUpdateSubnet(resourceGroup, vnet string, s *Subnet) (*Subnet, error) {
urlParams := url.Values{
"api-version": []string{apiVersion},
}
@@ -118,37 +118,41 @@ func (c *Client) CreateOrUpdateSubnet(resourceGroup, vnet string, subnet *Subnet
uri += "?" + url.Values(urlParams).Encode()
// Create the request.
b, err := json.Marshal(subnet)
b, err := json.Marshal(s)
if err != nil {
return errors.Wrap(err, "marshallig networking profile failed")
return nil, errors.Wrap(err, "marshallig networking profile failed")
}
req, err := http.NewRequest("PUT", uri, bytes.NewReader(b))
if err != nil {
return errors.New("creating subnet create uri request failed")
return nil, errors.New("creating subnet create uri request failed")
}
// Add the parameters to the url.
if err := api.ExpandURL(req.URL, map[string]string{
"subscriptionId": c.auth.SubscriptionID,
"resourceGroupName": resourceGroup,
"subnetName": subnet.Name,
"subnetName": s.Name,
"vnetName": vnet,
}); err != nil {
return errors.Wrap(err, "expanding URL with parameters failed")
return nil, errors.Wrap(err, "expanding URL with parameters failed")
}
// Send the request.
resp, err := c.hc.Do(req)
if err != nil {
return errors.Wrap(err, "sending create subnet request failed")
return nil, errors.Wrap(err, "sending create subnet request failed")
}
defer resp.Body.Close()
// 200 (OK) is a success response.
if err := api.CheckResponse(resp); err != nil {
return err
return nil, err
}
return errors.Wrap(json.NewDecoder(resp.Body).Decode(subnet), "error decoding create subnet response")
var subnet Subnet
if err := json.NewDecoder(resp.Body).Decode(&subnet); err != nil {
return nil, err
}
return &subnet, nil
}

View File

@@ -19,24 +19,32 @@ func TestCreateGetSubnet(t *testing.T) {
}
ensureVnet(t, t.Name())
if err := c.CreateOrUpdateSubnet(resourceGroup, t.Name(), subnet); err != nil {
t.Fatal(err)
}
s, err := c.GetSubnet(resourceGroup, t.Name(), subnet.Name)
s1, err := c.CreateOrUpdateSubnet(resourceGroup, t.Name(), subnet)
if err != nil {
t.Fatal(err)
}
if s.Name != subnet.Name {
if s1 == nil {
t.Fatal("create subnet should return subnet")
}
if s1.ID == "" {
t.Fatal("create subnet should return subnet.ID")
}
var s2 *Subnet
s2, err = c.GetSubnet(resourceGroup, t.Name(), subnet.Name)
if err != nil {
t.Fatal(err)
}
if s2.Name != subnet.Name {
t.Fatal("got unexpected subnet")
}
if s.Properties.AddressPrefix != subnet.Properties.AddressPrefix {
t.Fatalf("got unexpected address prefix: %s", s.Properties.AddressPrefix)
if s2.Properties.AddressPrefix != subnet.Properties.AddressPrefix {
t.Fatalf("got unexpected address prefix: %s", s2.Properties.AddressPrefix)
}
if len(s.Properties.Delegations) != 1 {
t.Fatalf("got unexpected delgations: %v", s.Properties.Delegations)
if len(s2.Properties.Delegations) != 1 {
t.Fatalf("got unexpected delgations: %v", s2.Properties.Delegations)
}
if s.Properties.Delegations[0].Name != subnet.Properties.Delegations[0].Name {
t.Fatalf("got unexpected delegation: %v", s.Properties.Delegations[0])
if s2.Properties.Delegations[0].Name != subnet.Properties.Delegations[0].Name {
t.Fatalf("got unexpected delegation: %v", s2.Properties.Delegations[0])
}
}