Compare commits

..

8 Commits

Author SHA1 Message Date
fnuarnav
2c155accb7 Prometheus metrics are encoded as text, not JSON (#1101)
Co-authored-by: Sanchit Mehta <sanchit.mehta602@gmail.com>
2023-04-06 08:03:43 +01:00
Salvatore Cirone
9c32bfb0ae Add support for Attach API functionality (#1090)
Co-authored-by: Pablo Borrelli <pablo.borrelli0@gmail.com>
2023-03-31 08:51:50 -07:00
Jackie Lan
b7030b9dc5 Support advanced capacity and providerID settings in mock provider 2023-03-28 13:22:44 +01:00
fnuarnav
a457d445a3 feat: Implement new metrics endpoint for k8s 1.24+ (#1082) 2023-03-28 13:01:37 +01:00
fnuarnav
b70ee9b6dd New metrics API proposal (#1075) 2023-03-28 12:39:42 +01:00
dependabot[bot]
8bf7691f59 Bump github.com/prometheus/client_golang from 1.13.0 to 1.14.0 (#1095)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-03-27 11:10:35 -07:00
dependabot[bot]
d87cc6ee1a Bump k8s.io/klog/v2 from 2.80.1 to 2.90.1 (#1091)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Heba Elayoty <31887807+helayoty@users.noreply.github.com>
2023-03-20 23:45:06 +00:00
dependabot[bot]
2b6bd337cc Bump actions/setup-go from 3 to 4 (#1093)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-03-20 16:40:03 -07:00
13 changed files with 1049 additions and 26 deletions

View File

@@ -20,7 +20,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v3
- uses: actions/setup-go@v3
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
- uses: actions/checkout@v3
@@ -36,7 +36,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v3
- uses: actions/setup-go@v3
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
- uses: actions/checkout@v3
@@ -50,7 +50,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v3
- uses: actions/setup-go@v3
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
- uses: actions/checkout@v3
@@ -72,7 +72,7 @@ jobs:
GO111MODULE: "on"
steps:
- uses: actions/setup-go@v3
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
- name: Checkout repository

View File

@@ -10,6 +10,7 @@ import (
"strings"
"time"
dto "github.com/prometheus/client_model/go"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
@@ -55,9 +56,11 @@ type MockProvider struct { //nolint:golint
// MockConfig contains a mock virtual-kubelet's configurable parameters.
type MockConfig struct { //nolint:golint
CPU string `json:"cpu,omitempty"`
Memory string `json:"memory,omitempty"`
Pods string `json:"pods,omitempty"`
CPU string `json:"cpu,omitempty"`
Memory string `json:"memory,omitempty"`
Pods string `json:"pods,omitempty"`
Others map[string]string `json:"others,omitempty"`
ProviderID string `json:"providerID,omitempty"`
}
// NewMockProviderMockConfig creates a new MockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface
@@ -128,6 +131,11 @@ func loadConfig(providerConfig, nodeName string) (config MockConfig, err error)
if _, err = resource.ParseQuantity(config.Pods); err != nil {
return config, fmt.Errorf("Invalid pods value %v", config.Pods)
}
for _, v := range config.Others {
if _, err = resource.ParseQuantity(v); err != nil {
return config, fmt.Errorf("Invalid other value %v", v)
}
}
return config, nil
}
@@ -293,6 +301,13 @@ func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, cont
return nil
}
// AttachToContainer attaches to the executing process of a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *MockProvider) AttachToContainer(ctx context.Context, namespace, name, container string, attach api.AttachIO) error {
log.G(ctx).Infof("receive AttachToContainer %q", container)
return nil
}
// GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found.
func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
@@ -332,6 +347,9 @@ func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) { //nolint
ctx, span := trace.StartSpan(ctx, "mock.ConfigureNode") //nolint:staticcheck,ineffassign
defer span.End()
if p.config.ProviderID != "" {
n.Spec.ProviderID = p.config.ProviderID
}
n.Status.Capacity = p.capacity()
n.Status.Allocatable = p.capacity()
n.Status.Conditions = p.nodeConditions()
@@ -349,11 +367,15 @@ func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) { //nolint
// Capacity returns a resource list containing the capacity limits.
func (p *MockProvider) capacity() v1.ResourceList {
return v1.ResourceList{
rl := v1.ResourceList{
"cpu": resource.MustParse(p.config.CPU),
"memory": resource.MustParse(p.config.Memory),
"pods": resource.MustParse(p.config.Pods),
}
for k, v := range p.config.Others {
rl[v1.ResourceName(k)] = resource.MustParse(v)
}
return rl
}
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
@@ -508,6 +530,129 @@ func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, err
return res, nil
}
func (p *MockProvider) generateMockMetrics(metricsMap map[string][]*dto.Metric, resourceType string, label []*dto.LabelPair) map[string][]*dto.Metric {
var (
cpuMetricSuffix = "_cpu_usage_seconds_total"
memoryMetricSuffix = "_memory_working_set_bytes"
dummyValue = float64(100)
)
if metricsMap == nil {
metricsMap = map[string][]*dto.Metric{}
}
finalCpuMetricName := resourceType + cpuMetricSuffix
finalMemoryMetricName := resourceType + memoryMetricSuffix
newCPUMetric := dto.Metric{
Label: label,
Counter: &dto.Counter{
Value: &dummyValue,
},
}
newMemoryMetric := dto.Metric{
Label: label,
Gauge: &dto.Gauge{
Value: &dummyValue,
},
}
// if metric family exists add to metric array
if cpuMetrics, ok := metricsMap[finalCpuMetricName]; ok {
metricsMap[finalCpuMetricName] = append(cpuMetrics, &newCPUMetric)
} else {
metricsMap[finalCpuMetricName] = []*dto.Metric{&newCPUMetric}
}
if memoryMetrics, ok := metricsMap[finalMemoryMetricName]; ok {
metricsMap[finalMemoryMetricName] = append(memoryMetrics, &newMemoryMetric)
} else {
metricsMap[finalMemoryMetricName] = []*dto.Metric{&newMemoryMetric}
}
return metricsMap
}
func (p *MockProvider) getMetricType(metricName string) *dto.MetricType {
var (
dtoCounterMetricType = dto.MetricType_COUNTER
dtoGaugeMetricType = dto.MetricType_GAUGE
cpuMetricSuffix = "_cpu_usage_seconds_total"
memoryMetricSuffix = "_memory_working_set_bytes"
)
if strings.HasSuffix(metricName, cpuMetricSuffix) {
return &dtoCounterMetricType
}
if strings.HasSuffix(metricName, memoryMetricSuffix) {
return &dtoGaugeMetricType
}
return nil
}
func (p *MockProvider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) {
var span trace.Span
ctx, span = trace.StartSpan(ctx, "GetMetricsResource") //nolint: ineffassign,staticcheck
defer span.End()
var (
nodeNameStr = "NodeName"
podNameStr = "PodName"
containerNameStr = "containerName"
)
nodeLabels := []*dto.LabelPair{
{
Name: &nodeNameStr,
Value: &p.nodeName,
},
}
metricsMap := p.generateMockMetrics(nil, "node", nodeLabels)
for _, pod := range p.pods {
podLabels := []*dto.LabelPair{
{
Name: &nodeNameStr,
Value: &p.nodeName,
},
{
Name: &podNameStr,
Value: &pod.Name,
},
}
metricsMap = p.generateMockMetrics(metricsMap, "pod", podLabels)
for _, container := range pod.Spec.Containers {
containerLabels := []*dto.LabelPair{
{
Name: &nodeNameStr,
Value: &p.nodeName,
},
{
Name: &podNameStr,
Value: &pod.Name,
},
{
Name: &containerNameStr,
Value: &container.Name,
},
}
metricsMap = p.generateMockMetrics(metricsMap, "container", containerLabels)
}
}
res := []*dto.MetricFamily{}
for metricName := range metricsMap {
tempName := metricName
tempMetrics := metricsMap[tempName]
metricFamily := dto.MetricFamily{
Name: &tempName,
Type: p.getMetricType(tempName),
Metric: tempMetrics,
}
res = append(res, &metricFamily)
}
return res, nil
}
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
// within the provider.
func (p *MockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {

View File

@@ -0,0 +1,296 @@
# Virtual Kubelet Metrics Update
<!-- toc -->
- [Summary](#summary)
- [Motivation](#motivation)
- [Goals](#goals)
- [Non-Goals](#non-goals)
- [Proposal](#proposal)
- [Design Details](#design-details)
- [API](#api)
- [Data](#data)
- [Changes to the Provider](#changes-to-the-provider)
- [Test Plan](#test-plan)
<!-- /toc -->
## Summary
Add the new /metrics/resource endpoint in the virtual-kubelet to support the metrics server update for new Kubernetes versions `>=1.24`
## Motivation
The Kubernetes metrics server now tries to get metrics from the kubelet using the new metrics endpoint [/metrics/resource](https://github.com/kubernetes-sigs/metrics-server/commit/a2d732e5cdbfd93a6ebce221e8df0e8b463eecc6#diff-6e5b914d1403a14af1cc43582a2c9af727113037a3c6a77d8729aaefba084fb5R88),
while Virtual Kubelet is still exposing the earlier metrics endpoint [/stats/summary](https://github.com/virtual-kubelet/virtual-kubelet/blob/master/node/api/server.go#L90).
This causes metrics to break when using virtual kubelet with newer Kubernetes versions (>=1.24).
To support the new metrics server, this document proposes adding a new handler to handle the updated metrics endpoint.
This will be an additive update, and the old
[/stats/summary](https://github.com/virtual-kubelet/virtual-kubelet/blob/master/node/api/server.go#L90) endpoint will still be available to maintain backward compatibility with
the older metrics server version.
### Goals
- Support metrics for kubernetes version `>=1.24` through adding /metrics/resource endpoint handler.
### Non-Goals
- Ensure pod autoscaling works as expected with the newer kubernetes versions `>=1.24` as expected
## Proposal
Add a new handler for `/metrics/resource` endpoint that calls a new `GetMetricsResource` method in the provider,
which in-turn returns metrics using the prometheus `model.Samples` data structure as expected by the new metrics server.
The provider will need to implement the `GetMetricsResource` method in order to add support for the new `/metrics/resource` endpoint with Kubernetes version >=1.24
## Design Details
Currently the virtual kubelet code uses the `PodStatsSummaryHandler` method to set up a http handler for serving pod metrics via the `/stats/summary` endpoint.
To support the updated metrics server, we need to add another handler `PodMetricsResourceHandler` which can serve metrics via the `/metrics/resource` endpoint.
The `PodMetricsResourceHandler` calls the new `GetMetricsResource` method of the provider to get the metrics from the specific provider.
### API
Add `GetMetricsResource` to `PodHandlerConfig`
```go
type PodHandlerConfig struct { //nolint:golint
RunInContainer ContainerExecHandlerFunc
GetContainerLogs ContainerLogsHandlerFunc
// GetPods is meant to enumerate the pods that the provider knows about
GetPods PodListerFunc
// GetPodsFromKubernetes is meant to enumerate the pods that the node is meant to be running
GetPodsFromKubernetes PodListerFunc
GetStatsSummary PodStatsSummaryHandlerFunc
GetMetricsResource PodMetricsResourceHandlerFunc
StreamIdleTimeout time.Duration
StreamCreationTimeout time.Duration
}
```
Add endpoint to `PodHandler` method
```go
const MetricsResourceRouteSuffix = "/metrics/resource"
func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
r := mux.NewRouter()
// This matches the behaviour in the reference kubelet
r.StrictSlash(true)
if debug {
r.HandleFunc("/runningpods/", HandleRunningPods(p.GetPods)).Methods("GET")
}
r.HandleFunc("/pods", HandleRunningPods(p.GetPodsFromKubernetes)).Methods("GET")
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", HandleContainerLogs(p.GetContainerLogs)).Methods("GET")
r.HandleFunc(
"/exec/{namespace}/{pod}/{container}",
HandleContainerExec(
p.RunInContainer,
WithExecStreamCreationTimeout(p.StreamCreationTimeout),
WithExecStreamIdleTimeout(p.StreamIdleTimeout),
),
).Methods("POST", "GET")
if p.GetStatsSummary != nil {
f := HandlePodStatsSummary(p.GetStatsSummary)
r.HandleFunc("/stats/summary", f).Methods("GET")
r.HandleFunc("/stats/summary/", f).Methods("GET")
}
if p.GetMetricsResource != nil {
f := HandlePodMetricsResource(p.GetMetricsResource)
r.HandleFunc(MetricsResourceRouteSuffix, f).Methods("GET")
r.HandleFunc(MetricsResourceRouteSuffix+"/", f).Methods("GET")
}
r.NotFoundHandler = http.HandlerFunc(NotFound)
return r
}
```
New `PodMetricsResourceHandler` method, that uses the new `PodMetricsResourceHandlerFunc` definition.
```go
// PodMetricsResourceHandler creates an http handler for serving pod metrics.
//
// If the passed in handler func is nil this will create handlers which only
// serves http.StatusNotImplemented
func PodMetricsResourceHandler(f PodMetricsResourceHandlerFunc) http.Handler {
if f == nil {
return http.HandlerFunc(NotImplemented)
}
r := mux.NewRouter()
h := HandlePodMetricsResource(f)
r.Handle(MetricsResourceRouteSuffix, ochttp.WithRouteTag(h, "PodMetricsResourceHandler")).Methods("GET")
r.Handle(MetricsResourceRouteSuffix+"/", ochttp.WithRouteTag(h, "PodMetricsResourceHandler")).Methods("GET")
r.NotFoundHandler = http.HandlerFunc(NotFound)
return r
}
```
`HandlePodMetricsResource` method returns a HandlerFunc which serves the metrics encoded in prometheus' text format encoding as expected by the metrics-server
```go
// HandlePodMetricsResource makes an HTTP handler for implementing the kubelet /metrics/resource endpoint
func HandlePodMetricsResource(h PodMetricsResourceHandlerFunc) http.HandlerFunc {
if h == nil {
return NotImplemented
}
return handleError(func(w http.ResponseWriter, req *http.Request) error {
metrics, err := h(req.Context())
if err != nil {
if isCancelled(err) {
return err
}
return errors.Wrap(err, "error getting status from provider")
}
b, err := json.Marshal(metrics)
if err != nil {
return errors.Wrap(err, "error marshalling metrics")
}
if _, err := w.Write(b); err != nil {
return errors.Wrap(err, "could not write to client")
}
return nil
})
}
```
The `PodMetricsResourceHandlerFunc` returns the metrics data using Prometheus' `MetricFamily` data structure. More details are provided in the Data subsection
```go
// PodMetricsResourceHandlerFunc defines the handler for getting pod metrics
type PodMetricsResourceHandlerFunc func(context.Context) ([]*dto.MetricFamily, error)
```
### Data
The updated metrics server does not add any new fields to the metrics data but uses the Prometheus textparse series parser to parse and reconstruct the [MetricsBatch](https://github.com/kubernetes-sigs/metrics-server/blob/83b2e01f9825849ae5f562e47aa1a4178b5d06e5/pkg/storage/types.go#L31) data structure.
Currently virtual-kubelet is sending data to the server using the [summary](https://github.com/virtual-kubelet/virtual-kubelet/blob/be0a062aec9a5eeea3ad6fbe5aec557a235558f6/node/api/statsv1alpha1/types.go#L24) data structure. The Prometheus text parser expects a series of bytes as in the Prometheus [model.Samples](https://github.com/kubernetes/kubernetes/blob/a93eda9db305611cacd8b6ee930ab3149a08f9b0/vendor/github.com/prometheus/common/model/value.go#L184) data structure, similar to the test [here](https://github.com/prometheus/prometheus/blob/c70d85baed260f6013afd18d6cd0ffcac4339861/model/textparse/promparse_test.go#L31).
Examples of how the new metrics are defined may be seen in the Kubernetes e2e test that calls the /metrics/resource endpoint [here](https://github.com/kubernetes/kubernetes/blob/a93eda9db305611cacd8b6ee930ab3149a08f9b0/test/e2e_node/resource_metrics_test.go#L76), and the kubelet metrics defined in the Kubernetes/kubelet code [here](https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/metrics/collectors/resource_metrics.go) .
```go
var (
nodeCPUUsageDesc = metrics.NewDesc("node_cpu_usage_seconds_total",
"Cumulative cpu time consumed by the node in core-seconds",
nil,
nil,
metrics.ALPHA,
"")
nodeMemoryUsageDesc = metrics.NewDesc("node_memory_working_set_bytes",
"Current working set of the node in bytes",
nil,
nil,
metrics.ALPHA,
"")
containerCPUUsageDesc = metrics.NewDesc("container_cpu_usage_seconds_total",
"Cumulative cpu time consumed by the container in core-seconds",
[]string{"container", "pod", "namespace"},
nil,
metrics.ALPHA,
"")
containerMemoryUsageDesc = metrics.NewDesc("container_memory_working_set_bytes",
"Current working set of the container in bytes",
[]string{"container", "pod", "namespace"},
nil,
metrics.ALPHA,
"")
podCPUUsageDesc = metrics.NewDesc("pod_cpu_usage_seconds_total",
"Cumulative cpu time consumed by the pod in core-seconds",
[]string{"pod", "namespace"},
nil,
metrics.ALPHA,
"")
podMemoryUsageDesc = metrics.NewDesc("pod_memory_working_set_bytes",
"Current working set of the pod in bytes",
[]string{"pod", "namespace"},
nil,
metrics.ALPHA,
"")
resourceScrapeResultDesc = metrics.NewDesc("scrape_error",
"1 if there was an error while getting container metrics, 0 otherwise",
nil,
nil,
metrics.ALPHA,
"")
containerStartTimeDesc = metrics.NewDesc("container_start_time_seconds",
"Start time of the container since unix epoch in seconds",
[]string{"container", "pod", "namespace"},
nil,
metrics.ALPHA,
"")
)
```
The kubernetes/kubelet code implements Prometheus' [collector](https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/metrics/collectors/resource_metrics.go#L88) interface which is used along with the k8s.io/component-base implementation of the [registry](https://github.com/kubernetes/component-base/blob/40d14bdbd62f9e2ea697f97d81d4abc72839901e/metrics/registry.go#L114) interface in order to collect and return the metrics data using the Prometheus' [MetricFamily](https://github.com/prometheus/client_model/blob/master/go/metrics.pb.go#L773) data structure.
The Gather method in the registry calls the kubelet collector's Collect method, and returns the data using the MetricFamily data structure. The metrics server expects metrics to be encoded in prometheus'
text format, and the kubelet uses the http handler from prometheus' promhttp module which returns the metrics data encoded in prometheus' text format encoding.
```go
type KubeRegistry interface {
// Deprecated
RawMustRegister(...prometheus.Collector)
// CustomRegister is our internal variant of Prometheus registry.Register
CustomRegister(c StableCollector) error
// CustomMustRegister is our internal variant of Prometheus registry.MustRegister
CustomMustRegister(cs ...StableCollector)
// Register conforms to Prometheus registry.Register
Register(Registerable) error
// MustRegister conforms to Prometheus registry.MustRegister
MustRegister(...Registerable)
// Unregister conforms to Prometheus registry.Unregister
Unregister(collector Collector) bool
// Gather conforms to Prometheus gatherer.Gather
Gather() ([]*dto.MetricFamily, error)
// Reset invokes the Reset() function on all items in the registry
// which are added as resettables.
Reset()
}
```
Prometheus MetricsFamily data structure:
```go
type MetricFamily struct {
Name *string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Help *string `protobuf:"bytes,2,opt,name=help" json:"help,omitempty"`
Type *MetricType `protobuf:"varint,3,opt,name=type,enum=io.prometheus.client.MetricType" json:"type,omitempty"`
Metric []*Metric `protobuf:"bytes,4,rep,name=metric" json:"metric,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
```
Therefore the provider's GetMetricsResource method should use the same return type as the Gather method in the registry interface.
### Changes to the Provider.
In order to support the new metrics endpoint the Provider must implement the GetMetricsResource method with definition
```golang
import (
dto "github.com/prometheus/client_model/go"
"context"
)
func GetMetricsResource(context.Context) ([]*dto.MetricsFamily, error) {
...
}
```
### Test Plan
- Write a provider implementation for GetMetricsResource method in ACI Provider and deploy pods get metrics using kubectl
- Run end-to-end tests with the provider implementation

16
go.mod
View File

@@ -10,10 +10,13 @@ require (
github.com/gorilla/mux v1.8.0
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.13.0
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.42.0
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.5.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
go.opencensus.io v0.23.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
@@ -21,12 +24,13 @@ require (
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sys v0.5.0
golang.org/x/time v0.3.0
google.golang.org/protobuf v1.28.1
gotest.tools v2.2.0+incompatible
k8s.io/api v0.26.2
k8s.io/apimachinery v0.26.2
k8s.io/apiserver v0.25.0
k8s.io/client-go v0.25.0
k8s.io/klog/v2 v2.80.1
k8s.io/klog/v2 v2.90.1
k8s.io/utils v0.0.0-20221107191617-1a15be271d1d
sigs.k8s.io/controller-runtime v0.13.0
)
@@ -64,13 +68,12 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/moby/spdystream v0.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/uber/jaeger-client-go v2.25.0+incompatible // indirect
go.etcd.io/etcd/api/v3 v3.5.4 // indirect
@@ -89,14 +92,13 @@ require (
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/api v0.57.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
google.golang.org/grpc v1.47.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect

20
go.sum
View File

@@ -285,6 +285,8 @@ github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8=
@@ -318,19 +320,22 @@ github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_golang v1.13.0 h1:b71QUfeo5M8gq2+evJdTPfZhYMAU0uKPkyPJ7TPsloU=
github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ=
github.com/prometheus/client_golang v1.14.0 h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw=
github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.37.0 h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8pXE=
github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM=
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
@@ -355,13 +360,16 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA=
github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U=
github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
@@ -528,6 +536,8 @@ golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b h1:clP8eMhB30EHdc0bd2Twtq6kgU7yl5ub2cQLSdrv1Dg=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.5.0 h1:HuArIo48skDwlrvM3sEdHXElYslAMsf3KwRkkW4MC4s=
golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -869,8 +879,8 @@ k8s.io/client-go v0.25.0 h1:CVWIaCETLMBNiTUta3d5nzRbXvY5Hy9Dpl+VvREpu5E=
k8s.io/client-go v0.25.0/go.mod h1:lxykvypVfKilxhTklov0wz1FoaUZ8X4EwbhS6rpRfN8=
k8s.io/component-base v0.25.0 h1:haVKlLkPCFZhkcqB6WCvpVxftrg6+FK5x1ZuaIDaQ5Y=
k8s.io/component-base v0.25.0/go.mod h1:F2Sumv9CnbBlqrpdf7rKZTmmd2meJq0HizeyY/yAFxk=
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=
k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E=
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4=
k8s.io/utils v0.0.0-20221107191617-1a15be271d1d h1:0Smp/HP1OH4Rvhe+4B8nWGERtlqAGSftbSbbmm45oFs=

View File

@@ -0,0 +1,79 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"fmt"
"io"
"net/http"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/remotecommand"
utilexec "k8s.io/utils/exec"
)
// Attacher knows how to attach to a container in a pod.
type Attacher interface {
// AttachToContainer attaches to a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
AttachToContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
}
// ServeAttach handles requests to attach to a container. After
// creating/receiving the required streams, it delegates the actual attachment
// to the attacher.
func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
if !ok {
// error is handled by createStreams
return
}
defer ctx.conn.Close()
err := attacher.AttachToContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
if err != nil {
if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
rc := exitErr.ExitStatus()
ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Reason: remotecommandconsts.NonZeroExitCodeReason,
Details: &metav1.StatusDetails{
Causes: []metav1.StatusCause{
{
Type: remotecommandconsts.ExitCodeCauseType,
Message: fmt.Sprintf("%d", rc),
},
},
},
Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr),
}})
return
}
err = fmt.Errorf("error attaching to container: %v", err)
runtime.HandleError(err)
ctx.writeStatus(apierrors.NewInternalError(err))
return
}
ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusSuccess,
}})
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
api "github.com/virtual-kubelet/virtual-kubelet/node/api"
stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
"k8s.io/apimachinery/pkg/util/net"
)
@@ -29,3 +30,21 @@ func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
}
return res, nil
}
// GetStatsSummary queries the /metrics/resource endpoint of the virtual-kubelet and returns the Summary object obtained as a response.
func (f *Framework) GetMetricsResource(ctx context.Context) ([]byte, error) {
// Query the /stats/summary endpoint.
b, err := f.KubeClient.CoreV1().
RESTClient().
Get().
Namespace(f.Namespace).
Resource("pods").
SubResource("proxy").
Name(net.JoinSchemeNamePort("https", f.NodeName, "10250")).
Suffix(api.MetricsResourceRouteSuffix).DoRaw(ctx)
if err != nil {
return nil, err
}
return b, nil
}

139
node/api/attach.go Normal file
View File

@@ -0,0 +1,139 @@
// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"context"
"io"
"net/http"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/internal/kubernetes/remotecommand"
"k8s.io/apimachinery/pkg/types"
remoteutils "k8s.io/client-go/tools/remotecommand"
)
// ContainerAttachHandlerFunc defines the handler function used for "execing" into a
// container in a pod.
type ContainerAttachHandlerFunc func(ctx context.Context, namespace, podName, containerName string, attach AttachIO) error
// HandleContainerAttach makes an http handler func from a Provider which execs a command in a pod's container
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
func HandleContainerAttach(h ContainerAttachHandlerFunc, opts ...ContainerExecHandlerOption) http.HandlerFunc {
if h == nil {
return NotImplemented
}
var cfg ContainerExecHandlerConfig
for _, o := range opts {
o(&cfg)
}
if cfg.StreamIdleTimeout == 0 {
cfg.StreamIdleTimeout = 30 * time.Second
}
if cfg.StreamCreationTimeout == 0 {
cfg.StreamCreationTimeout = 30 * time.Second
}
return handleError(func(w http.ResponseWriter, req *http.Request) error {
vars := mux.Vars(req)
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",")
streamOpts, err := getExecOptions(req)
if err != nil {
return errdefs.AsInvalidInput(err)
}
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
attach := &containerAttachContext{ctx: ctx, h: h, pod: pod, namespace: namespace, container: container}
remotecommand.ServeAttach(
w,
req,
attach,
"",
"",
container,
streamOpts,
cfg.StreamIdleTimeout,
cfg.StreamCreationTimeout,
supportedStreamProtocols,
)
return nil
})
}
type containerAttachContext struct {
h ContainerAttachHandlerFunc
namespace, pod, container string
ctx context.Context
}
// AttachToContainer Implements remotecommand.Attacher
// This is called by remotecommand.ServeAttach
func (c *containerAttachContext) AttachToContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remoteutils.TerminalSize, timeout time.Duration) error {
eio := &execIO{
tty: tty,
stdin: in,
stdout: out,
stderr: err,
}
if tty {
eio.chResize = make(chan TermSize)
}
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
if tty {
go func() {
send := func(s remoteutils.TerminalSize) bool {
select {
case eio.chResize <- TermSize{Width: s.Width, Height: s.Height}:
return false
case <-ctx.Done():
return true
}
}
for {
select {
case s := <-resize:
if send(s) {
return
}
case <-ctx.Done():
return
}
}
}()
}
return c.h(c.ctx, c.namespace, c.pod, c.container, eio)
}

67
node/api/metrics.go Normal file
View File

@@ -0,0 +1,67 @@
// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"bytes"
"context"
"net/http"
"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
const (
PrometheusTextFormatContentType = "text/plain; version=0.0.4"
)
// PodMetricsResourceHandlerFunc defines the handler for getting pod metrics
type PodMetricsResourceHandlerFunc func(context.Context) ([]*dto.MetricFamily, error)
// HandlePodMetricsResource makes an HTTP handler for implementing the kubelet /metrics/resource endpoint
func HandlePodMetricsResource(h PodMetricsResourceHandlerFunc) http.HandlerFunc {
if h == nil {
return NotImplemented
}
return handleError(func(w http.ResponseWriter, req *http.Request) error {
metrics, err := h(req.Context())
if err != nil {
if isCancelled(err) {
return err
}
return errors.Wrap(err, "error getting status from provider")
}
// Convert metrics to Prometheus text format.
var buffer bytes.Buffer
enc := expfmt.NewEncoder(&buffer, expfmt.FmtText)
for _, mf := range metrics {
if err := enc.Encode(mf); err != nil {
return errors.Wrap(err, "could not convert metrics to prometheus text format")
}
}
// Set the response content type to "text/plain; version=0.0.4".
w.Header().Set("Content-Type", PrometheusTextFormatContentType)
// Write the metrics in Prometheus text format to the response writer.
if _, err := w.Write(buffer.Bytes()); err != nil {
return errors.Wrap(err, "could not write to client")
}
return nil
})
}

149
node/api/metrics_test.go Normal file
View File

@@ -0,0 +1,149 @@
// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api_test
import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
"google.golang.org/protobuf/proto"
)
const (
prometheusContentType = "text/plain; version=0.0.4"
)
func TestHandlePodMetricsResource(t *testing.T) {
testCases := []struct {
name string
handler api.PodMetricsResourceHandlerFunc
expectedStatusCode int
expectedError error
}{
{
name: "Valid PodMetricsResourceHandlerFunc",
handler: func(_ context.Context) ([]*dto.MetricFamily, error) {
// Create the expected metrics.
cpuUsageMetric := &dto.MetricFamily{
Name: proto.String("container_cpu_usage_seconds_total"),
Help: proto.String("[ALPHA] Cumulative cpu time consumed by the container in core-seconds"),
Type: dto.MetricType_GAUGE.Enum(),
Metric: []*dto.Metric{},
}
memoryUsageMetric := &dto.MetricFamily{
Name: proto.String("container_memory_working_set_bytes"),
Help: proto.String("[ALPHA] Current working set of the container in bytes"),
Type: dto.MetricType_GAUGE.Enum(),
Metric: []*dto.Metric{},
}
// Add the sample metrics to the metric families.
cpuUsageMetric.Metric = append(cpuUsageMetric.Metric, createSampleMetric(
map[string]string{
"container": "simple-hello-world-container",
"namespace": "k8se-apps",
"pod": "test-pod--zruwatj-86454fdc54-2wwpw",
},
0.1104636, 1680536423102,
))
cpuUsageMetric.Metric = append(cpuUsageMetric.Metric, createSampleMetric(
map[string]string{
"container": "simple-hello-world-container",
"namespace": "k8se-apps",
"pod": "test-pod--zruwatj-86454fdc54-4mzd4",
},
0.11322, 1680536423103,
))
memoryUsageMetric.Metric = append(memoryUsageMetric.Metric, createSampleMetric(
map[string]string{
"container": "simple-hello-world-container",
"namespace": "k8se-apps",
"pod": "test-pod--zruwatj-86454fdc54-2wwpw",
},
2.3277568e+07, 1680536423102,
))
memoryUsageMetric.Metric = append(memoryUsageMetric.Metric, createSampleMetric(
map[string]string{
"container": "simple-hello-world-container",
"namespace": "k8se-apps",
"pod": "test-pod--zruwatj-86454fdc54-4mzd4",
},
2.2450176e+07, 1680536423104,
))
return []*dto.MetricFamily{cpuUsageMetric, memoryUsageMetric}, nil
},
expectedStatusCode: http.StatusOK,
},
{
name: "Nil PodMetricsResourceHandlerFunc",
handler: nil,
expectedStatusCode: http.StatusNotImplemented,
},
{
name: "Error in PodMetricsResourceHandlerFunc",
handler: func(_ context.Context) ([]*dto.MetricFamily, error) {
return nil, errors.New("test error")
},
expectedStatusCode: http.StatusInternalServerError,
expectedError: errors.New("error getting status from provider: test error"),
},
// Add more test cases as needed
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
h := api.HandlePodMetricsResource(tc.handler)
require.NotNil(t, h)
rr := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/metrics/resource", nil)
require.NoError(t, err)
h.ServeHTTP(rr, req)
assert.Equal(t, tc.expectedStatusCode, rr.Code)
if tc.expectedError != nil {
bodyBytes, err := ioutil.ReadAll(rr.Body)
require.NoError(t, err)
assert.Contains(t, string(bodyBytes), tc.expectedError.Error())
} else if tc.expectedStatusCode == http.StatusOK {
contentType := rr.Header().Get("Content-Type")
assert.Equal(t, prometheusContentType, contentType)
}
})
}
}
func createSampleMetric(labels map[string]string, value float64, timestamp int64) *dto.Metric {
labelPairs := []*dto.LabelPair{}
for k, v := range labels {
labelPairs = append(labelPairs, &dto.LabelPair{
Name: proto.String(k),
Value: proto.String(v),
})
}
return &dto.Metric{Label: labelPairs, Gauge: &dto.Gauge{Value: &value}, TimestampMs: &timestamp}
}

View File

@@ -34,17 +34,21 @@ type ServeMux interface {
}
type PodHandlerConfig struct { //nolint:golint
RunInContainer ContainerExecHandlerFunc
GetContainerLogs ContainerLogsHandlerFunc
RunInContainer ContainerExecHandlerFunc
AttachToContainer ContainerAttachHandlerFunc
GetContainerLogs ContainerLogsHandlerFunc
// GetPods is meant to enumerate the pods that the provider knows about
GetPods PodListerFunc
// GetPodsFromKubernetes is meant to enumerate the pods that the node is meant to be running
GetPodsFromKubernetes PodListerFunc
GetStatsSummary PodStatsSummaryHandlerFunc
GetMetricsResource PodMetricsResourceHandlerFunc
StreamIdleTimeout time.Duration
StreamCreationTimeout time.Duration
}
const MetricsResourceRouteSuffix = "/metrics/resource"
// PodHandler creates an http handler for interacting with pods/containers.
func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
r := mux.NewRouter()
@@ -65,6 +69,14 @@ func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
WithExecStreamIdleTimeout(p.StreamIdleTimeout),
),
).Methods("POST", "GET")
r.HandleFunc(
"/attach/{namespace}/{pod}/{container}",
HandleContainerAttach(
p.AttachToContainer,
WithExecStreamCreationTimeout(p.StreamCreationTimeout),
WithExecStreamIdleTimeout(p.StreamIdleTimeout),
),
).Methods("POST", "GET")
if p.GetStatsSummary != nil {
f := HandlePodStatsSummary(p.GetStatsSummary)
@@ -72,6 +84,11 @@ func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
r.HandleFunc("/stats/summary/", f).Methods("GET")
}
if p.GetMetricsResource != nil {
f := HandlePodMetricsResource(p.GetMetricsResource)
r.HandleFunc(MetricsResourceRouteSuffix, f).Methods("GET")
r.HandleFunc(MetricsResourceRouteSuffix+"/", f).Methods("GET")
}
r.NotFoundHandler = http.HandlerFunc(NotFound)
return r
}
@@ -97,6 +114,26 @@ func PodStatsSummaryHandler(f PodStatsSummaryHandlerFunc) http.Handler {
return r
}
// PodMetricsResourceHandler creates an http handler for serving pod metrics.
//
// If the passed in handler func is nil this will create handlers which only
// serves http.StatusNotImplemented
func PodMetricsResourceHandler(f PodMetricsResourceHandlerFunc) http.Handler {
if f == nil {
return http.HandlerFunc(NotImplemented)
}
r := mux.NewRouter()
h := HandlePodMetricsResource(f)
r.Handle(MetricsResourceRouteSuffix, ochttp.WithRouteTag(h, "PodMetricsResourceHandler")).Methods("GET")
r.Handle(MetricsResourceRouteSuffix+"/", ochttp.WithRouteTag(h, "PodMetricsResourceHandler")).Methods("GET")
r.NotFoundHandler = http.HandlerFunc(NotFound)
return r
}
// AttachPodRoutes adds the http routes for pod stuff to the passed in serve mux.
//
// Callers should take care to namespace the serve mux as they see fit, however
@@ -111,7 +148,8 @@ func AttachPodRoutes(p PodHandlerConfig, mux ServeMux, debug bool) {
// The main reason for this struct is in case of expansion we do not need to break
// the package level API.
type PodMetricsConfig struct {
GetStatsSummary PodStatsSummaryHandlerFunc
GetStatsSummary PodStatsSummaryHandlerFunc
GetMetricsResource PodMetricsResourceHandlerFunc
}
// AttachPodMetricsRoutes adds the http routes for pod/node metrics to the passed in serve mux.
@@ -120,6 +158,7 @@ type PodMetricsConfig struct {
// these routes get called by the Kubernetes API server.
func AttachPodMetricsRoutes(p PodMetricsConfig, mux ServeMux) {
mux.Handle("/", InstrumentHandler(HandlePodStatsSummary(p.GetStatsSummary)))
mux.Handle("/", InstrumentHandler(HandlePodMetricsResource(p.GetMetricsResource)))
}
func instrumentRequest(r *http.Request) *http.Request {

View File

@@ -4,6 +4,7 @@ import (
"context"
"io"
dto "github.com/prometheus/client_model/go"
"github.com/virtual-kubelet/virtual-kubelet/node"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
"github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
@@ -27,8 +28,15 @@ type Provider interface {
// between in/out/err and the container's stdin/stdout/stderr.
RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error
// AttachToContainer attaches to the executing process of a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
AttachToContainer(ctx context.Context, namespace, podName, containerName string, attach api.AttachIO) error
// GetStatsSummary gets the stats for the node, including running pods
GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error)
// GetMetricsResource gets the metrics for the node, including running pods
GetMetricsResource(context.Context) ([]*dto.MetricFamily, error)
}
// ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself.
@@ -54,13 +62,15 @@ func AttachProviderRoutes(mux api.ServeMux) NodeOpt {
return func(cfg *NodeConfig) error {
cfg.routeAttacher = func(p Provider, cfg NodeConfig, pods corev1listers.PodLister) {
mux.Handle("/", api.PodHandler(api.PodHandlerConfig{
RunInContainer: p.RunInContainer,
GetContainerLogs: p.GetContainerLogs,
GetPods: p.GetPods,
RunInContainer: p.RunInContainer,
AttachToContainer: p.AttachToContainer,
GetContainerLogs: p.GetContainerLogs,
GetPods: p.GetPods,
GetPodsFromKubernetes: func(context.Context) ([]*v1.Pod, error) {
return pods.List(labels.Everything())
},
GetStatsSummary: p.GetStatsSummary,
GetMetricsResource: p.GetMetricsResource,
StreamIdleTimeout: cfg.StreamIdleTimeout,
StreamCreationTimeout: cfg.StreamCreationTimeout,
}, true))

View File

@@ -1,11 +1,13 @@
package e2e
import (
"bytes"
"context"
"fmt"
"testing"
"time"
"github.com/prometheus/common/expfmt"
"github.com/virtual-kubelet/virtual-kubelet/internal/podutils"
stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
"gotest.tools/assert"
@@ -111,6 +113,72 @@ func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) {
}
}
// TestGetMetricsResource creates a pod having two containers and queries the /metrics/resource endpoint of the virtual-kubelet.
// It expects this endpoint to return stats for the current node, as well as for the aforementioned pod and each of its two containers.
func (ts *EndToEndTestSuite) TestGetMetricsResource(t *testing.T) {
ctx := context.Background()
// Create a pod with prefix "nginx-" having three containers.
pod, err := f.CreatePod(ctx, f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo", "bar", "baz"))
if err != nil {
t.Fatal(err)
}
// Delete the "nginx-0-X" pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
// Wait for the "nginx-" pod to be reported as running and ready.
if _, err := f.WaitUntilPodReady(pod.Namespace, pod.Name); err != nil {
t.Fatal(err)
}
// Grab the stats from the provider.
metricsResourceResponse, err := f.GetMetricsResource(ctx)
if err != nil {
t.Fatal(err)
}
// decode metrics response bytes to metric family
reader := bytes.NewReader(metricsResourceResponse)
parser := expfmt.TextParser{}
metricsFamilyMap, err := parser.TextToMetricFamilies(reader)
if err != nil {
t.Fatal(err)
}
// Make sure the "nginx-" pod exists in the metrics returned.
currentContainerStatsCount := 0
found := false
for metricName, metricFamily := range metricsFamilyMap {
if metricName == "pod_cpu_usage_seconds_total" {
for _, metric := range metricFamily.Metric {
if *metric.Label[1].Value == pod.Name {
found = true
}
}
}
if metricName == "container_cpu_usage_seconds_total" {
for _, metric := range metricFamily.Metric {
if *metric.Label[1].Value == pod.Name {
currentContainerStatsCount += 1
}
}
}
}
if !found {
t.Fatalf("Pod %s not found in metrics", pod.Name)
}
// Make sure that we've got stats for all the containers in the "nginx-" pod.
desiredContainerStatsCount := len(pod.Spec.Containers)
if currentContainerStatsCount != desiredContainerStatsCount {
t.Fatalf("expected stats for %d containers, got stats for %d containers", desiredContainerStatsCount, currentContainerStatsCount)
}
}
// TestPodLifecycleGracefulDelete creates a pod and verifies that the provider has been asked to create it.
// Then, it deletes the pods and verifies that the provider has been asked to delete it.
// These verifications are made using the /stats/summary endpoint of the virtual-kubelet, by checking for the presence or absence of the pods.