Files
virtual-kubelet/providers/huawei/cci.go
Eric Jadi 89921a08c1 Added clean exec functionality + ACI implementation - V2 (#244)
* Stubs and vkubelet changes

* added dependencies

* Azure provider exec implementation

* added missing dependencies

* added vkubelet imports

* added huawei exec stub

* Fixed exec tab functionality / stdin buffer length

* Removed unused import

* Added provider function GetPodFullName + ACI implementation

* Added error handling in ACI provider exec
2018-07-06 14:12:05 -07:00

429 lines
12 KiB
Go

package huawei
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"time"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers/huawei/auth"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
const (
podAnnotationNamespaceKey = "virtual-kubelet-namespace"
podAnnotationPodNameKey = "virtual-kubelet-podname"
podAnnotationClusterNameKey = "virtual-kubelet-clustername"
podAnnotationUIDkey = "virtual-kubelet-uid"
podAnnotationNodeName = "virtual-kubelet-nodename"
podAnnotationCreationTimestamp = "virtual-kubelet-creationtimestamp"
)
var defaultApiEndpoint string = "https://cciback.cn-north-1.huaweicloud.com"
// CCIProvider implements the virtual-kubelet provider interface and communicates with Huawei's CCI APIs.
type CCIProvider struct {
appKey string
appSecret string
apiEndpoint string
region string
service string
project string
internalIP string
daemonEndpointPort int32
nodeName string
operatingSystem string
client *Client
resourceManager *manager.ResourceManager
cpu string
memory string
pods string
}
// Client represents the client config for Huawei.
type Client struct {
Signer auth.Signer
HTTPClient http.Client
}
// NewCCIProvider creates a new CCI provider.
func NewCCIProvider(config string, rm *manager.ResourceManager, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*CCIProvider, error) {
p := CCIProvider{}
if config != "" {
f, err := os.Open(config)
if err != nil {
return nil, err
}
defer f.Close()
if err := p.loadConfig(f); err != nil {
return nil, err
}
}
if appKey := os.Getenv("CCI_APP_KEP"); appKey != "" {
p.appKey = appKey
}
if p.appKey == "" {
return nil, errors.New("AppKey can not be empty please set CCI_APP_KEP")
}
if appSecret := os.Getenv("CCI_APP_SECRET"); appSecret != "" {
p.appSecret = appSecret
}
if p.appSecret == "" {
return nil, errors.New("AppSecret can not be empty please set CCI_APP_SECRET")
}
p.client = new(Client)
p.client.Signer = &auth.SignerHws{
AppKey: p.appKey,
AppSecret: p.appSecret,
Region: p.region,
Service: p.service,
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
p.client.HTTPClient = http.Client{
Transport: tr,
}
p.resourceManager = rm
p.apiEndpoint = defaultApiEndpoint
p.nodeName = nodeName
p.operatingSystem = operatingSystem
p.internalIP = internalIP
p.daemonEndpointPort = daemonEndpointPort
if err := p.createProject(); err != nil {
return nil, err
}
return &p, nil
}
func (p *CCIProvider) createProject() error {
// Create the createProject request url
uri := p.apiEndpoint + "/api/v1/namespaces"
// build the request
project := &v1.Namespace{
TypeMeta: metav1.TypeMeta{
Kind: "Namespace",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: p.project,
},
}
var bodyReader io.Reader
body, err := json.Marshal(project)
if err != nil {
return err
}
if body != nil {
bodyReader = bytes.NewReader(body)
}
r, err := http.NewRequest("POST", uri, bodyReader)
if err != nil {
return err
}
if err = p.signRequest(r); err != nil {
return fmt.Errorf("Sign the request failed: %v", err)
}
_, err = p.client.HTTPClient.Do(r)
return err
}
func (p *CCIProvider) signRequest(r *http.Request) error {
r.Header.Add("content-type", "application/json; charset=utf-8")
if err := p.client.Signer.Sign(r); err != nil {
return fmt.Errorf("Sign the request failed: %v", err)
}
return nil
}
func (p *CCIProvider) setPodAnnotations(pod *v1.Pod) {
metav1.SetMetaDataAnnotation(&pod.ObjectMeta, podAnnotationNamespaceKey, pod.Namespace)
metav1.SetMetaDataAnnotation(&pod.ObjectMeta, podAnnotationClusterNameKey, pod.ClusterName)
metav1.SetMetaDataAnnotation(&pod.ObjectMeta, podAnnotationPodNameKey, pod.Name)
metav1.SetMetaDataAnnotation(&pod.ObjectMeta, podAnnotationUIDkey, string(pod.UID))
metav1.SetMetaDataAnnotation(&pod.ObjectMeta, podAnnotationNodeName, pod.Spec.NodeName)
metav1.SetMetaDataAnnotation(&pod.ObjectMeta, podAnnotationCreationTimestamp, pod.CreationTimestamp.String())
pod.Namespace = p.project
pod.Name = pod.Namespace + "-" + pod.Name
pod.UID = ""
pod.Spec.NodeName = ""
pod.CreationTimestamp = metav1.Time{}
}
func (p *CCIProvider) deletePodAnnotations(pod *v1.Pod) error {
pod.Name = pod.Annotations[podAnnotationPodNameKey]
pod.Namespace = pod.Annotations[podAnnotationNamespaceKey]
pod.UID = types.UID(pod.Annotations[podAnnotationUIDkey])
pod.ClusterName = pod.Annotations[podAnnotationClusterNameKey]
pod.Spec.NodeName = pod.Annotations[podAnnotationNodeName]
if pod.Annotations[podAnnotationCreationTimestamp] != "" {
t, err := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", pod.Annotations[podAnnotationCreationTimestamp])
if err != nil {
return err
}
podCreationTimestamp := metav1.NewTime(t)
pod.CreationTimestamp = podCreationTimestamp
}
delete(pod.Annotations, podAnnotationPodNameKey)
delete(pod.Annotations, podAnnotationNamespaceKey)
delete(pod.Annotations, podAnnotationUIDkey)
delete(pod.Annotations, podAnnotationClusterNameKey)
delete(pod.Annotations, podAnnotationNodeName)
delete(pod.Annotations, podAnnotationCreationTimestamp)
pod.Annotations = nil
return nil
}
// CreatePod takes a Kubernetes Pod and deploys it within the huawei CCI provider.
func (p *CCIProvider) CreatePod(pod *v1.Pod) error {
// Create the createPod request url
p.setPodAnnotations(pod)
uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods"
// build the request
var bodyReader io.Reader
body, err := json.Marshal(pod)
if err != nil {
return err
}
if body != nil {
bodyReader = bytes.NewReader(body)
}
r, err := http.NewRequest("POST", uri, bodyReader)
if err != nil {
return err
}
if err = p.signRequest(r); err != nil {
return fmt.Errorf("Sign the request failed: %v", err)
}
_, err = p.client.HTTPClient.Do(r)
return err
}
// UpdatePod takes a Kubernetes Pod and updates it within the huawei CCI provider.
func (p *CCIProvider) UpdatePod(pod *v1.Pod) error {
return nil
}
// DeletePod takes a Kubernetes Pod and deletes it from the huawei CCI provider.
func (p *CCIProvider) DeletePod(pod *v1.Pod) error {
// Create the deletePod request url
podName := pod.Namespace + "-" + pod.Name
uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods/" + podName
// build the request
r, err := http.NewRequest("DELETE", uri, nil)
if err != nil {
return err
}
if err = p.signRequest(r); err != nil {
return fmt.Errorf("Sign the request failed: %v", err)
}
_, err = p.client.HTTPClient.Do(r)
return err
}
// GetPod retrieves a pod by name from the huawei CCI provider.
func (p *CCIProvider) GetPod(namespace, name string) (*v1.Pod, error) {
// Create the getPod request url
podName := namespace + "-" + name
uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods/" + podName
r, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, fmt.Errorf("Create get POD request failed: %v", err)
}
if err = p.signRequest(r); err != nil {
return nil, fmt.Errorf("Sign the request failed: %v", err)
}
resp, err := p.client.HTTPClient.Do(r)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var pod v1.Pod
if err = json.Unmarshal(body, &pod); err != nil {
return nil, err
}
if err := p.deletePodAnnotations(&pod); err != nil {
return nil, err
}
return &pod, nil
}
// GetContainerLogs retrieves the logs of a container by name from the huawei CCI provider.
func (p *CCIProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
return "", nil
}
// Get full pod name as defined in the provider context
// TODO: Implementation
func (p *CCIProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *CCIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}
// GetPodStatus retrieves the status of a pod by name from the huawei CCI provider.
func (p *CCIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
pod, err := p.GetPod(namespace, name)
if err != nil {
return nil, err
}
if pod == nil {
return nil, nil
}
return &pod.Status, nil
}
// GetPods retrieves a list of all pods running on the huawei CCI provider.
func (p *CCIProvider) GetPods() ([]*v1.Pod, error) {
// Create the getPod request url
uri := p.apiEndpoint + "/api/v1/namespaces/" + p.project + "/pods"
r, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, fmt.Errorf("Create get POD request failed: %v", err)
}
if err = p.signRequest(r); err != nil {
return nil, fmt.Errorf("Sign the request failed: %v", err)
}
resp, err := p.client.HTTPClient.Do(r)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var pods []*v1.Pod
if err = json.Unmarshal(body, &pods); err != nil {
return nil, err
}
for _, pod := range pods {
if err := p.deletePodAnnotations(pod); err != nil {
return nil, err
}
}
return pods, nil
}
// Capacity returns a resource list with the capacity constraints of the huawei CCI provider.
func (p *CCIProvider) Capacity() v1.ResourceList {
return v1.ResourceList{
"cpu": resource.MustParse(p.cpu),
"memory": resource.MustParse(p.memory),
"pods": resource.MustParse(p.pods),
}
}
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is
// polled periodically to update the node status within Kubernetes.
func (p *CCIProvider) NodeConditions() []v1.NodeCondition {
// TODO: Make these dynamic and augment with custom CCI specific conditions of interest
return []v1.NodeCondition{
{
Type: "Ready",
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "KubeletReady",
Message: "kubelet is ready.",
},
{
Type: "OutOfDisk",
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "KubeletHasSufficientDisk",
Message: "kubelet has sufficient disk space available",
},
{
Type: "MemoryPressure",
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "KubeletHasSufficientMemory",
Message: "kubelet has sufficient memory available",
},
{
Type: "DiskPressure",
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "KubeletHasNoDiskPressure",
Message: "kubelet has no disk pressure",
},
{
Type: "NetworkUnavailable",
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "RouteCreated",
Message: "RouteController created a route",
},
}
}
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *CCIProvider) NodeAddresses() []v1.NodeAddress {
// TODO: Make these dynamic and augment with custom CCI specific conditions of interest
return []v1.NodeAddress{
{
Type: "InternalIP",
Address: p.internalIP,
},
}
}
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *CCIProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
return &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort,
},
}
}
// OperatingSystem returns the operating system the huawei CCI provider is for.
func (p *CCIProvider) OperatingSystem() string {
return p.operatingSystem
}