From 8091b089a2489dde21522928b58fdbf58254d146 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 20 Aug 2018 14:56:39 -0700 Subject: [PATCH] Plumb context to providers --- providers/aws/provider.go | 23 ++-- providers/aws/provider_test.go | 14 +-- providers/azure/aci.go | 39 +++---- providers/azure/aci_test.go | 25 +++-- providers/azure/client/aci/client_test.go | 129 +++++++++++----------- providers/azure/client/aci/create.go | 4 +- providers/azure/client/aci/delete.go | 4 +- providers/azure/client/aci/get.go | 4 +- providers/azure/client/aci/list.go | 4 +- providers/azure/client/aci/logs.go | 4 +- providers/azure/client/aci/update.go | 6 +- providers/azurebatch/batch.go | 27 ++--- providers/azurebatch/batch_test.go | 18 +-- providers/cri/cri.go | 23 ++-- providers/huawei/cci.go | 25 +++-- providers/huawei/cci_test.go | 7 +- providers/hypersh/hypersh.go | 24 ++-- providers/mock/mock.go | 25 +++-- providers/sfmesh/sfmesh.go | 30 ++--- providers/vic/vic_provider.go | 24 ++-- providers/web/broker.go | 23 ++-- vkubelet/apiserver.go | 2 +- vkubelet/provider.go | 22 ++-- vkubelet/vkubelet.go | 24 ++-- 24 files changed, 277 insertions(+), 253 deletions(-) diff --git a/providers/aws/provider.go b/providers/aws/provider.go index ed76f5621..ca67429a8 100644 --- a/providers/aws/provider.go +++ b/providers/aws/provider.go @@ -1,6 +1,7 @@ package aws import ( + "context" "fmt" "io" "log" @@ -108,7 +109,7 @@ func NewFargateProvider( } // CreatePod takes a Kubernetes Pod and deploys it within the Fargate provider. -func (p *FargateProvider) CreatePod(pod *corev1.Pod) error { +func (p *FargateProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error { log.Printf("Received CreatePod request for %+v.\n", pod) fgPod, err := fargate.NewPod(p.cluster, pod) @@ -127,13 +128,13 @@ func (p *FargateProvider) CreatePod(pod *corev1.Pod) error { } // UpdatePod takes a Kubernetes Pod and updates it within the provider. -func (p *FargateProvider) UpdatePod(pod *corev1.Pod) error { +func (p *FargateProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error { log.Printf("Received UpdatePod request for %s/%s.\n", pod.Namespace, pod.Name) return errNotImplemented } // DeletePod takes a Kubernetes Pod and deletes it from the provider. -func (p *FargateProvider) DeletePod(pod *corev1.Pod) error { +func (p *FargateProvider) DeletePod(ctx context.Context, pod *corev1.Pod) error { log.Printf("Received DeletePod request for %s/%s.\n", pod.Namespace, pod.Name) fgPod, err := p.cluster.GetPod(pod.Namespace, pod.Name) @@ -152,7 +153,7 @@ func (p *FargateProvider) DeletePod(pod *corev1.Pod) error { } // GetPod retrieves a pod by name from the provider (can be cached). -func (p *FargateProvider) GetPod(namespace, name string) (*corev1.Pod, error) { +func (p *FargateProvider) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) { log.Printf("Received GetPod request for %s/%s.\n", namespace, name) pod, err := p.cluster.GetPod(namespace, name) @@ -173,7 +174,7 @@ func (p *FargateProvider) GetPod(namespace, name string) (*corev1.Pod, error) { } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (p *FargateProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { +func (p *FargateProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { log.Printf("Received GetContainerLogs request for %s/%s/%s.\n", namespace, podName, containerName) return p.cluster.GetContainerLogs(namespace, podName, containerName, tail) } @@ -193,7 +194,7 @@ func (p *FargateProvider) ExecInContainer( } // GetPodStatus retrieves the status of a pod by name from the provider. -func (p *FargateProvider) GetPodStatus(namespace, name string) (*corev1.PodStatus, error) { +func (p *FargateProvider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) { log.Printf("Received GetPodStatus request for %s/%s.\n", namespace, name) pod, err := p.cluster.GetPod(namespace, name) @@ -210,7 +211,7 @@ func (p *FargateProvider) GetPodStatus(namespace, name string) (*corev1.PodStatu } // GetPods retrieves a list of all pods running on the provider (can be cached). -func (p *FargateProvider) GetPods() ([]*corev1.Pod, error) { +func (p *FargateProvider) GetPods(ctx context.Context) ([]*corev1.Pod, error) { log.Println("Received GetPods request.") pods, err := p.cluster.GetPods() @@ -237,7 +238,7 @@ func (p *FargateProvider) GetPods() ([]*corev1.Pod, error) { } // Capacity returns a resource list with the capacity constraints of the provider. -func (p *FargateProvider) Capacity() corev1.ResourceList { +func (p *FargateProvider) Capacity(ctx context.Context) corev1.ResourceList { log.Println("Received Capacity request.") return corev1.ResourceList{ @@ -250,7 +251,7 @@ func (p *FargateProvider) Capacity() corev1.ResourceList { // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is polled // periodically to update the node status within Kubernetes. -func (p *FargateProvider) NodeConditions() []corev1.NodeCondition { +func (p *FargateProvider) NodeConditions(ctx context.Context) []corev1.NodeCondition { log.Println("Received NodeConditions request.") lastHeartbeatTime := metav1.Now() @@ -312,7 +313,7 @@ func (p *FargateProvider) NodeConditions() []corev1.NodeCondition { } // NodeAddresses returns a list of addresses for the node status within Kubernetes. -func (p *FargateProvider) NodeAddresses() []corev1.NodeAddress { +func (p *FargateProvider) NodeAddresses(ctx context.Context) []corev1.NodeAddress { log.Println("Received NodeAddresses request.") return []corev1.NodeAddress{ @@ -324,7 +325,7 @@ func (p *FargateProvider) NodeAddresses() []corev1.NodeAddress { } // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status within Kubernetes. -func (p *FargateProvider) NodeDaemonEndpoints() *corev1.NodeDaemonEndpoints { +func (p *FargateProvider) NodeDaemonEndpoints(ctx context.Context) *corev1.NodeDaemonEndpoints { log.Println("Received NodeDaemonEndpoints request.") return &corev1.NodeDaemonEndpoints{ diff --git a/providers/aws/provider_test.go b/providers/aws/provider_test.go index b38a9f0e6..d95b9bbe4 100644 --- a/providers/aws/provider_test.go +++ b/providers/aws/provider_test.go @@ -212,7 +212,7 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) { } // Confirm that there are no pods on the cluster. - pods, err := provider.GetPods() + pods, err := provider.GetPods(context.Background()) if err != nil { t.Error(err) } @@ -259,13 +259,13 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) { }, } - err = provider.CreatePod(pod) + err = provider.CreatePod(context.Background(), pod) if err != nil { t.Fatal(err) } // Now there should be exactly one pod. - pods, err = provider.GetPods() + pods, err = provider.GetPods(context.Background()) if err != nil { t.Error(err) } @@ -282,7 +282,7 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) { // Wait a few seconds for the logs to settle. time.Sleep(10 * time.Second) - logs, err := provider.GetContainerLogs("default", podName, "echo-container", 100) + logs, err := provider.GetContainerLogs(context.Background(), "default", podName, "echo-container", 100) if err != nil { t.Error(err) } @@ -302,7 +302,7 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) { } // Delete the pod. - err = provider.DeletePod(pod) + err = provider.DeletePod(context.Background(), pod) if err != nil { t.Fatal(err) } @@ -313,7 +313,7 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) { } // The cluster should be empty again. - pods, err = provider.GetPods() + pods, err = provider.GetPods(context.Background()) if err != nil { t.Error(err) } @@ -340,7 +340,7 @@ func waitUntilPodStatus(provider *vkAWS.FargateProvider, podName string, desired case <-ctx.Done(): return ctx.Err() default: - status, err := provider.GetPodStatus("default", podName) + status, err := provider.GetPodStatus(context.Background(), "default", podName) if err != nil { if strings.Contains(err.Error(), "is not found") { return nil diff --git a/providers/azure/aci.go b/providers/azure/aci.go index 20d9488a1..bd7dc4686 100644 --- a/providers/azure/aci.go +++ b/providers/azure/aci.go @@ -492,7 +492,7 @@ func getKubeProxyExtension(secretPath, masterURI, clusterCIDR string) (*aci.Exte // CreatePod accepts a Pod definition and creates // an ACI deployment -func (p *ACIProvider) CreatePod(pod *v1.Pod) error { +func (p *ACIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { var containerGroup aci.ContainerGroup containerGroup.Location = p.region containerGroup.RestartPolicy = aci.ContainerGroupRestartPolicy(pod.Spec.RestartPolicy) @@ -561,6 +561,7 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error { p.amendVnetResources(&containerGroup, pod) _, err = p.aciClient.CreateContainerGroup( + ctx, p.resourceGroup, containerGroupName(pod), containerGroup, @@ -684,19 +685,19 @@ func containerGroupName(pod *v1.Pod) string { } // UpdatePod is a noop, ACI currently does not support live updates of a pod. -func (p *ACIProvider) UpdatePod(pod *v1.Pod) error { +func (p *ACIProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { return nil } // DeletePod deletes the specified pod out of ACI. -func (p *ACIProvider) DeletePod(pod *v1.Pod) error { - return p.aciClient.DeleteContainerGroup(p.resourceGroup, fmt.Sprintf("%s-%s", pod.Namespace, pod.Name)) +func (p *ACIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { + return p.aciClient.DeleteContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", pod.Namespace, pod.Name)) } // GetPod returns a pod by name that is running inside ACI // returns nil if a pod by that name is not found. -func (p *ACIProvider) GetPod(namespace, name string) (*v1.Pod, error) { - cg, err, status := p.aciClient.GetContainerGroup(p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name)) +func (p *ACIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) { + cg, err, status := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name)) if err != nil { if *status == http.StatusNotFound { return nil, nil @@ -712,9 +713,9 @@ func (p *ACIProvider) GetPod(namespace, name string) (*v1.Pod, error) { } // GetContainerLogs returns the logs of a pod by name that is running inside ACI. -func (p *ACIProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { +func (p *ACIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { logContent := "" - cg, err, _ := p.aciClient.GetContainerGroup(p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName)) + cg, err, _ := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName)) if err != nil { return logContent, err } @@ -725,9 +726,9 @@ func (p *ACIProvider) GetContainerLogs(namespace, podName, containerName string, // get logs from cg retry := 10 for i := 0; i < retry; i++ { - cLogs, err := p.aciClient.GetContainerLogs(p.resourceGroup, cg.Name, containerName, tail) + cLogs, err := p.aciClient.GetContainerLogs(ctx, p.resourceGroup, cg.Name, containerName, tail) if err != nil { - log.G(context.TODO()).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying") + log.G(ctx).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying") time.Sleep(5000 * time.Millisecond) } else { logContent = cLogs.Content @@ -754,7 +755,7 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri defer errstream.Close() } - cg, err, _ := p.aciClient.GetContainerGroup(p.resourceGroup, name) + cg, err, _ := p.aciClient.GetContainerGroup(context.TODO(), p.resourceGroup, name) if err != nil { return err } @@ -822,8 +823,8 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri // GetPodStatus returns the status of a pod by name that is running inside ACI // returns nil if a pod by that name is not found. -func (p *ACIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) { - pod, err := p.GetPod(namespace, name) +func (p *ACIProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { + pod, err := p.GetPod(ctx, namespace, name) if err != nil { return nil, err } @@ -836,8 +837,8 @@ func (p *ACIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error } // GetPods returns a list of all pods known to be running within ACI. -func (p *ACIProvider) GetPods() ([]*v1.Pod, error) { - cgs, err := p.aciClient.ListContainerGroups(p.resourceGroup) +func (p *ACIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { + cgs, err := p.aciClient.ListContainerGroups(ctx, p.resourceGroup) if err != nil { return nil, err } @@ -865,7 +866,7 @@ func (p *ACIProvider) GetPods() ([]*v1.Pod, error) { } // Capacity returns a resource list containing the capacity limits set for ACI. -func (p *ACIProvider) Capacity() v1.ResourceList { +func (p *ACIProvider) Capacity(ctx context.Context) v1.ResourceList { return v1.ResourceList{ "cpu": resource.MustParse(p.cpu), "memory": resource.MustParse(p.memory), @@ -875,7 +876,7 @@ func (p *ACIProvider) Capacity() v1.ResourceList { // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status // within Kubernetes. -func (p *ACIProvider) NodeConditions() []v1.NodeCondition { +func (p *ACIProvider) NodeConditions(ctx context.Context) []v1.NodeCondition { // TODO: Make these dynamic and augment with custom ACI specific conditions of interest return []v1.NodeCondition{ { @@ -923,7 +924,7 @@ func (p *ACIProvider) NodeConditions() []v1.NodeCondition { // NodeAddresses returns a list of addresses for the node status // within Kubernetes. -func (p *ACIProvider) NodeAddresses() []v1.NodeAddress { +func (p *ACIProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress { // TODO: Make these dynamic and augment with custom ACI specific conditions of interest return []v1.NodeAddress{ { @@ -935,7 +936,7 @@ func (p *ACIProvider) NodeAddresses() []v1.NodeAddress { // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. -func (p *ACIProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints { +func (p *ACIProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints { return &v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{ Port: p.daemonEndpointPort, diff --git a/providers/azure/aci_test.go b/providers/azure/aci_test.go index dc4b73848..13f354ee6 100644 --- a/providers/azure/aci_test.go +++ b/providers/azure/aci_test.go @@ -6,6 +6,7 @@ package azure import ( "bytes" + "context" "encoding/json" "fmt" "io/ioutil" @@ -77,7 +78,7 @@ func TestCreatePodWithoutResourceSpec(t *testing.T) { }, } - if err := provider.CreatePod(pod); err != nil { + if err := provider.CreatePod(context.Background(), pod); err != nil { t.Fatal("Failed to create pod", err) } } @@ -131,7 +132,7 @@ func TestCreatePodWithResourceRequestOnly(t *testing.T) { }, } - if err := provider.CreatePod(pod); err != nil { + if err := provider.CreatePod(context.Background(), pod); err != nil { t.Fatal("Failed to create pod", err) } } @@ -190,7 +191,7 @@ func TestCreatePodWithResourceRequestAndLimit(t *testing.T) { }, } - if err := provider.CreatePod(pod); err != nil { + if err := provider.CreatePod(context.Background(), pod); err != nil { t.Fatal("Failed to create pod", err) } } @@ -212,7 +213,7 @@ func TestGetPodsWithEmptyList(t *testing.T) { } } - pods, err := provider.GetPods() + pods, err := provider.GetPods(context.Background()) if err != nil { t.Fatal("Failed to get pods", err) } @@ -269,7 +270,7 @@ func TestGetPodsWithoutResourceRequestsLimits(t *testing.T) { } } - pods, err := provider.GetPods() + pods, err := provider.GetPods(context.Background()) if err != nil { t.Fatal("Failed to get pods", err) } @@ -343,7 +344,7 @@ func TestGetPodWithoutResourceRequestsLimits(t *testing.T) { } } - pod, err := provider.GetPod(podNamespace, podName) + pod, err := provider.GetPod(context.Background(), podNamespace, podName) if err != nil { t.Fatal("Failed to get pod", err) } @@ -385,7 +386,7 @@ func TestGetPodWithContainerID(t *testing.T) { assert.Equal(t, podNamespace+"-"+podName, containerGroup, "Container group name is not expected") return http.StatusOK, aci.ContainerGroup{ - ID: cgID, + ID: cgID, Tags: map[string]string{ "NodeName": fakeNodeName, }, @@ -405,7 +406,7 @@ func TestGetPodWithContainerID(t *testing.T) { }, Resources: aci.ResourceRequirements{ Requests: &aci.ResourceRequests{ - CPU: 0.99, + CPU: 0.99, MemoryInGB: 1.5, }, }, @@ -416,7 +417,7 @@ func TestGetPodWithContainerID(t *testing.T) { } } - pod, err := provider.GetPod(podNamespace, podName) + pod, err := provider.GetPod(context.Background(), podNamespace, podName) if err != nil { t.Fatal("Failed to get pod", err) } @@ -586,7 +587,7 @@ func TestCreatePodWithLivenessProbe(t *testing.T) { }, } - if err := provider.CreatePod(pod); err != nil { + if err := provider.CreatePod(context.Background(), pod); err != nil { t.Fatal("Failed to create pod", err) } } @@ -648,7 +649,7 @@ func TestCreatePodWithReadinessProbe(t *testing.T) { }, } - if err := provider.CreatePod(pod); err != nil { + if err := provider.CreatePod(context.Background(), pod); err != nil { t.Fatal("Failed to create pod", err) } -} \ No newline at end of file +} diff --git a/providers/azure/client/aci/client_test.go b/providers/azure/client/aci/client_test.go index 5420e336e..b88ebf9f6 100644 --- a/providers/azure/client/aci/client_test.go +++ b/providers/azure/client/aci/client_test.go @@ -1,6 +1,7 @@ package aci import ( + "context" "encoding/base64" "fmt" "log" @@ -90,7 +91,7 @@ func TestNewClient(t *testing.T) { } func TestCreateContainerGroupFails(t *testing.T) { - _, err := client.CreateContainerGroup(resourceGroup, containerGroup, ContainerGroup{ + _, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroup, ContainerGroup{ Location: location, ContainerGroupProperties: ContainerGroupProperties{ OsType: Linux, @@ -121,7 +122,7 @@ func TestCreateContainerGroupFails(t *testing.T) { } func TestCreateContainerGroupWithoutResourceLimit(t *testing.T) { - cg, err := client.CreateContainerGroup(resourceGroup, containerGroup, ContainerGroup{ + cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroup, ContainerGroup{ Location: location, ContainerGroupProperties: ContainerGroupProperties{ OsType: Linux, @@ -155,13 +156,13 @@ func TestCreateContainerGroupWithoutResourceLimit(t *testing.T) { t.Fatalf("resource group name is %s, expected %s", cg.Name, containerGroup) } - if err := client.DeleteContainerGroup(resourceGroup, containerGroup); err != nil { + if err := client.DeleteContainerGroup(context.Background(), resourceGroup, containerGroup); err != nil { t.Fatal(err) } } func TestCreateContainerGroup(t *testing.T) { - cg, err := client.CreateContainerGroup(resourceGroup, containerGroup, ContainerGroup{ + cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroup, ContainerGroup{ Location: location, ContainerGroupProperties: ContainerGroupProperties{ OsType: Linux, @@ -201,7 +202,7 @@ func TestCreateContainerGroup(t *testing.T) { } func TestCreateContainerGroupWithBadVNetFails(t *testing.T) { - _, err := client.CreateContainerGroup(resourceGroup, containerGroup, ContainerGroup{ + _, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroup, ContainerGroup{ Location: location, ContainerGroupProperties: ContainerGroupProperties{ OsType: Linux, @@ -250,7 +251,7 @@ func TestCreateContainerGroupWithBadVNetFails(t *testing.T) { } func TestGetContainerGroup(t *testing.T) { - cg, err, _ := client.GetContainerGroup(resourceGroup, containerGroup) + cg, err, _ := client.GetContainerGroup(context.Background(), resourceGroup, containerGroup) if err != nil { t.Fatal(err) } @@ -260,7 +261,7 @@ func TestGetContainerGroup(t *testing.T) { } func TestListContainerGroup(t *testing.T) { - list, err := client.ListContainerGroups(resourceGroup) + list, err := client.ListContainerGroups(context.Background(), resourceGroup) if err != nil { t.Fatal(err) } @@ -274,7 +275,7 @@ func TestListContainerGroup(t *testing.T) { func TestCreateContainerGroupWithLivenessProbe(t *testing.T) { uid := uuid.New() containerGroupName := containerGroup + "-" + uid.String()[0:6] - cg, err := client.CreateContainerGroup(resourceGroup, containerGroupName, ContainerGroup{ + cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroupName, ContainerGroup{ Location: location, ContainerGroupProperties: ContainerGroupProperties{ OsType: Linux, @@ -321,7 +322,7 @@ func TestCreateContainerGroupWithLivenessProbe(t *testing.T) { func TestCreateContainerGroupFailsWithLivenessProbeMissingPort(t *testing.T) { uid := uuid.New() containerGroupName := containerGroup + "-" + uid.String()[0:6] - _, err := client.CreateContainerGroup(resourceGroup, containerGroupName, ContainerGroup{ + _, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroupName, ContainerGroup{ Location: location, ContainerGroupProperties: ContainerGroupProperties{ OsType: Linux, @@ -365,7 +366,7 @@ func TestCreateContainerGroupFailsWithLivenessProbeMissingPort(t *testing.T) { func TestCreateContainerGroupWithReadinessProbe(t *testing.T) { uid := uuid.New() containerGroupName := containerGroup + "-" + uid.String()[0:6] - cg, err := client.CreateContainerGroup(resourceGroup, containerGroupName, ContainerGroup{ + cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroupName, ContainerGroup{ Location: location, ContainerGroupProperties: ContainerGroupProperties{ OsType: Linux, @@ -420,7 +421,7 @@ func TestCreateContainerGroupWithLogAnalytics(t *testing.T) { t.Fatal(err) } cgname := "cgla" - cg, err := client.CreateContainerGroup(resourceGroup, cgname, ContainerGroup{ + cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, cgname, ContainerGroup{ Location: location, ContainerGroupProperties: ContainerGroupProperties{ OsType: Linux, @@ -458,14 +459,14 @@ func TestCreateContainerGroupWithLogAnalytics(t *testing.T) { if cg.Name != cgname { t.Fatalf("resource group name is %s, expected %s", cg.Name, cgname) } - if err := client.DeleteContainerGroup(resourceGroup, cgname); err != nil { + if err := client.DeleteContainerGroup(context.Background(), resourceGroup, cgname); err != nil { t.Fatalf("Delete Container Group failed: %s", err.Error()) } } func TestCreateContainerGroupWithInvalidLogAnalytics(t *testing.T) { law := &LogAnalyticsWorkspace{} - _, err := client.CreateContainerGroup(resourceGroup, containerGroup, ContainerGroup{ + _, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroup, ContainerGroup{ Location: location, ContainerGroupProperties: ContainerGroupProperties{ OsType: Linux, @@ -506,52 +507,52 @@ func TestCreateContainerGroupWithInvalidLogAnalytics(t *testing.T) { func TestCreateContainerGroupWithVNet(t *testing.T) { uid := uuid.New() - containerGroupName := containerGroup + "-" + uid.String()[0:6] + containerGroupName := containerGroup + "-" + uid.String()[0:6] fakeKubeConfig := base64.StdEncoding.EncodeToString([]byte(uid.String())) networkProfileId := "/subscriptions/ae43b1e3-c35d-4c8c-bc0d-f148b4c52b78/resourceGroups/aci-connector/providers/Microsoft.Network/networkprofiles/aci-connector-network-profile-westus" - diagnostics, err := NewContainerGroupDiagnosticsFromFile("../../../../loganalytics.json") - if err != nil { - t.Fatal(err) - } + diagnostics, err := NewContainerGroupDiagnosticsFromFile("../../../../loganalytics.json") + if err != nil { + t.Fatal(err) + } diagnostics.LogAnalytics.LogType = LogAnlyticsLogTypeContainerInsights - cg, err := client.CreateContainerGroup(resourceGroup, containerGroupName, ContainerGroup{ - Location: location, - ContainerGroupProperties: ContainerGroupProperties{ - OsType: Linux, - Containers: []Container{ - { - Name: "nginx", - ContainerProperties: ContainerProperties{ - Image: "nginx", - Command: []string{"nginx", "-g", "daemon off;"}, - Ports: []ContainerPort{ - { - Protocol: ContainerNetworkProtocolTCP, - Port: 80, - }, - }, - Resources: ResourceRequirements{ - Requests: &ResourceRequests{ - CPU: 1, - MemoryInGB: 1, - }, - Limits: &ResourceLimits{ - CPU: 1, - MemoryInGB: 1, - }, - }, - }, - }, - }, - NetworkProfile: &NetworkProfileDefinition{ - ID: networkProfileId, - }, + cg, err := client.CreateContainerGroup(context.Background(), resourceGroup, containerGroupName, ContainerGroup{ + Location: location, + ContainerGroupProperties: ContainerGroupProperties{ + OsType: Linux, + Containers: []Container{ + { + Name: "nginx", + ContainerProperties: ContainerProperties{ + Image: "nginx", + Command: []string{"nginx", "-g", "daemon off;"}, + Ports: []ContainerPort{ + { + Protocol: ContainerNetworkProtocolTCP, + Port: 80, + }, + }, + Resources: ResourceRequirements{ + Requests: &ResourceRequests{ + CPU: 1, + MemoryInGB: 1, + }, + Limits: &ResourceLimits{ + CPU: 1, + MemoryInGB: 1, + }, + }, + }, + }, + }, + NetworkProfile: &NetworkProfileDefinition{ + ID: networkProfileId, + }, Extensions: []*Extension{ &Extension{ Name: "kube-proxy", - Properties: &ExtensionProperties{ + Properties: &ExtensionProperties{ Type: ExtensionTypeKubeProxy, Version: ExtensionVersion1_0, Settings: map[string]string{ @@ -567,23 +568,23 @@ func TestCreateContainerGroupWithVNet(t *testing.T) { DNSConfig: &DNSConfig{ NameServers: []string{"1.1.1.1"}, }, - Diagnostics: diagnostics, - }, - }) + Diagnostics: diagnostics, + }, + }) - if err != nil { - t.Fatal(err) - } - if cg.Name != containerGroupName { - t.Fatalf("resource group name is %s, expected %s", cg.Name, containerGroupName) - } - if err := client.DeleteContainerGroup(resourceGroup, containerGroupName); err != nil { - t.Fatalf("Delete Container Group failed: %s", err.Error()) - } + if err != nil { + t.Fatal(err) + } + if cg.Name != containerGroupName { + t.Fatalf("resource group name is %s, expected %s", cg.Name, containerGroupName) + } + if err := client.DeleteContainerGroup(context.Background(), resourceGroup, containerGroupName); err != nil { + t.Fatalf("Delete Container Group failed: %s", err.Error()) + } } func TestDeleteContainerGroup(t *testing.T) { - err := client.DeleteContainerGroup(resourceGroup, containerGroup) + err := client.DeleteContainerGroup(context.Background(), resourceGroup, containerGroup) if err != nil { t.Fatal(err) } diff --git a/providers/azure/client/aci/create.go b/providers/azure/client/aci/create.go index 0d456e832..dc01f1f8a 100644 --- a/providers/azure/client/aci/create.go +++ b/providers/azure/client/aci/create.go @@ -2,6 +2,7 @@ package aci import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -14,7 +15,7 @@ import ( // CreateContainerGroup creates a new Azure Container Instance with the // provided properties. // From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/createorupdate -func (c *Client) CreateContainerGroup(resourceGroup, containerGroupName string, containerGroup ContainerGroup) (*ContainerGroup, error) { +func (c *Client) CreateContainerGroup(ctx context.Context, resourceGroup, containerGroupName string, containerGroup ContainerGroup) (*ContainerGroup, error) { urlParams := url.Values{ "api-version": []string{apiVersion}, } @@ -34,6 +35,7 @@ func (c *Client) CreateContainerGroup(resourceGroup, containerGroupName string, if err != nil { return nil, fmt.Errorf("Creating create/update container group uri request failed: %v", err) } + req = req.WithContext(ctx) // Add the parameters to the url. if err := api.ExpandURL(req.URL, map[string]string{ diff --git a/providers/azure/client/aci/delete.go b/providers/azure/client/aci/delete.go index de23a7e7c..5336a0753 100644 --- a/providers/azure/client/aci/delete.go +++ b/providers/azure/client/aci/delete.go @@ -1,6 +1,7 @@ package aci import ( + "context" "fmt" "net/http" "net/url" @@ -11,7 +12,7 @@ import ( // DeleteContainerGroup deletes an Azure Container Instance in the provided // resource group with the given container group name. // From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/delete -func (c *Client) DeleteContainerGroup(resourceGroup, containerGroupName string) error { +func (c *Client) DeleteContainerGroup(ctx context.Context, resourceGroup, containerGroupName string) error { urlParams := url.Values{ "api-version": []string{apiVersion}, } @@ -25,6 +26,7 @@ func (c *Client) DeleteContainerGroup(resourceGroup, containerGroupName string) if err != nil { return fmt.Errorf("Creating delete container group uri request failed: %v", err) } + req = req.WithContext(ctx) // Add the parameters to the url. if err := api.ExpandURL(req.URL, map[string]string{ diff --git a/providers/azure/client/aci/get.go b/providers/azure/client/aci/get.go index a60e9535d..f7f22fcbb 100644 --- a/providers/azure/client/aci/get.go +++ b/providers/azure/client/aci/get.go @@ -1,6 +1,7 @@ package aci import ( + "context" "encoding/json" "errors" "fmt" @@ -13,7 +14,7 @@ import ( // GetContainerGroup gets an Azure Container Instance in the provided // resource group with the given container group name. // From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/get -func (c *Client) GetContainerGroup(resourceGroup, containerGroupName string) (*ContainerGroup, error, *int) { +func (c *Client) GetContainerGroup(ctx context.Context, resourceGroup, containerGroupName string) (*ContainerGroup, error, *int) { urlParams := url.Values{ "api-version": []string{apiVersion}, } @@ -27,6 +28,7 @@ func (c *Client) GetContainerGroup(resourceGroup, containerGroupName string) (*C if err != nil { return nil, fmt.Errorf("Creating get container group uri request failed: %v", err), nil } + req = req.WithContext(ctx) // Add the parameters to the url. if err := api.ExpandURL(req.URL, map[string]string{ diff --git a/providers/azure/client/aci/list.go b/providers/azure/client/aci/list.go index 5cb1cfa54..14dc8eeda 100644 --- a/providers/azure/client/aci/list.go +++ b/providers/azure/client/aci/list.go @@ -1,6 +1,7 @@ package aci import ( + "context" "encoding/json" "errors" "fmt" @@ -16,7 +17,7 @@ import ( // if it is not empty. // From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/list // From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/listbyresourcegroup -func (c *Client) ListContainerGroups(resourceGroup string) (*ContainerGroupListResult, error) { +func (c *Client) ListContainerGroups(ctx context.Context, resourceGroup string) (*ContainerGroupListResult, error) { urlParams := url.Values{ "api-version": []string{apiVersion}, } @@ -35,6 +36,7 @@ func (c *Client) ListContainerGroups(resourceGroup string) (*ContainerGroupListR if err != nil { return nil, fmt.Errorf("Creating get container group list uri request failed: %v", err) } + req = req.WithContext(ctx) // Add the parameters to the url. if err := api.ExpandURL(req.URL, map[string]string{ diff --git a/providers/azure/client/aci/logs.go b/providers/azure/client/aci/logs.go index 36a2539f8..09411a209 100644 --- a/providers/azure/client/aci/logs.go +++ b/providers/azure/client/aci/logs.go @@ -1,6 +1,7 @@ package aci import ( + "context" "encoding/json" "errors" "fmt" @@ -13,7 +14,7 @@ import ( // GetContainerLogs returns the logs from an Azure Container Instance // in the provided resource group with the given container group name. // From: https://docs.microsoft.com/en-us/rest/api/container-instances/ContainerLogs/List -func (c *Client) GetContainerLogs(resourceGroup, containerGroupName, containerName string, tail int) (*Logs, error) { +func (c *Client) GetContainerLogs(ctx context.Context, resourceGroup, containerGroupName, containerName string, tail int) (*Logs, error) { urlParams := url.Values{ "api-version": []string{apiVersion}, "tail": []string{fmt.Sprintf("%d", tail)}, @@ -28,6 +29,7 @@ func (c *Client) GetContainerLogs(resourceGroup, containerGroupName, containerNa if err != nil { return nil, fmt.Errorf("Creating get container logs uri request failed: %v", err) } + req = req.WithContext(ctx) // Add the parameters to the url. if err := api.ExpandURL(req.URL, map[string]string{ diff --git a/providers/azure/client/aci/update.go b/providers/azure/client/aci/update.go index 6c54745bf..849a1c23a 100644 --- a/providers/azure/client/aci/update.go +++ b/providers/azure/client/aci/update.go @@ -1,8 +1,10 @@ package aci +import "context" + // UpdateContainerGroup updates an Azure Container Instance with the // provided properties. // From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/createorupdate -func (c *Client) UpdateContainerGroup(resourceGroup, containerGroupName string, containerGroup ContainerGroup) (*ContainerGroup, error) { - return c.CreateContainerGroup(resourceGroup, containerGroupName, containerGroup) +func (c *Client) UpdateContainerGroup(ctx context.Context, resourceGroup, containerGroupName string, containerGroup ContainerGroup) (*ContainerGroup, error) { + return c.CreateContainerGroup(ctx, resourceGroup, containerGroupName, containerGroup) } diff --git a/providers/azurebatch/batch.go b/providers/azurebatch/batch.go index 472b4ba10..cbf9f2f53 100644 --- a/providers/azurebatch/batch.go +++ b/providers/azurebatch/batch.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/Azure/go-autorest/autorest" "io" "io/ioutil" "log" @@ -13,6 +12,8 @@ import ( "strings" "time" + "github.com/Azure/go-autorest/autorest" + "github.com/Azure/go-autorest/autorest/azure" "github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch" @@ -157,7 +158,7 @@ func NewBatchProviderFromConfig(config *Config, rm *manager.ResourceManager, nod } // CreatePod accepts a Pod definition -func (p *Provider) CreatePod(pod *v1.Pod) error { +func (p *Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { log.Println("Creating pod...") podCommand, err := pod2docker.GetBashCommand(pod2docker.PodComponents{ InitContainers: pod.Spec.InitContainers, @@ -200,9 +201,9 @@ func (p *Provider) CreatePod(pod *v1.Pod) error { } // GetPodStatus retrieves the status of a given pod by name. -func (p *Provider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) { +func (p *Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { log.Println("Getting pod status ....") - pod, err := p.GetPod(namespace, name) + pod, err := p.GetPod(ctx, namespace, name) if err != nil { return nil, err @@ -214,13 +215,13 @@ func (p *Provider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) { } // UpdatePod accepts a Pod definition -func (p *Provider) UpdatePod(pod *v1.Pod) error { +func (p *Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { log.Println("Pod Update called: No-op as not implemented") return nil } // DeletePod accepts a Pod definition -func (p *Provider) DeletePod(pod *v1.Pod) error { +func (p *Provider) DeletePod(ctx context.Context, pod *v1.Pod) error { taskID := getTaskIDForPod(pod.Namespace, pod.Name) task, err := p.deleteTask(taskID) if err != nil { @@ -234,7 +235,7 @@ func (p *Provider) DeletePod(pod *v1.Pod) error { } // GetPod returns a pod by name -func (p *Provider) GetPod(namespace, name string) (*v1.Pod, error) { +func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) { log.Println("Getting Pod ...") task, err := p.getTask(getTaskIDForPod(namespace, name)) if err != nil { @@ -257,7 +258,7 @@ func (p *Provider) GetPod(namespace, name string) (*v1.Pod, error) { } // GetContainerLogs returns the logs of a container running in a pod by name. -func (p *Provider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { +func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { log.Println("Getting pod logs ....") taskID := getTaskIDForPod(namespace, podName) @@ -318,7 +319,7 @@ func (p *Provider) ExecInContainer(name string, uid types.UID, container string, } // GetPods retrieves a list of all pods scheduled to run. -func (p *Provider) GetPods() ([]*v1.Pod, error) { +func (p *Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { log.Println("Getting pods...") tasksPtr, err := p.listTasks() if err != nil { @@ -342,7 +343,7 @@ func (p *Provider) GetPods() ([]*v1.Pod, error) { } // Capacity returns a resource list containing the capacity limits -func (p *Provider) Capacity() v1.ResourceList { +func (p *Provider) Capacity(ctx context.Context) v1.ResourceList { return v1.ResourceList{ "cpu": resource.MustParse(p.cpu), "memory": resource.MustParse(p.memory), @@ -353,7 +354,7 @@ func (p *Provider) Capacity() v1.ResourceList { // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status // within Kubernetes. -func (p *Provider) NodeConditions() []v1.NodeCondition { +func (p *Provider) NodeConditions(ctx context.Context) []v1.NodeCondition { return []v1.NodeCondition{ { Type: "Ready", @@ -400,7 +401,7 @@ func (p *Provider) NodeConditions() []v1.NodeCondition { // NodeAddresses returns a list of addresses for the node status // within Kubernetes. -func (p *Provider) NodeAddresses() []v1.NodeAddress { +func (p *Provider) NodeAddresses(ctx context.Context) []v1.NodeAddress { // TODO: Make these dynamic and augment with custom ACI specific conditions of interest return []v1.NodeAddress{ { @@ -412,7 +413,7 @@ func (p *Provider) NodeAddresses() []v1.NodeAddress { // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. -func (p *Provider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints { +func (p *Provider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints { return &v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{ Port: p.daemonEndpointPort, diff --git a/providers/azurebatch/batch_test.go b/providers/azurebatch/batch_test.go index b919b1c7a..6040e8003 100644 --- a/providers/azurebatch/batch_test.go +++ b/providers/azurebatch/batch_test.go @@ -1,16 +1,18 @@ package azurebatch import ( + "context" "crypto/md5" "fmt" - "github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch" - "github.com/Azure/go-autorest/autorest" "io/ioutil" "net/http" "os" "strings" "testing" + "github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch" + "github.com/Azure/go-autorest/autorest" + apiv1 "k8s.io/api/core/v1" ) @@ -32,7 +34,7 @@ func Test_deletePod(t *testing.T) { pod.Name = podName pod.Namespace = podNamespace - err := provider.DeletePod(pod) + err := provider.DeletePod(context.Background(), pod) if err != nil { t.Error(err) } @@ -48,7 +50,7 @@ func Test_deletePod_doesntExist(t *testing.T) { return autorest.Response{}, fmt.Errorf("Task doesn't exist") } - err := provider.DeletePod(pod) + err := provider.DeletePod(context.Background(), pod) if err == nil { t.Error("Expected error but none seen") } @@ -72,7 +74,7 @@ func Test_createPod(t *testing.T) { return autorest.Response{}, nil } - err := provider.CreatePod(pod) + err := provider.CreatePod(context.Background(), pod) if err != nil { t.Errorf("Unexpected error creating pod %v", err) } @@ -88,7 +90,7 @@ func Test_createPod_errorResponse(t *testing.T) { return autorest.Response{}, fmt.Errorf("Failed creating task") } - err := provider.CreatePod(pod) + err := provider.CreatePod(context.Background(), pod) if err == nil { t.Error("Expected error but none seen") } @@ -118,7 +120,7 @@ func Test_readLogs_404Response_expectReturnStartupLogs(t *testing.T) { return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask") } - result, err := provider.GetContainerLogs(pod.Namespace, pod.Name, containerName, 0) + result, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, 0) if err != nil { t.Errorf("GetContainerLogs return error: %v", err) } @@ -152,7 +154,7 @@ func Test_readLogs_JsonResponse_expectFormattedLogs(t *testing.T) { return batch.ReadCloser{}, fmt.Errorf("Failed in test mock of getFileFromTask") } - result, err := provider.GetContainerLogs(pod.Namespace, pod.Name, containerName, 0) + result, err := provider.GetContainerLogs(context.Background(), pod.Namespace, pod.Name, containerName, 0) if err != nil { t.Errorf("GetContainerLogs return error: %v", err) } diff --git a/providers/cri/cri.go b/providers/cri/cri.go index 4c0b2336b..d14422b24 100644 --- a/providers/cri/cri.go +++ b/providers/cri/cri.go @@ -2,6 +2,7 @@ package cri import ( "bufio" + "context" "fmt" "io" "io/ioutil" @@ -486,7 +487,7 @@ func generateContainerConfig(container *v1.Container, pod *v1.Pod, imageRef, pod } // Provider function to create a Pod -func (p *CRIProvider) CreatePod(pod *v1.Pod) error { +func (p *CRIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { log.Printf("receive CreatePod %q", pod.Name) var attempt uint32 // TODO: Track attempts. Currently always 0 @@ -548,14 +549,14 @@ func (p *CRIProvider) CreatePod(pod *v1.Pod) error { } // Update is currently not required or even called by VK, so not implemented -func (p *CRIProvider) UpdatePod(pod *v1.Pod) error { +func (p *CRIProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { log.Printf("receive UpdatePod %q", pod.Name) return nil } // Provider function to delete a pod and its containers -func (p *CRIProvider) DeletePod(pod *v1.Pod) error { +func (p *CRIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { log.Printf("receive DeletePod %q", pod.Name) err := p.refreshNodeState() @@ -587,7 +588,7 @@ func (p *CRIProvider) DeletePod(pod *v1.Pod) error { } // Provider function to return a Pod spec - mostly used for its status -func (p *CRIProvider) GetPod(namespace, name string) (*v1.Pod, error) { +func (p *CRIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) { log.Printf("receive GetPod %q", name) err := p.refreshNodeState() @@ -624,7 +625,7 @@ func readLogFile(filename string, tail int) (string, error) { } // Provider function to read the logs of a container -func (p *CRIProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { +func (p *CRIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { log.Printf("receive GetContainerLogs %q", containerName) err := p.refreshNodeState() @@ -672,7 +673,7 @@ func (p *CRIProvider) findPodByName(namespace, name string) *CRIPod { } // Provider function to return the status of a Pod -func (p *CRIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) { +func (p *CRIProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { log.Printf("receive GetPodStatus %q", name) err := p.refreshNodeState() @@ -803,7 +804,7 @@ func createPodSpecFromCRI(p *CRIPod, nodeName string) *v1.Pod { // Provider function to return all known pods // TODO: Should this be all pods or just running pods? -func (p *CRIProvider) GetPods() ([]*v1.Pod, error) { +func (p *CRIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { log.Printf("receive GetPods") var pods []*v1.Pod @@ -831,7 +832,7 @@ func getSystemTotalMemory() uint64 { } // Provider function to return the capacity of the node -func (p *CRIProvider) Capacity() v1.ResourceList { +func (p *CRIProvider) Capacity(ctx context.Context) v1.ResourceList { log.Printf("receive Capacity") err := p.refreshNodeState() @@ -853,7 +854,7 @@ func (p *CRIProvider) Capacity() v1.ResourceList { // Provider function to return node conditions // TODO: For now, use the same node conditions as the MockProvider -func (p *CRIProvider) NodeConditions() []v1.NodeCondition { +func (p *CRIProvider) NodeConditions(ctx context.Context) []v1.NodeCondition { // TODO: Make this configurable return []v1.NodeCondition{ { @@ -901,7 +902,7 @@ func (p *CRIProvider) NodeConditions() []v1.NodeCondition { } // Provider function to return a list of node addresses -func (p *CRIProvider) NodeAddresses() []v1.NodeAddress { +func (p *CRIProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress { log.Printf("receive NodeAddresses - returning %s", p.internalIP) return []v1.NodeAddress{ @@ -913,7 +914,7 @@ func (p *CRIProvider) NodeAddresses() []v1.NodeAddress { } // Provider function to return the daemon endpoint -func (p *CRIProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints { +func (p *CRIProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints { log.Printf("receive NodeDaemonEndpoints - returning %v", p.daemonEndpointPort) return &v1.NodeDaemonEndpoints{ diff --git a/providers/huawei/cci.go b/providers/huawei/cci.go index 2d958a773..3f6dacbbe 100644 --- a/providers/huawei/cci.go +++ b/providers/huawei/cci.go @@ -2,6 +2,7 @@ package huawei import ( "bytes" + "context" "crypto/tls" "encoding/json" "errors" @@ -191,7 +192,7 @@ func (p *CCIProvider) deletePodAnnotations(pod *v1.Pod) error { } // CreatePod takes a Kubernetes Pod and deploys it within the huawei CCI provider. -func (p *CCIProvider) CreatePod(pod *v1.Pod) error { +func (p *CCIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { // Create the createPod request url p.setPodAnnotations(pod) uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods" @@ -218,12 +219,12 @@ func (p *CCIProvider) CreatePod(pod *v1.Pod) error { } // UpdatePod takes a Kubernetes Pod and updates it within the huawei CCI provider. -func (p *CCIProvider) UpdatePod(pod *v1.Pod) error { +func (p *CCIProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { return nil } // DeletePod takes a Kubernetes Pod and deletes it from the huawei CCI provider. -func (p *CCIProvider) DeletePod(pod *v1.Pod) error { +func (p *CCIProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { // Create the deletePod request url podName := pod.Namespace + "-" + pod.Name uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods/" + podName @@ -241,7 +242,7 @@ func (p *CCIProvider) DeletePod(pod *v1.Pod) error { } // GetPod retrieves a pod by name from the huawei CCI provider. -func (p *CCIProvider) GetPod(namespace, name string) (*v1.Pod, error) { +func (p *CCIProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) { // Create the getPod request url podName := namespace + "-" + name uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods/" + podName @@ -276,7 +277,7 @@ func (p *CCIProvider) GetPod(namespace, name string) (*v1.Pod, error) { } // GetContainerLogs retrieves the logs of a container by name from the huawei CCI provider. -func (p *CCIProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { +func (p *CCIProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { return "", nil } @@ -295,8 +296,8 @@ func (p *CCIProvider) ExecInContainer(name string, uid types.UID, container stri } // GetPodStatus retrieves the status of a pod by name from the huawei CCI provider. -func (p *CCIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) { - pod, err := p.GetPod(namespace, name) +func (p *CCIProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { + pod, err := p.GetPod(ctx, namespace, name) if err != nil { return nil, err } @@ -309,7 +310,7 @@ func (p *CCIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error } // GetPods retrieves a list of all pods running on the huawei CCI provider. -func (p *CCIProvider) GetPods() ([]*v1.Pod, error) { +func (p *CCIProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { // Create the getPod request url uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods" r, err := http.NewRequest("GET", uri, nil) @@ -344,7 +345,7 @@ func (p *CCIProvider) GetPods() ([]*v1.Pod, error) { } // Capacity returns a resource list with the capacity constraints of the huawei CCI provider. -func (p *CCIProvider) Capacity() v1.ResourceList { +func (p *CCIProvider) Capacity(ctx context.Context) v1.ResourceList { return v1.ResourceList{ "cpu": resource.MustParse(p.cpu), "memory": resource.MustParse(p.memory), @@ -354,7 +355,7 @@ func (p *CCIProvider) Capacity() v1.ResourceList { // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is // polled periodically to update the node status within Kubernetes. -func (p *CCIProvider) NodeConditions() []v1.NodeCondition { +func (p *CCIProvider) NodeConditions(ctx context.Context) []v1.NodeCondition { // TODO: Make these dynamic and augment with custom CCI specific conditions of interest return []v1.NodeCondition{ { @@ -402,7 +403,7 @@ func (p *CCIProvider) NodeConditions() []v1.NodeCondition { // NodeAddresses returns a list of addresses for the node status // within Kubernetes. -func (p *CCIProvider) NodeAddresses() []v1.NodeAddress { +func (p *CCIProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress { // TODO: Make these dynamic and augment with custom CCI specific conditions of interest return []v1.NodeAddress{ { @@ -414,7 +415,7 @@ func (p *CCIProvider) NodeAddresses() []v1.NodeAddress { // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. -func (p *CCIProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints { +func (p *CCIProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints { return &v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{ Port: p.daemonEndpointPort, diff --git a/providers/huawei/cci_test.go b/providers/huawei/cci_test.go index 873a1cc1b..57893cf75 100644 --- a/providers/huawei/cci_test.go +++ b/providers/huawei/cci_test.go @@ -1,6 +1,7 @@ package huawei import ( + "context" "net/http" "os" "testing" @@ -81,7 +82,7 @@ func TestCreatePod(t *testing.T) { }, } - if err := provider.CreatePod(pod); err != nil { + if err := provider.CreatePod(context.Background(), pod); err != nil { t.Fatal("Failed to create pod", err) } } @@ -121,7 +122,7 @@ func TestGetPod(t *testing.T) { }, } } - pod, err := provider.GetPod(podNamespace, podName) + pod, err := provider.GetPod(context.Background(), podNamespace, podName) if err != nil { t.Fatal("Failed to get pod", err) } @@ -172,7 +173,7 @@ func TestGetPods(t *testing.T) { } return http.StatusOK, []v1.Pod{pod} } - pods, err := provider.GetPods() + pods, err := provider.GetPods(context.Background()) if err != nil { t.Fatal("Failed to get pods", err) } diff --git a/providers/hypersh/hypersh.go b/providers/hypersh/hypersh.go index dba97fea3..b312e4c5b 100755 --- a/providers/hypersh/hypersh.go +++ b/providers/hypersh/hypersh.go @@ -198,7 +198,7 @@ func newHTTPClient(host string, tlsOptions *tlsconfig.Options) (*http.Client, er // CreatePod accepts a Pod definition and creates // a hyper.sh deployment -func (p *HyperProvider) CreatePod(pod *v1.Pod) error { +func (p *HyperProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { log.Printf("receive CreatePod %q\n", pod.Name) //Ignore daemonSet Pod @@ -254,12 +254,12 @@ func (p *HyperProvider) CreatePod(pod *v1.Pod) error { } // UpdatePod is a noop, hyper.sh currently does not support live updates of a pod. -func (p *HyperProvider) UpdatePod(pod *v1.Pod) error { +func (p *HyperProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { return nil } // DeletePod deletes the specified pod out of hyper.sh. -func (p *HyperProvider) DeletePod(pod *v1.Pod) (err error) { +func (p *HyperProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { log.Printf("receive DeletePod %q\n", pod.Name) var ( containerName = fmt.Sprintf("pod-%s-%s", pod.Name, pod.Name) @@ -298,7 +298,7 @@ func (p *HyperProvider) DeletePod(pod *v1.Pod) (err error) { // GetPod returns a pod by name that is running inside hyper.sh // returns nil if a pod by that name is not found. -func (p *HyperProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) { +func (p *HyperProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { var ( containerName = fmt.Sprintf("pod-%s-%s", name, name) container types.ContainerJSON @@ -318,7 +318,7 @@ func (p *HyperProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (p *HyperProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { +func (p *HyperProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { return "", nil } @@ -338,8 +338,8 @@ func (p *HyperProvider) ExecInContainer(name string, uid apitypes.UID, container // GetPodStatus returns the status of a pod by name that is running inside hyper.sh // returns nil if a pod by that name is not found. -func (p *HyperProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) { - pod, err := p.GetPod(namespace, name) +func (p *HyperProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { + pod, err := p.GetPod(ctx, namespace, name) if err != nil { return nil, err } @@ -347,7 +347,7 @@ func (p *HyperProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, err } // GetPods returns a list of all pods known to be running within hyper.sh. -func (p *HyperProvider) GetPods() ([]*v1.Pod, error) { +func (p *HyperProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { log.Printf("receive GetPods\n") filter, err := filters.FromParam(fmt.Sprintf("{\"label\":{\"%s=%s\":true}}", nodeLabel, p.nodeName)) if err != nil { @@ -376,7 +376,7 @@ func (p *HyperProvider) GetPods() ([]*v1.Pod, error) { } // Capacity returns a resource list containing the capacity limits set for hyper.sh. -func (p *HyperProvider) Capacity() v1.ResourceList { +func (p *HyperProvider) Capacity(ctx context.Context) v1.ResourceList { // TODO: These should be configurable return v1.ResourceList{ "cpu": resource.MustParse("20"), @@ -387,7 +387,7 @@ func (p *HyperProvider) Capacity() v1.ResourceList { // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status // within Kubernetes. -func (p *HyperProvider) NodeConditions() []v1.NodeCondition { +func (p *HyperProvider) NodeConditions(ctx context.Context) []v1.NodeCondition { // TODO: Make these dynamic and augment with custom hyper.sh specific conditions of interest return []v1.NodeCondition{ { @@ -436,13 +436,13 @@ func (p *HyperProvider) NodeConditions() []v1.NodeCondition { // NodeAddresses returns a list of addresses for the node status // within Kubernetes. -func (p *HyperProvider) NodeAddresses() []v1.NodeAddress { +func (p *HyperProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress { return nil } // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. -func (p *HyperProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints { +func (p *HyperProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints { return &v1.NodeDaemonEndpoints{} } diff --git a/providers/mock/mock.go b/providers/mock/mock.go index f84eed032..58803119a 100644 --- a/providers/mock/mock.go +++ b/providers/mock/mock.go @@ -1,6 +1,7 @@ package mock import ( + "context" "encoding/json" "fmt" "io" @@ -96,7 +97,7 @@ func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) } // CreatePod accepts a Pod definition and stores it in memory. -func (p *MockProvider) CreatePod(pod *v1.Pod) error { +func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { log.Printf("receive CreatePod %q\n", pod.Name) key, err := buildKey(pod) @@ -110,7 +111,7 @@ func (p *MockProvider) CreatePod(pod *v1.Pod) error { } // UpdatePod accepts a Pod definition and updates its reference. -func (p *MockProvider) UpdatePod(pod *v1.Pod) error { +func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { log.Printf("receive UpdatePod %q\n", pod.Name) key, err := buildKey(pod) @@ -124,7 +125,7 @@ func (p *MockProvider) UpdatePod(pod *v1.Pod) error { } // DeletePod deletes the specified pod out of memory. -func (p *MockProvider) DeletePod(pod *v1.Pod) (err error) { +func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { log.Printf("receive DeletePod %q\n", pod.Name) key, err := buildKey(pod) @@ -138,7 +139,7 @@ func (p *MockProvider) DeletePod(pod *v1.Pod) (err error) { } // GetPod returns a pod by name that is stored in memory. -func (p *MockProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) { +func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { log.Printf("receive GetPod %q\n", name) key, err := buildKeyFromNames(namespace, name) @@ -154,7 +155,7 @@ func (p *MockProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) { } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (p *MockProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { +func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { log.Printf("receive GetContainerLogs %q\n", podName) return "", nil } @@ -174,7 +175,7 @@ func (p *MockProvider) ExecInContainer(name string, uid types.UID, container str // GetPodStatus returns the status of a pod by name that is "running". // returns nil if a pod by that name is not found. -func (p *MockProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) { +func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { log.Printf("receive GetPodStatus %q\n", name) now := metav1.NewTime(time.Now()) @@ -200,7 +201,7 @@ func (p *MockProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, erro }, } - pod, err := p.GetPod(namespace, name) + pod, err := p.GetPod(ctx, namespace, name) if err != nil { return status, err } @@ -223,7 +224,7 @@ func (p *MockProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, erro } // GetPods returns a list of all pods known to be "running". -func (p *MockProvider) GetPods() ([]*v1.Pod, error) { +func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { log.Printf("receive GetPods\n") var pods []*v1.Pod @@ -236,7 +237,7 @@ func (p *MockProvider) GetPods() ([]*v1.Pod, error) { } // Capacity returns a resource list containing the capacity limits. -func (p *MockProvider) Capacity() v1.ResourceList { +func (p *MockProvider) Capacity(ctx context.Context) v1.ResourceList { return v1.ResourceList{ "cpu": resource.MustParse(p.config.CPU), "memory": resource.MustParse(p.config.Memory), @@ -246,7 +247,7 @@ func (p *MockProvider) Capacity() v1.ResourceList { // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status // within Kubernetes. -func (p *MockProvider) NodeConditions() []v1.NodeCondition { +func (p *MockProvider) NodeConditions(ctx context.Context) []v1.NodeCondition { // TODO: Make this configurable return []v1.NodeCondition{ { @@ -295,7 +296,7 @@ func (p *MockProvider) NodeConditions() []v1.NodeCondition { // NodeAddresses returns a list of addresses for the node status // within Kubernetes. -func (p *MockProvider) NodeAddresses() []v1.NodeAddress { +func (p *MockProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress { return []v1.NodeAddress{ { Type: "InternalIP", @@ -306,7 +307,7 @@ func (p *MockProvider) NodeAddresses() []v1.NodeAddress { // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. -func (p *MockProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints { +func (p *MockProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints { return &v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{ Port: p.daemonEndpointPort, diff --git a/providers/sfmesh/sfmesh.go b/providers/sfmesh/sfmesh.go index 8d656a2d6..3e590a246 100644 --- a/providers/sfmesh/sfmesh.go +++ b/providers/sfmesh/sfmesh.go @@ -404,7 +404,7 @@ func (p *SFMeshProvider) getEndpointFromContainerPort(port v1.ContainerPort) ser } // CreatePod accepts a Pod definition and creates a SF Mesh App. -func (p *SFMeshProvider) CreatePod(pod *v1.Pod) error { +func (p *SFMeshProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { log.Printf("receive CreatePod %q\n", pod.Name) meshApp, err := p.getMeshApplication(pod) @@ -440,7 +440,7 @@ func (p *SFMeshProvider) CreatePod(pod *v1.Pod) error { } // UpdatePod updates the pod running inside SF Mesh. -func (p *SFMeshProvider) UpdatePod(pod *v1.Pod) error { +func (p *SFMeshProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { log.Printf("receive UpdatePod %q\n", pod.Name) app, err := p.getMeshApplication(pod) @@ -457,10 +457,10 @@ func (p *SFMeshProvider) UpdatePod(pod *v1.Pod) error { } // DeletePod deletes the specified pod out of SF Mesh. -func (p *SFMeshProvider) DeletePod(pod *v1.Pod) (err error) { +func (p *SFMeshProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { log.Printf("receive DeletePod %q\n", pod.Name) - _, err = p.appClient.Delete(context.Background(), p.resourceGroup, pod.Name) + _, err = p.appClient.Delete(ctx, p.resourceGroup, pod.Name) if err != nil { return err } @@ -470,10 +470,10 @@ func (p *SFMeshProvider) DeletePod(pod *v1.Pod) (err error) { // GetPod returns a pod by name that is running inside SF Mesh. // returns nil if a pod by that name is not found. -func (p *SFMeshProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) { +func (p *SFMeshProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { log.Printf("receive GetPod %q\n", name) - resp, err := p.appClient.Get(context.Background(), p.resourceGroup, name) + resp, err := p.appClient.Get(ctx, p.resourceGroup, name) httpResponse := resp.Response.Response if err != nil { @@ -694,7 +694,7 @@ func (p *SFMeshProvider) applicationDescriptionToPod(app servicefabricmesh.Appli } // GetContainerLogs retrieves the logs of a container by name. -func (p *SFMeshProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { +func (p *SFMeshProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { log.Printf("receive GetContainerLogs %q\n", podName) return "", nil } @@ -713,8 +713,8 @@ func (p *SFMeshProvider) ExecInContainer(name string, uid types.UID, container s // GetPodStatus returns the status of a pod by name that is "running". // returns nil if a pod by that name is not found. -func (p *SFMeshProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) { - pod, err := p.GetPod(namespace, name) +func (p *SFMeshProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { + pod, err := p.GetPod(ctx, namespace, name) if err != nil { return nil, err } @@ -727,12 +727,12 @@ func (p *SFMeshProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, er } // GetPods returns a list of all pods known to be running within SF Mesh. -func (p *SFMeshProvider) GetPods() ([]*v1.Pod, error) { +func (p *SFMeshProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { log.Printf("receive GetPods\n") var pods []*v1.Pod - list, err := p.appClient.ListByResourceGroup(context.Background(), p.resourceGroup) + list, err := p.appClient.ListByResourceGroup(ctx, p.resourceGroup) if err != nil { return pods, err } @@ -766,7 +766,7 @@ func (p *SFMeshProvider) GetPods() ([]*v1.Pod, error) { } // Capacity returns a resource list containing the capacity limits set for SF Mesh. -func (p *SFMeshProvider) Capacity() v1.ResourceList { +func (p *SFMeshProvider) Capacity(ctx context.Context) v1.ResourceList { return v1.ResourceList{ "cpu": resource.MustParse(defaultCPUCapacity), "memory": resource.MustParse(defaultMemoryCapacity), @@ -776,7 +776,7 @@ func (p *SFMeshProvider) Capacity() v1.ResourceList { // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status // within Kubernetes. -func (p *SFMeshProvider) NodeConditions() []v1.NodeCondition { +func (p *SFMeshProvider) NodeConditions(ctx context.Context) []v1.NodeCondition { // TODO: Make this configurable return []v1.NodeCondition{ { @@ -825,7 +825,7 @@ func (p *SFMeshProvider) NodeConditions() []v1.NodeCondition { // NodeAddresses returns a list of addresses for the node status // within Kubernetes. -func (p *SFMeshProvider) NodeAddresses() []v1.NodeAddress { +func (p *SFMeshProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress { return []v1.NodeAddress{ { Type: "InternalIP", @@ -836,7 +836,7 @@ func (p *SFMeshProvider) NodeAddresses() []v1.NodeAddress { // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. -func (p *SFMeshProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints { +func (p *SFMeshProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints { return &v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{ Port: p.daemonEndpointPort, diff --git a/providers/vic/vic_provider.go b/providers/vic/vic_provider.go index 75498e7d3..f66522b8f 100644 --- a/providers/vic/vic_provider.go +++ b/providers/vic/vic_provider.go @@ -193,7 +193,7 @@ func initLogger() { } // CreatePod takes a Kubernetes Pod and deploys it within the provider. -func (v *VicProvider) CreatePod(pod *v1.Pod) error { +func (v *VicProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { op := trace.NewOperation(context.Background(), "CreatePod - %s", pod.Name) defer trace.End(trace.Begin(pod.Name, op)) @@ -216,14 +216,14 @@ func (v *VicProvider) CreatePod(pod *v1.Pod) error { } // UpdatePod takes a Kubernetes Pod and updates it within the provider. -func (v *VicProvider) UpdatePod(pod *v1.Pod) error { +func (v *VicProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { op := trace.NewOperation(context.Background(), "UpdatePod - %s", pod.Name) defer trace.End(trace.Begin(pod.Name, op)) return nil } // DeletePod takes a Kubernetes Pod and deletes it from the provider. -func (v *VicProvider) DeletePod(pod *v1.Pod) error { +func (v *VicProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { op := trace.NewOperation(context.Background(), "DeletePod - %s", pod.Name) defer trace.End(trace.Begin(pod.Name, op)) @@ -240,7 +240,7 @@ func (v *VicProvider) DeletePod(pod *v1.Pod) error { } // GetPod retrieves a pod by name from the provider (can be cached). -func (v *VicProvider) GetPod(namespace, name string) (*v1.Pod, error) { +func (v *VicProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) { op := trace.NewOperation(context.Background(), "GetPod - %s", name) defer trace.End(trace.Begin(name, op)) @@ -254,7 +254,7 @@ func (v *VicProvider) GetPod(namespace, name string) (*v1.Pod, error) { } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (v *VicProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { +func (v *VicProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { op := trace.NewOperation(context.Background(), "GetContainerLogs - pod[%s], container[%s]", podName, containerName) defer trace.End(trace.Begin("", op)) @@ -276,7 +276,7 @@ func (p *VicProvider) ExecInContainer(name string, uid types.UID, container stri // GetPodStatus retrieves the status of a pod by name from the provider. // This function needs to return a status or the reconcile loop will stop running. -func (v *VicProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) { +func (v *VicProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { op := trace.NewOperation(context.Background(), "GetPodStatus - pod[%s], namespace", name, namespace) defer trace.End(trace.Begin("GetPodStatus", op)) @@ -313,7 +313,7 @@ func (v *VicProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error } var nodeAddress string - nodeAddresses := v.NodeAddresses() + nodeAddresses := v.NodeAddresses(ctx) if len(nodeAddresses) > 0 { nodeAddress = nodeAddresses[0].Address } else { @@ -348,7 +348,7 @@ func (v *VicProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error } // GetPods retrieves a list of all pods running on the provider (can be cached). -func (v *VicProvider) GetPods() ([]*v1.Pod, error) { +func (v *VicProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { op := trace.NewOperation(context.Background(), "GetPods") defer trace.End(trace.Begin("GetPods", op)) @@ -362,7 +362,7 @@ func (v *VicProvider) GetPods() ([]*v1.Pod, error) { } // Capacity returns a resource list with the capacity constraints of the provider. -func (v *VicProvider) Capacity() v1.ResourceList { +func (v *VicProvider) Capacity(ctx context.Context) v1.ResourceList { op := trace.NewOperation(context.Background(), "VicProvider.Capacity") defer trace.End(trace.Begin("", op)) @@ -382,7 +382,7 @@ func (v *VicProvider) Capacity() v1.ResourceList { // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is polled periodically to update the node status // within Kubernetes. -func (v *VicProvider) NodeConditions() []v1.NodeCondition { +func (v *VicProvider) NodeConditions(ctx context.Context) []v1.NodeCondition { // TODO: Make these dynamic and augment with custom ACI specific conditions of interest return []v1.NodeCondition{ { @@ -430,7 +430,7 @@ func (v *VicProvider) NodeConditions() []v1.NodeCondition { // NodeAddresses returns a list of addresses for the node status // within Kubernetes. -func (v *VicProvider) NodeAddresses() []v1.NodeAddress { +func (v *VicProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress { addrs, err := net.InterfaceAddrs() if err != nil { return []v1.NodeAddress{} @@ -454,7 +454,7 @@ func (v *VicProvider) NodeAddresses() []v1.NodeAddress { // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. -func (v *VicProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints { +func (v *VicProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints { return &v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{ Port: 80, diff --git a/providers/web/broker.go b/providers/web/broker.go index d12260825..2db56334b 100644 --- a/providers/web/broker.go +++ b/providers/web/broker.go @@ -16,6 +16,7 @@ package web import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -63,17 +64,17 @@ func NewBrokerProvider(nodeName, operatingSystem string, daemonEndpointPort int3 } // CreatePod accepts a Pod definition and forwards the call to the web endpoint -func (p *BrokerProvider) CreatePod(pod *v1.Pod) error { +func (p *BrokerProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { return p.createUpdatePod(pod, "POST", "/createPod") } // UpdatePod accepts a Pod definition and forwards the call to the web endpoint -func (p *BrokerProvider) UpdatePod(pod *v1.Pod) error { +func (p *BrokerProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { return p.createUpdatePod(pod, "PUT", "/updatePod") } // DeletePod accepts a Pod definition and forwards the call to the web endpoint -func (p *BrokerProvider) DeletePod(pod *v1.Pod) error { +func (p *BrokerProvider) DeletePod(ctx context.Context, pod *v1.Pod) error { urlPath, err := url.Parse("/deletePod") if err != nil { return err @@ -90,7 +91,7 @@ func (p *BrokerProvider) DeletePod(pod *v1.Pod) error { } // GetPod returns a pod by name that is being managed by the web server -func (p *BrokerProvider) GetPod(namespace, name string) (*v1.Pod, error) { +func (p *BrokerProvider) GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) { urlPathStr := fmt.Sprintf( "/getPod?namespace=%s&name=%s", url.QueryEscape(namespace), @@ -109,7 +110,7 @@ func (p *BrokerProvider) GetPod(namespace, name string) (*v1.Pod, error) { } // GetContainerLogs returns the logs of a container running in a pod by name. -func (p *BrokerProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) { +func (p *BrokerProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) { urlPathStr := fmt.Sprintf( "/getContainerLogs?namespace=%s&podName=%s&containerName=%s&tail=%d", url.QueryEscape(namespace), @@ -140,7 +141,7 @@ func (p *BrokerProvider) ExecInContainer(name string, uid types.UID, container s } // GetPodStatus retrieves the status of a given pod by name. -func (p *BrokerProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) { +func (p *BrokerProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { urlPathStr := fmt.Sprintf( "/getPodStatus?namespace=%s&name=%s", url.QueryEscape(namespace), @@ -159,7 +160,7 @@ func (p *BrokerProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, er } // GetPods retrieves a list of all pods scheduled to run. -func (p *BrokerProvider) GetPods() ([]*v1.Pod, error) { +func (p *BrokerProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { var pods []*v1.Pod err := p.doGetRequest("/getPods", &pods) @@ -167,7 +168,7 @@ func (p *BrokerProvider) GetPods() ([]*v1.Pod, error) { } // Capacity returns a resource list containing the capacity limits -func (p *BrokerProvider) Capacity() v1.ResourceList { +func (p *BrokerProvider) Capacity(ctx context.Context) v1.ResourceList { var resourceList v1.ResourceList err := p.doGetRequest("/capacity", &resourceList) @@ -180,7 +181,7 @@ func (p *BrokerProvider) Capacity() v1.ResourceList { } // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status -func (p *BrokerProvider) NodeConditions() []v1.NodeCondition { +func (p *BrokerProvider) NodeConditions(ctx context.Context) []v1.NodeCondition { var nodeConditions []v1.NodeCondition err := p.doGetRequest("/nodeConditions", &nodeConditions) @@ -194,7 +195,7 @@ func (p *BrokerProvider) NodeConditions() []v1.NodeCondition { // NodeAddresses returns a list of addresses for the node status // within Kubernetes. -func (p *BrokerProvider) NodeAddresses() []v1.NodeAddress { +func (p *BrokerProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress { var nodeAddresses []v1.NodeAddress err := p.doGetRequest("/nodeAddresses", &nodeAddresses) @@ -208,7 +209,7 @@ func (p *BrokerProvider) NodeAddresses() []v1.NodeAddress { // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. -func (p *BrokerProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints { +func (p *BrokerProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints { return &v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{ Port: p.daemonEndpointPort, diff --git a/vkubelet/apiserver.go b/vkubelet/apiserver.go index b415742b1..0b971bae9 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/apiserver.go @@ -134,7 +134,7 @@ func (s *KubeletServer) ApiServerHandler(w http.ResponseWriter, req *http.Reques tail = t } - podsLogs, err := s.p.GetContainerLogs(namespace, pod, container, tail) + podsLogs, err := s.p.GetContainerLogs(ctx, namespace, pod, container, tail) if err != nil { log.G(ctx).WithError(err).Error("error getting container logs") http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError) diff --git a/vkubelet/provider.go b/vkubelet/provider.go index 5f932f891..6ace5e839 100644 --- a/vkubelet/provider.go +++ b/vkubelet/provider.go @@ -14,44 +14,44 @@ import ( // Provider contains the methods required to implement a virtual-kubelet provider. type Provider interface { // CreatePod takes a Kubernetes Pod and deploys it within the provider. - CreatePod(pod *v1.Pod) error + CreatePod(ctx context.Context, pod *v1.Pod) error // UpdatePod takes a Kubernetes Pod and updates it within the provider. - UpdatePod(pod *v1.Pod) error + UpdatePod(ctx context.Context, pod *v1.Pod) error // DeletePod takes a Kubernetes Pod and deletes it from the provider. - DeletePod(pod *v1.Pod) error + DeletePod(ctx context.Context, pod *v1.Pod) error // GetPod retrieves a pod by name from the provider (can be cached). - GetPod(namespace, name string) (*v1.Pod, error) + GetPod(ctx context.Context, namespace, name string) (*v1.Pod, error) // GetContainerLogs retrieves the logs of a container by name from the provider. - GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) + GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) // ExecInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error // GetPodStatus retrieves the status of a pod by name from the provider. - GetPodStatus(namespace, name string) (*v1.PodStatus, error) + GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) // GetPods retrieves a list of all pods running on the provider (can be cached). - GetPods() ([]*v1.Pod, error) + GetPods(context.Context) ([]*v1.Pod, error) // Capacity returns a resource list with the capacity constraints of the provider. - Capacity() v1.ResourceList + Capacity(context.Context) v1.ResourceList // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is // polled periodically to update the node status within Kubernetes. - NodeConditions() []v1.NodeCondition + NodeConditions(context.Context) []v1.NodeCondition // NodeAddresses returns a list of addresses for the node status // within Kubernetes. - NodeAddresses() []v1.NodeAddress + NodeAddresses(context.Context) []v1.NodeAddress // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. - NodeDaemonEndpoints() *v1.NodeDaemonEndpoints + NodeDaemonEndpoints(context.Context) *v1.NodeDaemonEndpoints // OperatingSystem returns the operating system the provider is for. OperatingSystem() string diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index 19ffedfb6..83838469a 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -189,11 +189,11 @@ func (s *Server) registerNode(ctx context.Context) error { Architecture: "amd64", KubeletVersion: "v1.11.2", }, - Capacity: s.provider.Capacity(), - Allocatable: s.provider.Capacity(), - Conditions: s.provider.NodeConditions(), - Addresses: s.provider.NodeAddresses(), - DaemonEndpoints: *s.provider.NodeDaemonEndpoints(), + Capacity: s.provider.Capacity(ctx), + Allocatable: s.provider.Capacity(ctx), + Conditions: s.provider.NodeConditions(ctx), + Addresses: s.provider.NodeAddresses(ctx), + DaemonEndpoints: *s.provider.NodeDaemonEndpoints(ctx), }, } @@ -297,13 +297,13 @@ func (s *Server) updateNode(ctx context.Context) { } n.ResourceVersion = "" // Blank out resource version to prevent object has been modified error - n.Status.Conditions = s.provider.NodeConditions() + n.Status.Conditions = s.provider.NodeConditions(ctx) - capacity := s.provider.Capacity() + capacity := s.provider.Capacity(ctx) n.Status.Capacity = capacity n.Status.Allocatable = capacity - n.Status.Addresses = s.provider.NodeAddresses() + n.Status.Addresses = s.provider.NodeAddresses(ctx) n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n) if err != nil { @@ -319,7 +319,7 @@ func (s *Server) reconcile(ctx context.Context) { logger.Debug("Start reconcile") defer logger.Debug("End reconcile") - providerPods, err := s.provider.GetPods() + providerPods, err := s.provider.GetPods(ctx) if err != nil { logger.WithError(err).Error("Error getting pod list from provider") return @@ -377,7 +377,7 @@ func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error { logger := log.G(ctx).WithField("pod", pod.Name) - if origErr := s.provider.CreatePod(pod); origErr != nil { + if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error pod.Status.Phase = corev1.PodFailed pod.Status.Reason = PodStatusReason_ProviderFailed @@ -398,7 +398,7 @@ func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error { func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error { var delErr error - if delErr = s.provider.DeletePod(pod); delErr != nil && errors.IsNotFound(delErr) { + if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) { return delErr } @@ -430,7 +430,7 @@ func (s *Server) updatePodStatuses(ctx context.Context) { continue } - status, err := s.provider.GetPodStatus(pod.Namespace, pod.Name) + status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name) if err != nil { log.G(ctx).WithField("pod", pod.Name).Error("Error retrieving pod status") return