Merge pull request #975 from cpuguy83/node_manager
Make ControllerManager more useful
This commit is contained in:
@@ -59,7 +59,11 @@ func (mv mapVar) Type() string {
|
||||
|
||||
func installFlags(flags *pflag.FlagSet, c *Opts) {
|
||||
flags.StringVar(&c.KubeConfigPath, "kubeconfig", c.KubeConfigPath, "kube config file to use for connecting to the Kubernetes API server")
|
||||
|
||||
flags.StringVar(&c.KubeNamespace, "namespace", c.KubeNamespace, "kubernetes namespace (default is 'all')")
|
||||
flags.MarkDeprecated("namespace", "Nodes must watch for pods in all namespaces. This option is now ignored.") //nolint:errcheck
|
||||
flags.MarkHidden("namespace") //nolint:errcheck
|
||||
|
||||
flags.StringVar(&c.KubeClusterDomain, "cluster-domain", c.KubeClusterDomain, "kubernetes cluster-domain (default is 'cluster.local')")
|
||||
flags.StringVar(&c.NodeName, "nodename", c.NodeName, "kubernetes node name")
|
||||
flags.StringVar(&c.OperatingSystem, "os", c.OperatingSystem, "Operating System (Linux/Windows)")
|
||||
@@ -68,11 +72,15 @@ func installFlags(flags *pflag.FlagSet, c *Opts) {
|
||||
flags.StringVar(&c.MetricsAddr, "metrics-addr", c.MetricsAddr, "address to listen for metrics/stats requests")
|
||||
|
||||
flags.StringVar(&c.TaintKey, "taint", c.TaintKey, "Set node taint key")
|
||||
|
||||
flags.BoolVar(&c.DisableTaint, "disable-taint", c.DisableTaint, "disable the virtual-kubelet node taint")
|
||||
flags.MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable") //nolint:errcheck
|
||||
|
||||
flags.IntVar(&c.PodSyncWorkers, "pod-sync-workers", c.PodSyncWorkers, `set the number of pod synchronization workers`)
|
||||
|
||||
flags.BoolVar(&c.EnableNodeLease, "enable-node-lease", c.EnableNodeLease, `use node leases (1.13) for node heartbeats`)
|
||||
flags.MarkDeprecated("enable-node-lease", "leases are always enabled") //nolint:errcheck
|
||||
flags.MarkHidden("enable-node-lease") //nolint:errcheck
|
||||
|
||||
flags.StringSliceVar(&c.TraceExporters, "trace-exporter", c.TraceExporters, fmt.Sprintf("sets the tracing exporter to use, available exporters: %s", AvailableTraceExporters()))
|
||||
flags.StringVar(&c.TraceConfig.ServiceName, "trace-service-name", c.TraceConfig.ServiceName, "sets the name of the service used to register with the trace exporter")
|
||||
|
||||
@@ -15,140 +15,15 @@
|
||||
package root
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api"
|
||||
)
|
||||
|
||||
// AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided
|
||||
// Note this list should be a moving target.
|
||||
var AcceptedCiphers = []uint16{
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
|
||||
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||
}
|
||||
|
||||
func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) {
|
||||
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error loading tls certs")
|
||||
}
|
||||
|
||||
return &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
MinVersion: tls.VersionTLS12,
|
||||
PreferServerCipherSuites: true,
|
||||
CipherSuites: AcceptedCiphers,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig, getPodsFromKubernetes api.PodListerFunc) (_ func(), retErr error) {
|
||||
var closers []io.Closer
|
||||
cancel := func() {
|
||||
for _, c := range closers {
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
|
||||
if cfg.CertPath == "" || cfg.KeyPath == "" {
|
||||
log.G(ctx).
|
||||
WithField("certPath", cfg.CertPath).
|
||||
WithField("keyPath", cfg.KeyPath).
|
||||
Error("TLS certificates not provided, not setting up pod http server")
|
||||
} else {
|
||||
tlsCfg, err := loadTLSConfig(cfg.CertPath, cfg.KeyPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l, err := tls.Listen("tcp", cfg.Addr, tlsCfg)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error setting up listener for pod http server")
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
|
||||
podRoutes := api.PodHandlerConfig{
|
||||
RunInContainer: p.RunInContainer,
|
||||
GetContainerLogs: p.GetContainerLogs,
|
||||
GetPodsFromKubernetes: getPodsFromKubernetes,
|
||||
GetPods: p.GetPods,
|
||||
StreamIdleTimeout: cfg.StreamIdleTimeout,
|
||||
StreamCreationTimeout: cfg.StreamCreationTimeout,
|
||||
}
|
||||
|
||||
api.AttachPodRoutes(podRoutes, mux, true)
|
||||
|
||||
s := &http.Server{
|
||||
Handler: mux,
|
||||
TLSConfig: tlsCfg,
|
||||
}
|
||||
go serveHTTP(ctx, s, l, "pods")
|
||||
closers = append(closers, s)
|
||||
}
|
||||
|
||||
if cfg.MetricsAddr == "" {
|
||||
log.G(ctx).Info("Pod metrics server not setup due to empty metrics address")
|
||||
} else {
|
||||
l, err := net.Listen("tcp", cfg.MetricsAddr)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not setup listener for pod metrics http server")
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
|
||||
var summaryHandlerFunc api.PodStatsSummaryHandlerFunc
|
||||
if mp, ok := p.(provider.PodMetricsProvider); ok {
|
||||
summaryHandlerFunc = mp.GetStatsSummary
|
||||
}
|
||||
podMetricsRoutes := api.PodMetricsConfig{
|
||||
GetStatsSummary: summaryHandlerFunc,
|
||||
}
|
||||
api.AttachPodMetricsRoutes(podMetricsRoutes, mux)
|
||||
s := &http.Server{
|
||||
Handler: mux,
|
||||
}
|
||||
go serveHTTP(ctx, s, l, "pod metrics")
|
||||
closers = append(closers, s)
|
||||
}
|
||||
|
||||
return cancel, nil
|
||||
}
|
||||
|
||||
func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string) {
|
||||
if err := s.Serve(l); err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
default:
|
||||
log.G(ctx).WithError(err).Errorf("Error setting up %s http server", name)
|
||||
}
|
||||
}
|
||||
l.Close()
|
||||
}
|
||||
|
||||
type apiServerConfig struct {
|
||||
CertPath string
|
||||
KeyPath string
|
||||
CACertPath string
|
||||
Addr string
|
||||
MetricsAddr string
|
||||
StreamIdleTimeout time.Duration
|
||||
@@ -157,8 +32,9 @@ type apiServerConfig struct {
|
||||
|
||||
func getAPIConfig(c Opts) (*apiServerConfig, error) {
|
||||
config := apiServerConfig{
|
||||
CertPath: os.Getenv("APISERVER_CERT_LOCATION"),
|
||||
KeyPath: os.Getenv("APISERVER_KEY_LOCATION"),
|
||||
CertPath: os.Getenv("APISERVER_CERT_LOCATION"),
|
||||
KeyPath: os.Getenv("APISERVER_KEY_LOCATION"),
|
||||
CACertPath: os.Getenv("APISERVER_CA_CERT_LOCATION"),
|
||||
}
|
||||
|
||||
config.Addr = fmt.Sprintf(":%d", c.ListenPort)
|
||||
|
||||
@@ -15,54 +15,10 @@
|
||||
package root
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
const osLabel = "beta.kubernetes.io/os"
|
||||
|
||||
// NodeFromProvider builds a kubernetes node object from a provider
|
||||
// This is a temporary solution until node stuff actually split off from the provider interface itself.
|
||||
func NodeFromProvider(ctx context.Context, name string, taint *v1.Taint, p provider.Provider, version string) *v1.Node {
|
||||
taints := make([]v1.Taint, 0)
|
||||
|
||||
if taint != nil {
|
||||
taints = append(taints, *taint)
|
||||
}
|
||||
|
||||
node := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Labels: map[string]string{
|
||||
"type": "virtual-kubelet",
|
||||
"kubernetes.io/role": "agent",
|
||||
"kubernetes.io/hostname": name,
|
||||
},
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
Taints: taints,
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
NodeInfo: v1.NodeSystemInfo{
|
||||
Architecture: "amd64",
|
||||
KubeletVersion: version,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
p.ConfigureNode(ctx, node)
|
||||
if _, ok := node.ObjectMeta.Labels[osLabel]; !ok {
|
||||
node.ObjectMeta.Labels[osLabel] = strings.ToLower(node.Status.NodeInfo.OperatingSystem)
|
||||
}
|
||||
return node
|
||||
}
|
||||
|
||||
// getTaint creates a taint using the provided key/value.
|
||||
// Taint effect is read from the environment
|
||||
// The taint key/value may be overwritten by the environment.
|
||||
|
||||
@@ -28,7 +28,7 @@ import (
|
||||
// Defaults for root command options
|
||||
const (
|
||||
DefaultNodeName = "virtual-kubelet"
|
||||
DefaultOperatingSystem = "Linux"
|
||||
DefaultOperatingSystem = "linux"
|
||||
DefaultInformerResyncPeriod = 1 * time.Minute
|
||||
DefaultMetricsAddr = ":10255"
|
||||
DefaultListenPort = 10250 // TODO(cpuguy83)(VK1.0): Change this to an addr instead of just a port.. we should not be listening on all interfaces.
|
||||
|
||||
@@ -16,8 +16,10 @@ package root
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/cobra"
|
||||
@@ -26,14 +28,10 @@ import (
|
||||
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
||||
)
|
||||
|
||||
// NewCommand creates a new top-level command.
|
||||
@@ -75,30 +73,33 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
||||
}
|
||||
}
|
||||
|
||||
client, err := nodeutil.ClientsetFromEnv(c.KubeConfigPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mux := http.NewServeMux()
|
||||
newProvider := func(cfg nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) {
|
||||
rm, err := manager.NewResourceManager(cfg.Pods, cfg.Secrets, cfg.ConfigMaps, cfg.Services)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "could not create resource manager")
|
||||
}
|
||||
initConfig := provider.InitConfig{
|
||||
ConfigPath: c.ProviderConfigPath,
|
||||
NodeName: c.NodeName,
|
||||
OperatingSystem: c.OperatingSystem,
|
||||
ResourceManager: rm,
|
||||
DaemonPort: c.ListenPort,
|
||||
InternalIP: os.Getenv("VKUBELET_POD_IP"),
|
||||
KubeClusterDomain: c.KubeClusterDomain,
|
||||
}
|
||||
pInit := s.Get(c.Provider)
|
||||
if pInit == nil {
|
||||
return nil, nil, errors.Errorf("provider %q not found", c.Provider)
|
||||
}
|
||||
|
||||
// Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node.
|
||||
podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
|
||||
client,
|
||||
c.InformerResyncPeriod,
|
||||
kubeinformers.WithNamespace(c.KubeNamespace),
|
||||
nodeutil.PodInformerFilter(c.NodeName),
|
||||
)
|
||||
podInformer := podInformerFactory.Core().V1().Pods()
|
||||
|
||||
// Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
|
||||
scmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, c.InformerResyncPeriod)
|
||||
// Create a secret informer and a config map informer so we can pass their listers to the resource manager.
|
||||
secretInformer := scmInformerFactory.Core().V1().Secrets()
|
||||
configMapInformer := scmInformerFactory.Core().V1().ConfigMaps()
|
||||
serviceInformer := scmInformerFactory.Core().V1().Services()
|
||||
|
||||
rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), configMapInformer.Lister(), serviceInformer.Lister())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not create resource manager")
|
||||
p, err := pInit(initConfig)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrapf(err, "error initializing provider %s", c.Provider)
|
||||
}
|
||||
p.ConfigureNode(ctx, cfg.Node)
|
||||
cfg.Node.Status.NodeInfo.KubeletVersion = c.Version
|
||||
return p, nil, nil
|
||||
}
|
||||
|
||||
apiConfig, err := getAPIConfig(c)
|
||||
@@ -106,28 +107,39 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := setupTracing(ctx, c); err != nil {
|
||||
cm, err := nodeutil.NewNode(c.NodeName, newProvider, func(cfg *nodeutil.NodeConfig) error {
|
||||
cfg.KubeconfigPath = c.KubeConfigPath
|
||||
cfg.Handler = mux
|
||||
cfg.InformerResyncPeriod = c.InformerResyncPeriod
|
||||
|
||||
if taint != nil {
|
||||
cfg.NodeSpec.Spec.Taints = append(cfg.NodeSpec.Spec.Taints, *taint)
|
||||
}
|
||||
cfg.NodeSpec.Status.NodeInfo.Architecture = runtime.GOARCH
|
||||
cfg.NodeSpec.Status.NodeInfo.OperatingSystem = c.OperatingSystem
|
||||
|
||||
cfg.HTTPListenAddr = apiConfig.Addr
|
||||
cfg.StreamCreationTimeout = apiConfig.StreamCreationTimeout
|
||||
cfg.StreamIdleTimeout = apiConfig.StreamIdleTimeout
|
||||
cfg.DebugHTTP = true
|
||||
|
||||
cfg.NumWorkers = c.PodSyncWorkers
|
||||
|
||||
return nil
|
||||
},
|
||||
setAuth(c.NodeName, apiConfig),
|
||||
nodeutil.WithTLSConfig(
|
||||
nodeutil.WithKeyPairFromPath(apiConfig.CertPath, apiConfig.KeyPath),
|
||||
maybeCA(apiConfig.CACertPath),
|
||||
),
|
||||
nodeutil.AttachProviderRoutes(mux),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
initConfig := provider.InitConfig{
|
||||
ConfigPath: c.ProviderConfigPath,
|
||||
NodeName: c.NodeName,
|
||||
OperatingSystem: c.OperatingSystem,
|
||||
ResourceManager: rm,
|
||||
DaemonPort: c.ListenPort,
|
||||
InternalIP: os.Getenv("VKUBELET_POD_IP"),
|
||||
KubeClusterDomain: c.KubeClusterDomain,
|
||||
}
|
||||
|
||||
pInit := s.Get(c.Provider)
|
||||
if pInit == nil {
|
||||
return errors.Errorf("provider %q not found", c.Provider)
|
||||
}
|
||||
|
||||
p, err := pInit(initConfig)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error initializing provider %s", c.Provider)
|
||||
if err := setupTracing(ctx, c); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(log.Fields{
|
||||
@@ -137,70 +149,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
||||
"watchedNamespace": c.KubeNamespace,
|
||||
}))
|
||||
|
||||
pNode := NodeFromProvider(ctx, c.NodeName, taint, p, c.Version)
|
||||
np := node.NewNaiveNodeProvider()
|
||||
additionalOptions := []node.NodeControllerOpt{
|
||||
node.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error {
|
||||
if !k8serrors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
log.G(ctx).Debug("node not found")
|
||||
newNode := pNode.DeepCopy()
|
||||
newNode.ResourceVersion = ""
|
||||
_, err = client.CoreV1().Nodes().Create(ctx, newNode, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.G(ctx).Debug("created new node")
|
||||
return nil
|
||||
}),
|
||||
}
|
||||
if c.EnableNodeLease {
|
||||
leaseClient := nodeutil.NodeLeaseV1Client(client)
|
||||
// 40 seconds is the default lease time in upstream kubelet
|
||||
additionalOptions = append(additionalOptions, node.WithNodeEnableLeaseV1(leaseClient, 40))
|
||||
}
|
||||
nodeRunner, err := node.NewNodeController(
|
||||
np,
|
||||
pNode,
|
||||
client.CoreV1().Nodes(),
|
||||
additionalOptions...,
|
||||
)
|
||||
if err != nil {
|
||||
log.G(ctx).Fatal(err)
|
||||
}
|
||||
|
||||
eb := record.NewBroadcaster()
|
||||
eb.StartLogging(log.G(ctx).Infof)
|
||||
eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)})
|
||||
|
||||
pc, err := node.NewPodController(node.PodControllerConfig{
|
||||
PodClient: client.CoreV1(),
|
||||
PodInformer: podInformer,
|
||||
EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}),
|
||||
Provider: p,
|
||||
SecretInformer: secretInformer,
|
||||
ConfigMapInformer: configMapInformer,
|
||||
ServiceInformer: serviceInformer,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error setting up pod controller")
|
||||
}
|
||||
|
||||
go podInformerFactory.Start(ctx.Done())
|
||||
go scmInformerFactory.Start(ctx.Done())
|
||||
|
||||
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) {
|
||||
return rm.GetPods(), nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cancelHTTP()
|
||||
|
||||
cm := nodeutil.NewControllerManager(nodeRunner, pc)
|
||||
go cm.Run(ctx, c.PodSyncWorkers) // nolint:errcheck
|
||||
go cm.Run(ctx) //nolint:errcheck
|
||||
|
||||
defer func() {
|
||||
log.G(ctx).Debug("Waiting for controllers to be done")
|
||||
@@ -213,11 +162,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
||||
return err
|
||||
}
|
||||
|
||||
setNodeReady(pNode)
|
||||
if err := np.UpdateStatus(ctx, pNode); err != nil {
|
||||
return errors.Wrap(err, "error marking the node as ready")
|
||||
}
|
||||
log.G(ctx).Info("Initialized")
|
||||
log.G(ctx).Info("Ready")
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -227,18 +172,31 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func setNodeReady(n *corev1.Node) {
|
||||
for i, c := range n.Status.Conditions {
|
||||
if c.Type != "Ready" {
|
||||
continue
|
||||
func setAuth(node string, apiCfg *apiServerConfig) nodeutil.NodeOpt {
|
||||
if apiCfg.CACertPath == "" {
|
||||
return func(cfg *nodeutil.NodeConfig) error {
|
||||
cfg.Handler = api.InstrumentHandler(nodeutil.WithAuth(nodeutil.NoAuth(), cfg.Handler))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
c.Message = "Kubelet is ready"
|
||||
c.Reason = "KubeletReady"
|
||||
c.Status = corev1.ConditionTrue
|
||||
c.LastHeartbeatTime = metav1.Now()
|
||||
c.LastTransitionTime = metav1.Now()
|
||||
n.Status.Conditions[i] = c
|
||||
return
|
||||
return func(cfg *nodeutil.NodeConfig) error {
|
||||
auth, err := nodeutil.WebhookAuth(cfg.Client, node, func(cfg *nodeutil.WebhookAuthConfig) error {
|
||||
var err error
|
||||
cfg.AuthnConfig.ClientCertificateCAContentProvider, err = dynamiccertificates.NewDynamicCAContentFromFile("ca-cert-bundle", apiCfg.CACertPath)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg.Handler = api.InstrumentHandler(nodeutil.WithAuth(auth, cfg.Handler))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func maybeCA(p string) func(*tls.Config) error {
|
||||
if p == "" {
|
||||
return func(*tls.Config) error { return nil }
|
||||
}
|
||||
return nodeutil.WithCAFromPath(p)
|
||||
}
|
||||
|
||||
@@ -339,7 +339,7 @@ func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) { // nolin
|
||||
n.Status.DaemonEndpoints = p.nodeDaemonEndpoints()
|
||||
os := p.operatingSystem
|
||||
if os == "" {
|
||||
os = "Linux"
|
||||
os = "linux"
|
||||
}
|
||||
n.Status.NodeInfo.OperatingSystem = os
|
||||
n.Status.NodeInfo.Architecture = "amd64"
|
||||
|
||||
@@ -2,35 +2,15 @@ package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// Provider contains the methods required to implement a virtual-kubelet provider.
|
||||
//
|
||||
// Errors produced by these methods should implement an interface from
|
||||
// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the
|
||||
// core logic to be able to understand the type of failure.
|
||||
// Provider wraps the core provider type with an extra function needed to bootstrap the node
|
||||
type Provider interface {
|
||||
node.PodLifecycleHandler
|
||||
|
||||
// GetContainerLogs retrieves the logs of a container by name from the provider.
|
||||
GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error)
|
||||
|
||||
// RunInContainer executes a command in a container in the pod, copying data
|
||||
// between in/out/err and the container's stdin/stdout/stderr.
|
||||
RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error
|
||||
|
||||
nodeutil.Provider
|
||||
// ConfigureNode enables a provider to configure the node object that
|
||||
// will be used for Kubernetes.
|
||||
ConfigureNode(context.Context, *v1.Node)
|
||||
}
|
||||
|
||||
// PodMetricsProvider is an optional interface that providers can implement to expose pod stats
|
||||
type PodMetricsProvider interface {
|
||||
GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error)
|
||||
}
|
||||
|
||||
@@ -2,9 +2,9 @@ package provider
|
||||
|
||||
const (
|
||||
// OperatingSystemLinux is the configuration value for defining Linux.
|
||||
OperatingSystemLinux = "Linux"
|
||||
OperatingSystemLinux = "linux"
|
||||
// OperatingSystemWindows is the configuration value for defining Windows.
|
||||
OperatingSystemWindows = "Windows"
|
||||
OperatingSystemWindows = "windows"
|
||||
)
|
||||
|
||||
type OperatingSystems map[string]bool // nolint:golint
|
||||
|
||||
10
go.sum
10
go.sum
@@ -34,8 +34,10 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
|
||||
github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
|
||||
github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI=
|
||||
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
|
||||
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
|
||||
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
|
||||
@@ -55,6 +57,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/blang/semver v3.5.0+incompatible h1:CGxCgetQ64DKk7rdZ++Vfnb1+ogGNnB17OJKJXD2Cfs=
|
||||
github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
||||
github.com/bombsimon/logrusr v1.0.0 h1:CTCkURYAt5nhCCnKH9eLShYayj2/8Kn/4Qg3QfiU+Ro=
|
||||
github.com/bombsimon/logrusr v1.0.0/go.mod h1:Jq0nHtvxabKE5EMwAAdgTaz7dfWE8C4i11NOltxGQpc=
|
||||
@@ -140,11 +143,13 @@ github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+
|
||||
github.com/go-openapi/jsonpointer v0.17.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M=
|
||||
github.com/go-openapi/jsonpointer v0.18.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M=
|
||||
github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg=
|
||||
github.com/go-openapi/jsonpointer v0.19.3 h1:gihV7YNZK1iK6Tgwwsxo2rJbD1GTbdm72325Bq8FI3w=
|
||||
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
|
||||
github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg=
|
||||
github.com/go-openapi/jsonreference v0.17.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I=
|
||||
github.com/go-openapi/jsonreference v0.18.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I=
|
||||
github.com/go-openapi/jsonreference v0.19.2/go.mod h1:jMjeRr2HHw6nAVajTXJ4eiUwohSTlpa0o73RUL1owJc=
|
||||
github.com/go-openapi/jsonreference v0.19.3 h1:5cxNfTy0UVC3X8JL5ymxzyoUZmo8iZb+jeTWn7tUa8o=
|
||||
github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL98+wF9xc8zWvFonSJ8=
|
||||
github.com/go-openapi/loads v0.17.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU=
|
||||
github.com/go-openapi/loads v0.18.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU=
|
||||
@@ -158,6 +163,7 @@ github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nA
|
||||
github.com/go-openapi/spec v0.17.0/go.mod h1:XkF/MOi14NmjsfZ8VtAKf8pIlbZzyoTvZsdfssdxcBI=
|
||||
github.com/go-openapi/spec v0.18.0/go.mod h1:XkF/MOi14NmjsfZ8VtAKf8pIlbZzyoTvZsdfssdxcBI=
|
||||
github.com/go-openapi/spec v0.19.2/go.mod h1:sCxk3jxKgioEJikev4fgkNmwS+3kuYdJtcsZsD5zxMY=
|
||||
github.com/go-openapi/spec v0.19.3 h1:0XRyw8kguri6Yw4SxhsQA/atC88yqrk0+G4YhI2wabc=
|
||||
github.com/go-openapi/spec v0.19.3/go.mod h1:FpwSN1ksY1eteniUU7X0N/BgJ7a4WvBFVA8Lj9mJglo=
|
||||
github.com/go-openapi/strfmt v0.17.0/go.mod h1:P82hnJI0CXkErkXi8IKjPbNBM6lV6+5pLP5l494TcyU=
|
||||
github.com/go-openapi/strfmt v0.18.0/go.mod h1:P82hnJI0CXkErkXi8IKjPbNBM6lV6+5pLP5l494TcyU=
|
||||
@@ -167,6 +173,7 @@ github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dp
|
||||
github.com/go-openapi/swag v0.17.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg=
|
||||
github.com/go-openapi/swag v0.18.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg=
|
||||
github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
|
||||
github.com/go-openapi/swag v0.19.5 h1:lTz6Ys4CmqqCQmZPBlbQENR1/GucA2bzYTE12Pw4tFY=
|
||||
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
|
||||
github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4=
|
||||
github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA=
|
||||
@@ -281,6 +288,7 @@ github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN
|
||||
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
|
||||
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
|
||||
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
|
||||
github.com/mailru/easyjson v0.7.0 h1:aizVhC/NAAcKWb+5QsU1iNOZb4Yws5UO2I+aIprQITM=
|
||||
github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
|
||||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
@@ -371,6 +379,7 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf
|
||||
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
|
||||
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
|
||||
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
|
||||
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
|
||||
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
|
||||
@@ -704,6 +713,7 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800 h1:9ZNvfPvVIEsp/T1ez4GQuzCcCTEQW
|
||||
k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
|
||||
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
|
||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.9/go.mod h1:dzAXnQbTRyDlZPJX2SUPEqvnB+j7AJjtlox7PEwigU0=
|
||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15 h1:4uqm9Mv+w2MmBYD+F4qf/v6tDFUdPOk29C095RbU5mY=
|
||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15/go.mod h1:LEScyzhFmoF5pso/YSeBstl57mOzx9xlU9n85RGrDQg=
|
||||
sigs.k8s.io/controller-runtime v0.7.1 h1:nqVwzVzdenfd9xIbB35pC7JJH2IXVL4hDo3MNzkyCh4=
|
||||
sigs.k8s.io/controller-runtime v0.7.1/go.mod h1:pJ3YBrJiAqMAZKi6UVGuE98ZrroV1p+pIhoHsMm9wdU=
|
||||
|
||||
@@ -56,6 +56,14 @@ rules:
|
||||
verbs:
|
||||
- create
|
||||
- patch
|
||||
- apiGroups:
|
||||
- coordination.k8s.io
|
||||
resources:
|
||||
- leases
|
||||
verbs:
|
||||
- get
|
||||
- create
|
||||
- update
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRoleBinding
|
||||
|
||||
@@ -4,6 +4,8 @@ metadata:
|
||||
name: vkubelet-mock-0
|
||||
spec:
|
||||
containers:
|
||||
- name: jaeger-tracing
|
||||
image: jaegertracing/all-in-one:1.22
|
||||
- name: vkubelet-mock-0
|
||||
image: virtual-kubelet
|
||||
# "IfNotPresent" is used to prevent Minikube from trying to pull from the registry (and failing) in the first place.
|
||||
@@ -23,18 +25,16 @@ spec:
|
||||
- --klog.logtostderr
|
||||
- --log-level
|
||||
- debug
|
||||
- --trace-exporter
|
||||
- jaeger
|
||||
- --trace-sample-rate=always
|
||||
env:
|
||||
- name: JAEGER_AGENT_ENDPOINT
|
||||
value: localhost:6831
|
||||
- name: KUBELET_PORT
|
||||
value: "10250"
|
||||
- name: VKUBELET_POD_IP
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.podIP
|
||||
ports:
|
||||
- name: metrics
|
||||
containerPort: 10255
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /stats/summary
|
||||
port: metrics
|
||||
serviceAccountName: virtual-kubelet
|
||||
|
||||
@@ -3,7 +3,6 @@ package framework
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
|
||||
stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
|
||||
"k8s.io/apimachinery/pkg/util/net"
|
||||
@@ -18,7 +17,7 @@ func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
|
||||
Namespace(f.Namespace).
|
||||
Resource("pods").
|
||||
SubResource("proxy").
|
||||
Name(net.JoinSchemeNamePort("http", f.NodeName, strconv.Itoa(10255))).
|
||||
Name(net.JoinSchemeNamePort("https", f.NodeName, "10250")).
|
||||
Suffix("/stats/summary").DoRaw(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
220
node/nodeutil/auth.go
Normal file
220
node/nodeutil/auth.go
Normal file
@@ -0,0 +1,220 @@
|
||||
package nodeutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
|
||||
"k8s.io/apiserver/pkg/authentication/request/anonymous"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
// Auth is the interface used to implement authn/authz for http requests
|
||||
type Auth interface {
|
||||
authenticator.Request
|
||||
authorizer.RequestAttributesGetter
|
||||
authorizer.Authorizer
|
||||
}
|
||||
|
||||
type authWrapper struct {
|
||||
authenticator.Request
|
||||
authorizer.RequestAttributesGetter
|
||||
authorizer.Authorizer
|
||||
}
|
||||
|
||||
// InstrumentAuth wraps the provided Auth in a new instrumented Auth
|
||||
//
|
||||
// Note: You would only need this if you rolled your own auth.
|
||||
// The Auth implementations defined in this package are already instrumented.
|
||||
func InstrumentAuth(auth Auth) Auth {
|
||||
if _, ok := auth.(*authWrapper); ok {
|
||||
// This is already instrumented
|
||||
return auth
|
||||
}
|
||||
return &authWrapper{
|
||||
Request: auth,
|
||||
RequestAttributesGetter: auth,
|
||||
Authorizer: auth,
|
||||
}
|
||||
}
|
||||
|
||||
// NoAuth creates an Auth which allows anonymous access to all resouorces
|
||||
func NoAuth() Auth {
|
||||
return &authWrapper{
|
||||
Request: anonymous.NewAuthenticator(),
|
||||
RequestAttributesGetter: &NodeRequestAttr{},
|
||||
Authorizer: authorizerfactory.NewAlwaysAllowAuthorizer(),
|
||||
}
|
||||
}
|
||||
|
||||
// WithAuth makes a new http handler which wraps the provided handler with authn/authz.
|
||||
func WithAuth(auth Auth, h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handleAuth(auth, w, r, h)
|
||||
})
|
||||
}
|
||||
|
||||
func handleAuth(auth Auth, w http.ResponseWriter, r *http.Request, next http.Handler) {
|
||||
ctx := r.Context()
|
||||
ctx, span := trace.StartSpan(ctx, "vk.handleAuth")
|
||||
defer span.End()
|
||||
r = r.WithContext(ctx)
|
||||
|
||||
info, ok, err := auth.AuthenticateRequest(r)
|
||||
if err != nil || !ok {
|
||||
log.G(r.Context()).WithError(err).Error("Authorization error")
|
||||
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
logger := log.G(ctx).WithFields(log.Fields{
|
||||
"user-name": info.User.GetName(),
|
||||
"user-id": info.User.GetUID(),
|
||||
})
|
||||
|
||||
ctx = log.WithLogger(ctx, logger)
|
||||
r = r.WithContext(ctx)
|
||||
|
||||
attrs := auth.GetRequestAttributes(info.User, r)
|
||||
|
||||
decision, _, err := auth.Authorize(ctx, attrs)
|
||||
if err != nil {
|
||||
log.G(r.Context()).WithError(err).Error("Authorization error")
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if decision != authorizer.DecisionAllow {
|
||||
http.Error(w, "Forbidden", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
// WebhookAuthOption is used as a functional argument to configure webhook auth.
|
||||
type WebhookAuthOption func(*WebhookAuthConfig) error
|
||||
|
||||
// WebhookAuthConfig stores the configurations for authn/authz and is used by WebhookAuthOption to expose to callers.
|
||||
type WebhookAuthConfig struct {
|
||||
AuthnConfig authenticatorfactory.DelegatingAuthenticatorConfig
|
||||
AuthzConfig authorizerfactory.DelegatingAuthorizerConfig
|
||||
}
|
||||
|
||||
// WebhookAuth creates an Auth suitable to use with kubelet webhook auth.
|
||||
// You must provide a CA provider to the authentication config, otherwise mTLS is disabled.
|
||||
func WebhookAuth(client kubernetes.Interface, nodeName string, opts ...WebhookAuthOption) (Auth, error) {
|
||||
cfg := WebhookAuthConfig{
|
||||
AuthnConfig: authenticatorfactory.DelegatingAuthenticatorConfig{
|
||||
CacheTTL: 2 * time.Minute, // default taken from k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1
|
||||
// TODO: After upgrading k8s libs, we need to add the retry backoff option
|
||||
},
|
||||
AuthzConfig: authorizerfactory.DelegatingAuthorizerConfig{
|
||||
AllowCacheTTL: 5 * time.Minute, // default taken from k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1
|
||||
DenyCacheTTL: 30 * time.Second, // default taken from k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1
|
||||
// TODO: After upgrading k8s libs, we need to add the retry backoff option
|
||||
},
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
if err := o(&cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
cfg.AuthnConfig.TokenAccessReviewClient = client.AuthenticationV1().TokenReviews()
|
||||
cfg.AuthzConfig.SubjectAccessReviewClient = client.AuthorizationV1().SubjectAccessReviews()
|
||||
|
||||
authn, _, err := cfg.AuthnConfig.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
authz, err := cfg.AuthzConfig.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &authWrapper{
|
||||
Request: authn,
|
||||
RequestAttributesGetter: NodeRequestAttr{nodeName},
|
||||
Authorizer: authz,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (w *authWrapper) AuthenticateRequest(r *http.Request) (*authenticator.Response, bool, error) {
|
||||
ctx, span := trace.StartSpan(r.Context(), "AuthenticateRequest")
|
||||
defer span.End()
|
||||
return w.Request.AuthenticateRequest(r.WithContext(ctx))
|
||||
}
|
||||
|
||||
func (w *authWrapper) Authorize(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "Authorize")
|
||||
defer span.End()
|
||||
return w.Authorizer.Authorize(ctx, a)
|
||||
}
|
||||
|
||||
// NodeRequestAttr is a authorizor.RequeestAttributesGetter which can be used in the Auth interface.
|
||||
type NodeRequestAttr struct {
|
||||
NodeName string
|
||||
}
|
||||
|
||||
// GetRequestAttributes satisfies the authorizer.RequestAttributesGetter interface for use with an `Auth`.
|
||||
func (a NodeRequestAttr) GetRequestAttributes(u user.Info, r *http.Request) authorizer.Attributes {
|
||||
return authorizer.AttributesRecord{
|
||||
User: u,
|
||||
Verb: getAPIVerb(r),
|
||||
Namespace: "",
|
||||
APIGroup: "",
|
||||
APIVersion: "v1",
|
||||
Resource: "nodes",
|
||||
Name: a.NodeName,
|
||||
ResourceRequest: true,
|
||||
Path: r.URL.Path,
|
||||
Subresource: getSubresource(r),
|
||||
}
|
||||
}
|
||||
|
||||
func getAPIVerb(r *http.Request) string {
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
return "create"
|
||||
case http.MethodGet:
|
||||
return "get"
|
||||
case http.MethodPut:
|
||||
return "update"
|
||||
case http.MethodPatch:
|
||||
return "patch"
|
||||
case http.MethodDelete:
|
||||
return "delete"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func isSubpath(subpath, path string) bool {
|
||||
// Taken from k8s.io/kubernetes/pkg/kubelet/server/auth.go
|
||||
return subpath == path || (strings.HasPrefix(subpath, path) && subpath[len(path)] == '/')
|
||||
}
|
||||
|
||||
func getSubresource(r *http.Request) string {
|
||||
if isSubpath(r.URL.Path, "/stats") {
|
||||
return "stats"
|
||||
}
|
||||
if isSubpath(r.URL.Path, "/metrics") {
|
||||
return "metrics"
|
||||
}
|
||||
if isSubpath(r.URL.Path, "/logs") {
|
||||
// yes, "log", not "logs"
|
||||
// per kubelet code: "log" to match other log subresources (pods/log, etc)
|
||||
return "log"
|
||||
}
|
||||
|
||||
return "proxy"
|
||||
}
|
||||
@@ -2,101 +2,173 @@ package nodeutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
)
|
||||
|
||||
// ControllerManager helps manage the startup/shutdown procedure for other controllers.
|
||||
// Node helps manage the startup/shutdown procedure for other controllers.
|
||||
// It is intended as a convenience to reduce boiler plate code for starting up controllers.
|
||||
//
|
||||
// Must be created with constructor `NewControllerManager`.
|
||||
type ControllerManager struct {
|
||||
// Must be created with constructor `NewNode`.
|
||||
type Node struct {
|
||||
nc *node.NodeController
|
||||
pc *node.PodController
|
||||
|
||||
readyCb func(context.Context) error
|
||||
|
||||
ready chan struct{}
|
||||
done chan struct{}
|
||||
err error
|
||||
}
|
||||
|
||||
// NewControllerManager creates a new ControllerManager.
|
||||
func NewControllerManager(nc *node.NodeController, pc *node.PodController) *ControllerManager {
|
||||
return &ControllerManager{
|
||||
nc: nc,
|
||||
pc: pc,
|
||||
ready: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
podInformerFactory informers.SharedInformerFactory
|
||||
scmInformerFactory informers.SharedInformerFactory
|
||||
client kubernetes.Interface
|
||||
|
||||
listenAddr string
|
||||
h http.Handler
|
||||
tlsConfig *tls.Config
|
||||
|
||||
workers int
|
||||
|
||||
eb record.EventBroadcaster
|
||||
}
|
||||
|
||||
// NodeController returns the configured node controller.
|
||||
func (c *ControllerManager) NodeController() *node.NodeController {
|
||||
return c.nc
|
||||
func (n *Node) NodeController() *node.NodeController {
|
||||
return n.nc
|
||||
}
|
||||
|
||||
// PodController returns the configured pod controller.
|
||||
func (c *ControllerManager) PodController() *node.PodController {
|
||||
return c.pc
|
||||
func (n *Node) PodController() *node.PodController {
|
||||
return n.pc
|
||||
}
|
||||
|
||||
func (n *Node) runHTTP(ctx context.Context) (func(), error) {
|
||||
if n.tlsConfig == nil {
|
||||
log.G(ctx).Warn("TLS config not provided, not starting up http service")
|
||||
return func() {}, nil
|
||||
}
|
||||
if n.h == nil {
|
||||
log.G(ctx).Debug("No http handler, not starting up http service")
|
||||
return func() {}, nil
|
||||
}
|
||||
|
||||
l, err := tls.Listen("tcp", n.listenAddr, n.tlsConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error starting http listener")
|
||||
}
|
||||
|
||||
log.G(ctx).Debug("Started TLS listener")
|
||||
|
||||
srv := &http.Server{Handler: n.h, TLSConfig: n.tlsConfig}
|
||||
go srv.Serve(l) //nolint:errcheck
|
||||
log.G(ctx).Debug("HTTP server running")
|
||||
|
||||
return func() {
|
||||
srv.Close()
|
||||
l.Close()
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run starts all the underlying controllers
|
||||
func (c *ControllerManager) Run(ctx context.Context, workers int) (retErr error) {
|
||||
func (n *Node) Run(ctx context.Context) (retErr error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
go c.pc.Run(ctx, workers) // nolint:errcheck
|
||||
|
||||
defer func() {
|
||||
cancel()
|
||||
|
||||
<-c.pc.Done()
|
||||
n.err = retErr
|
||||
close(n.done)
|
||||
}()
|
||||
|
||||
c.err = retErr
|
||||
close(c.done)
|
||||
if n.eb != nil {
|
||||
n.eb.StartLogging(log.G(ctx).Infof)
|
||||
n.eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: n.client.CoreV1().Events(v1.NamespaceAll)})
|
||||
defer n.eb.Shutdown()
|
||||
log.G(ctx).Debug("Started event broadcaster")
|
||||
}
|
||||
|
||||
cancelHTTP, err := n.runHTTP(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cancelHTTP()
|
||||
|
||||
go n.podInformerFactory.Start(ctx.Done())
|
||||
go n.scmInformerFactory.Start(ctx.Done())
|
||||
go n.pc.Run(ctx, n.workers) //nolint:errcheck
|
||||
|
||||
defer func() {
|
||||
cancel()
|
||||
<-n.pc.Done()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return c.err
|
||||
case <-c.pc.Ready():
|
||||
case <-c.pc.Done():
|
||||
return c.pc.Err()
|
||||
return n.err
|
||||
case <-n.pc.Ready():
|
||||
case <-n.pc.Done():
|
||||
return n.pc.Err()
|
||||
}
|
||||
|
||||
go c.nc.Run(ctx) // nolint:errcheck
|
||||
log.G(ctx).Debug("pod controller ready")
|
||||
|
||||
go n.nc.Run(ctx) //nolint:errcheck
|
||||
|
||||
defer func() {
|
||||
cancel()
|
||||
<-c.nc.Done()
|
||||
<-n.nc.Done()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
c.err = ctx.Err()
|
||||
return c.err
|
||||
case <-c.nc.Ready():
|
||||
case <-c.nc.Done():
|
||||
return c.nc.Err()
|
||||
n.err = ctx.Err()
|
||||
return n.err
|
||||
case <-n.nc.Ready():
|
||||
case <-n.nc.Done():
|
||||
return n.nc.Err()
|
||||
}
|
||||
|
||||
close(c.ready)
|
||||
log.G(ctx).Debug("node controller ready")
|
||||
|
||||
if n.readyCb != nil {
|
||||
if err := n.readyCb(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
close(n.ready)
|
||||
|
||||
select {
|
||||
case <-c.nc.Done():
|
||||
case <-n.nc.Done():
|
||||
cancel()
|
||||
return c.nc.Err()
|
||||
case <-c.pc.Done():
|
||||
return n.nc.Err()
|
||||
case <-n.pc.Done():
|
||||
cancel()
|
||||
return c.pc.Err()
|
||||
return n.pc.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// WaitReady waits for the specified timeout for the controller to be ready.
|
||||
//
|
||||
// The timeout is for convenience so the caller doesn't have to juggle an extra context.
|
||||
func (c *ControllerManager) WaitReady(ctx context.Context, timeout time.Duration) error {
|
||||
func (n *Node) WaitReady(ctx context.Context, timeout time.Duration) error {
|
||||
if timeout > 0 {
|
||||
var cancel func()
|
||||
ctx, cancel = context.WithTimeout(ctx, timeout)
|
||||
@@ -104,33 +176,255 @@ func (c *ControllerManager) WaitReady(ctx context.Context, timeout time.Duration
|
||||
}
|
||||
|
||||
select {
|
||||
case <-c.ready:
|
||||
case <-n.ready:
|
||||
return nil
|
||||
case <-c.done:
|
||||
return fmt.Errorf("controller exited before ready: %w", c.err)
|
||||
case <-n.done:
|
||||
return fmt.Errorf("controller exited before ready: %w", n.err)
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Ready returns a channel that will be closed after the controller is ready.
|
||||
func (c *ControllerManager) Ready() <-chan struct{} {
|
||||
return c.ready
|
||||
func (n *Node) Ready() <-chan struct{} {
|
||||
return n.ready
|
||||
}
|
||||
|
||||
// Done returns a channel that will be closed when the controller has exited.
|
||||
func (c *ControllerManager) Done() <-chan struct{} {
|
||||
return c.done
|
||||
func (n *Node) Done() <-chan struct{} {
|
||||
return n.done
|
||||
}
|
||||
|
||||
// Err returns any error that occurred with the controller.
|
||||
//
|
||||
// This always return nil before `<-Done()`.
|
||||
func (c *ControllerManager) Err() error {
|
||||
func (n *Node) Err() error {
|
||||
select {
|
||||
case <-c.Done():
|
||||
return c.err
|
||||
case <-n.Done():
|
||||
return n.err
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NodeOpt is used as functional options when configuring a new node in NewNodeFromClient
|
||||
type NodeOpt func(c *NodeConfig) error
|
||||
|
||||
// NodeConfig is used to hold configuration items for a Node.
|
||||
// It gets used in conjection with NodeOpt in NewNodeFromClient
|
||||
type NodeConfig struct {
|
||||
// Set the client to use, otherwise a client will be created from ClientsetFromEnv
|
||||
Client kubernetes.Interface
|
||||
|
||||
// Set the node spec to register with Kubernetes
|
||||
NodeSpec v1.Node
|
||||
// Set the path to read a kubeconfig from for creating a client.
|
||||
// This is ignored when a client is provided to NewNodeFromClient
|
||||
KubeconfigPath string
|
||||
// Set the period for a full resync for generated client-go informers
|
||||
InformerResyncPeriod time.Duration
|
||||
|
||||
// Set the address to listen on for the http API
|
||||
HTTPListenAddr string
|
||||
// Set a custom API handler to use.
|
||||
// You can use this to setup, for example, authentication middleware.
|
||||
// If one is not provided a default one will be created.
|
||||
//
|
||||
// Note: If you provide your own handler, you'll need to handle all auth, routes, etc.
|
||||
Handler http.Handler
|
||||
// Set the timeout for idle http streams
|
||||
StreamIdleTimeout time.Duration
|
||||
// Set the timeout for creating http streams
|
||||
StreamCreationTimeout time.Duration
|
||||
// Enable http debugging routes
|
||||
DebugHTTP bool
|
||||
// Set the tls config to use for the http server
|
||||
TLSConfig *tls.Config
|
||||
|
||||
// Specify the event recorder to use
|
||||
// If this is not provided, a default one will be used.
|
||||
EventRecorder record.EventRecorder
|
||||
|
||||
// Set the number of workers to reconcile pods
|
||||
// The default value is derived from the number of cores available.
|
||||
NumWorkers int
|
||||
|
||||
routeAttacher func(Provider, NodeConfig, corev1listers.PodLister)
|
||||
}
|
||||
|
||||
// WithNodeConfig returns a NodeOpt which replaces the NodeConfig with the passed in value.
|
||||
func WithNodeConfig(c NodeConfig) NodeOpt {
|
||||
return func(orig *NodeConfig) error {
|
||||
*orig = c
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithClient return a NodeOpt that sets the client that will be used to create/manage the node.
|
||||
func WithClient(c kubernetes.Interface) NodeOpt {
|
||||
return func(cfg *NodeConfig) error {
|
||||
cfg.Client = c
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewNode creates a new node using the provided client and name.
|
||||
// This is intended for high-level/low boiler-plate usage.
|
||||
// Use the constructors in the `node` package for lower level configuration.
|
||||
//
|
||||
// Some basic values are set for node status, you'll almost certainly want to modify it.
|
||||
//
|
||||
// If client is nil, this will construct a client using ClientsetFromEnv
|
||||
//
|
||||
// It is up to the caller to configure auth on the HTTP handler.
|
||||
func NewNode(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node, error) {
|
||||
cfg := NodeConfig{
|
||||
NumWorkers: runtime.NumCPU(),
|
||||
InformerResyncPeriod: time.Minute,
|
||||
KubeconfigPath: os.Getenv("KUBECONFIG"),
|
||||
HTTPListenAddr: ":10250",
|
||||
NodeSpec: v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Labels: map[string]string{
|
||||
"type": "virtual-kubelet",
|
||||
"kubernetes.io/role": "agent",
|
||||
"kubernetes.io/hostname": name,
|
||||
},
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Phase: v1.NodePending,
|
||||
Conditions: []v1.NodeCondition{
|
||||
{Type: v1.NodeReady},
|
||||
{Type: v1.NodeDiskPressure},
|
||||
{Type: v1.NodeMemoryPressure},
|
||||
{Type: v1.NodePIDPressure},
|
||||
{Type: v1.NodeNetworkUnavailable},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
if err := o(&cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if _, _, err := net.SplitHostPort(cfg.HTTPListenAddr); err != nil {
|
||||
return nil, errors.Wrap(err, "error parsing http listen address")
|
||||
}
|
||||
|
||||
if cfg.Client == nil {
|
||||
var err error
|
||||
cfg.Client, err = ClientsetFromEnv(cfg.KubeconfigPath)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating clientset from env")
|
||||
}
|
||||
}
|
||||
|
||||
podInformerFactory := informers.NewSharedInformerFactoryWithOptions(
|
||||
cfg.Client,
|
||||
cfg.InformerResyncPeriod,
|
||||
PodInformerFilter(name),
|
||||
)
|
||||
|
||||
scmInformerFactory := informers.NewSharedInformerFactoryWithOptions(
|
||||
cfg.Client,
|
||||
cfg.InformerResyncPeriod,
|
||||
)
|
||||
|
||||
podInformer := podInformerFactory.Core().V1().Pods()
|
||||
secretInformer := scmInformerFactory.Core().V1().Secrets()
|
||||
configMapInformer := scmInformerFactory.Core().V1().ConfigMaps()
|
||||
serviceInformer := scmInformerFactory.Core().V1().Services()
|
||||
|
||||
p, np, err := newProvider(ProviderConfig{
|
||||
Pods: podInformer.Lister(),
|
||||
ConfigMaps: configMapInformer.Lister(),
|
||||
Secrets: secretInformer.Lister(),
|
||||
Services: serviceInformer.Lister(),
|
||||
Node: &cfg.NodeSpec,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating provider")
|
||||
}
|
||||
|
||||
if cfg.routeAttacher != nil {
|
||||
cfg.routeAttacher(p, cfg, podInformer.Lister())
|
||||
}
|
||||
|
||||
var readyCb func(context.Context) error
|
||||
if np == nil {
|
||||
nnp := node.NewNaiveNodeProvider()
|
||||
np = nnp
|
||||
|
||||
readyCb = func(ctx context.Context) error {
|
||||
setNodeReady(&cfg.NodeSpec)
|
||||
err := nnp.UpdateStatus(ctx, &cfg.NodeSpec)
|
||||
return errors.Wrap(err, "error marking node as ready")
|
||||
}
|
||||
}
|
||||
|
||||
nc, err := node.NewNodeController(
|
||||
np,
|
||||
&cfg.NodeSpec,
|
||||
cfg.Client.CoreV1().Nodes(),
|
||||
node.WithNodeEnableLeaseV1(NodeLeaseV1Client(cfg.Client), node.DefaultLeaseDuration),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating node controller")
|
||||
}
|
||||
|
||||
var eb record.EventBroadcaster
|
||||
if cfg.EventRecorder == nil {
|
||||
eb = record.NewBroadcaster()
|
||||
cfg.EventRecorder = eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(name, "pod-controller")})
|
||||
}
|
||||
|
||||
pc, err := node.NewPodController(node.PodControllerConfig{
|
||||
PodClient: cfg.Client.CoreV1(),
|
||||
EventRecorder: cfg.EventRecorder,
|
||||
Provider: p,
|
||||
PodInformer: podInformer,
|
||||
SecretInformer: secretInformer,
|
||||
ConfigMapInformer: configMapInformer,
|
||||
ServiceInformer: serviceInformer,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating pod controller")
|
||||
}
|
||||
|
||||
return &Node{
|
||||
nc: nc,
|
||||
pc: pc,
|
||||
readyCb: readyCb,
|
||||
ready: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
eb: eb,
|
||||
podInformerFactory: podInformerFactory,
|
||||
scmInformerFactory: scmInformerFactory,
|
||||
client: cfg.Client,
|
||||
tlsConfig: cfg.TLSConfig,
|
||||
h: cfg.Handler,
|
||||
listenAddr: cfg.HTTPListenAddr,
|
||||
workers: cfg.NumWorkers,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func setNodeReady(n *v1.Node) {
|
||||
n.Status.Phase = v1.NodeRunning
|
||||
for i, c := range n.Status.Conditions {
|
||||
if c.Type != "Ready" {
|
||||
continue
|
||||
}
|
||||
|
||||
c.Message = "Kubelet is ready"
|
||||
c.Reason = "KubeletReady"
|
||||
c.Status = v1.ConditionTrue
|
||||
c.LastHeartbeatTime = metav1.Now()
|
||||
c.LastTransitionTime = metav1.Now()
|
||||
n.Status.Conditions[i] = c
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
70
node/nodeutil/provider.go
Normal file
70
node/nodeutil/provider.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package nodeutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||
)
|
||||
|
||||
// Provider contains the methods required to implement a virtual-kubelet provider.
|
||||
//
|
||||
// Errors produced by these methods should implement an interface from
|
||||
// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the
|
||||
// core logic to be able to understand the type of failure
|
||||
type Provider interface {
|
||||
node.PodLifecycleHandler
|
||||
|
||||
// GetContainerLogs retrieves the logs of a container by name from the provider.
|
||||
GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error)
|
||||
|
||||
// RunInContainer executes a command in a container in the pod, copying data
|
||||
// between in/out/err and the container's stdin/stdout/stderr.
|
||||
RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error
|
||||
|
||||
// GetStatsSummary gets the stats for the node, including running pods
|
||||
GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error)
|
||||
}
|
||||
|
||||
// ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself.
|
||||
type ProviderConfig struct {
|
||||
Pods corev1listers.PodLister
|
||||
ConfigMaps corev1listers.ConfigMapLister
|
||||
Secrets corev1listers.SecretLister
|
||||
Services corev1listers.ServiceLister
|
||||
// Hack to allow the provider to set things on the node
|
||||
// Since the provider is bootstrapped after the node object is configured
|
||||
// Primarily this is due to carry-over from the pre-1.0 interfaces that expect the provider instead of the direct *caller* to configure the node.
|
||||
Node *v1.Node
|
||||
}
|
||||
|
||||
// NewProviderFunc is used from NewNodeFromClient to bootstrap a provider using the client/listers/etc created there.
|
||||
// If a nil node provider is returned a default one will be used.
|
||||
type NewProviderFunc func(ProviderConfig) (Provider, node.NodeProvider, error)
|
||||
|
||||
// AttachProviderRoutes returns a NodeOpt which uses api.PodHandler to attach the routes to the provider functions.
|
||||
//
|
||||
// Note this only attaches routes, you'll need to ensure to set the handler in the node config.
|
||||
func AttachProviderRoutes(mux api.ServeMux) NodeOpt {
|
||||
return func(cfg *NodeConfig) error {
|
||||
cfg.routeAttacher = func(p Provider, cfg NodeConfig, pods corev1listers.PodLister) {
|
||||
mux.Handle("/", api.PodHandler(api.PodHandlerConfig{
|
||||
RunInContainer: p.RunInContainer,
|
||||
GetContainerLogs: p.GetContainerLogs,
|
||||
GetPods: p.GetPods,
|
||||
GetPodsFromKubernetes: func(context.Context) ([]*v1.Pod, error) {
|
||||
return pods.List(labels.Everything())
|
||||
},
|
||||
GetStatsSummary: p.GetStatsSummary,
|
||||
StreamIdleTimeout: cfg.StreamIdleTimeout,
|
||||
StreamCreationTimeout: cfg.StreamCreationTimeout,
|
||||
}, true))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
83
node/nodeutil/tls.go
Normal file
83
node/nodeutil/tls.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package nodeutil
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
// WithTLSConfig returns a NodeOpt which creates a base TLSConfig with the default cipher suites and tls min verions.
|
||||
// The tls config can be modified through functional options.
|
||||
func WithTLSConfig(opts ...func(*tls.Config) error) NodeOpt {
|
||||
return func(cfg *NodeConfig) error {
|
||||
tlsCfg := &tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
PreferServerCipherSuites: true,
|
||||
CipherSuites: DefaultServerCiphers(),
|
||||
ClientAuth: tls.RequestClientCert,
|
||||
}
|
||||
for _, o := range opts {
|
||||
if err := o(tlsCfg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cfg.TLSConfig = tlsCfg
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithCAFromPath makes a TLS config option to set up client auth using the path to a PEM encoded CA cert.
|
||||
func WithCAFromPath(p string) func(*tls.Config) error {
|
||||
return func(cfg *tls.Config) error {
|
||||
pem, err := ioutil.ReadFile(p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error reading ca cert pem: %w", err)
|
||||
}
|
||||
cfg.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
return WithCACert(pem)(cfg)
|
||||
}
|
||||
}
|
||||
|
||||
// WithKeyPairFromPath make sa TLS config option which loads the key pair paths from disk and appends them to the tls config.
|
||||
func WithKeyPairFromPath(cert, key string) func(*tls.Config) error {
|
||||
return func(cfg *tls.Config) error {
|
||||
cert, err := tls.LoadX509KeyPair(cert, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg.Certificates = append(cfg.Certificates, cert)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithCACert makes a TLS config opotion which appends the provided PEM encoded bytes the tls config's cert pool.
|
||||
// If a cert pool is not defined on the tls config an empty one will be created.
|
||||
func WithCACert(pem []byte) func(*tls.Config) error {
|
||||
return func(cfg *tls.Config) error {
|
||||
if cfg.ClientCAs == nil {
|
||||
cfg.ClientCAs = x509.NewCertPool()
|
||||
}
|
||||
if !cfg.ClientCAs.AppendCertsFromPEM(pem) {
|
||||
return fmt.Errorf("could not parse ca cert pem")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultServerCiphers is the list of accepted TLS ciphers, with known weak ciphers elided
|
||||
// Note this list should be a moving target.
|
||||
func DefaultServerCiphers() []uint16 {
|
||||
return []uint16{
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
|
||||
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||
}
|
||||
}
|
||||
@@ -1,61 +0,0 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gotest.tools/assert"
|
||||
is "gotest.tools/assert/cmp"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
watchapi "k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
// TestNodeCreateAfterDelete makes sure that a node is automatically recreated
|
||||
// if it is deleted while VK is running.
|
||||
func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
podList, err := f.KubeClient.CoreV1().Pods(f.Namespace).List(ctx, metav1.ListOptions{
|
||||
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", f.NodeName).String(),
|
||||
})
|
||||
|
||||
assert.NilError(t, err)
|
||||
assert.Assert(t, is.Len(podList.Items, 0), "Kubernetes does not allow node deletion with dependent objects (pods) in existence: %v")
|
||||
|
||||
chErr := make(chan error, 1)
|
||||
|
||||
originalNode, err := f.GetNode(ctx)
|
||||
assert.NilError(t, err)
|
||||
|
||||
ctx, cancel = context.WithTimeout(ctx, time.Minute)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
wait := func(e watchapi.Event) (bool, error) {
|
||||
err = ctx.Err()
|
||||
// Our timeout has expired
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
if e.Type == watchapi.Deleted || e.Type == watchapi.Error {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return originalNode.ObjectMeta.UID != e.Object.(*v1.Node).ObjectMeta.UID, nil
|
||||
}
|
||||
chErr <- f.WaitUntilNodeCondition(wait)
|
||||
}()
|
||||
|
||||
assert.NilError(t, f.DeleteNode(ctx))
|
||||
|
||||
select {
|
||||
case result := <-chErr:
|
||||
assert.NilError(t, result, "Did not observe new node object created after deletion")
|
||||
case <-ctx.Done():
|
||||
t.Fatal("Test timed out while waiting for node object to be deleted / recreated")
|
||||
}
|
||||
}
|
||||
@@ -38,7 +38,7 @@ flags:
|
||||
default: virtual-kubelet
|
||||
- name: --os
|
||||
arg: string
|
||||
description: The operating system (must be `Linux` or `Windows`)
|
||||
description: The operating system (must be `linux` or `windows`)
|
||||
default: Linux
|
||||
- name: --pod-sync-workers
|
||||
arg: int
|
||||
|
||||
Reference in New Issue
Block a user