Plumb context to providers

This commit is contained in:
Brian Goff
2018-08-20 14:56:39 -07:00
parent 4e20fc40ca
commit 8091b089a2
24 changed files with 277 additions and 253 deletions

View File

@@ -1,6 +1,7 @@
package aws
import (
"context"
"fmt"
"io"
"log"
@@ -108,7 +109,7 @@ func NewFargateProvider(
}
// CreatePod takes a Kubernetes Pod and deploys it within the Fargate provider.
func (p *FargateProvider) CreatePod(pod *corev1.Pod) error {
func (p *FargateProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
log.Printf("Received CreatePod request for %+v.\n", pod)
fgPod, err := fargate.NewPod(p.cluster, pod)
@@ -127,13 +128,13 @@ func (p *FargateProvider) CreatePod(pod *corev1.Pod) error {
}
// UpdatePod takes a Kubernetes Pod and updates it within the provider.
func (p *FargateProvider) UpdatePod(pod *corev1.Pod) error {
func (p *FargateProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
log.Printf("Received UpdatePod request for %s/%s.\n", pod.Namespace, pod.Name)
return errNotImplemented
}
// DeletePod takes a Kubernetes Pod and deletes it from the provider.
func (p *FargateProvider) DeletePod(pod *corev1.Pod) error {
func (p *FargateProvider) DeletePod(ctx context.Context, pod *corev1.Pod) error {
log.Printf("Received DeletePod request for %s/%s.\n", pod.Namespace, pod.Name)
fgPod, err := p.cluster.GetPod(pod.Namespace, pod.Name)
@@ -152,7 +153,7 @@ func (p *FargateProvider) DeletePod(pod *corev1.Pod) error {
}
// GetPod retrieves a pod by name from the provider (can be cached).
func (p *FargateProvider) GetPod(namespace, name string) (*corev1.Pod, error) {
func (p *FargateProvider) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) {
log.Printf("Received GetPod request for %s/%s.\n", namespace, name)
pod, err := p.cluster.GetPod(namespace, name)
@@ -173,7 +174,7 @@ func (p *FargateProvider) GetPod(namespace, name string) (*corev1.Pod, error) {
}
// GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *FargateProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
func (p *FargateProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
log.Printf("Received GetContainerLogs request for %s/%s/%s.\n", namespace, podName, containerName)
return p.cluster.GetContainerLogs(namespace, podName, containerName, tail)
}
@@ -193,7 +194,7 @@ func (p *FargateProvider) ExecInContainer(
}
// GetPodStatus retrieves the status of a pod by name from the provider.
func (p *FargateProvider) GetPodStatus(namespace, name string) (*corev1.PodStatus, error) {
func (p *FargateProvider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) {
log.Printf("Received GetPodStatus request for %s/%s.\n", namespace, name)
pod, err := p.cluster.GetPod(namespace, name)
@@ -210,7 +211,7 @@ func (p *FargateProvider) GetPodStatus(namespace, name string) (*corev1.PodStatu
}
// GetPods retrieves a list of all pods running on the provider (can be cached).
func (p *FargateProvider) GetPods() ([]*corev1.Pod, error) {
func (p *FargateProvider) GetPods(ctx context.Context) ([]*corev1.Pod, error) {
log.Println("Received GetPods request.")
pods, err := p.cluster.GetPods()
@@ -237,7 +238,7 @@ func (p *FargateProvider) GetPods() ([]*corev1.Pod, error) {
}
// Capacity returns a resource list with the capacity constraints of the provider.
func (p *FargateProvider) Capacity() corev1.ResourceList {
func (p *FargateProvider) Capacity(ctx context.Context) corev1.ResourceList {
log.Println("Received Capacity request.")
return corev1.ResourceList{
@@ -250,7 +251,7 @@ func (p *FargateProvider) Capacity() corev1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is polled
// periodically to update the node status within Kubernetes.
func (p *FargateProvider) NodeConditions() []corev1.NodeCondition {
func (p *FargateProvider) NodeConditions(ctx context.Context) []corev1.NodeCondition {
log.Println("Received NodeConditions request.")
lastHeartbeatTime := metav1.Now()
@@ -312,7 +313,7 @@ func (p *FargateProvider) NodeConditions() []corev1.NodeCondition {
}
// NodeAddresses returns a list of addresses for the node status within Kubernetes.
func (p *FargateProvider) NodeAddresses() []corev1.NodeAddress {
func (p *FargateProvider) NodeAddresses(ctx context.Context) []corev1.NodeAddress {
log.Println("Received NodeAddresses request.")
return []corev1.NodeAddress{
@@ -324,7 +325,7 @@ func (p *FargateProvider) NodeAddresses() []corev1.NodeAddress {
}
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status within Kubernetes.
func (p *FargateProvider) NodeDaemonEndpoints() *corev1.NodeDaemonEndpoints {
func (p *FargateProvider) NodeDaemonEndpoints(ctx context.Context) *corev1.NodeDaemonEndpoints {
log.Println("Received NodeDaemonEndpoints request.")
return &corev1.NodeDaemonEndpoints{

View File

@@ -212,7 +212,7 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) {
}
// Confirm that there are no pods on the cluster.
pods, err := provider.GetPods()
pods, err := provider.GetPods(context.Background())
if err != nil {
t.Error(err)
}
@@ -259,13 +259,13 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) {
},
}
err = provider.CreatePod(pod)
err = provider.CreatePod(context.Background(), pod)
if err != nil {
t.Fatal(err)
}
// Now there should be exactly one pod.
pods, err = provider.GetPods()
pods, err = provider.GetPods(context.Background())
if err != nil {
t.Error(err)
}
@@ -282,7 +282,7 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) {
// Wait a few seconds for the logs to settle.
time.Sleep(10 * time.Second)
logs, err := provider.GetContainerLogs("default", podName, "echo-container", 100)
logs, err := provider.GetContainerLogs(context.Background(), "default", podName, "echo-container", 100)
if err != nil {
t.Error(err)
}
@@ -302,7 +302,7 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) {
}
// Delete the pod.
err = provider.DeletePod(pod)
err = provider.DeletePod(context.Background(), pod)
if err != nil {
t.Fatal(err)
}
@@ -313,7 +313,7 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) {
}
// The cluster should be empty again.
pods, err = provider.GetPods()
pods, err = provider.GetPods(context.Background())
if err != nil {
t.Error(err)
}
@@ -340,7 +340,7 @@ func waitUntilPodStatus(provider *vkAWS.FargateProvider, podName string, desired
case <-ctx.Done():
return ctx.Err()
default:
status, err := provider.GetPodStatus("default", podName)
status, err := provider.GetPodStatus(context.Background(), "default", podName)
if err != nil {
if strings.Contains(err.Error(), "is not found") {
return nil

View File

@@ -492,7 +492,7 @@ func getKubeProxyExtension(secretPath, masterURI, clusterCIDR string) (*aci.Exte
// CreatePod accepts a Pod definition and creates
// an ACI deployment
func (p *ACIProvider) CreatePod(pod *v1.Pod) error {
func (p *ACIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
var containerGroup aci.ContainerGroup
containerGroup.Location = p.region
containerGroup.RestartPolicy = aci.ContainerGroupRestartPolicy(pod.Spec.RestartPolicy)
@@ -561,6 +561,7 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error {
p.amendVnetResources(&containerGroup, pod)
_, err = p.aciClient.CreateContainerGroup(
ctx,
p.resourceGroup,
containerGroupName(pod),
containerGroup,
@@ -684,19 +685,19 @@ func containerGroupName(pod *v1.Pod) string {
}
// UpdatePod is a noop, ACI currently does not support live updates of a pod.
func (p *ACIProvider) UpdatePod(pod *v1.Pod) error {
func (p *ACIProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
return nil
}
// DeletePod deletes the specified pod out of ACI.
func (p *ACIProvider) DeletePod(pod *v1.Pod) error {
return p.aciClient.DeleteContainerGroup(p.resourceGroup, fmt.Sprintf("%s-%s", pod.Namespace, pod.Name))
func (p *ACIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error {
return p.aciClient.DeleteContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", pod.Namespace, pod.Name))
}
// GetPod returns a pod by name that is running inside ACI
// returns nil if a pod by that name is not found.
func (p *ACIProvider) GetPod(namespace, name string) (*v1.Pod, error) {
cg, err, status := p.aciClient.GetContainerGroup(p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name))
func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) {
cg, err, status := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name))
if err != nil {
if *status == http.StatusNotFound {
return nil, nil
@@ -712,9 +713,9 @@ func (p *ACIProvider) GetPod(namespace, name string) (*v1.Pod, error) {
}
// GetContainerLogs returns the logs of a pod by name that is running inside ACI.
func (p *ACIProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
logContent := ""
cg, err, _ := p.aciClient.GetContainerGroup(p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName))
cg, err, _ := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName))
if err != nil {
return logContent, err
}
@@ -725,9 +726,9 @@ func (p *ACIProvider) GetContainerLogs(namespace, podName, containerName string,
// get logs from cg
retry := 10
for i := 0; i < retry; i++ {
cLogs, err := p.aciClient.GetContainerLogs(p.resourceGroup, cg.Name, containerName, tail)
cLogs, err := p.aciClient.GetContainerLogs(ctx, p.resourceGroup, cg.Name, containerName, tail)
if err != nil {
log.G(context.TODO()).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying")
log.G(ctx).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying")
time.Sleep(5000 * time.Millisecond)
} else {
logContent = cLogs.Content
@@ -754,7 +755,7 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri
defer errstream.Close()
}
cg, err, _ := p.aciClient.GetContainerGroup(p.resourceGroup, name)
cg, err, _ := p.aciClient.GetContainerGroup(context.TODO(), p.resourceGroup, name)
if err != nil {
return err
}
@@ -822,8 +823,8 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri
// GetPodStatus returns the status of a pod by name that is running inside ACI
// returns nil if a pod by that name is not found.
func (p *ACIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
pod, err := p.GetPod(namespace, name)
func (p *ACIProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
pod, err := p.GetPod(ctx, namespace, name)
if err != nil {
return nil, err
}
@@ -836,8 +837,8 @@ func (p *ACIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error
}
// GetPods returns a list of all pods known to be running within ACI.
func (p *ACIProvider) GetPods() ([]*v1.Pod, error) {
cgs, err := p.aciClient.ListContainerGroups(p.resourceGroup)
func (p *ACIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
cgs, err := p.aciClient.ListContainerGroups(ctx, p.resourceGroup)
if err != nil {
return nil, err
}
@@ -865,7 +866,7 @@ func (p *ACIProvider) GetPods() ([]*v1.Pod, error) {
}
// Capacity returns a resource list containing the capacity limits set for ACI.
func (p *ACIProvider) Capacity() v1.ResourceList {
func (p *ACIProvider) Capacity(ctx context.Context) v1.ResourceList {
return v1.ResourceList{
"cpu": resource.MustParse(p.cpu),
"memory": resource.MustParse(p.memory),
@@ -875,7 +876,7 @@ func (p *ACIProvider) Capacity() v1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
// within Kubernetes.
func (p *ACIProvider) NodeConditions() []v1.NodeCondition {
func (p *ACIProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
// TODO: Make these dynamic and augment with custom ACI specific conditions of interest
return []v1.NodeCondition{
{
@@ -923,7 +924,7 @@ func (p *ACIProvider) NodeConditions() []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *ACIProvider) NodeAddresses() []v1.NodeAddress {
func (p *ACIProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
// TODO: Make these dynamic and augment with custom ACI specific conditions of interest
return []v1.NodeAddress{
{
@@ -935,7 +936,7 @@ func (p *ACIProvider) NodeAddresses() []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *ACIProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
func (p *ACIProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
return &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort,

View File

@@ -6,6 +6,7 @@ package azure
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
@@ -77,7 +78,7 @@ func TestCreatePodWithoutResourceSpec(t *testing.T) {
},
}
if err := provider.CreatePod(pod); err != nil {
if err := provider.CreatePod(context.Background(), pod); err != nil {
t.Fatal("Failed to create pod", err)
}
}
@@ -131,7 +132,7 @@ func TestCreatePodWithResourceRequestOnly(t *testing.T) {
},
}
if err := provider.CreatePod(pod); err != nil {
if err := provider.CreatePod(context.Background(), pod); err != nil {
t.Fatal("Failed to create pod", err)
}
}
@@ -190,7 +191,7 @@ func TestCreatePodWithResourceRequestAndLimit(t *testing.T) {
},
}
if err := provider.CreatePod(pod); err != nil {
if err := provider.CreatePod(context.Background(), pod); err != nil {
t.Fatal("Failed to create pod", err)
}
}
@@ -212,7 +213,7 @@ func TestGetPodsWithEmptyList(t *testing.T) {
}
}
pods, err := provider.GetPods()
pods, err := provider.GetPods(context.Background())
if err != nil {
t.Fatal("Failed to get pods", err)
}
@@ -269,7 +270,7 @@ func TestGetPodsWithoutResourceRequestsLimits(t *testing.T) {
}
}
pods, err := provider.GetPods()
pods, err := provider.GetPods(context.Background())
if err != nil {
t.Fatal("Failed to get pods", err)
}
@@ -343,7 +344,7 @@ func TestGetPodWithoutResourceRequestsLimits(t *testing.T) {
}
}
pod, err := provider.GetPod(podNamespace, podName)
pod, err := provider.GetPod(context.Background(), podNamespace, podName)
if err != nil {
t.Fatal("Failed to get pod", err)
}
@@ -385,7 +386,7 @@ func TestGetPodWithContainerID(t *testing.T) {
assert.Equal(t, podNamespace+"-"+podName, containerGroup, "Container group name is not expected")
return http.StatusOK, aci.ContainerGroup{
ID: cgID,
ID: cgID,
Tags: map[string]string{
"NodeName": fakeNodeName,
},
@@ -405,7 +406,7 @@ func TestGetPodWithContainerID(t *testing.T) {
},
Resources: aci.ResourceRequirements{
Requests: &aci.ResourceRequests{
CPU: 0.99,
CPU: 0.99,
MemoryInGB: 1.5,
},
},
@@ -416,7 +417,7 @@ func TestGetPodWithContainerID(t *testing.T) {
}
}
pod, err := provider.GetPod(podNamespace, podName)
pod, err := provider.GetPod(context.Background(), podNamespace, podName)
if err != nil {
t.Fatal("Failed to get pod", err)
}
@@ -586,7 +587,7 @@ func TestCreatePodWithLivenessProbe(t *testing.T) {
},
}
if err := provider.CreatePod(pod); err != nil {
if err := provider.CreatePod(context.Background(), pod); err != nil {
t.Fatal("Failed to create pod", err)
}
}
@@ -648,7 +649,7 @@ func TestCreatePodWithReadinessProbe(t *testing.T) {
},
}
if err := provider.CreatePod(pod); err != nil {
if err := provider.CreatePod(context.Background(), pod); err != nil {
t.Fatal("Failed to create pod", err)
}
}
}

View File

@@ -1,6 +1,7 @@
package aci
import (
"context"
"encoding/base64"
"fmt"
"log"
@@ -90,7 +91,7 @@ func TestNewClient(t *testing.T) {
}
func TestCreateContainerGroupFails(t *testing.T) {
_, err := client.CreateContainerGroup(resourceGroup, containerGroup, ContainerGroup{
_, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroup, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
@@ -121,7 +122,7 @@ func TestCreateContainerGroupFails(t *testing.T) {
}
func TestCreateContainerGroupWithoutResourceLimit(t *testing.T) {
cg, err := client.CreateContainerGroup(resourceGroup, containerGroup, ContainerGroup{
cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroup, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
@@ -155,13 +156,13 @@ func TestCreateContainerGroupWithoutResourceLimit(t *testing.T) {
t.Fatalf("resource group name is %s, expected %s", cg.Name, containerGroup)
}
if err := client.DeleteContainerGroup(resourceGroup, containerGroup); err != nil {
if err := client.DeleteContainerGroup(context.Background(), resourceGroup, containerGroup); err != nil {
t.Fatal(err)
}
}
func TestCreateContainerGroup(t *testing.T) {
cg, err := client.CreateContainerGroup(resourceGroup, containerGroup, ContainerGroup{
cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroup, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
@@ -201,7 +202,7 @@ func TestCreateContainerGroup(t *testing.T) {
}
func TestCreateContainerGroupWithBadVNetFails(t *testing.T) {
_, err := client.CreateContainerGroup(resourceGroup, containerGroup, ContainerGroup{
_, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroup, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
@@ -250,7 +251,7 @@ func TestCreateContainerGroupWithBadVNetFails(t *testing.T) {
}
func TestGetContainerGroup(t *testing.T) {
cg, err, _ := client.GetContainerGroup(resourceGroup, containerGroup)
cg, err, _ := client.GetContainerGroup(context.Background(), resourceGroup, containerGroup)
if err != nil {
t.Fatal(err)
}
@@ -260,7 +261,7 @@ func TestGetContainerGroup(t *testing.T) {
}
func TestListContainerGroup(t *testing.T) {
list, err := client.ListContainerGroups(resourceGroup)
list, err := client.ListContainerGroups(context.Background(), resourceGroup)
if err != nil {
t.Fatal(err)
}
@@ -274,7 +275,7 @@ func TestListContainerGroup(t *testing.T) {
func TestCreateContainerGroupWithLivenessProbe(t *testing.T) {
uid := uuid.New()
containerGroupName := containerGroup + "-" + uid.String()[0:6]
cg, err := client.CreateContainerGroup(resourceGroup, containerGroupName, ContainerGroup{
cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroupName, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
@@ -321,7 +322,7 @@ func TestCreateContainerGroupWithLivenessProbe(t *testing.T) {
func TestCreateContainerGroupFailsWithLivenessProbeMissingPort(t *testing.T) {
uid := uuid.New()
containerGroupName := containerGroup + "-" + uid.String()[0:6]
_, err := client.CreateContainerGroup(resourceGroup, containerGroupName, ContainerGroup{
_, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroupName, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
@@ -365,7 +366,7 @@ func TestCreateContainerGroupFailsWithLivenessProbeMissingPort(t *testing.T) {
func TestCreateContainerGroupWithReadinessProbe(t *testing.T) {
uid := uuid.New()
containerGroupName := containerGroup + "-" + uid.String()[0:6]
cg, err := client.CreateContainerGroup(resourceGroup, containerGroupName, ContainerGroup{
cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroupName, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
@@ -420,7 +421,7 @@ func TestCreateContainerGroupWithLogAnalytics(t *testing.T) {
t.Fatal(err)
}
cgname := "cgla"
cg, err := client.CreateContainerGroup(resourceGroup, cgname, ContainerGroup{
cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, cgname, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
@@ -458,14 +459,14 @@ func TestCreateContainerGroupWithLogAnalytics(t *testing.T) {
if cg.Name != cgname {
t.Fatalf("resource group name is %s, expected %s", cg.Name, cgname)
}
if err := client.DeleteContainerGroup(resourceGroup, cgname); err != nil {
if err := client.DeleteContainerGroup(context.Background(), resourceGroup, cgname); err != nil {
t.Fatalf("Delete Container Group failed: %s", err.Error())
}
}
func TestCreateContainerGroupWithInvalidLogAnalytics(t *testing.T) {
law := &LogAnalyticsWorkspace{}
_, err := client.CreateContainerGroup(resourceGroup, containerGroup, ContainerGroup{
_, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroup, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
@@ -506,52 +507,52 @@ func TestCreateContainerGroupWithInvalidLogAnalytics(t *testing.T) {
func TestCreateContainerGroupWithVNet(t *testing.T) {
uid := uuid.New()
containerGroupName := containerGroup + "-" + uid.String()[0:6]
containerGroupName := containerGroup + "-" + uid.String()[0:6]
fakeKubeConfig := base64.StdEncoding.EncodeToString([]byte(uid.String()))
networkProfileId := "/subscriptions/ae43b1e3-c35d-4c8c-bc0d-f148b4c52b78/resourceGroups/aci-connector/providers/Microsoft.Network/networkprofiles/aci-connector-network-profile-westus"
diagnostics, err := NewContainerGroupDiagnosticsFromFile("../../../../loganalytics.json")
if err != nil {
t.Fatal(err)
}
diagnostics, err := NewContainerGroupDiagnosticsFromFile("../../../../loganalytics.json")
if err != nil {
t.Fatal(err)
}
diagnostics.LogAnalytics.LogType = LogAnlyticsLogTypeContainerInsights
cg, err := client.CreateContainerGroup(resourceGroup, containerGroupName, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
Containers: []Container{
{
Name: "nginx",
ContainerProperties: ContainerProperties{
Image: "nginx",
Command: []string{"nginx", "-g", "daemon off;"},
Ports: []ContainerPort{
{
Protocol: ContainerNetworkProtocolTCP,
Port: 80,
},
},
Resources: ResourceRequirements{
Requests: &ResourceRequests{
CPU: 1,
MemoryInGB: 1,
},
Limits: &ResourceLimits{
CPU: 1,
MemoryInGB: 1,
},
},
},
},
},
NetworkProfile: &NetworkProfileDefinition{
ID: networkProfileId,
},
cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroupName, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
Containers: []Container{
{
Name: "nginx",
ContainerProperties: ContainerProperties{
Image: "nginx",
Command: []string{"nginx", "-g", "daemon off;"},
Ports: []ContainerPort{
{
Protocol: ContainerNetworkProtocolTCP,
Port: 80,
},
},
Resources: ResourceRequirements{
Requests: &ResourceRequests{
CPU: 1,
MemoryInGB: 1,
},
Limits: &ResourceLimits{
CPU: 1,
MemoryInGB: 1,
},
},
},
},
},
NetworkProfile: &NetworkProfileDefinition{
ID: networkProfileId,
},
Extensions: []*Extension{
&Extension{
Name: "kube-proxy",
Properties: &ExtensionProperties{
Properties: &ExtensionProperties{
Type: ExtensionTypeKubeProxy,
Version: ExtensionVersion1_0,
Settings: map[string]string{
@@ -567,23 +568,23 @@ func TestCreateContainerGroupWithVNet(t *testing.T) {
DNSConfig: &DNSConfig{
NameServers: []string{"1.1.1.1"},
},
Diagnostics: diagnostics,
},
})
Diagnostics: diagnostics,
},
})
if err != nil {
t.Fatal(err)
}
if cg.Name != containerGroupName {
t.Fatalf("resource group name is %s, expected %s", cg.Name, containerGroupName)
}
if err := client.DeleteContainerGroup(resourceGroup, containerGroupName); err != nil {
t.Fatalf("Delete Container Group failed: %s", err.Error())
}
if err != nil {
t.Fatal(err)
}
if cg.Name != containerGroupName {
t.Fatalf("resource group name is %s, expected %s", cg.Name, containerGroupName)
}
if err := client.DeleteContainerGroup(context.Background(), resourceGroup, containerGroupName); err != nil {
t.Fatalf("Delete Container Group failed: %s", err.Error())
}
}
func TestDeleteContainerGroup(t *testing.T) {
err := client.DeleteContainerGroup(resourceGroup, containerGroup)
err := client.DeleteContainerGroup(context.Background(), resourceGroup, containerGroup)
if err != nil {
t.Fatal(err)
}

View File

@@ -2,6 +2,7 @@ package aci
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -14,7 +15,7 @@ import (
// CreateContainerGroup creates a new Azure Container Instance with the
// provided properties.
// From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/createorupdate
func (c *Client) CreateContainerGroup(resourceGroup, containerGroupName string, containerGroup ContainerGroup) (*ContainerGroup, error) {
func (c *Client) CreateContainerGroup(ctx context.Context, resourceGroup, containerGroupName string, containerGroup ContainerGroup) (*ContainerGroup, error) {
urlParams := url.Values{
"api-version": []string{apiVersion},
}
@@ -34,6 +35,7 @@ func (c *Client) CreateContainerGroup(resourceGroup, containerGroupName string,
if err != nil {
return nil, fmt.Errorf("Creating create/update container group uri request failed: %v", err)
}
req = req.WithContext(ctx)
// Add the parameters to the url.
if err := api.ExpandURL(req.URL, map[string]string{

View File

@@ -1,6 +1,7 @@
package aci
import (
"context"
"fmt"
"net/http"
"net/url"
@@ -11,7 +12,7 @@ import (
// DeleteContainerGroup deletes an Azure Container Instance in the provided
// resource group with the given container group name.
// From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/delete
func (c *Client) DeleteContainerGroup(resourceGroup, containerGroupName string) error {
func (c *Client) DeleteContainerGroup(ctx context.Context, resourceGroup, containerGroupName string) error {
urlParams := url.Values{
"api-version": []string{apiVersion},
}
@@ -25,6 +26,7 @@ func (c *Client) DeleteContainerGroup(resourceGroup, containerGroupName string)
if err != nil {
return fmt.Errorf("Creating delete container group uri request failed: %v", err)
}
req = req.WithContext(ctx)
// Add the parameters to the url.
if err := api.ExpandURL(req.URL, map[string]string{

View File

@@ -1,6 +1,7 @@
package aci
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -13,7 +14,7 @@ import (
// GetContainerGroup gets an Azure Container Instance in the provided
// resource group with the given container group name.
// From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/get
func (c *Client) GetContainerGroup(resourceGroup, containerGroupName string) (*ContainerGroup, error, *int) {
func (c *Client) GetContainerGroup(ctx context.Context, resourceGroup, containerGroupName string) (*ContainerGroup, error, *int) {
urlParams := url.Values{
"api-version": []string{apiVersion},
}
@@ -27,6 +28,7 @@ func (c *Client) GetContainerGroup(resourceGroup, containerGroupName string) (*C
if err != nil {
return nil, fmt.Errorf("Creating get container group uri request failed: %v", err), nil
}
req = req.WithContext(ctx)
// Add the parameters to the url.
if err := api.ExpandURL(req.URL, map[string]string{

View File

@@ -1,6 +1,7 @@
package aci
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -16,7 +17,7 @@ import (
// if it is not empty.
// From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/list
// From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/listbyresourcegroup
func (c *Client) ListContainerGroups(resourceGroup string) (*ContainerGroupListResult, error) {
func (c *Client) ListContainerGroups(ctx context.Context, resourceGroup string) (*ContainerGroupListResult, error) {
urlParams := url.Values{
"api-version": []string{apiVersion},
}
@@ -35,6 +36,7 @@ func (c *Client) ListContainerGroups(resourceGroup string) (*ContainerGroupListR
if err != nil {
return nil, fmt.Errorf("Creating get container group list uri request failed: %v", err)
}
req = req.WithContext(ctx)
// Add the parameters to the url.
if err := api.ExpandURL(req.URL, map[string]string{

View File

@@ -1,6 +1,7 @@
package aci
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -13,7 +14,7 @@ import (
// GetContainerLogs returns the logs from an Azure Container Instance
// in the provided resource group with the given container group name.
// From: https://docs.microsoft.com/en-us/rest/api/container-instances/ContainerLogs/List
func (c *Client) GetContainerLogs(resourceGroup, containerGroupName, containerName string, tail int) (*Logs, error) {
func (c *Client) GetContainerLogs(ctx context.Context, resourceGroup, containerGroupName, containerName string, tail int) (*Logs, error) {
urlParams := url.Values{
"api-version": []string{apiVersion},
"tail": []string{fmt.Sprintf("%d", tail)},
@@ -28,6 +29,7 @@ func (c *Client) GetContainerLogs(resourceGroup, containerGroupName, containerNa
if err != nil {
return nil, fmt.Errorf("Creating get container logs uri request failed: %v", err)
}
req = req.WithContext(ctx)
// Add the parameters to the url.
if err := api.ExpandURL(req.URL, map[string]string{

View File

@@ -1,8 +1,10 @@
package aci
import "context"
// UpdateContainerGroup updates an Azure Container Instance with the
// provided properties.
// From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/createorupdate
func (c *Client) UpdateContainerGroup(resourceGroup, containerGroupName string, containerGroup ContainerGroup) (*ContainerGroup, error) {
return c.CreateContainerGroup(resourceGroup, containerGroupName, containerGroup)
func (c *Client) UpdateContainerGroup(ctx context.Context, resourceGroup, containerGroupName string, containerGroup ContainerGroup) (*ContainerGroup, error) {
return c.CreateContainerGroup(ctx, resourceGroup, containerGroupName, containerGroup)
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/Azure/go-autorest/autorest"
"io"
"io/ioutil"
"log"
@@ -13,6 +12,8 @@ import (
"strings"
"time"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch"
@@ -157,7 +158,7 @@ func NewBatchProviderFromConfig(config *Config, rm *manager.ResourceManager, nod
}
// CreatePod accepts a Pod definition
func (p *Provider) CreatePod(pod *v1.Pod) error {
func (p *Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
log.Println("Creating pod...")
podCommand, err := pod2docker.GetBashCommand(pod2docker.PodComponents{
InitContainers: pod.Spec.InitContainers,
@@ -200,9 +201,9 @@ func (p *Provider) CreatePod(pod *v1.Pod) error {
}
// GetPodStatus retrieves the status of a given pod by name.
func (p *Provider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
func (p *Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
log.Println("Getting pod status ....")
pod, err := p.GetPod(namespace, name)
pod, err := p.GetPod(ctx, namespace, name)
if err != nil {
return nil, err
@@ -214,13 +215,13 @@ func (p *Provider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
}
// UpdatePod accepts a Pod definition
func (p *Provider) UpdatePod(pod *v1.Pod) error {
func (p *Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
log.Println("Pod Update called: No-op as not implemented")
return nil
}
// DeletePod accepts a Pod definition
func (p *Provider) DeletePod(pod *v1.Pod) error {
func (p *Provider) DeletePod(ctx context.Context, pod *v1.Pod) error {
taskID := getTaskIDForPod(pod.Namespace, pod.Name)
task, err := p.deleteTask(taskID)
if err != nil {
@@ -234,7 +235,7 @@ func (p *Provider) DeletePod(pod *v1.Pod) error {
}
// GetPod returns a pod by name
func (p *Provider) GetPod(namespace, name string) (*v1.Pod, error) {
func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) {
log.Println("Getting Pod ...")
task, err := p.getTask(getTaskIDForPod(namespace, name))
if err != nil {
@@ -257,7 +258,7 @@ func (p *Provider) GetPod(namespace, name string) (*v1.Pod, error) {
}
// GetContainerLogs returns the logs of a container running in a pod by name.
func (p *Provider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
log.Println("Getting pod logs ....")
taskID := getTaskIDForPod(namespace, podName)
@@ -318,7 +319,7 @@ func (p *Provider) ExecInContainer(name string, uid types.UID, container string,
}
// GetPods retrieves a list of all pods scheduled to run.
func (p *Provider) GetPods() ([]*v1.Pod, error) {
func (p *Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
log.Println("Getting pods...")
tasksPtr, err := p.listTasks()
if err != nil {
@@ -342,7 +343,7 @@ func (p *Provider) GetPods() ([]*v1.Pod, error) {
}
// Capacity returns a resource list containing the capacity limits
func (p *Provider) Capacity() v1.ResourceList {
func (p *Provider) Capacity(ctx context.Context) v1.ResourceList {
return v1.ResourceList{
"cpu": resource.MustParse(p.cpu),
"memory": resource.MustParse(p.memory),
@@ -353,7 +354,7 @@ func (p *Provider) Capacity() v1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
// within Kubernetes.
func (p *Provider) NodeConditions() []v1.NodeCondition {
func (p *Provider) NodeConditions(ctx context.Context) []v1.NodeCondition {
return []v1.NodeCondition{
{
Type: "Ready",
@@ -400,7 +401,7 @@ func (p *Provider) NodeConditions() []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *Provider) NodeAddresses() []v1.NodeAddress {
func (p *Provider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
// TODO: Make these dynamic and augment with custom ACI specific conditions of interest
return []v1.NodeAddress{
{
@@ -412,7 +413,7 @@ func (p *Provider) NodeAddresses() []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *Provider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
func (p *Provider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
return &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort,

View File

@@ -1,16 +1,18 @@
package azurebatch
import (
"context"
"crypto/md5"
"fmt"
"github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch"
"github.com/Azure/go-autorest/autorest"
"io/ioutil"
"net/http"
"os"
"strings"
"testing"
"github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch"
"github.com/Azure/go-autorest/autorest"
apiv1 "k8s.io/api/core/v1"
)
@@ -32,7 +34,7 @@ func Test_deletePod(t *testing.T) {
pod.Name = podName
pod.Namespace = podNamespace
err := provider.DeletePod(pod)
err := provider.DeletePod(context.Background(), pod)
if err != nil {
t.Error(err)
}
@@ -48,7 +50,7 @@ func Test_deletePod_doesntExist(t *testing.T) {
return autorest.Response{}, fmt.Errorf("Task doesn't exist")
}
err := provider.DeletePod(pod)
err := provider.DeletePod(context.Background(), pod)
if err == nil {
t.Error("Expected error but none seen")
}
@@ -72,7 +74,7 @@ func Test_createPod(t *testing.T) {
return autorest.Response{}, nil
}
err := provider.CreatePod(pod)
err := provider.CreatePod(context.Background(), pod)
if err != nil {
t.Errorf("Unexpected error creating pod %v", err)
}
@@ -88,7 +90,7 @@ func Test_createPod_errorResponse(t *testing.T) {
return autorest.Response{}, fmt.Errorf("Failed creating task")
}
err := provider.CreatePod(pod)
err := provider.CreatePod(context.Background(), pod)
if err == nil {
t.Error("Expected error but none seen")
}
@@ -118,7 +120,7 @@ func Test_readLogs_404Response_expectReturnStartupLogs(t *testing.T) {
return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask")
}
result, err := provider.GetContainerLogs(pod.Namespace, pod.Name, containerName, 0)
result, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, 0)
if err != nil {
t.Errorf("GetContainerLogs return error: %v", err)
}
@@ -152,7 +154,7 @@ func Test_readLogs_JsonResponse_expectFormattedLogs(t *testing.T) {
return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask")
}
result, err := provider.GetContainerLogs(pod.Namespace, pod.Name, containerName, 0)
result, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, 0)
if err != nil {
t.Errorf("GetContainerLogs return error: %v", err)
}

View File

@@ -2,6 +2,7 @@ package cri
import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
@@ -486,7 +487,7 @@ func generateContainerConfig(container *v1.Container, pod *v1.Pod, imageRef, pod
}
// Provider function to create a Pod
func (p *CRIProvider) CreatePod(pod *v1.Pod) error {
func (p *CRIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
log.Printf("receive CreatePod %q", pod.Name)
var attempt uint32 // TODO: Track attempts. Currently always 0
@@ -548,14 +549,14 @@ func (p *CRIProvider) CreatePod(pod *v1.Pod) error {
}
// Update is currently not required or even called by VK, so not implemented
func (p *CRIProvider) UpdatePod(pod *v1.Pod) error {
func (p *CRIProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
log.Printf("receive UpdatePod %q", pod.Name)
return nil
}
// Provider function to delete a pod and its containers
func (p *CRIProvider) DeletePod(pod *v1.Pod) error {
func (p *CRIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error {
log.Printf("receive DeletePod %q", pod.Name)
err := p.refreshNodeState()
@@ -587,7 +588,7 @@ func (p *CRIProvider) DeletePod(pod *v1.Pod) error {
}
// Provider function to return a Pod spec - mostly used for its status
func (p *CRIProvider) GetPod(namespace, name string) (*v1.Pod, error) {
func (p *CRIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) {
log.Printf("receive GetPod %q", name)
err := p.refreshNodeState()
@@ -624,7 +625,7 @@ func readLogFile(filename string, tail int) (string, error) {
}
// Provider function to read the logs of a container
func (p *CRIProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
func (p *CRIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
log.Printf("receive GetContainerLogs %q", containerName)
err := p.refreshNodeState()
@@ -672,7 +673,7 @@ func (p *CRIProvider) findPodByName(namespace, name string) *CRIPod {
}
// Provider function to return the status of a Pod
func (p *CRIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
func (p *CRIProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
log.Printf("receive GetPodStatus %q", name)
err := p.refreshNodeState()
@@ -803,7 +804,7 @@ func createPodSpecFromCRI(p *CRIPod, nodeName string) *v1.Pod {
// Provider function to return all known pods
// TODO: Should this be all pods or just running pods?
func (p *CRIProvider) GetPods() ([]*v1.Pod, error) {
func (p *CRIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
log.Printf("receive GetPods")
var pods []*v1.Pod
@@ -831,7 +832,7 @@ func getSystemTotalMemory() uint64 {
}
// Provider function to return the capacity of the node
func (p *CRIProvider) Capacity() v1.ResourceList {
func (p *CRIProvider) Capacity(ctx context.Context) v1.ResourceList {
log.Printf("receive Capacity")
err := p.refreshNodeState()
@@ -853,7 +854,7 @@ func (p *CRIProvider) Capacity() v1.ResourceList {
// Provider function to return node conditions
// TODO: For now, use the same node conditions as the MockProvider
func (p *CRIProvider) NodeConditions() []v1.NodeCondition {
func (p *CRIProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
// TODO: Make this configurable
return []v1.NodeCondition{
{
@@ -901,7 +902,7 @@ func (p *CRIProvider) NodeConditions() []v1.NodeCondition {
}
// Provider function to return a list of node addresses
func (p *CRIProvider) NodeAddresses() []v1.NodeAddress {
func (p *CRIProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
log.Printf("receive NodeAddresses - returning %s", p.internalIP)
return []v1.NodeAddress{
@@ -913,7 +914,7 @@ func (p *CRIProvider) NodeAddresses() []v1.NodeAddress {
}
// Provider function to return the daemon endpoint
func (p *CRIProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
func (p *CRIProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
log.Printf("receive NodeDaemonEndpoints - returning %v", p.daemonEndpointPort)
return &v1.NodeDaemonEndpoints{

View File

@@ -2,6 +2,7 @@ package huawei
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
@@ -191,7 +192,7 @@ func (p *CCIProvider) deletePodAnnotations(pod *v1.Pod) error {
}
// CreatePod takes a Kubernetes Pod and deploys it within the huawei CCI provider.
func (p *CCIProvider) CreatePod(pod *v1.Pod) error {
func (p *CCIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
// Create the createPod request url
p.setPodAnnotations(pod)
uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods"
@@ -218,12 +219,12 @@ func (p *CCIProvider) CreatePod(pod *v1.Pod) error {
}
// UpdatePod takes a Kubernetes Pod and updates it within the huawei CCI provider.
func (p *CCIProvider) UpdatePod(pod *v1.Pod) error {
func (p *CCIProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
return nil
}
// DeletePod takes a Kubernetes Pod and deletes it from the huawei CCI provider.
func (p *CCIProvider) DeletePod(pod *v1.Pod) error {
func (p *CCIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error {
// Create the deletePod request url
podName := pod.Namespace + "-" + pod.Name
uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods/" + podName
@@ -241,7 +242,7 @@ func (p *CCIProvider) DeletePod(pod *v1.Pod) error {
}
// GetPod retrieves a pod by name from the huawei CCI provider.
func (p *CCIProvider) GetPod(namespace, name string) (*v1.Pod, error) {
func (p *CCIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) {
// Create the getPod request url
podName := namespace + "-" + name
uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods/" + podName
@@ -276,7 +277,7 @@ func (p *CCIProvider) GetPod(namespace, name string) (*v1.Pod, error) {
}
// GetContainerLogs retrieves the logs of a container by name from the huawei CCI provider.
func (p *CCIProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
func (p *CCIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
return "", nil
}
@@ -295,8 +296,8 @@ func (p *CCIProvider) ExecInContainer(name string, uid types.UID, container stri
}
// GetPodStatus retrieves the status of a pod by name from the huawei CCI provider.
func (p *CCIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
pod, err := p.GetPod(namespace, name)
func (p *CCIProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
pod, err := p.GetPod(ctx, namespace, name)
if err != nil {
return nil, err
}
@@ -309,7 +310,7 @@ func (p *CCIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error
}
// GetPods retrieves a list of all pods running on the huawei CCI provider.
func (p *CCIProvider) GetPods() ([]*v1.Pod, error) {
func (p *CCIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
// Create the getPod request url
uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods"
r, err := http.NewRequest("GET", uri, nil)
@@ -344,7 +345,7 @@ func (p *CCIProvider) GetPods() ([]*v1.Pod, error) {
}
// Capacity returns a resource list with the capacity constraints of the huawei CCI provider.
func (p *CCIProvider) Capacity() v1.ResourceList {
func (p *CCIProvider) Capacity(ctx context.Context) v1.ResourceList {
return v1.ResourceList{
"cpu": resource.MustParse(p.cpu),
"memory": resource.MustParse(p.memory),
@@ -354,7 +355,7 @@ func (p *CCIProvider) Capacity() v1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is
// polled periodically to update the node status within Kubernetes.
func (p *CCIProvider) NodeConditions() []v1.NodeCondition {
func (p *CCIProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
// TODO: Make these dynamic and augment with custom CCI specific conditions of interest
return []v1.NodeCondition{
{
@@ -402,7 +403,7 @@ func (p *CCIProvider) NodeConditions() []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *CCIProvider) NodeAddresses() []v1.NodeAddress {
func (p *CCIProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
// TODO: Make these dynamic and augment with custom CCI specific conditions of interest
return []v1.NodeAddress{
{
@@ -414,7 +415,7 @@ func (p *CCIProvider) NodeAddresses() []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *CCIProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
func (p *CCIProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
return &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort,

View File

@@ -1,6 +1,7 @@
package huawei
import (
"context"
"net/http"
"os"
"testing"
@@ -81,7 +82,7 @@ func TestCreatePod(t *testing.T) {
},
}
if err := provider.CreatePod(pod); err != nil {
if err := provider.CreatePod(context.Background(), pod); err != nil {
t.Fatal("Failed to create pod", err)
}
}
@@ -121,7 +122,7 @@ func TestGetPod(t *testing.T) {
},
}
}
pod, err := provider.GetPod(podNamespace, podName)
pod, err := provider.GetPod(context.Background(), podNamespace, podName)
if err != nil {
t.Fatal("Failed to get pod", err)
}
@@ -172,7 +173,7 @@ func TestGetPods(t *testing.T) {
}
return http.StatusOK, []v1.Pod{pod}
}
pods, err := provider.GetPods()
pods, err := provider.GetPods(context.Background())
if err != nil {
t.Fatal("Failed to get pods", err)
}

View File

@@ -198,7 +198,7 @@ func newHTTPClient(host string, tlsOptions *tlsconfig.Options) (*http.Client, er
// CreatePod accepts a Pod definition and creates
// a hyper.sh deployment
func (p *HyperProvider) CreatePod(pod *v1.Pod) error {
func (p *HyperProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
log.Printf("receive CreatePod %q\n", pod.Name)
//Ignore daemonSet Pod
@@ -254,12 +254,12 @@ func (p *HyperProvider) CreatePod(pod *v1.Pod) error {
}
// UpdatePod is a noop, hyper.sh currently does not support live updates of a pod.
func (p *HyperProvider) UpdatePod(pod *v1.Pod) error {
func (p *HyperProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
return nil
}
// DeletePod deletes the specified pod out of hyper.sh.
func (p *HyperProvider) DeletePod(pod *v1.Pod) (err error) {
func (p *HyperProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
log.Printf("receive DeletePod %q\n", pod.Name)
var (
containerName = fmt.Sprintf("pod-%s-%s", pod.Name, pod.Name)
@@ -298,7 +298,7 @@ func (p *HyperProvider) DeletePod(pod *v1.Pod) (err error) {
// GetPod returns a pod by name that is running inside hyper.sh
// returns nil if a pod by that name is not found.
func (p *HyperProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) {
func (p *HyperProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
var (
containerName = fmt.Sprintf("pod-%s-%s", name, name)
container types.ContainerJSON
@@ -318,7 +318,7 @@ func (p *HyperProvider) GetPod(namespace, name string) (pod *v1.Pod, err error)
}
// GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *HyperProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
func (p *HyperProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
return "", nil
}
@@ -338,8 +338,8 @@ func (p *HyperProvider) ExecInContainer(name string, uid apitypes.UID, container
// GetPodStatus returns the status of a pod by name that is running inside hyper.sh
// returns nil if a pod by that name is not found.
func (p *HyperProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
pod, err := p.GetPod(namespace, name)
func (p *HyperProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
pod, err := p.GetPod(ctx, namespace, name)
if err != nil {
return nil, err
}
@@ -347,7 +347,7 @@ func (p *HyperProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, err
}
// GetPods returns a list of all pods known to be running within hyper.sh.
func (p *HyperProvider) GetPods() ([]*v1.Pod, error) {
func (p *HyperProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
log.Printf("receive GetPods\n")
filter, err := filters.FromParam(fmt.Sprintf("{\"label\":{\"%s=%s\":true}}", nodeLabel, p.nodeName))
if err != nil {
@@ -376,7 +376,7 @@ func (p *HyperProvider) GetPods() ([]*v1.Pod, error) {
}
// Capacity returns a resource list containing the capacity limits set for hyper.sh.
func (p *HyperProvider) Capacity() v1.ResourceList {
func (p *HyperProvider) Capacity(ctx context.Context) v1.ResourceList {
// TODO: These should be configurable
return v1.ResourceList{
"cpu": resource.MustParse("20"),
@@ -387,7 +387,7 @@ func (p *HyperProvider) Capacity() v1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
// within Kubernetes.
func (p *HyperProvider) NodeConditions() []v1.NodeCondition {
func (p *HyperProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
// TODO: Make these dynamic and augment with custom hyper.sh specific conditions of interest
return []v1.NodeCondition{
{
@@ -436,13 +436,13 @@ func (p *HyperProvider) NodeConditions() []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *HyperProvider) NodeAddresses() []v1.NodeAddress {
func (p *HyperProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
return nil
}
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *HyperProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
func (p *HyperProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
return &v1.NodeDaemonEndpoints{}
}

View File

@@ -1,6 +1,7 @@
package mock
import (
"context"
"encoding/json"
"fmt"
"io"
@@ -96,7 +97,7 @@ func loadConfig(providerConfig, nodeName string) (config MockConfig, err error)
}
// CreatePod accepts a Pod definition and stores it in memory.
func (p *MockProvider) CreatePod(pod *v1.Pod) error {
func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
log.Printf("receive CreatePod %q\n", pod.Name)
key, err := buildKey(pod)
@@ -110,7 +111,7 @@ func (p *MockProvider) CreatePod(pod *v1.Pod) error {
}
// UpdatePod accepts a Pod definition and updates its reference.
func (p *MockProvider) UpdatePod(pod *v1.Pod) error {
func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
log.Printf("receive UpdatePod %q\n", pod.Name)
key, err := buildKey(pod)
@@ -124,7 +125,7 @@ func (p *MockProvider) UpdatePod(pod *v1.Pod) error {
}
// DeletePod deletes the specified pod out of memory.
func (p *MockProvider) DeletePod(pod *v1.Pod) (err error) {
func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
log.Printf("receive DeletePod %q\n", pod.Name)
key, err := buildKey(pod)
@@ -138,7 +139,7 @@ func (p *MockProvider) DeletePod(pod *v1.Pod) (err error) {
}
// GetPod returns a pod by name that is stored in memory.
func (p *MockProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) {
func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
log.Printf("receive GetPod %q\n", name)
key, err := buildKeyFromNames(namespace, name)
@@ -154,7 +155,7 @@ func (p *MockProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) {
}
// GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *MockProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
log.Printf("receive GetContainerLogs %q\n", podName)
return "", nil
}
@@ -174,7 +175,7 @@ func (p *MockProvider) ExecInContainer(name string, uid types.UID, container str
// GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found.
func (p *MockProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
log.Printf("receive GetPodStatus %q\n", name)
now := metav1.NewTime(time.Now())
@@ -200,7 +201,7 @@ func (p *MockProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, erro
},
}
pod, err := p.GetPod(namespace, name)
pod, err := p.GetPod(ctx, namespace, name)
if err != nil {
return status, err
}
@@ -223,7 +224,7 @@ func (p *MockProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, erro
}
// GetPods returns a list of all pods known to be "running".
func (p *MockProvider) GetPods() ([]*v1.Pod, error) {
func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
log.Printf("receive GetPods\n")
var pods []*v1.Pod
@@ -236,7 +237,7 @@ func (p *MockProvider) GetPods() ([]*v1.Pod, error) {
}
// Capacity returns a resource list containing the capacity limits.
func (p *MockProvider) Capacity() v1.ResourceList {
func (p *MockProvider) Capacity(ctx context.Context) v1.ResourceList {
return v1.ResourceList{
"cpu": resource.MustParse(p.config.CPU),
"memory": resource.MustParse(p.config.Memory),
@@ -246,7 +247,7 @@ func (p *MockProvider) Capacity() v1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
// within Kubernetes.
func (p *MockProvider) NodeConditions() []v1.NodeCondition {
func (p *MockProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
// TODO: Make this configurable
return []v1.NodeCondition{
{
@@ -295,7 +296,7 @@ func (p *MockProvider) NodeConditions() []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *MockProvider) NodeAddresses() []v1.NodeAddress {
func (p *MockProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
return []v1.NodeAddress{
{
Type: "InternalIP",
@@ -306,7 +307,7 @@ func (p *MockProvider) NodeAddresses() []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *MockProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
func (p *MockProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
return &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort,

View File

@@ -404,7 +404,7 @@ func (p *SFMeshProvider) getEndpointFromContainerPort(port v1.ContainerPort) ser
}
// CreatePod accepts a Pod definition and creates a SF Mesh App.
func (p *SFMeshProvider) CreatePod(pod *v1.Pod) error {
func (p *SFMeshProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
log.Printf("receive CreatePod %q\n", pod.Name)
meshApp, err := p.getMeshApplication(pod)
@@ -440,7 +440,7 @@ func (p *SFMeshProvider) CreatePod(pod *v1.Pod) error {
}
// UpdatePod updates the pod running inside SF Mesh.
func (p *SFMeshProvider) UpdatePod(pod *v1.Pod) error {
func (p *SFMeshProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
log.Printf("receive UpdatePod %q\n", pod.Name)
app, err := p.getMeshApplication(pod)
@@ -457,10 +457,10 @@ func (p *SFMeshProvider) UpdatePod(pod *v1.Pod) error {
}
// DeletePod deletes the specified pod out of SF Mesh.
func (p *SFMeshProvider) DeletePod(pod *v1.Pod) (err error) {
func (p *SFMeshProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
log.Printf("receive DeletePod %q\n", pod.Name)
_, err = p.appClient.Delete(context.Background(), p.resourceGroup, pod.Name)
_, err = p.appClient.Delete(ctx, p.resourceGroup, pod.Name)
if err != nil {
return err
}
@@ -470,10 +470,10 @@ func (p *SFMeshProvider) DeletePod(pod *v1.Pod) (err error) {
// GetPod returns a pod by name that is running inside SF Mesh.
// returns nil if a pod by that name is not found.
func (p *SFMeshProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) {
func (p *SFMeshProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
log.Printf("receive GetPod %q\n", name)
resp, err := p.appClient.Get(context.Background(), p.resourceGroup, name)
resp, err := p.appClient.Get(ctx, p.resourceGroup, name)
httpResponse := resp.Response.Response
if err != nil {
@@ -694,7 +694,7 @@ func (p *SFMeshProvider) applicationDescriptionToPod(app servicefabricmesh.Appli
}
// GetContainerLogs retrieves the logs of a container by name.
func (p *SFMeshProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
func (p *SFMeshProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
log.Printf("receive GetContainerLogs %q\n", podName)
return "", nil
}
@@ -713,8 +713,8 @@ func (p *SFMeshProvider) ExecInContainer(name string, uid types.UID, container s
// GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found.
func (p *SFMeshProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
pod, err := p.GetPod(namespace, name)
func (p *SFMeshProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
pod, err := p.GetPod(ctx, namespace, name)
if err != nil {
return nil, err
}
@@ -727,12 +727,12 @@ func (p *SFMeshProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, er
}
// GetPods returns a list of all pods known to be running within SF Mesh.
func (p *SFMeshProvider) GetPods() ([]*v1.Pod, error) {
func (p *SFMeshProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
log.Printf("receive GetPods\n")
var pods []*v1.Pod
list, err := p.appClient.ListByResourceGroup(context.Background(), p.resourceGroup)
list, err := p.appClient.ListByResourceGroup(ctx, p.resourceGroup)
if err != nil {
return pods, err
}
@@ -766,7 +766,7 @@ func (p *SFMeshProvider) GetPods() ([]*v1.Pod, error) {
}
// Capacity returns a resource list containing the capacity limits set for SF Mesh.
func (p *SFMeshProvider) Capacity() v1.ResourceList {
func (p *SFMeshProvider) Capacity(ctx context.Context) v1.ResourceList {
return v1.ResourceList{
"cpu": resource.MustParse(defaultCPUCapacity),
"memory": resource.MustParse(defaultMemoryCapacity),
@@ -776,7 +776,7 @@ func (p *SFMeshProvider) Capacity() v1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
// within Kubernetes.
func (p *SFMeshProvider) NodeConditions() []v1.NodeCondition {
func (p *SFMeshProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
// TODO: Make this configurable
return []v1.NodeCondition{
{
@@ -825,7 +825,7 @@ func (p *SFMeshProvider) NodeConditions() []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *SFMeshProvider) NodeAddresses() []v1.NodeAddress {
func (p *SFMeshProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
return []v1.NodeAddress{
{
Type: "InternalIP",
@@ -836,7 +836,7 @@ func (p *SFMeshProvider) NodeAddresses() []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *SFMeshProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
func (p *SFMeshProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
return &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort,

View File

@@ -193,7 +193,7 @@ func initLogger() {
}
// CreatePod takes a Kubernetes Pod and deploys it within the provider.
func (v *VicProvider) CreatePod(pod *v1.Pod) error {
func (v *VicProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
op := trace.NewOperation(context.Background(), "CreatePod - %s", pod.Name)
defer trace.End(trace.Begin(pod.Name, op))
@@ -216,14 +216,14 @@ func (v *VicProvider) CreatePod(pod *v1.Pod) error {
}
// UpdatePod takes a Kubernetes Pod and updates it within the provider.
func (v *VicProvider) UpdatePod(pod *v1.Pod) error {
func (v *VicProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
op := trace.NewOperation(context.Background(), "UpdatePod - %s", pod.Name)
defer trace.End(trace.Begin(pod.Name, op))
return nil
}
// DeletePod takes a Kubernetes Pod and deletes it from the provider.
func (v *VicProvider) DeletePod(pod *v1.Pod) error {
func (v *VicProvider) DeletePod(ctx context.Context, pod *v1.Pod) error {
op := trace.NewOperation(context.Background(), "DeletePod - %s", pod.Name)
defer trace.End(trace.Begin(pod.Name, op))
@@ -240,7 +240,7 @@ func (v *VicProvider) DeletePod(pod *v1.Pod) error {
}
// GetPod retrieves a pod by name from the provider (can be cached).
func (v *VicProvider) GetPod(namespace, name string) (*v1.Pod, error) {
func (v *VicProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) {
op := trace.NewOperation(context.Background(), "GetPod - %s", name)
defer trace.End(trace.Begin(name, op))
@@ -254,7 +254,7 @@ func (v *VicProvider) GetPod(namespace, name string) (*v1.Pod, error) {
}
// GetContainerLogs retrieves the logs of a container by name from the provider.
func (v *VicProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
func (v *VicProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
op := trace.NewOperation(context.Background(), "GetContainerLogs - pod[%s], container[%s]", podName, containerName)
defer trace.End(trace.Begin("", op))
@@ -276,7 +276,7 @@ func (p *VicProvider) ExecInContainer(name string, uid types.UID, container stri
// GetPodStatus retrieves the status of a pod by name from the provider.
// This function needs to return a status or the reconcile loop will stop running.
func (v *VicProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
func (v *VicProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
op := trace.NewOperation(context.Background(), "GetPodStatus - pod[%s], namespace", name, namespace)
defer trace.End(trace.Begin("GetPodStatus", op))
@@ -313,7 +313,7 @@ func (v *VicProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error
}
var nodeAddress string
nodeAddresses := v.NodeAddresses()
nodeAddresses := v.NodeAddresses(ctx)
if len(nodeAddresses) > 0 {
nodeAddress = nodeAddresses[0].Address
} else {
@@ -348,7 +348,7 @@ func (v *VicProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error
}
// GetPods retrieves a list of all pods running on the provider (can be cached).
func (v *VicProvider) GetPods() ([]*v1.Pod, error) {
func (v *VicProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
op := trace.NewOperation(context.Background(), "GetPods")
defer trace.End(trace.Begin("GetPods", op))
@@ -362,7 +362,7 @@ func (v *VicProvider) GetPods() ([]*v1.Pod, error) {
}
// Capacity returns a resource list with the capacity constraints of the provider.
func (v *VicProvider) Capacity() v1.ResourceList {
func (v *VicProvider) Capacity(ctx context.Context) v1.ResourceList {
op := trace.NewOperation(context.Background(), "VicProvider.Capacity")
defer trace.End(trace.Begin("", op))
@@ -382,7 +382,7 @@ func (v *VicProvider) Capacity() v1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is polled periodically to update the node status
// within Kubernetes.
func (v *VicProvider) NodeConditions() []v1.NodeCondition {
func (v *VicProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
// TODO: Make these dynamic and augment with custom ACI specific conditions of interest
return []v1.NodeCondition{
{
@@ -430,7 +430,7 @@ func (v *VicProvider) NodeConditions() []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (v *VicProvider) NodeAddresses() []v1.NodeAddress {
func (v *VicProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
addrs, err := net.InterfaceAddrs()
if err != nil {
return []v1.NodeAddress{}
@@ -454,7 +454,7 @@ func (v *VicProvider) NodeAddresses() []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (v *VicProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
func (v *VicProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
return &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: 80,

View File

@@ -16,6 +16,7 @@ package web
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -63,17 +64,17 @@ func NewBrokerProvider(nodeName, operatingSystem string, daemonEndpointPort int3
}
// CreatePod accepts a Pod definition and forwards the call to the web endpoint
func (p *BrokerProvider) CreatePod(pod *v1.Pod) error {
func (p *BrokerProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
return p.createUpdatePod(pod, "POST", "/createPod")
}
// UpdatePod accepts a Pod definition and forwards the call to the web endpoint
func (p *BrokerProvider) UpdatePod(pod *v1.Pod) error {
func (p *BrokerProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
return p.createUpdatePod(pod, "PUT", "/updatePod")
}
// DeletePod accepts a Pod definition and forwards the call to the web endpoint
func (p *BrokerProvider) DeletePod(pod *v1.Pod) error {
func (p *BrokerProvider) DeletePod(ctx context.Context, pod *v1.Pod) error {
urlPath, err := url.Parse("/deletePod")
if err != nil {
return err
@@ -90,7 +91,7 @@ func (p *BrokerProvider) DeletePod(pod *v1.Pod) error {
}
// GetPod returns a pod by name that is being managed by the web server
func (p *BrokerProvider) GetPod(namespace, name string) (*v1.Pod, error) {
func (p *BrokerProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) {
urlPathStr := fmt.Sprintf(
"/getPod?namespace=%s&name=%s",
url.QueryEscape(namespace),
@@ -109,7 +110,7 @@ func (p *BrokerProvider) GetPod(namespace, name string) (*v1.Pod, error) {
}
// GetContainerLogs returns the logs of a container running in a pod by name.
func (p *BrokerProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
func (p *BrokerProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
urlPathStr := fmt.Sprintf(
"/getContainerLogs?namespace=%s&podName=%s&containerName=%s&tail=%d",
url.QueryEscape(namespace),
@@ -140,7 +141,7 @@ func (p *BrokerProvider) ExecInContainer(name string, uid types.UID, container s
}
// GetPodStatus retrieves the status of a given pod by name.
func (p *BrokerProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
func (p *BrokerProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
urlPathStr := fmt.Sprintf(
"/getPodStatus?namespace=%s&name=%s",
url.QueryEscape(namespace),
@@ -159,7 +160,7 @@ func (p *BrokerProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, er
}
// GetPods retrieves a list of all pods scheduled to run.
func (p *BrokerProvider) GetPods() ([]*v1.Pod, error) {
func (p *BrokerProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
var pods []*v1.Pod
err := p.doGetRequest("/getPods", &pods)
@@ -167,7 +168,7 @@ func (p *BrokerProvider) GetPods() ([]*v1.Pod, error) {
}
// Capacity returns a resource list containing the capacity limits
func (p *BrokerProvider) Capacity() v1.ResourceList {
func (p *BrokerProvider) Capacity(ctx context.Context) v1.ResourceList {
var resourceList v1.ResourceList
err := p.doGetRequest("/capacity", &resourceList)
@@ -180,7 +181,7 @@ func (p *BrokerProvider) Capacity() v1.ResourceList {
}
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
func (p *BrokerProvider) NodeConditions() []v1.NodeCondition {
func (p *BrokerProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
var nodeConditions []v1.NodeCondition
err := p.doGetRequest("/nodeConditions", &nodeConditions)
@@ -194,7 +195,7 @@ func (p *BrokerProvider) NodeConditions() []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *BrokerProvider) NodeAddresses() []v1.NodeAddress {
func (p *BrokerProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
var nodeAddresses []v1.NodeAddress
err := p.doGetRequest("/nodeAddresses", &nodeAddresses)
@@ -208,7 +209,7 @@ func (p *BrokerProvider) NodeAddresses() []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *BrokerProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
func (p *BrokerProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
return &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort,

View File

@@ -134,7 +134,7 @@ func (s *KubeletServer) ApiServerHandler(w http.ResponseWriter, req *http.Reques
tail = t
}
podsLogs, err := s.p.GetContainerLogs(namespace, pod, container, tail)
podsLogs, err := s.p.GetContainerLogs(ctx, namespace, pod, container, tail)
if err != nil {
log.G(ctx).WithError(err).Error("error getting container logs")
http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError)

View File

@@ -14,44 +14,44 @@ import (
// Provider contains the methods required to implement a virtual-kubelet provider.
type Provider interface {
// CreatePod takes a Kubernetes Pod and deploys it within the provider.
CreatePod(pod *v1.Pod) error
CreatePod(ctx context.Context, pod *v1.Pod) error
// UpdatePod takes a Kubernetes Pod and updates it within the provider.
UpdatePod(pod *v1.Pod) error
UpdatePod(ctx context.Context, pod *v1.Pod) error
// DeletePod takes a Kubernetes Pod and deletes it from the provider.
DeletePod(pod *v1.Pod) error
DeletePod(ctx context.Context, pod *v1.Pod) error
// GetPod retrieves a pod by name from the provider (can be cached).
GetPod(namespace, name string) (*v1.Pod, error)
GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error)
// GetContainerLogs retrieves the logs of a container by name from the provider.
GetContainerLogs(namespace, podName, containerName string, tail int) (string, error)
GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error)
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
// GetPodStatus retrieves the status of a pod by name from the provider.
GetPodStatus(namespace, name string) (*v1.PodStatus, error)
GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error)
// GetPods retrieves a list of all pods running on the provider (can be cached).
GetPods() ([]*v1.Pod, error)
GetPods(context.Context) ([]*v1.Pod, error)
// Capacity returns a resource list with the capacity constraints of the provider.
Capacity() v1.ResourceList
Capacity(context.Context) v1.ResourceList
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is
// polled periodically to update the node status within Kubernetes.
NodeConditions() []v1.NodeCondition
NodeConditions(context.Context) []v1.NodeCondition
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
NodeAddresses() []v1.NodeAddress
NodeAddresses(context.Context) []v1.NodeAddress
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
NodeDaemonEndpoints() *v1.NodeDaemonEndpoints
NodeDaemonEndpoints(context.Context) *v1.NodeDaemonEndpoints
// OperatingSystem returns the operating system the provider is for.
OperatingSystem() string

View File

@@ -189,11 +189,11 @@ func (s *Server) registerNode(ctx context.Context) error {
Architecture: "amd64",
KubeletVersion: "v1.11.2",
},
Capacity: s.provider.Capacity(),
Allocatable: s.provider.Capacity(),
Conditions: s.provider.NodeConditions(),
Addresses: s.provider.NodeAddresses(),
DaemonEndpoints: *s.provider.NodeDaemonEndpoints(),
Capacity: s.provider.Capacity(ctx),
Allocatable: s.provider.Capacity(ctx),
Conditions: s.provider.NodeConditions(ctx),
Addresses: s.provider.NodeAddresses(ctx),
DaemonEndpoints: *s.provider.NodeDaemonEndpoints(ctx),
},
}
@@ -297,13 +297,13 @@ func (s *Server) updateNode(ctx context.Context) {
}
n.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
n.Status.Conditions = s.provider.NodeConditions()
n.Status.Conditions = s.provider.NodeConditions(ctx)
capacity := s.provider.Capacity()
capacity := s.provider.Capacity(ctx)
n.Status.Capacity = capacity
n.Status.Allocatable = capacity
n.Status.Addresses = s.provider.NodeAddresses()
n.Status.Addresses = s.provider.NodeAddresses(ctx)
n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n)
if err != nil {
@@ -319,7 +319,7 @@ func (s *Server) reconcile(ctx context.Context) {
logger.Debug("Start reconcile")
defer logger.Debug("End reconcile")
providerPods, err := s.provider.GetPods()
providerPods, err := s.provider.GetPods(ctx)
if err != nil {
logger.WithError(err).Error("Error getting pod list from provider")
return
@@ -377,7 +377,7 @@ func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
logger := log.G(ctx).WithField("pod", pod.Name)
if origErr := s.provider.CreatePod(pod); origErr != nil {
if origErr := s.provider.CreatePod(ctx, pod); origErr != nil {
pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
pod.Status.Phase = corev1.PodFailed
pod.Status.Reason = PodStatusReason_ProviderFailed
@@ -398,7 +398,7 @@ func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error {
var delErr error
if delErr = s.provider.DeletePod(pod); delErr != nil && errors.IsNotFound(delErr) {
if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) {
return delErr
}
@@ -430,7 +430,7 @@ func (s *Server) updatePodStatuses(ctx context.Context) {
continue
}
status, err := s.provider.GetPodStatus(pod.Namespace, pod.Name)
status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name)
if err != nil {
log.G(ctx).WithField("pod", pod.Name).Error("Error retrieving pod status")
return