feat: support SPDY subprotocol version v5 (K8s API >= v1.29.0) (#1185)

This commit is contained in:
Francesco Torta
2024-01-19 19:31:38 +01:00
committed by GitHub
parent d6a4e8ee09
commit a9ce60e19d
5 changed files with 209 additions and 159 deletions

View File

@@ -148,6 +148,8 @@ func createHTTPStreamStreams(req *http.Request, w http.ResponseWriter, opts *Opt
var handler protocolHandler
switch protocol {
case remotecommandconsts.StreamProtocolV5Name:
handler = &v5ProtocolHandler{}
case remotecommandconsts.StreamProtocolV4Name:
handler = &v4ProtocolHandler{}
case remotecommandconsts.StreamProtocolV3Name:
@@ -159,6 +161,9 @@ func createHTTPStreamStreams(req *http.Request, w http.ResponseWriter, opts *Opt
fallthrough
case remotecommandconsts.StreamProtocolV1Name:
handler = &v1ProtocolHandler{}
default:
klog.Errorf("unable to create HTTP stream: unknown protocol %q", protocol)
return nil, false
}
// count the streams client asked for, starting with 1
@@ -199,6 +204,57 @@ type protocolHandler interface {
supportsTerminalResizing() bool
}
// v5ProtocolHandler implements the V5 protocol version for streaming command execution.
type v5ProtocolHandler struct{}
func (*v5ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
ctx := &context{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
for {
select {
case stream := <-streams:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
ctx.writeStatus = v4WriteStatusFunc(stream) // write json errors
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin:
ctx.stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdout:
ctx.stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStderr:
ctx.stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeResize:
ctx.resizeStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
return nil, errors.New("timed out waiting for client to create streams")
}
}
return ctx, nil
}
// supportsTerminalResizing returns true because v5ProtocolHandler supports it
func (*v5ProtocolHandler) supportsTerminalResizing() bool { return true }
// v4ProtocolHandler implements the V4 protocol version for streaming command execution. It only differs
// in from v3 in the error stream format using an json-marshaled metav1.Status which carries
// the process' exit code.

View File

@@ -29,7 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
apivalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
)
const (
@@ -334,7 +334,7 @@ func getEnvironmentVariableValue(ctx context.Context, env *corev1.EnvVar, mappin
return getEnvironmentVariableValueWithValueFrom(ctx, env, mappingFunc, pod, container, rm, recorder)
}
// Handle values that have been directly provided after expanding variable references.
return pointer.String(expansion.Expand(env.Value, mappingFunc)), nil
return ptr.To(expansion.Expand(env.Value, mappingFunc)), nil
}
func getEnvironmentVariableValueWithValueFrom(ctx context.Context, env *corev1.EnvVar, mappingFunc func(string) string, pod *corev1.Pod, container *corev1.Container, rm *manager.ResourceManager, recorder record.EventRecorder) (*string, error) {
@@ -411,7 +411,7 @@ func getEnvironmentVariableValueWithValueFromConfigMapKeyRef(ctx context.Context
return nil, fmt.Errorf("configmap %q doesn't contain the %q key required by pod %s", vf.Name, vf.Key, pod.Name)
}
// Populate the environment variable and continue on to the next reference.
return pointer.String(keyValue), nil
return ptr.To(keyValue), nil
}
func getEnvironmentVariableValueWithValueFromSecretKeyRef(ctx context.Context, env *corev1.EnvVar, mappingFunc func(string) string, pod *corev1.Pod, container *corev1.Container, rm *manager.ResourceManager, recorder record.EventRecorder) (*string, error) {
@@ -463,7 +463,7 @@ func getEnvironmentVariableValueWithValueFromSecretKeyRef(ctx context.Context, e
return nil, fmt.Errorf("secret %q doesn't contain the %q key required by pod %s", vf.Name, vf.Key, pod.Name)
}
// Populate the environment variable and continue on to the next reference.
return pointer.String(string(keyValue)), nil
return ptr.To(string(keyValue)), nil
}
// Handle population from a field (downward API).
@@ -476,7 +476,7 @@ func getEnvironmentVariableValueWithValueFromFieldRef(ctx context.Context, env *
return nil, err
}
return pointer.String(runtimeVal), nil
return ptr.To(runtimeVal), nil
}
// podFieldSelectorRuntimeValue returns the runtime value of the given