From 785d223eee527a76a3810ca68ad93550cb6f9dec Mon Sep 17 00:00:00 2001 From: Jimmy Xu Date: Wed, 20 Dec 2017 01:37:04 +0800 Subject: [PATCH] [hyper-provider] 1.fix start container 2.support DeletePod --- providers/hypersh/hypersh.go | 251 ++++++++++++++++++++++++++++++++--- 1 file changed, 232 insertions(+), 19 deletions(-) diff --git a/providers/hypersh/hypersh.go b/providers/hypersh/hypersh.go index f9c99ff36..654816edd 100755 --- a/providers/hypersh/hypersh.go +++ b/providers/hypersh/hypersh.go @@ -7,6 +7,8 @@ import ( "net/http" "os" "runtime" + "strings" + "time" "github.com/docker/go-connections/nat" "github.com/docker/go-connections/sockets" @@ -82,12 +84,12 @@ func NewHyperProvider(config string, rm *manager.ResourceManager, nodeName, oper p.instanceType = it } - host = fmt.Sprintf("tcp://%v.hyper.sh:443", p.region) + host = fmt.Sprintf("tcp://%s.hyper.sh:443", p.region) httpClient, err := newHTTPClient(host, &tlsconfig.Options{InsecureSkipVerify: false}) customHeaders := map[string]string{} ver := "0.1" - customHeaders["User-Agent"] = fmt.Sprintf("Virtual-Kubelet-Client/%v (%v)", ver, runtime.GOOS) + customHeaders["User-Agent"] = fmt.Sprintf("Virtual-Kubelet-Client/%s (%s)", ver, runtime.GOOS) p.operatingSystem = operatingSystem p.nodeName = nodeName @@ -128,8 +130,15 @@ func newHTTPClient(host string, tlsOptions *tlsconfig.Options) (*http.Client, er // CreatePod accepts a Pod definition and creates // a hyper.sh deployment func (p *HyperProvider) CreatePod(pod *v1.Pod) error { + log.Printf("receive CreatePod %q\n", pod.Name) - // get containers + //Ignore daemonSet Pod + if pod != nil && pod.OwnerReferences != nil && len(pod.OwnerReferences) != 0 && pod.OwnerReferences[0].Kind == "DaemonSet" { + log.Printf("Skip to create DaemonSet pod %q\n", pod.Name) + return nil + } + + // Get containers containers, hostConfigs, err := getContainers(pod) if err != nil { return err @@ -139,6 +148,7 @@ func (p *HyperProvider) CreatePod(pod *v1.Pod) error { // Iterate over the containers to create and start them. for k, ctr := range containers { + //one container in a Pod in hyper.sh currently containerName := fmt.Sprintf("pod-%s-%s", pod.Name, pod.Spec.Containers[k].Name) // Add labels to the pod containers. @@ -147,23 +157,26 @@ func (p *HyperProvider) CreatePod(pod *v1.Pod) error { nodeLabel: p.nodeName, instanceTypeLabel: p.instanceType, } + hostConfigs[k].NetworkMode = "bridge" // Create the container. resp, err := p.hyperClient.ContainerCreate(context.Background(), &ctr, &hostConfigs[k], &network.NetworkingConfig{}, containerName) if err != nil { - return fmt.Errorf("Creating container %q failed in pod %q: %v", containerName, pod.Name, err) + return err } + log.Printf("container %q for pod %q was created\n", resp.ID, pod.Name) + // Iterate throught the warnings. for _, warning := range resp.Warnings { - log.Printf("Warning while creating container %q for pod %q: %s", containerName, pod.Name, warning) + log.Printf("warning while creating container %q for pod %q: %s", containerName, pod.Name, warning) } // Start the container. if err := p.hyperClient.ContainerStart(context.Background(), resp.ID, ""); err != nil { - return fmt.Errorf("Starting container %q failed in pod %q: %v", containerName, pod.Name, err) + return err } + log.Printf("container %q for pod %q was started\n", resp.ID, pod.Name) } - return nil } @@ -173,40 +186,101 @@ func (p *HyperProvider) UpdatePod(pod *v1.Pod) error { } // DeletePod deletes the specified pod out of hyper.sh. -func (p *HyperProvider) DeletePod(pod *v1.Pod) error { +func (p *HyperProvider) DeletePod(pod *v1.Pod) (err error) { + log.Printf("receive DeletePod %q\n", pod.Name) + var ( + containerName = fmt.Sprintf("pod-%s-%s", pod.Name, pod.Name) + container types.ContainerJSON + ) + // Inspect hyper container + container, err = p.hyperClient.ContainerInspect(context.Background(), containerName) + if err != nil { + return err + } + // Check container label + if v, ok := container.Config.Labels[containerLabel]; ok { + // Check value of label + if v != pod.Name { + return fmt.Errorf("the label %q of hyper container %q should be %q, but it's %q currently", pod.Name, containerLabel, container.Name, pod.Name, v) + } + rmOptions := types.ContainerRemoveOptions{ + RemoveVolumes: true, + Force: true, + } + // Delete hyper container + resp, err := p.hyperClient.ContainerRemove(context.Background(), container.ID, rmOptions) + if err != nil { + return err + } + // Iterate throught the warnings. + for _, warning := range resp { + log.Printf("warning while deleting container %q for pod %q: %s", container.ID, pod.Name, warning) + } + log.Printf("container %q for pod %q was deleted\n", container.ID, pod.Name) + } else { + return fmt.Errorf("hyper container %q has no label %q", pod.Name, container.Name, containerLabel) + } return nil } // GetPod returns a pod by name that is running inside hyper.sh // returns nil if a pod by that name is not found. -func (p *HyperProvider) GetPod(namespace, name string) (*v1.Pod, error) { - return nil, nil +func (p *HyperProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) { + var ( + containerName = fmt.Sprintf("pod-%s-%s", name, name) + container types.ContainerJSON + ) + // Inspect hyper container + container, err = p.hyperClient.ContainerInspect(context.Background(), containerName) + if err != nil { + return nil, err + } + // Convert hyper container into Pod + pod, err = containerJSONToPod(&container) + if err != nil { + return nil, err + } else { + return pod, nil + } } // GetPodStatus returns the status of a pod by name that is running inside hyper.sh // returns nil if a pod by that name is not found. func (p *HyperProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) { - return nil, nil + pod, err := p.GetPod(namespace, name) + if err != nil { + return nil, err + } + return &pod.Status, nil } // GetPods returns a list of all pods known to be running within hyper.sh. func (p *HyperProvider) GetPods() ([]*v1.Pod, error) { - filter, err := filters.FromParam(fmt.Sprintf("{\"%v\":{\"%v\":true}}", nodeLabel, p.nodeName)) + log.Printf("receive GetPods\n") + filter, err := filters.FromParam(fmt.Sprintf("{\"label\":{\"%s=%s\":true}}", nodeLabel, p.nodeName)) if err != nil { - return nil, fmt.Errorf("Creating filter to get containers by node name failed: %v", err) + return nil, err } // Filter by label. - _, err = p.hyperClient.ContainerList(context.Background(), types.ContainerListOptions{ + containers, err := p.hyperClient.ContainerList(context.Background(), types.ContainerListOptions{ Filter: filter, All: true, }) if err != nil { - return nil, fmt.Errorf("Listing containers failed: %v", err) + return nil, err } + log.Printf("found %d pods\n", len(containers)) - // TODO: convert containers into pods - - return nil, nil + var pods = []*v1.Pod{} + for _, container := range containers { + pod, err := containerToPod(&container) + if err != nil { + fmt.Errorf("convert container %q to pod error: %v\n", container.ID, err) + continue + } + pods = append(pods, pod) + } + return pods, nil } // Capacity returns a resource list containing the capacity limits set for hyper.sh. @@ -289,7 +363,7 @@ func getContainers(pod *v1.Pod) ([]container.Config, []container.HostConfig, err if p.HostPort == 0 { p.HostPort = p.ContainerPort } - port, err := nat.NewPort(string(p.Protocol), fmt.Sprintf("%d", p.HostPort)) + port, err := nat.NewPort(strings.ToLower(string(p.Protocol)), fmt.Sprintf("%d", p.HostPort)) if err != nil { return nil, nil, fmt.Errorf("creating new port in container conversion failed: %v", err) } @@ -297,6 +371,7 @@ func getContainers(pod *v1.Pod) ([]container.Config, []container.HostConfig, err portBindings[port] = []nat.PortBinding{ { + HostIP: "0.0.0.0", HostPort: fmt.Sprintf("%d", p.HostPort), }, } @@ -328,3 +403,141 @@ func getContainers(pod *v1.Pod) ([]container.Config, []container.HostConfig, err } return containers, hostConfigs, nil } + +func containerJSONToPod(container *types.ContainerJSON) (*v1.Pod, error) { + // TODO: convert containers into pods + podName, found := container.Config.Labels[containerLabel] + if !found { + return nil, fmt.Errorf("can not found podName: key %q not found in container label", containerLabel) + } + + nodeName, found := container.Config.Labels[nodeLabel] + if !found { + return nil, fmt.Errorf("can not found nodeName: key %q not found in container label", containerLabel) + } + + created, err := time.Parse(time.RFC3339, container.Created) + if err != nil { + return nil, fmt.Errorf("parse Created time failed:%v", container.Created) + } + + p := v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: "default", + CreationTimestamp: metav1.NewTime(created), + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + Volumes: []v1.Volume{}, + Containers: []v1.Container{ + { + Name: podName, + Image: container.Config.Image, + Command: container.Config.Cmd, + }, + }, + }, + Status: v1.PodStatus{ + Phase: hyperStateToPodPhase(container.State.Status), + Conditions: []v1.PodCondition{}, + Message: "", + Reason: "", + HostIP: "", + PodIP: container.NetworkSettings.IPAddress, + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "", + State: v1.ContainerState{}, + Ready: container.State.Running, + RestartCount: int32(container.RestartCount), + Image: container.Config.Image, + ImageID: container.Image, + ContainerID: container.ID, + }, + }, + }, + } + + return &p, nil +} + +func containerToPod(container *types.Container) (*v1.Pod, error) { + // TODO: convert containers into pods + podName, found := container.Labels[containerLabel] + if !found { + return nil, fmt.Errorf("can not found podName: key %q not found in container label", containerLabel) + } + + nodeName, found := container.Labels[nodeLabel] + if !found { + return nil, fmt.Errorf("can not found nodeName: key %q not found in container label", containerLabel) + } + + p := v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: "default", + ClusterName: "", + UID: "", + CreationTimestamp: metav1.NewTime(time.Unix(container.Created, 0)), + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + Volumes: []v1.Volume{}, + Containers: []v1.Container{ + { + Name: podName, + Image: container.Image, + Command: strings.Split(container.Command, " "), + Resources: v1.ResourceRequirements{}, + }, + }, + }, + Status: v1.PodStatus{ + //Phase: "", + Conditions: []v1.PodCondition{}, + Message: "", + Reason: "", + HostIP: "", + PodIP: "", + ContainerStatuses: []v1.ContainerStatus{ + { + Name: container.Names[0], + Ready: string(container.State) == string(v1.PodRunning), + Image: container.Image, + ImageID: container.ImageID, + ContainerID: container.ID, + }, + }, + }, + } + + return &p, nil +} + +func hyperStateToPodPhase(state string) v1.PodPhase { + switch strings.ToLower(state) { + case "running": + return v1.PodRunning + case "paused": + return v1.PodSucceeded + case "restarting": + return v1.PodPending + case "created": + return v1.PodPending + case "dead": + return v1.PodFailed + case "exited": + return v1.PodFailed + } + return v1.PodUnknown +}