From d809dff289e8d9cdcf5be3888a026507cae0c47e Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 26 Apr 2019 12:57:56 -0700 Subject: [PATCH] Refactor exec interface (#578) This removes the dependence on remotecommand in providers as well as the need to expose provider ID's for the sake of the ExecInContainer API. --- providers/alibabacloud/eci.go | 9 ++-- providers/aws/provider.go | 11 ++-- providers/azure/aci.go | 43 +++++++++------ providers/azurebatch/batch.go | 11 ++-- providers/cri/cri.go | 8 ++- providers/huawei/cci.go | 8 +-- providers/mock/mock.go | 7 +-- providers/nomad/nomad.go | 8 +-- providers/openstack/zun.go | 8 ++- providers/provider.go | 24 ++++++--- providers/vic/vic_provider.go | 10 ++-- providers/web/broker.go | 9 ++-- vkubelet/api/exec.go | 99 +++++++++++++++++++++++++++++++++-- 13 files changed, 174 insertions(+), 81 deletions(-) diff --git a/providers/alibabacloud/eci.go b/providers/alibabacloud/eci.go index 23d68e1ef..9382d4a8f 100644 --- a/providers/alibabacloud/eci.go +++ b/providers/alibabacloud/eci.go @@ -9,7 +9,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "os" "strconv" "strings" @@ -19,13 +18,13 @@ import ( "github.com/cpuguy83/strongerrors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/alibabacloud/eci" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/remotecommand" ) // The service account secret mount path. @@ -318,9 +317,9 @@ func (p *ECIProvider) GetPodFullName(namespace string, pod string) string { return fmt.Sprintf("%s-%s", namespace, pod) } -// ExecInContainer executes a command in a container in the pod, copying data +// RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *ECIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, errstream io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { +func (p *ECIProvider) RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach providers.AttachIO) error { return nil } diff --git a/providers/aws/provider.go b/providers/aws/provider.go index 255afc283..0582c8bf7 100644 --- a/providers/aws/provider.go +++ b/providers/aws/provider.go @@ -3,18 +3,16 @@ package aws import ( "context" "fmt" - "io" "log" "time" "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/aws/fargate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/remotecommand" ) // FargateProvider implements the virtual-kubelet provider interface. @@ -184,12 +182,9 @@ func (p *FargateProvider) GetPodFullName(namespace string, pod string) string { return "" } -// ExecInContainer executes a command in a container in the pod, copying data +// RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *FargateProvider) ExecInContainer( - name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, - tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { - log.Printf("Received ExecInContainer request for %s.\n", container) +func (p *FargateProvider) RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach providers.AttachIO) error { return errNotImplemented } diff --git a/providers/azure/aci.go b/providers/azure/aci.go index 8267f86b4..1a99ae3c7 100644 --- a/providers/azure/aci.go +++ b/providers/azure/aci.go @@ -25,6 +25,7 @@ import ( "github.com/virtual-kubelet/azure-aci/client/network" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/trace" v1 "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" @@ -33,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" clientcmdv1 "k8s.io/client-go/tools/clientcmd/api/v1" - "k8s.io/client-go/tools/remotecommand" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) @@ -800,33 +800,35 @@ func (p *ACIProvider) GetPodFullName(namespace string, pod string) string { return fmt.Sprintf("%s-%s", namespace, pod) } -// ExecInContainer executes a command in a container in the pod, copying data +// RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, errstream io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { - // Cleanup on exit +func (p *ACIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { + out := attach.Stdout() if out != nil { defer out.Close() } - if errstream != nil { - defer errstream.Close() - } - cg, _, err := p.aciClient.GetContainerGroup(context.TODO(), p.resourceGroup, name) + cg, _, err := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, p.GetPodFullName(namespace, name)) if err != nil { return err } // Set default terminal size - terminalSize := remotecommand.TerminalSize{ + size := providers.TermSize{ Height: 60, Width: 120, } + resize := attach.Resize() if resize != nil { - terminalSize = <-resize // Receive terminal resize event if resize stream is present + select { + case size = <-resize: + case <-ctx.Done(): + return ctx.Err() + } } - ts := aci.TerminalSizeRequest{Height: int(terminalSize.Height), Width: int(terminalSize.Width)} + ts := aci.TerminalSizeRequest{Height: int(size.Height), Width: int(size.Width)} xcrsp, err := p.aciClient.LaunchExec(p.resourceGroup, cg.Name, container, cmd[0], ts) if err != nil { return err @@ -840,12 +842,17 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri // Cleanup on exit defer c.Close() - defer out.Close() + in := attach.Stdin() if in != nil { - // Inputstream go func() { for { + select { + case <-ctx.Done(): + return + default: + } + var msg = make([]byte, 512) n, err := in.Read(msg) if err == io.EOF { @@ -858,14 +865,18 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri if n > 0 { // Only call WriteMessage if there is data to send c.WriteMessage(websocket.BinaryMessage, msg[:n]) } - } }() } if out != nil { - //Outputstream for { + select { + case <-ctx.Done(): + break + default: + } + _, cr, err := c.NextReader() if err != nil { // Handle errors @@ -875,7 +886,7 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri } } - return nil + return ctx.Err() } // GetPodStatus returns the status of a pod by name that is running inside ACI diff --git a/providers/azurebatch/batch.go b/providers/azurebatch/batch.go index 5577fa389..2d6ddc4ed 100644 --- a/providers/azurebatch/batch.go +++ b/providers/azurebatch/batch.go @@ -4,13 +4,11 @@ import ( "context" "encoding/json" "fmt" - "io" "io/ioutil" "log" "net/http" "os" "strings" - "time" "github.com/Azure/go-autorest/autorest" @@ -20,12 +18,11 @@ import ( "github.com/Azure/go-autorest/autorest/to" "github.com/lawrencegripper/pod2docker" "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" azureCreds "github.com/virtual-kubelet/virtual-kubelet/providers/azure" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/remotecommand" ) const ( @@ -310,10 +307,10 @@ func (p *Provider) GetPodFullName(namespace string, pod string) string { return "" } -// ExecInContainer executes a command in a container in the pod, copying data +// RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. // TODO: Implementation -func (p *Provider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { +func (p *Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { log.Printf("receive ExecInContainer %q\n", container) return nil } diff --git a/providers/cri/cri.go b/providers/cri/cri.go index 15b65c4d5..835d63bd3 100644 --- a/providers/cri/cri.go +++ b/providers/cri/cri.go @@ -6,7 +6,6 @@ import ( "bufio" "context" "fmt" - "io" "io/ioutil" "net" "os" @@ -21,12 +20,11 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" "google.golang.org/grpc" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" types "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/remotecommand" criapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" ) @@ -654,10 +652,10 @@ func (p *CRIProvider) GetPodFullName(namespace string, pod string) string { return "" } -// ExecInContainer executes a command in a container in the pod, copying data +// RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. // TODO: Implementation -func (p *CRIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { +func (p *CRIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { log.Printf("receive ExecInContainer %q\n", container) return nil } diff --git a/providers/huawei/cci.go b/providers/huawei/cci.go index da6e4ac4d..89111abf9 100644 --- a/providers/huawei/cci.go +++ b/providers/huawei/cci.go @@ -16,12 +16,12 @@ import ( "github.com/cpuguy83/strongerrors" "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/huawei/auth" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/remotecommand" ) const ( @@ -308,10 +308,10 @@ func (p *CCIProvider) GetPodFullName(namespace string, pod string) string { return "" } -// ExecInContainer executes a command in a container in the pod, copying data +// RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. // TODO: Implementation -func (p *CCIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { +func (p *CCIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { log.Printf("receive ExecInContainer %q\n", container) return nil } diff --git a/providers/mock/mock.go b/providers/mock/mock.go index 83fe86364..c0b726c30 100644 --- a/providers/mock/mock.go +++ b/providers/mock/mock.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "io/ioutil" "math/rand" "time" @@ -16,8 +15,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/remotecommand" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" "github.com/virtual-kubelet/virtual-kubelet/providers" @@ -215,9 +212,9 @@ func (p *MockProvider) GetPodFullName(namespace string, pod string) string { return "" } -// ExecInContainer executes a command in a container in the pod, copying data +// RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *MockProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { +func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { log.G(context.TODO()).Info("receive ExecInContainer %q", container) return nil } diff --git a/providers/nomad/nomad.go b/providers/nomad/nomad.go index 38d73bc84..8fc14478d 100644 --- a/providers/nomad/nomad.go +++ b/providers/nomad/nomad.go @@ -3,11 +3,9 @@ package nomad import ( "context" "fmt" - "io" "log" "os" "strings" - "time" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" @@ -16,8 +14,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - apitypes "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/remotecommand" ) // Nomad provider constants @@ -159,10 +155,10 @@ func (p *Provider) GetPodFullName(ctx context.Context, namespace string, pod str return fmt.Sprintf("%s-%s", jobNamePrefix, pod) } -// ExecInContainer executes a command in a container in the pod, copying data +// RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. // TODO: Implementation -func (p *Provider) ExecInContainer(name string, uid apitypes.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { +func (p *Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { log.Printf("ExecInContainer %q\n", container) return nil } diff --git a/providers/openstack/zun.go b/providers/openstack/zun.go index 68ee8589d..6f3454872 100644 --- a/providers/openstack/zun.go +++ b/providers/openstack/zun.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "log" "os" "strconv" @@ -16,11 +15,10 @@ import ( "github.com/gophercloud/gophercloud/pagination" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/remotecommand" ) // ZunProvider implements the virtual-kubelet provider interface and communicates with OpenStack's Zun APIs. @@ -212,9 +210,9 @@ func (p *ZunProvider) getContainers(ctx context.Context, pod *v1.Pod) ([]Contain return containers, nil } -// ExecInContainer executes a command in a container in the pod, copying data +// RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *ZunProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { +func (p *ZunProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { log.Printf("receive ExecInContainer %q\n", container) return nil } diff --git a/providers/provider.go b/providers/provider.go index 919814eb5..62fe4e919 100644 --- a/providers/provider.go +++ b/providers/provider.go @@ -3,11 +3,8 @@ package providers import ( "context" "io" - "time" - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/remotecommand" + v1 "k8s.io/api/core/v1" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) @@ -28,9 +25,9 @@ type Provider interface { // GetContainerLogs retrieves the logs of a container by name from the provider. GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) - // ExecInContainer executes a command in a container in the pod, copying data + // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. - ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error + RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach AttachIO) error // GetPodStatus retrieves the status of a pod by name from the provider. GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) @@ -72,3 +69,18 @@ type PodNotifier interface { // NotifyPods should not block callers. NotifyPods(context.Context, func(*v1.Pod)) } + +// AttachIO is used to pass in streams to attach to a container process +type AttachIO interface { + Stdin() io.Reader + Stdout() io.WriteCloser + Stderr() io.WriteCloser + TTY() bool + Resize() <-chan TermSize +} + +// TermSize is used to set the terminal size from attached clients. +type TermSize struct { + Width uint16 + Height uint16 +} diff --git a/providers/vic/vic_provider.go b/providers/vic/vic_provider.go index 559204748..836019c54 100644 --- a/providers/vic/vic_provider.go +++ b/providers/vic/vic_provider.go @@ -2,7 +2,6 @@ package vic import ( "fmt" - "io" "os" "path" "syscall" @@ -24,6 +23,7 @@ import ( "github.com/vmware/vic/pkg/trace" "github.com/virtual-kubelet/virtual-kubelet/manager" + "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/providers/vic/cache" "github.com/virtual-kubelet/virtual-kubelet/providers/vic/operations" "github.com/virtual-kubelet/virtual-kubelet/providers/vic/proxy" @@ -31,11 +31,9 @@ import ( "net" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/remotecommand" ) type VicProvider struct { @@ -267,9 +265,9 @@ func (p *VicProvider) GetPodFullName(namespace string, pod string) string { return "" } -// ExecInContainer executes a command in a container in the pod, copying data +// RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *VicProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { +func (p *VicProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { log.Printf("receive ExecInContainer %q\n", container) return nil } diff --git a/providers/web/broker.go b/providers/web/broker.go index 6b9220f94..ca123d271 100644 --- a/providers/web/broker.go +++ b/providers/web/broker.go @@ -30,9 +30,8 @@ import ( "github.com/cenkalti/backoff" "github.com/cpuguy83/strongerrors" - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/remotecommand" + "github.com/virtual-kubelet/virtual-kubelet/providers" + v1 "k8s.io/api/core/v1" ) // BrokerProvider implements the virtual-kubelet provider interface by forwarding kubelet calls to a web endpoint. @@ -133,10 +132,10 @@ func (p *BrokerProvider) GetPodFullName(namespace string, pod string) string { return "" } -// ExecInContainer executes a command in a container in the pod, copying data +// RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. // TODO: Implementation -func (p *BrokerProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { +func (p *BrokerProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error { log.Printf("receive ExecInContainer %q\n", container) return nil } diff --git a/vkubelet/api/exec.go b/vkubelet/api/exec.go index 279a6dd74..021e330bf 100644 --- a/vkubelet/api/exec.go +++ b/vkubelet/api/exec.go @@ -1,7 +1,8 @@ package api import ( - "fmt" + "context" + "io" "net/http" "strings" "time" @@ -9,14 +10,21 @@ import ( "github.com/cpuguy83/strongerrors" "github.com/gorilla/mux" "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/providers" + "k8s.io/apimachinery/pkg/types" + remoteutils "k8s.io/client-go/tools/remotecommand" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" ) +type ExecBackend interface { + RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach providers.AttachIO) error +} + // PodExecHandlerFunc makes an http handler func from a Provider which execs a command in a pod's container // Note that this handler currently depends on gorrilla/mux to get url parts as variables. // TODO(@cpuguy83): don't force gorilla/mux on consumers of this function -func PodExecHandlerFunc(backend remotecommand.Executor) http.HandlerFunc { +func PodExecHandlerFunc(backend ExecBackend) http.HandlerFunc { return handleError(func(w http.ResponseWriter, req *http.Request) error { vars := mux.Vars(req) @@ -37,7 +45,12 @@ func PodExecHandlerFunc(backend remotecommand.Executor) http.HandlerFunc { idleTimeout := time.Second * 30 streamCreationTimeout := time.Second * 30 - remotecommand.ServeExec(w, req, backend, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + exec := &containerExecContext{ctx: ctx, b: backend, pod: pod, namespace: namespace, container: container} + remotecommand.ServeExec(w, req, exec, "", "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols) + return nil }) } @@ -62,3 +75,83 @@ func getExecOptions(req *http.Request) (*remotecommand.Options, error) { }, nil } + +type containerExecContext struct { + b ExecBackend + eio *execIO + namespace, pod, container string + ctx context.Context +} + +// ExecInContainer Implements remotecommand.Executor +// This is called by remotecommand.ServeExec +func (c *containerExecContext) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remoteutils.TerminalSize, timeout time.Duration) error { + + eio := &execIO{ + tty: tty, + stdin: in, + stdout: out, + stderr: err, + } + + if tty { + eio.chResize = make(chan providers.TermSize) + } + + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + + if tty { + go func() { + send := func(s remoteutils.TerminalSize) bool { + select { + case eio.chResize <- providers.TermSize{Width: s.Width, Height: s.Height}: + return false + case <-ctx.Done(): + return true + } + } + + for { + select { + case s := <-resize: + if send(s) { + return + } + case <-ctx.Done(): + return + } + } + }() + } + + return c.b.RunInContainer(c.ctx, c.namespace, c.pod, c.container, cmd, eio) +} + +type execIO struct { + tty bool + stdin io.Reader + stdout io.WriteCloser + stderr io.WriteCloser + chResize chan providers.TermSize +} + +func (e *execIO) TTY() bool { + return e.tty +} + +func (e *execIO) Stdin() io.Reader { + return e.stdin +} + +func (e *execIO) Stdout() io.WriteCloser { + return e.stdout +} + +func (e *execIO) Stderr() io.WriteCloser { + return e.stderr +} + +func (e *execIO) Resize() <-chan providers.TermSize { + return e.chResize +}