Compare commits

..

3 Commits

Author SHA1 Message Date
Brian Goff
dfe657cfa5 Add trailing newline to stats response
This was just an annoyance when working with the API from a terminal.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2025-06-11 23:58:22 +01:00
Brian Goff
c01fff766d Update mock CLI to use bootstrapper
Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2025-06-11 23:30:57 +01:00
Brian Goff
e72a73af61 Add nodeutil opt to bootstrapping from rest.Config
This uses a rest.Config to bootstrap TLS for the http server, webhook
auth, and the client.

This can be expanded later to do other kinds of TLS bootstrapping. For
now this seems to get the job done in terms of what VK expects for
permissions on the cluster.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2025-06-11 23:29:59 +01:00
15 changed files with 459 additions and 184 deletions

View File

@@ -1,69 +0,0 @@
name: Virtual Kubelet Docker Build and Deploy
on:
push:
branches: [master]
pull_request:
jobs:
build-and-deploy:
runs-on: ubuntu-latest
steps:
- name: 输出触发信息
run: |
echo "🎉 该作业由 ${{ github.event_name }} 事件自动触发。"
echo "🐧 此作业当前在 GitHub 托管的 ${{ runner.os }} 服务器上运行!"
echo "🔎 您的发布版本是 ${{ github.ref_name }},仓库是 ${{ github.repository }}。"
- name: 检出仓库代码
uses: actions/checkout@v4
- name: 输出工作区信息
run: |
echo "💡 ${{ github.repository }} 仓库已克隆到运行器。"
echo "🖥️ 工作流现在已准备好在运行器上测试您的代码。"
ls -la ${{ github.workspace }}
echo "🍏 此作业的状态是 ${{ job.status }}。"
- name: 设置 Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker # 明确指定使用 docker 驱动
- name: 从标签名中提取版本号
id: extract_version
run: |
# 从Release标签名中提取版本号例如从 v0.1.0 中提取 v0.1.0
VERSION=$(echo "${{ github.ref_name }}" | sed 's|refs/tags/||')
echo "VERSION=$VERSION" >> $GITHUB_ENV
echo "提取的版本号:$VERSION"
- name: 登录 Docker 注册表
uses: docker/login-action@v3
with:
registry: registry-vpc.cn-beijing.aliyuncs.com
username: ${{ secrets.ALI_DOCKER_REGISTRY_USERNAME }}
password: ${{ secrets.ALI_DOCKER_REGISTRY_PASSWORD }}
- name: 构建并推送 Virtual Kubelet 镜像
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile # 确保Dockerfile路径正确
push: true
tags: |
registry-vpc.cn-beijing.aliyuncs.com/d8dcloud/virtual-kubelet:alpine
build-args: |
GOLANG_CI_LINT_VERSION=v1.68.0
BUILD_TAGS=
HTTP_PROXY=${{ vars.HTTP_PROXY }}
HTTPS_PROXY=${{ vars.HTTPS_PROXY }}
NO_PROXY=${{ vars.NO_PROXY }}
labels: |
org.opencontainers.image.title=Virtual Kubelet
org.opencontainers.image.description=Kubernetes Virtual Kubelet implementation
org.opencontainers.image.version=${{ env.VERSION }}
org.opencontainers.image.source=${{ github.repositoryUrl }}
org.opencontainers.image.revision=${{ github.sha }}

115
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,115 @@
name: CI
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
on:
push:
branches: [master]
pull_request:
permissions:
contents: read
env:
GO_VERSION: "1.23"
jobs:
lint:
name: Lint
runs-on: ubuntu-22.04
timeout-minutes: 20
steps:
- name: Checkout repository
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
cache: false
- uses: actions/checkout@v4
- uses: golangci/golangci-lint-action@v8
with:
version: v2.1
args: --timeout=15m --config=.golangci.yml
skip-cache: true
unit-tests:
name: Unit Tests
runs-on: ubuntu-22.04
timeout-minutes: 20
steps:
- name: Checkout repository
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- uses: actions/checkout@v4
- name: Run Tests
run: make test
env-tests:
name: Envtest Tests
runs-on: ubuntu-22.04
timeout-minutes: 10
steps:
- name: Checkout repository
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- uses: actions/checkout@v4
- name: Run Tests
run: make envtest
e2e:
name: E2E
runs-on: ubuntu-22.04
timeout-minutes: 20
env:
CHANGE_MINIKUBE_NONE_USER: true
KUBERNETES_VERSION: v1.31
MINIKUBE_HOME: /home/runner
MINIKUBE_VERSION: v1.34.0
MINIKUBE_WANTUPDATENOTIFICATION: false
MINIKUBE_WANTREPORTERRORPROMPT: false
SKAFFOLD_VERSION: v2.13.2
GO111MODULE: "on"
steps:
- uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- name: Checkout repository
uses: actions/checkout@v4
- name: Install Skaffold
run: |
curl -sLo skaffold https://storage.googleapis.com/skaffold/releases/${SKAFFOLD_VERSION}/skaffold-linux-amd64
chmod +x skaffold
sudo mv skaffold /usr/local/bin/
echo /usr/local/bin >> $GITHUB_PATH
- name: Install Minikube dependencies
run: |
sudo apt-get update && sudo apt-get install -y apt-transport-https curl
echo "deb [signed-by=/etc/apt/keyrings/kubernetes-apt-keyring.gpg] https://pkgs.k8s.io/core:/stable:/${KUBERNETES_VERSION}/deb/ /" | sudo tee /etc/apt/sources.list.d/kubernetes.list
curl -fsSL https://pkgs.k8s.io/core:/stable:/${KUBERNETES_VERSION}/deb/Release.key | sudo gpg --no-tty --yes --dearmor -o /etc/apt/keyrings/kubernetes-apt-keyring.gpg
sudo apt-get update
sudo apt-get remove -y containerd.io containerd
sudo apt-get install -y kubectl docker.io
- name: Install Minikube
run: |
curl -sLo minikube https://storage.googleapis.com/minikube/releases/${MINIKUBE_VERSION}/minikube-linux-amd64
chmod +x minikube
sudo mv minikube /usr/local/bin/
- name: Start Minikube
run: |
sudo usermod -aG docker $USER && newgrp docker
minikube start --vm-driver=docker --cpus 2 --memory 2048 --kubernetes-version=${KUBERNETES_VERSION}
- name: Wait for Minikube
run: |
JSONPATH='{range .items[*]}{@.metadata.name}:{range @.status.conditions[*]}{@.type}={@.status};{end}{end}';
until kubectl get nodes -o jsonpath="$JSONPATH" 2>&1 | grep -q "Ready=True"; do
sleep 1;
done
- name: Run Tests
run: make e2e

59
.github/workflows/codeql-analysis.yml vendored Normal file
View File

@@ -0,0 +1,59 @@
name: "CodeQL"
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
on:
push:
branches: [master]
schedule:
- cron: "19 18 * * 3"
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
security-events: write
strategy:
fail-fast: false
matrix:
language: ["go"]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: Checkout repository
uses: actions/checkout@v4
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v3
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3

View File

@@ -25,16 +25,8 @@ RUN \
--mount=type=cache,target=/root/.cache/golangci-lint \ --mount=type=cache,target=/root/.cache/golangci-lint \
golangci-lint run -v --out-format="${OUT_FORMAT:-colored-line-number}" golangci-lint run -v --out-format="${OUT_FORMAT:-colored-line-number}"
# FROM scratch FROM scratch
# COPY --from=builder /usr/bin/virtual-kubelet /usr/bin/virtual-kubelet
# COPY --from=builder /etc/ssl/certs/ /etc/ssl/certs
FROM alpine:latest
COPY --from=builder /usr/bin/virtual-kubelet /usr/bin/virtual-kubelet COPY --from=builder /usr/bin/virtual-kubelet /usr/bin/virtual-kubelet
COPY --from=builder /etc/ssl/certs/ /etc/ssl/certs COPY --from=builder /etc/ssl/certs/ /etc/ssl/certs
# 创建必要的目录
RUN mkdir -p /root/.kube
ENTRYPOINT [ "/usr/bin/virtual-kubelet" ] ENTRYPOINT [ "/usr/bin/virtual-kubelet" ]
CMD [ "--help" ] CMD [ "--help" ]

View File

@@ -16,31 +16,21 @@ package root
import ( import (
"fmt" "fmt"
"os"
"time" "time"
) )
type apiServerConfig struct { type apiServerConfig struct {
CertPath string
KeyPath string
CACertPath string
Addr string Addr string
MetricsAddr string MetricsAddr string
StreamIdleTimeout time.Duration StreamIdleTimeout time.Duration
StreamCreationTimeout time.Duration StreamCreationTimeout time.Duration
} }
func getAPIConfig(c Opts) (*apiServerConfig, error) { func getAPIConfig(c Opts) apiServerConfig {
config := apiServerConfig{ return apiServerConfig{
CertPath: os.Getenv("APISERVER_CERT_LOCATION"), Addr: fmt.Sprintf(":%d", c.ListenPort),
KeyPath: os.Getenv("APISERVER_KEY_LOCATION"), MetricsAddr: c.MetricsAddr,
CACertPath: os.Getenv("APISERVER_CA_CERT_LOCATION"), StreamIdleTimeout: c.StreamIdleTimeout,
StreamCreationTimeout: c.StreamCreationTimeout,
} }
config.Addr = fmt.Sprintf(":%d", c.ListenPort)
config.MetricsAddr = c.MetricsAddr
config.StreamIdleTimeout = c.StreamIdleTimeout
config.StreamCreationTimeout = c.StreamCreationTimeout
return &config, nil
} }

View File

@@ -16,8 +16,6 @@ package root
import ( import (
"context" "context"
"crypto/tls"
"net/http"
"os" "os"
"runtime" "runtime"
@@ -28,10 +26,8 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/internal/manager" "github.com/virtual-kubelet/virtual-kubelet/internal/manager"
"github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/node" "github.com/virtual-kubelet/virtual-kubelet/node"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
) )
// NewCommand creates a new top-level command. // NewCommand creates a new top-level command.
@@ -73,14 +69,6 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
} }
} }
// Ensure API client.
clientSet, err := nodeutil.ClientsetFromEnv(c.KubeConfigPath)
if err != nil {
return err
}
// Set-up the node provider.
mux := http.NewServeMux()
newProvider := func(cfg nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) { newProvider := func(cfg nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) {
rm, err := manager.NewResourceManager(cfg.Pods, cfg.Secrets, cfg.ConfigMaps, cfg.Services) rm, err := manager.NewResourceManager(cfg.Pods, cfg.Secrets, cfg.ConfigMaps, cfg.Services)
if err != nil { if err != nil {
@@ -109,14 +97,9 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
return p, nil, nil return p, nil, nil
} }
apiConfig, err := getAPIConfig(c) apiConfig := getAPIConfig(c)
if err != nil {
return err
}
cm, err := nodeutil.NewNode(c.NodeName, newProvider, func(cfg *nodeutil.NodeConfig) error { cm, err := nodeutil.NewNode(c.NodeName, newProvider, func(cfg *nodeutil.NodeConfig) error {
cfg.KubeconfigPath = c.KubeConfigPath cfg.KubeconfigPath = c.KubeConfigPath
cfg.Handler = mux
cfg.InformerResyncPeriod = c.InformerResyncPeriod cfg.InformerResyncPeriod = c.InformerResyncPeriod
if taint != nil { if taint != nil {
@@ -134,13 +117,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
return nil return nil
}, },
nodeutil.WithClient(clientSet), nodeutil.WithBootstrapFromRestConfig(),
setAuth(c.NodeName, apiConfig),
nodeutil.WithTLSConfig(
nodeutil.WithKeyPairFromPath(apiConfig.CertPath, apiConfig.KeyPath),
maybeCA(apiConfig.CACertPath),
),
nodeutil.AttachProviderRoutes(mux),
) )
if err != nil { if err != nil {
return err return err
@@ -179,32 +156,3 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
} }
return nil return nil
} }
func setAuth(node string, apiCfg *apiServerConfig) nodeutil.NodeOpt {
if apiCfg.CACertPath == "" {
return func(cfg *nodeutil.NodeConfig) error {
cfg.Handler = api.InstrumentHandler(nodeutil.WithAuth(nodeutil.NoAuth(), cfg.Handler))
return nil
}
}
return func(cfg *nodeutil.NodeConfig) error {
auth, err := nodeutil.WebhookAuth(cfg.Client, node, func(cfg *nodeutil.WebhookAuthConfig) error {
var err error
cfg.AuthnConfig.ClientCertificateCAContentProvider, err = dynamiccertificates.NewDynamicCAContentFromFile("ca-cert-bundle", apiCfg.CACertPath)
return err
})
if err != nil {
return err
}
cfg.Handler = api.InstrumentHandler(nodeutil.WithAuth(auth, cfg.Handler))
return nil
}
}
func maybeCA(p string) func(*tls.Config) error {
if p == "" {
return func(*tls.Config) error { return nil }
}
return nodeutil.WithCAFromPath(p)
}

View File

@@ -102,7 +102,7 @@ func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalI
func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) { func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) {
data, err := os.ReadFile(providerConfig) data, err := os.ReadFile(providerConfig)
if err != nil { if err != nil {
return config, err return config, fmt.Errorf("error reaeding provider config: %w", err)
} }
configMap := map[string]MockConfig{} configMap := map[string]MockConfig{}
err = json.Unmarshal(data, &configMap) err = json.Unmarshal(data, &configMap)

33
errdefs/auth.go Normal file
View File

@@ -0,0 +1,33 @@
package errdefs
import (
"errors"
"fmt"
)
var (
// ErrForbidden is returned when the user is not authorized to perform the operation.
ErrForbidden = errors.New("forbidden")
// ErrUnauthorized is returned when the user is not authenticated.
ErrUnauthorized = errors.New("unauthorized")
)
// Unauthorized wraps ErrUnauthorized with a message.
func Unauthorized(msg string) error {
return fmt.Errorf("%w: %s", ErrUnauthorized, msg)
}
// Forbidden wraps ErrForbidden with a message.
func Forbidden(msg string) error {
return fmt.Errorf("%w: %s", ErrForbidden, msg)
}
// IsForbidden returns true if the error has ErrForbidden in the error chain.
func IsForbidden(err error) bool {
return errors.Is(err, ErrForbidden)
}
// IsUnauthorized returns true if the error has ErrUnauthorized in the error chain.
func IsUnauthorized(err error) bool {
return errors.Is(err, ErrUnauthorized)
}

View File

@@ -48,6 +48,7 @@ func HandlePodStatsSummary(h PodStatsSummaryHandlerFunc) http.HandlerFunc {
if _, err := w.Write(b); err != nil { if _, err := w.Write(b); err != nil {
return errors.Wrap(err, "could not write to client") return errors.Wrap(err, "could not write to client")
} }
_, _ = w.Write([]byte("\n"))
return nil return nil
}) })
} }

View File

@@ -6,6 +6,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace" "github.com/virtual-kubelet/virtual-kubelet/trace"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
@@ -69,14 +70,24 @@ func handleAuth(auth Auth, w http.ResponseWriter, r *http.Request, next http.Han
defer span.End() defer span.End()
r = r.WithContext(ctx) r = r.WithContext(ctx)
logger := log.G(r.Context())
info, ok, err := auth.AuthenticateRequest(r) info, ok, err := auth.AuthenticateRequest(r)
if err != nil || !ok { if err != nil {
log.G(r.Context()).WithError(err).Error("Authorization error") logger.WithError(err).Error("Error authenticating request")
http.Error(w, "Unauthorized", http.StatusUnauthorized) http.Error(w, "Unauthorized", http.StatusUnauthorized)
span.SetStatus(err)
return return
} }
logger := log.G(ctx).WithFields(log.Fields{ if !ok {
logger.Error("Request not authenticated")
log.G(r.Context()).Infof("Unauthorized: RequestURI: %s", r.RequestURI)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
span.SetStatus(errdefs.ErrUnauthorized)
return
}
logger = logger.WithFields(log.Fields{
"user-name": info.User.GetName(), "user-name": info.User.GetName(),
"user-id": info.User.GetUID(), "user-id": info.User.GetUID(),
}) })
@@ -90,11 +101,13 @@ func handleAuth(auth Auth, w http.ResponseWriter, r *http.Request, next http.Han
if err != nil { if err != nil {
log.G(r.Context()).WithError(err).Error("Authorization error") log.G(r.Context()).WithError(err).Error("Authorization error")
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
span.SetStatus(err)
return return
} }
if decision != authorizer.DecisionAllow { if decision != authorizer.DecisionAllow {
http.Error(w, "Forbidden", http.StatusForbidden) http.Error(w, "Forbidden", http.StatusForbidden)
span.SetStatus(errdefs.ErrForbidden)
return return
} }

View File

@@ -24,7 +24,7 @@ func ClientsetFromEnv(kubeConfigPath string) (*kubernetes.Clientset, error) {
) )
if kubeConfigPath != "" { if kubeConfigPath != "" {
config, err = clientsetFromEnvKubeConfigPath(kubeConfigPath) config, err = RestConfigFromEnv(kubeConfigPath)
} else { } else {
config, err = rest.InClusterConfig() config, err = rest.InClusterConfig()
} }
@@ -36,7 +36,8 @@ func ClientsetFromEnv(kubeConfigPath string) (*kubernetes.Clientset, error) {
return kubernetes.NewForConfig(config) return kubernetes.NewForConfig(config)
} }
func clientsetFromEnvKubeConfigPath(kubeConfigPath string) (*rest.Config, error) { // RestConfigFromEnv is like ClientsetFromEnv except it returns a rest config instead of a full client.
func RestConfigFromEnv(kubeConfigPath string) (*rest.Config, error) {
_, err := os.Stat(kubeConfigPath) _, err := os.Stat(kubeConfigPath)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return rest.InClusterConfig() return rest.InClusterConfig()

View File

@@ -48,7 +48,8 @@ type Node struct {
workers int workers int
eb record.EventBroadcaster eb record.EventBroadcaster
caController caController
} }
// NodeController returns the configured node controller. // NodeController returns the configured node controller.
@@ -107,6 +108,10 @@ func (n *Node) Run(ctx context.Context) (retErr error) {
log.G(ctx).Debug("Started event broadcaster") log.G(ctx).Debug("Started event broadcaster")
} }
if n.caController != nil {
go n.caController.Run(ctx, 1)
}
cancelHTTP, err := n.runHTTP(ctx) cancelHTTP, err := n.runHTTP(ctx)
if err != nil { if err != nil {
return err return err
@@ -212,6 +217,8 @@ func (n *Node) Err() error {
// NodeOpt is used as functional options when configuring a new node in NewNodeFromClient // NodeOpt is used as functional options when configuring a new node in NewNodeFromClient
type NodeOpt func(c *NodeConfig) error type NodeOpt func(c *NodeConfig) error
type caController interface{ Run(context.Context, int) }
// NodeConfig is used to hold configuration items for a Node. // NodeConfig is used to hold configuration items for a Node.
// It gets used in conjection with NodeOpt in NewNodeFromClient // It gets used in conjection with NodeOpt in NewNodeFromClient
type NodeConfig struct { type NodeConfig struct {
@@ -260,22 +267,8 @@ type NodeConfig struct {
SkipDownwardAPIResolution bool SkipDownwardAPIResolution bool
routeAttacher func(Provider, NodeConfig, corev1listers.PodLister) routeAttacher func(Provider, NodeConfig, corev1listers.PodLister)
}
// WithNodeConfig returns a NodeOpt which replaces the NodeConfig with the passed in value. caController caController
func WithNodeConfig(c NodeConfig) NodeOpt {
return func(orig *NodeConfig) error {
*orig = c
return nil
}
}
// WithClient return a NodeOpt that sets the client that will be used to create/manage the node.
func WithClient(c kubernetes.Interface) NodeOpt {
return func(cfg *NodeConfig) error {
cfg.Client = c
return nil
}
} }
// NewNode creates a new node using the provided client and name. // NewNode creates a new node using the provided client and name.
@@ -326,10 +319,6 @@ func NewNode(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node,
return nil, errors.Wrap(err, "error parsing http listen address") return nil, errors.Wrap(err, "error parsing http listen address")
} }
if cfg.Client == nil {
return nil, errors.New("no client provided")
}
podInformerFactory := informers.NewSharedInformerFactoryWithOptions( podInformerFactory := informers.NewSharedInformerFactoryWithOptions(
cfg.Client, cfg.Client,
cfg.InformerResyncPeriod, cfg.InformerResyncPeriod,
@@ -425,6 +414,7 @@ func NewNode(name string, newProvider NewProviderFunc, opts ...NodeOpt) (*Node,
h: cfg.Handler, h: cfg.Handler,
listenAddr: cfg.HTTPListenAddr, listenAddr: cfg.HTTPListenAddr,
workers: cfg.NumWorkers, workers: cfg.NumWorkers,
caController: cfg.caController,
}, nil }, nil
} }

150
node/nodeutil/opts.go Normal file
View File

@@ -0,0 +1,150 @@
package nodeutil
import (
"fmt"
"net/http"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
// WithNodeConfig returns a NodeOpt which replaces the NodeConfig with the passed in value.
func WithNodeConfig(c NodeConfig) NodeOpt {
return func(orig *NodeConfig) error {
*orig = c
return nil
}
}
// WithClient return a NodeOpt that sets the client that will be used to create/manage the node.
func WithClient(c kubernetes.Interface) NodeOpt {
return func(cfg *NodeConfig) error {
cfg.Client = c
return nil
}
}
// BootstrapConfig is the configuration for bootstrapping a node.
type BootstrapConfig struct {
// ClientCAConfigMapSpec is the config map information for getting the CA for authenticating clients
// If not set, the CA in the rest config will be used.
ClientCAConfigMapSpec *ConfigMapCASpec
// A set of options to pass when creating the webhook auth.
WebhookAuthOpts []WebhookAuthOption
// RestConfig is the rest config to use for the client
// If it is not provided a default one from the environment will be created.
RestConfig *rest.Config
}
// ConfigMapCASpec is the spec for a config map that contains a CA.
type ConfigMapCASpec struct {
Namespace string
Name string
Key string
}
// BootstrapOpt is a functional option used to configure node bootstrapping
type BootstrapOpt func(*BootstrapConfig) error
// WithBootstrapFromRestConfig takes a reset config (or a default one from the environment) and returns a NodeOpt that will:
// 1. Create a client from the rest config (if no client is set)
// 2. Create webhook authn/authz from the rest config
// 3. Configure the TLS config for the HTTP server from the certs in the rest config.
func WithBootstrapFromRestConfig(opts ...BootstrapOpt) NodeOpt {
return func(cfg *NodeConfig) error {
var bOpts BootstrapConfig
if rCfg, err := RestConfigFromEnv(cfg.KubeconfigPath); err == nil {
bOpts.RestConfig = rCfg
}
for _, o := range opts {
if err := o(&bOpts); err != nil {
return err
}
}
if bOpts.RestConfig == nil {
return fmt.Errorf("no rest config provided")
}
if cfg.Client == nil {
client, err := kubernetes.NewForConfig(bOpts.RestConfig)
if err != nil {
return err
}
cfg.Client = client
}
if err := WithTLSConfig(tlsFromRestConfig(bOpts.RestConfig))(cfg); err != nil {
return err
}
if err := configureWebhookCA(cfg, &bOpts); err != nil {
return fmt.Errorf("error configure webhook auth: %w", err)
}
var mux *http.ServeMux
if cfg.Handler == nil {
mux = http.NewServeMux()
cfg.Handler = mux
}
if cfg.routeAttacher == nil && mux != nil {
if err := AttachProviderRoutes(mux)(cfg); err != nil {
return err
}
}
auth, err := WebhookAuth(cfg.Client, cfg.NodeSpec.Name, bOpts.WebhookAuthOpts...)
if err != nil {
return err
}
cfg.Handler = api.InstrumentHandler(WithAuth(auth, cfg.Handler))
return nil
}
}
func configureWebhookCA(cfg *NodeConfig, bCfg *BootstrapConfig) error {
if bCfg.ClientCAConfigMapSpec != nil {
bCfg.WebhookAuthOpts = append(bCfg.WebhookAuthOpts, func(auth *WebhookAuthConfig) error {
cmCA, err := dynamiccertificates.NewDynamicCAFromConfigMapController("client-ca", bCfg.ClientCAConfigMapSpec.Namespace, bCfg.ClientCAConfigMapSpec.Name, bCfg.ClientCAConfigMapSpec.Key, cfg.Client)
if err != nil {
return fmt.Errorf("error loading dynamic CA from config map: %w", err)
}
auth.AuthnConfig.ClientCertificateCAContentProvider = cmCA
cfg.caController = cmCA
return nil
})
return nil
}
if bCfg.RestConfig.CAFile != "" {
bCfg.WebhookAuthOpts = append(bCfg.WebhookAuthOpts, func(auth *WebhookAuthConfig) error {
caFile, err := dynamiccertificates.NewDynamicCAContentFromFile("ca-file", bCfg.RestConfig.CAFile)
if err != nil {
return fmt.Errorf("error loading dynamic CA file from rest config: %w", err)
}
auth.AuthnConfig.ClientCertificateCAContentProvider = caFile
cfg.caController = caFile
return nil
})
return nil
}
if bCfg.RestConfig.CAData != nil {
bCfg.WebhookAuthOpts = append(bCfg.WebhookAuthOpts, func(auth *WebhookAuthConfig) error {
caData, err := dynamiccertificates.NewStaticCAContent("ca-data", bCfg.RestConfig.CAData)
if err != nil {
return fmt.Errorf("error loading static ca from rest config: %w", err)
}
auth.AuthnConfig.ClientCertificateCAContentProvider = caData
return nil
})
return nil
}
return errdefs.InvalidInput("no client CA found")
}

View File

@@ -5,25 +5,28 @@ import (
"crypto/x509" "crypto/x509"
"fmt" "fmt"
"os" "os"
"k8s.io/client-go/rest"
) )
// WithTLSConfig returns a NodeOpt which creates a base TLSConfig with the default cipher suites and tls min verions. // WithTLSConfig returns a NodeOpt which creates a base TLSConfig with the default cipher suites and tls min verions.
// The tls config can be modified through functional options. // The tls config can be modified through functional options.
func WithTLSConfig(opts ...func(*tls.Config) error) NodeOpt { func WithTLSConfig(opts ...func(*tls.Config) error) NodeOpt {
return func(cfg *NodeConfig) error { return func(cfg *NodeConfig) error {
tlsCfg := &tls.Config{ if cfg.TLSConfig == nil {
MinVersion: tls.VersionTLS12, cfg.TLSConfig = &tls.Config{
PreferServerCipherSuites: true, MinVersion: tls.VersionTLS12,
CipherSuites: DefaultServerCiphers(), PreferServerCipherSuites: true,
ClientAuth: tls.RequestClientCert, CipherSuites: DefaultServerCiphers(),
ClientAuth: tls.RequestClientCert,
}
} }
for _, o := range opts { for _, o := range opts {
if err := o(tlsCfg); err != nil { if err := o(cfg.TLSConfig); err != nil {
return err return err
} }
} }
cfg.TLSConfig = tlsCfg
return nil return nil
} }
} }
@@ -45,7 +48,7 @@ func WithKeyPairFromPath(cert, key string) func(*tls.Config) error {
return func(cfg *tls.Config) error { return func(cfg *tls.Config) error {
cert, err := tls.LoadX509KeyPair(cert, key) cert, err := tls.LoadX509KeyPair(cert, key)
if err != nil { if err != nil {
return err return fmt.Errorf("error loading x509 key pair: %w", err)
} }
cfg.Certificates = append(cfg.Certificates, cert) cfg.Certificates = append(cfg.Certificates, cert)
return nil return nil
@@ -56,7 +59,7 @@ func WithKeyPairFromPath(cert, key string) func(*tls.Config) error {
// If a cert pool is not defined on the tls config an empty one will be created. // If a cert pool is not defined on the tls config an empty one will be created.
func WithCACert(pem []byte) func(*tls.Config) error { func WithCACert(pem []byte) func(*tls.Config) error {
return func(cfg *tls.Config) error { return func(cfg *tls.Config) error {
if cfg.ClientCAs == nil { if cfg.RootCAs == nil {
cfg.ClientCAs = x509.NewCertPool() cfg.ClientCAs = x509.NewCertPool()
} }
if !cfg.ClientCAs.AppendCertsFromPEM(pem) { if !cfg.ClientCAs.AppendCertsFromPEM(pem) {
@@ -81,3 +84,48 @@ func DefaultServerCiphers() []uint16 {
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
} }
} }
func tlsFromRestConfig(r *rest.Config) func(*tls.Config) error {
return func(cfg *tls.Config) error {
var err error
cfg.ClientAuth = tls.RequestClientCert
certData := r.CertData
if certData == nil && r.CertFile != "" {
certData, err = os.ReadFile(r.CertFile)
if err != nil {
return fmt.Errorf("error reading cert file from clientset: %w", err)
}
}
keyData := r.KeyData
if keyData == nil && r.KeyFile != "" {
keyData, err = os.ReadFile(r.KeyFile)
if err != nil {
return fmt.Errorf("error reading key file from clientset: %w", err)
}
}
if keyData != nil && certData != nil {
pem, err := tls.X509KeyPair(certData, keyData)
if err != nil {
return fmt.Errorf("error creating key pair from clientset: %w", err)
}
cfg.Certificates = append(cfg.Certificates, pem)
cfg.ClientAuth = tls.RequestClientCert
}
caData := r.CAData
if certData == nil && r.CAFile != "" {
caData, err = os.ReadFile(r.CAFile)
if err != nil {
return fmt.Errorf("error reading ca file from clientset: %w", err)
}
}
if caData != nil {
return WithCACert(caData)(cfg)
}
return nil
}
}

View File

@@ -79,8 +79,12 @@ func (s *span) SetStatus(err error) {
status.Code = octrace.StatusCodeNotFound status.Code = octrace.StatusCodeNotFound
case errdefs.IsInvalidInput(err): case errdefs.IsInvalidInput(err):
status.Code = octrace.StatusCodeInvalidArgument status.Code = octrace.StatusCodeInvalidArgument
// TODO: other error types case errdefs.IsForbidden(err):
status.Code = octrace.StatusCodePermissionDenied
case errdefs.IsUnauthorized(err):
status.Code = octrace.StatusCodeUnauthenticated
default: default:
// TODO: other error types
status.Code = octrace.StatusCodeUnknown status.Code = octrace.StatusCodeUnknown
} }