Added clean exec functionality + ACI implementation - V2 (#244)

* Stubs and vkubelet changes

* added dependencies

* Azure provider exec implementation

* added missing dependencies

* added vkubelet imports

* added huawei exec stub

* Fixed exec tab functionality / stdin buffer length

* Removed unused import

* Added provider function GetPodFullName + ACI implementation

* Added error handling in ACI provider exec
This commit is contained in:
Eric Jadi
2018-07-06 23:12:05 +02:00
committed by Robbie Zhang
parent 3e8a1b9bb5
commit 89921a08c1
94 changed files with 23090 additions and 4 deletions

View File

@@ -2,6 +2,7 @@ package aws
import (
"fmt"
"io"
"log"
"time"
@@ -11,6 +12,8 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
// FargateProvider implements the virtual-kubelet provider interface.
@@ -175,6 +178,19 @@ func (p *FargateProvider) GetContainerLogs(namespace, podName, containerName str
return p.cluster.GetContainerLogs(namespace, podName, containerName, tail)
}
// Get full pod name as defined in the provider context
func (p *FargateProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *FargateProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}
// GetPodStatus retrieves the status of a pod by name from the provider.
func (p *FargateProvider) GetPodStatus(namespace, name string) (*corev1.PodStatus, error) {
log.Printf("Received GetPodStatus request for %s/%s.\n", namespace, name)

View File

@@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
@@ -13,6 +14,7 @@ import (
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/virtual-kubelet/virtual-kubelet/manager"
client "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
@@ -21,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
// The service account secret mount path.
@@ -188,7 +191,7 @@ func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operat
if podsQuota := os.Getenv("ACI_QUOTA_POD"); podsQuota != "" {
p.pods = podsQuota
}
p.operatingSystem = operatingSystem
p.nodeName = nodeName
p.internalIP = internalIP
@@ -329,6 +332,88 @@ func (p *ACIProvider) GetContainerLogs(namespace, podName, containerName string,
return logContent, err
}
// Get full pod name as defined in the provider context
func (p *ACIProvider) GetPodFullName(namespace string, pod string) string {
return fmt.Sprintf("%s-%s", namespace, pod)
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, errstream io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
// Cleanup on exit
if out != nil {
defer out.Close()
}
if errstream != nil {
defer errstream.Close()
}
cg, err, _ := p.aciClient.GetContainerGroup(p.resourceGroup, name)
if err != nil {
return err
}
// Set default terminal size
terminalSize := remotecommand.TerminalSize{
Height: 60,
Width: 120,
}
if resize != nil {
terminalSize = <-resize // Receive terminal resize event if resize stream is present
}
xcrsp, err := p.aciClient.LaunchExec(p.resourceGroup, cg.Name, container, cmd[0], terminalSize)
if err != nil {
return err
}
wsUri := xcrsp.WebSocketUri
password := xcrsp.Password
c, _, _ := websocket.DefaultDialer.Dial(wsUri, nil)
c.WriteMessage(websocket.TextMessage, []byte(password)) // Websocket password needs to be sent before WS terminal is active
// Cleanup on exit
defer c.Close()
defer out.Close()
if in != nil {
// Inputstream
go func() {
for {
var msg = make([]byte, 512)
n, err := in.Read(msg)
if err == io.EOF {
// Handle EOF
}
if err != nil {
// Handle errors
return
}
if n > 0 { // Only call WriteMessage if there is data to send
c.WriteMessage(websocket.BinaryMessage, msg[:n])
}
}
}()
}
if out != nil {
//Outputstream
for {
_, cr, err := c.NextReader()
if err != nil {
// Handle errors
break
}
io.Copy(out, cr)
}
}
return nil
}
// GetPodStatus returns the status of a pod by name that is running inside ACI
// returns nil if a pod by that name is not found.
func (p *ACIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {

View File

@@ -17,6 +17,7 @@ const (
containerGroupListURLPath = "subscriptions/{{.subscriptionId}}/providers/Microsoft.ContainerInstance/containerGroups"
containerGroupListByResourceGroupURLPath = "subscriptions/{{.subscriptionId}}/resourceGroups/{{.resourceGroup}}/providers/Microsoft.ContainerInstance/containerGroups"
containerLogsURLPath = containerGroupURLPath + "/containers/{{.containerName}}/logs"
containerExecURLPath = containerGroupURLPath + "/containers/{{.containerName}}/exec"
)
// Client is a client for interacting with Azure Container Instances.

View File

@@ -0,0 +1,84 @@
package aci
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api"
"k8s.io/client-go/tools/remotecommand"
)
type TerminalSizeRequest struct {
Width int
Height int
}
// Starts the exec command for a specified container instance in a specified resource group and container group.
// From: https://docs.microsoft.com/en-us/rest/api/container-instances/startcontainer/launchexec
func (c *Client) LaunchExec(resourceGroup, containerGroupName, containerName, command string, terminalSize remotecommand.TerminalSize) (ExecResponse, error) {
urlParams := url.Values{
"api-version": []string{apiVersion},
}
// Create the url to call Azure REST API
uri := api.ResolveRelative(baseURI, containerExecURLPath)
uri += "?" + url.Values(urlParams).Encode()
var xc ExecRequest
xc.Command = command
xc.TerminalSize.Rows = int(terminalSize.Height)
xc.TerminalSize.Cols = int(terminalSize.Width)
var xcrsp ExecResponse
xcrsp.Password = ""
xcrsp.WebSocketUri = ""
b := new(bytes.Buffer)
if err := json.NewEncoder(b).Encode(xc); err != nil {
return xcrsp, fmt.Errorf("Encoding create launch exec body request failed: %v", err)
}
req, err := http.NewRequest("POST", uri, b)
if err != nil {
return xcrsp, fmt.Errorf("Creating launch exec uri request failed: %v", err)
}
// Add the parameters to the url.
if err := api.ExpandURL(req.URL, map[string]string{
"subscriptionId": c.auth.SubscriptionID,
"resourceGroup": resourceGroup,
"containerGroupName": containerGroupName,
"containerName": containerName,
}); err != nil {
return xcrsp, fmt.Errorf("Expanding URL with parameters failed: %v", err)
}
// Send the request.
resp, err := c.hc.Do(req)
if err != nil {
return xcrsp, fmt.Errorf("Sending launch exec request failed: %v", err)
}
defer resp.Body.Close()
// 200 (OK) is a success response.
if err := api.CheckResponse(resp); err != nil {
return xcrsp, err
}
// Decode the body from the response.
if resp.Body == nil {
return xcrsp, errors.New("Create launch exec returned an empty body in the response")
}
if err := json.NewDecoder(resp.Body).Decode(&xcrsp); err != nil {
return xcrsp, fmt.Errorf("Decoding create launch exec response body failed: %v", err)
}
return xcrsp, nil
}

View File

@@ -275,3 +275,21 @@ type VolumeMount struct {
MountPath string `json:"mountPath,omitempty"`
ReadOnly bool `json:"readOnly,omitempty"`
}
// TerminalSize is the size of the Launch Exec terminal
type TerminalSize struct {
Rows int `json:"rows,omitempty"`
Cols int `json:"cols,omitempty"`
}
// ExecRequest is a request for Launch Exec API response for ACI.
type ExecRequest struct {
Command string `json:"command,omitempty"`
TerminalSize TerminalSize `json:"terminalSize,omitempty"`
}
// ExecRequest is a request for Launch Exec API response for ACI.
type ExecResponse struct {
WebSocketUri string `json:"webSocketUri,omitempty"`
Password string `json:"password,omitempty"`
}

View File

@@ -5,12 +5,13 @@ import (
"encoding/json"
"fmt"
"github.com/Azure/go-autorest/autorest"
"os"
"strings"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/Azure/go-autorest/autorest/azure"
@@ -22,6 +23,8 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
const (
@@ -300,6 +303,20 @@ func (p *Provider) GetContainerLogs(namespace, podName, containerName string, ta
return result, nil
}
// Get full pod name as defined in the provider context
// TODO: Implementation
func (p *Provider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *Provider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}
// GetPods retrieves a list of all pods scheduled to run.
func (p *Provider) GetPods() ([]*v1.Pod, error) {
log.Println("Getting pods...")

View File

@@ -3,6 +3,7 @@ package cri
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"net"
"os"
@@ -21,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
criapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)
@@ -642,6 +644,20 @@ func (p *CRIProvider) GetContainerLogs(namespace, podName, containerName string,
return readLogFile(container.LogPath, tail)
}
// Get full pod name as defined in the provider context
// TODO: Implementation
func (p *CRIProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *CRIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}
// Find a pod by name and namespace. Pods are indexed by UID
func (p *CRIProvider) findPodByName(namespace, name string) *CRIPod {
var found *CRIPod

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"time"
@@ -18,6 +19,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
const (
@@ -278,6 +280,20 @@ func (p *CCIProvider) GetContainerLogs(namespace, podName, containerName string,
return "", nil
}
// Get full pod name as defined in the provider context
// TODO: Implementation
func (p *CCIProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *CCIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}
// GetPodStatus retrieves the status of a pod by name from the huawei CCI provider.
func (p *CCIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
pod, err := p.GetPod(namespace, name)

View File

@@ -3,10 +3,12 @@ package hypersh
import (
"context"
"fmt"
"io"
"log"
"net/http"
"os"
"runtime"
"time"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
@@ -21,6 +23,8 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
var host = "tcp://*.hyper.sh:443"
@@ -318,6 +322,20 @@ func (p *HyperProvider) GetContainerLogs(namespace, podName, containerName strin
return "", nil
}
// Get full pod name as defined in the provider context
// TODO: Implementation
func (p *HyperProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *HyperProvider) ExecInContainer(name string, uid apitypes.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}
// GetPodStatus returns the status of a pod by name that is running inside hyper.sh
// returns nil if a pod by that name is not found.
func (p *HyperProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {

View File

@@ -3,6 +3,7 @@ package mock
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"time"
@@ -11,6 +12,8 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
const (
@@ -148,6 +151,19 @@ func (p *MockProvider) GetContainerLogs(namespace, podName, containerName string
return "", nil
}
// Get full pod name as defined in the provider context
// TODO: Implementation
func (p *MockProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *MockProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}
// GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found.
func (p *MockProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {

View File

@@ -2,6 +2,7 @@ package vic
import (
"fmt"
"io"
"os"
"path"
"syscall"
@@ -33,6 +34,8 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
type VicProvider struct {
@@ -258,6 +261,19 @@ func (v *VicProvider) GetContainerLogs(namespace, podName, containerName string,
return "", nil
}
// Get full pod name as defined in the provider context
// TODO: Implementation
func (p *VicProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *VicProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}
// GetPodStatus retrieves the status of a pod by name from the provider.
// This function needs to return a status or the reconcile loop will stop running.
func (v *VicProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {

View File

@@ -21,6 +21,7 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
@@ -28,6 +29,8 @@ import (
"github.com/cenkalti/backoff"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
// BrokerProvider implements the virtual-kubelet provider interface by forwarding kubelet calls to a web endpoint.
@@ -122,6 +125,20 @@ func (p *BrokerProvider) GetContainerLogs(namespace, podName, containerName stri
return string(response), nil
}
// Get full pod name as defined in the provider context
// TODO: Implementation
func (p *BrokerProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *BrokerProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}
// GetPodStatus retrieves the status of a given pod by name.
func (p *BrokerProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
urlPathStr := fmt.Sprintf(