@@ -81,6 +81,11 @@ func installFlags(flags *pflag.FlagSet, c *Opts) {
|
|||||||
|
|
||||||
flags.DurationVar(&c.InformerResyncPeriod, "full-resync-period", c.InformerResyncPeriod, "how often to perform a full resync of pods between kubernetes and the provider")
|
flags.DurationVar(&c.InformerResyncPeriod, "full-resync-period", c.InformerResyncPeriod, "how often to perform a full resync of pods between kubernetes and the provider")
|
||||||
flags.DurationVar(&c.StartupTimeout, "startup-timeout", c.StartupTimeout, "How long to wait for the virtual-kubelet to start")
|
flags.DurationVar(&c.StartupTimeout, "startup-timeout", c.StartupTimeout, "How long to wait for the virtual-kubelet to start")
|
||||||
|
flags.DurationVar(&c.StreamIdleTimeout, "stream-idle-timeout", c.StreamIdleTimeout,
|
||||||
|
"stream-idle-timeout is the maximum time a streaming connection can be idle before the connection is"+
|
||||||
|
" automatically closed, default 30s.")
|
||||||
|
flags.DurationVar(&c.StreamCreationTimeout, "stream-creation-timeout", c.StreamCreationTimeout,
|
||||||
|
"stream-creation-timeout is the maximum time for streaming connection, default 30s.")
|
||||||
|
|
||||||
flagset := flag.NewFlagSet("klog", flag.PanicOnError)
|
flagset := flag.NewFlagSet("klog", flag.PanicOnError)
|
||||||
klog.InitFlags(flagset)
|
klog.InitFlags(flagset)
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
|
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
|
||||||
@@ -88,10 +89,13 @@ func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerCon
|
|||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
|
|
||||||
podRoutes := api.PodHandlerConfig{
|
podRoutes := api.PodHandlerConfig{
|
||||||
RunInContainer: p.RunInContainer,
|
RunInContainer: p.RunInContainer,
|
||||||
GetContainerLogs: p.GetContainerLogs,
|
GetContainerLogs: p.GetContainerLogs,
|
||||||
GetPods: p.GetPods,
|
GetPods: p.GetPods,
|
||||||
|
StreamIdleTimeout: cfg.StreamIdleTimeout,
|
||||||
|
StreamCreationTimeout: cfg.StreamCreationTimeout,
|
||||||
}
|
}
|
||||||
|
|
||||||
api.AttachPodRoutes(podRoutes, mux, true)
|
api.AttachPodRoutes(podRoutes, mux, true)
|
||||||
|
|
||||||
s := &http.Server{
|
s := &http.Server{
|
||||||
@@ -142,10 +146,12 @@ func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
type apiServerConfig struct {
|
type apiServerConfig struct {
|
||||||
CertPath string
|
CertPath string
|
||||||
KeyPath string
|
KeyPath string
|
||||||
Addr string
|
Addr string
|
||||||
MetricsAddr string
|
MetricsAddr string
|
||||||
|
StreamIdleTimeout time.Duration
|
||||||
|
StreamCreationTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func getAPIConfig(c Opts) (*apiServerConfig, error) {
|
func getAPIConfig(c Opts) (*apiServerConfig, error) {
|
||||||
@@ -156,6 +162,8 @@ func getAPIConfig(c Opts) (*apiServerConfig, error) {
|
|||||||
|
|
||||||
config.Addr = fmt.Sprintf(":%d", c.ListenPort)
|
config.Addr = fmt.Sprintf(":%d", c.ListenPort)
|
||||||
config.MetricsAddr = c.MetricsAddr
|
config.MetricsAddr = c.MetricsAddr
|
||||||
|
config.StreamIdleTimeout = c.StreamIdleTimeout
|
||||||
|
config.StreamCreationTimeout = c.StreamCreationTimeout
|
||||||
|
|
||||||
return &config, nil
|
return &config, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,8 +36,10 @@ const (
|
|||||||
DefaultKubeNamespace = corev1.NamespaceAll
|
DefaultKubeNamespace = corev1.NamespaceAll
|
||||||
DefaultKubeClusterDomain = "cluster.local"
|
DefaultKubeClusterDomain = "cluster.local"
|
||||||
|
|
||||||
DefaultTaintEffect = string(corev1.TaintEffectNoSchedule)
|
DefaultTaintEffect = string(corev1.TaintEffectNoSchedule)
|
||||||
DefaultTaintKey = "virtual-kubelet.io/provider"
|
DefaultTaintKey = "virtual-kubelet.io/provider"
|
||||||
|
DefaultStreamIdleTimeout = 30 * time.Second
|
||||||
|
DefaultStreamCreationTimeout = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Opts stores all the options for configuring the root virtual-kubelet command.
|
// Opts stores all the options for configuring the root virtual-kubelet command.
|
||||||
@@ -84,6 +86,11 @@ type Opts struct {
|
|||||||
|
|
||||||
// Startup Timeout is how long to wait for the kubelet to start
|
// Startup Timeout is how long to wait for the kubelet to start
|
||||||
StartupTimeout time.Duration
|
StartupTimeout time.Duration
|
||||||
|
// StreamIdleTimeout is the maximum time a streaming connection
|
||||||
|
// can be idle before the connection is automatically closed.
|
||||||
|
StreamIdleTimeout time.Duration
|
||||||
|
// StreamCreationTimeout is the maximum time for streaming connection
|
||||||
|
StreamCreationTimeout time.Duration
|
||||||
|
|
||||||
Version string
|
Version string
|
||||||
}
|
}
|
||||||
@@ -152,5 +159,13 @@ func SetDefaultOpts(c *Opts) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.StreamIdleTimeout == 0 {
|
||||||
|
c.StreamIdleTimeout = DefaultStreamIdleTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.StreamCreationTimeout == 0 {
|
||||||
|
c.StreamCreationTimeout = DefaultStreamCreationTimeout
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -52,10 +52,45 @@ type TermSize struct {
|
|||||||
// HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container
|
// HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container
|
||||||
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
|
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
|
||||||
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
|
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
|
||||||
func HandleContainerExec(h ContainerExecHandlerFunc) http.HandlerFunc {
|
// ContainerExecHandlerConfig is used to pass options to options to the container exec handler.
|
||||||
|
type ContainerExecHandlerConfig struct {
|
||||||
|
// StreamIdleTimeout is the maximum time a streaming connection
|
||||||
|
// can be idle before the connection is automatically closed.
|
||||||
|
StreamIdleTimeout time.Duration
|
||||||
|
// StreamCreationTimeout is the maximum time for streaming connection
|
||||||
|
StreamCreationTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContainerExecHandlerOption configures a ContainerExecHandlerConfig
|
||||||
|
// It is used as functional options passed to `HandleContainerExec`
|
||||||
|
type ContainerExecHandlerOption func(*ContainerExecHandlerConfig)
|
||||||
|
|
||||||
|
// WithExecStreamIdleTimeout sets the idle timeout for a container exec stream
|
||||||
|
func WithExecStreamIdleTimeout(dur time.Duration) ContainerExecHandlerOption {
|
||||||
|
return func(cfg *ContainerExecHandlerConfig) {
|
||||||
|
cfg.StreamIdleTimeout = dur
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithExecStreamIdleTimeout sets the idle timeout for a container exec stream
|
||||||
|
func WithExecStreamCreationTimeout(dur time.Duration) ContainerExecHandlerOption {
|
||||||
|
return func(cfg *ContainerExecHandlerConfig) {
|
||||||
|
cfg.StreamCreationTimeout = dur
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container
|
||||||
|
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
|
||||||
|
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
|
||||||
|
func HandleContainerExec(h ContainerExecHandlerFunc, opts ...ContainerExecHandlerOption) http.HandlerFunc {
|
||||||
if h == nil {
|
if h == nil {
|
||||||
return NotImplemented
|
return NotImplemented
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var cfg ContainerExecHandlerConfig
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&cfg)
|
||||||
|
}
|
||||||
return handleError(func(w http.ResponseWriter, req *http.Request) error {
|
return handleError(func(w http.ResponseWriter, req *http.Request) error {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
|
|
||||||
@@ -73,14 +108,23 @@ func HandleContainerExec(h ContainerExecHandlerFunc) http.HandlerFunc {
|
|||||||
return errdefs.AsInvalidInput(err)
|
return errdefs.AsInvalidInput(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
idleTimeout := time.Second * 30
|
|
||||||
streamCreationTimeout := time.Second * 30
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.TODO())
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
exec := &containerExecContext{ctx: ctx, h: h, pod: pod, namespace: namespace, container: container}
|
exec := &containerExecContext{ctx: ctx, h: h, pod: pod, namespace: namespace, container: container}
|
||||||
remotecommand.ServeExec(w, req, exec, "", "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
|
remotecommand.ServeExec(
|
||||||
|
w,
|
||||||
|
req,
|
||||||
|
exec,
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
container,
|
||||||
|
command,
|
||||||
|
streamOpts,
|
||||||
|
cfg.StreamIdleTimeout,
|
||||||
|
cfg.StreamCreationTimeout,
|
||||||
|
supportedStreamProtocols,
|
||||||
|
)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ package api
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||||
@@ -33,9 +34,11 @@ type ServeMux interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type PodHandlerConfig struct {
|
type PodHandlerConfig struct {
|
||||||
RunInContainer ContainerExecHandlerFunc
|
RunInContainer ContainerExecHandlerFunc
|
||||||
GetContainerLogs ContainerLogsHandlerFunc
|
GetContainerLogs ContainerLogsHandlerFunc
|
||||||
GetPods PodListerFunc
|
GetPods PodListerFunc
|
||||||
|
StreamIdleTimeout time.Duration
|
||||||
|
StreamCreationTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodHandler creates an http handler for interacting with pods/containers.
|
// PodHandler creates an http handler for interacting with pods/containers.
|
||||||
@@ -47,8 +50,16 @@ func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
|
|||||||
if debug {
|
if debug {
|
||||||
r.HandleFunc("/runningpods/", HandleRunningPods(p.GetPods)).Methods("GET")
|
r.HandleFunc("/runningpods/", HandleRunningPods(p.GetPods)).Methods("GET")
|
||||||
}
|
}
|
||||||
|
|
||||||
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", HandleContainerLogs(p.GetContainerLogs)).Methods("GET")
|
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", HandleContainerLogs(p.GetContainerLogs)).Methods("GET")
|
||||||
r.HandleFunc("/exec/{namespace}/{pod}/{container}", HandleContainerExec(p.RunInContainer)).Methods("POST")
|
r.HandleFunc(
|
||||||
|
"/exec/{namespace}/{pod}/{container}",
|
||||||
|
HandleContainerExec(
|
||||||
|
p.RunInContainer,
|
||||||
|
WithExecStreamCreationTimeout(p.StreamCreationTimeout),
|
||||||
|
WithExecStreamIdleTimeout(p.StreamIdleTimeout),
|
||||||
|
),
|
||||||
|
).Methods("POST")
|
||||||
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user