Refactor exec interface (#578)

This removes the dependence on remotecommand in providers as well as the
need to expose provider ID's for the sake of the ExecInContainer API.
This commit is contained in:
Brian Goff
2019-04-26 12:57:56 -07:00
committed by GitHub
parent 449eb3bb7d
commit d809dff289
13 changed files with 174 additions and 81 deletions

View File

@@ -9,7 +9,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"strconv"
"strings"
@@ -19,13 +18,13 @@ import (
"github.com/cpuguy83/strongerrors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/providers/alibabacloud/eci"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8serr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
// The service account secret mount path.
@@ -318,9 +317,9 @@ func (p *ECIProvider) GetPodFullName(namespace string, pod string) string {
return fmt.Sprintf("%s-%s", namespace, pod)
}
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *ECIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, errstream io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
func (p *ECIProvider) RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach providers.AttachIO) error {
return nil
}

View File

@@ -3,18 +3,16 @@ package aws
import (
"context"
"fmt"
"io"
"log"
"time"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/providers/aws/fargate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
// FargateProvider implements the virtual-kubelet provider interface.
@@ -184,12 +182,9 @@ func (p *FargateProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *FargateProvider) ExecInContainer(
name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser,
tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("Received ExecInContainer request for %s.\n", container)
func (p *FargateProvider) RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach providers.AttachIO) error {
return errNotImplemented
}

View File

@@ -25,6 +25,7 @@ import (
"github.com/virtual-kubelet/azure-aci/client/network"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/trace"
v1 "k8s.io/api/core/v1"
k8serr "k8s.io/apimachinery/pkg/api/errors"
@@ -33,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
clientcmdv1 "k8s.io/client-go/tools/clientcmd/api/v1"
"k8s.io/client-go/tools/remotecommand"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
@@ -800,33 +800,35 @@ func (p *ACIProvider) GetPodFullName(namespace string, pod string) string {
return fmt.Sprintf("%s-%s", namespace, pod)
}
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, errstream io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
// Cleanup on exit
func (p *ACIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error {
out := attach.Stdout()
if out != nil {
defer out.Close()
}
if errstream != nil {
defer errstream.Close()
}
cg, _, err := p.aciClient.GetContainerGroup(context.TODO(), p.resourceGroup, name)
cg, _, err := p.aciClient.GetContainerGroup(ctx, p.resourceGroup, p.GetPodFullName(namespace, name))
if err != nil {
return err
}
// Set default terminal size
terminalSize := remotecommand.TerminalSize{
size := providers.TermSize{
Height: 60,
Width: 120,
}
resize := attach.Resize()
if resize != nil {
terminalSize = <-resize // Receive terminal resize event if resize stream is present
select {
case size = <-resize:
case <-ctx.Done():
return ctx.Err()
}
}
ts := aci.TerminalSizeRequest{Height: int(terminalSize.Height), Width: int(terminalSize.Width)}
ts := aci.TerminalSizeRequest{Height: int(size.Height), Width: int(size.Width)}
xcrsp, err := p.aciClient.LaunchExec(p.resourceGroup, cg.Name, container, cmd[0], ts)
if err != nil {
return err
@@ -840,12 +842,17 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri
// Cleanup on exit
defer c.Close()
defer out.Close()
in := attach.Stdin()
if in != nil {
// Inputstream
go func() {
for {
select {
case <-ctx.Done():
return
default:
}
var msg = make([]byte, 512)
n, err := in.Read(msg)
if err == io.EOF {
@@ -858,14 +865,18 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri
if n > 0 { // Only call WriteMessage if there is data to send
c.WriteMessage(websocket.BinaryMessage, msg[:n])
}
}
}()
}
if out != nil {
//Outputstream
for {
select {
case <-ctx.Done():
break
default:
}
_, cr, err := c.NextReader()
if err != nil {
// Handle errors
@@ -875,7 +886,7 @@ func (p *ACIProvider) ExecInContainer(name string, uid types.UID, container stri
}
}
return nil
return ctx.Err()
}
// GetPodStatus returns the status of a pod by name that is running inside ACI

View File

@@ -4,13 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/Azure/go-autorest/autorest"
@@ -20,12 +18,11 @@ import (
"github.com/Azure/go-autorest/autorest/to"
"github.com/lawrencegripper/pod2docker"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
azureCreds "github.com/virtual-kubelet/virtual-kubelet/providers/azure"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
const (
@@ -310,10 +307,10 @@ func (p *Provider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *Provider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
func (p *Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}

View File

@@ -6,7 +6,6 @@ import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
"net"
"os"
@@ -21,12 +20,11 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"google.golang.org/grpc"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8serr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
criapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)
@@ -654,10 +652,10 @@ func (p *CRIProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *CRIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
func (p *CRIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}

View File

@@ -16,12 +16,12 @@ import (
"github.com/cpuguy83/strongerrors"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/providers/huawei/auth"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
const (
@@ -308,10 +308,10 @@ func (p *CCIProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *CCIProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
func (p *CCIProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
"time"
@@ -16,8 +15,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"github.com/virtual-kubelet/virtual-kubelet/providers"
@@ -215,9 +212,9 @@ func (p *MockProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *MockProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error {
log.G(context.TODO()).Info("receive ExecInContainer %q", container)
return nil
}

View File

@@ -3,11 +3,9 @@ package nomad
import (
"context"
"fmt"
"io"
"log"
"os"
"strings"
"time"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
@@ -16,8 +14,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
// Nomad provider constants
@@ -159,10 +155,10 @@ func (p *Provider) GetPodFullName(ctx context.Context, namespace string, pod str
return fmt.Sprintf("%s-%s", jobNamePrefix, pod)
}
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *Provider) ExecInContainer(name string, uid apitypes.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
func (p *Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error {
log.Printf("ExecInContainer %q\n", container)
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"strconv"
@@ -16,11 +15,10 @@ import (
"github.com/gophercloud/gophercloud/pagination"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
// ZunProvider implements the virtual-kubelet provider interface and communicates with OpenStack's Zun APIs.
@@ -212,9 +210,9 @@ func (p *ZunProvider) getContainers(ctx context.Context, pod *v1.Pod) ([]Contain
return containers, nil
}
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *ZunProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
func (p *ZunProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}

View File

@@ -3,11 +3,8 @@ package providers
import (
"context"
"io"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
v1 "k8s.io/api/core/v1"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
@@ -28,9 +25,9 @@ type Provider interface {
// GetContainerLogs retrieves the logs of a container by name from the provider.
GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error)
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach AttachIO) error
// GetPodStatus retrieves the status of a pod by name from the provider.
GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error)
@@ -72,3 +69,18 @@ type PodNotifier interface {
// NotifyPods should not block callers.
NotifyPods(context.Context, func(*v1.Pod))
}
// AttachIO is used to pass in streams to attach to a container process
type AttachIO interface {
Stdin() io.Reader
Stdout() io.WriteCloser
Stderr() io.WriteCloser
TTY() bool
Resize() <-chan TermSize
}
// TermSize is used to set the terminal size from attached clients.
type TermSize struct {
Width uint16
Height uint16
}

View File

@@ -2,7 +2,6 @@ package vic
import (
"fmt"
"io"
"os"
"path"
"syscall"
@@ -24,6 +23,7 @@ import (
"github.com/vmware/vic/pkg/trace"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/providers/vic/cache"
"github.com/virtual-kubelet/virtual-kubelet/providers/vic/operations"
"github.com/virtual-kubelet/virtual-kubelet/providers/vic/proxy"
@@ -31,11 +31,9 @@ import (
"net"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
type VicProvider struct {
@@ -267,9 +265,9 @@ func (p *VicProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *VicProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
func (p *VicProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}

View File

@@ -30,9 +30,8 @@ import (
"github.com/cenkalti/backoff"
"github.com/cpuguy83/strongerrors"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
"github.com/virtual-kubelet/virtual-kubelet/providers"
v1 "k8s.io/api/core/v1"
)
// BrokerProvider implements the virtual-kubelet provider interface by forwarding kubelet calls to a web endpoint.
@@ -133,10 +132,10 @@ func (p *BrokerProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *BrokerProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
func (p *BrokerProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach providers.AttachIO) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}

View File

@@ -1,7 +1,8 @@
package api
import (
"fmt"
"context"
"io"
"net/http"
"strings"
"time"
@@ -9,14 +10,21 @@ import (
"github.com/cpuguy83/strongerrors"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"k8s.io/apimachinery/pkg/types"
remoteutils "k8s.io/client-go/tools/remotecommand"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
)
type ExecBackend interface {
RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach providers.AttachIO) error
}
// PodExecHandlerFunc makes an http handler func from a Provider which execs a command in a pod's container
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
func PodExecHandlerFunc(backend remotecommand.Executor) http.HandlerFunc {
func PodExecHandlerFunc(backend ExecBackend) http.HandlerFunc {
return handleError(func(w http.ResponseWriter, req *http.Request) error {
vars := mux.Vars(req)
@@ -37,7 +45,12 @@ func PodExecHandlerFunc(backend remotecommand.Executor) http.HandlerFunc {
idleTimeout := time.Second * 30
streamCreationTimeout := time.Second * 30
remotecommand.ServeExec(w, req, backend, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
exec := &containerExecContext{ctx: ctx, b: backend, pod: pod, namespace: namespace, container: container}
remotecommand.ServeExec(w, req, exec, "", "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
return nil
})
}
@@ -62,3 +75,83 @@ func getExecOptions(req *http.Request) (*remotecommand.Options, error) {
}, nil
}
type containerExecContext struct {
b ExecBackend
eio *execIO
namespace, pod, container string
ctx context.Context
}
// ExecInContainer Implements remotecommand.Executor
// This is called by remotecommand.ServeExec
func (c *containerExecContext) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remoteutils.TerminalSize, timeout time.Duration) error {
eio := &execIO{
tty: tty,
stdin: in,
stdout: out,
stderr: err,
}
if tty {
eio.chResize = make(chan providers.TermSize)
}
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
if tty {
go func() {
send := func(s remoteutils.TerminalSize) bool {
select {
case eio.chResize <- providers.TermSize{Width: s.Width, Height: s.Height}:
return false
case <-ctx.Done():
return true
}
}
for {
select {
case s := <-resize:
if send(s) {
return
}
case <-ctx.Done():
return
}
}
}()
}
return c.b.RunInContainer(c.ctx, c.namespace, c.pod, c.container, cmd, eio)
}
type execIO struct {
tty bool
stdin io.Reader
stdout io.WriteCloser
stderr io.WriteCloser
chResize chan providers.TermSize
}
func (e *execIO) TTY() bool {
return e.tty
}
func (e *execIO) Stdin() io.Reader {
return e.stdin
}
func (e *execIO) Stdout() io.WriteCloser {
return e.stdout
}
func (e *execIO) Stderr() io.WriteCloser {
return e.stderr
}
func (e *execIO) Resize() <-chan providers.TermSize {
return e.chResize
}