From a457d445a301c9375ec416b6229e06de85866eb0 Mon Sep 17 00:00:00 2001 From: fnuarnav <102992687+fnuarnav@users.noreply.github.com> Date: Tue, 28 Mar 2023 05:01:37 -0700 Subject: [PATCH] feat: Implement new metrics endpoint for k8s 1.24+ (#1082) --- .../internal/provider/mock/mock.go | 124 ++++++++++++++++++ internal/test/e2e/framework/stats.go | 25 ++++ node/api/metrics.go | 53 ++++++++ node/api/server.go | 32 ++++- node/nodeutil/provider.go | 5 + test/e2e/basic.go | 58 ++++++++ 6 files changed, 296 insertions(+), 1 deletion(-) create mode 100644 node/api/metrics.go diff --git a/cmd/virtual-kubelet/internal/provider/mock/mock.go b/cmd/virtual-kubelet/internal/provider/mock/mock.go index 87834fa36..c7d292c26 100644 --- a/cmd/virtual-kubelet/internal/provider/mock/mock.go +++ b/cmd/virtual-kubelet/internal/provider/mock/mock.go @@ -10,6 +10,7 @@ import ( "strings" "time" + dto "github.com/prometheus/client_model/go" "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node/api" @@ -508,6 +509,129 @@ func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, err return res, nil } +func (p *MockProvider) generateMockMetrics(metricsMap map[string][]*dto.Metric, resourceType string, label []*dto.LabelPair) map[string][]*dto.Metric { + var ( + cpuMetricSuffix = "_cpu_usage_seconds_total" + memoryMetricSuffix = "_memory_working_set_bytes" + dummyValue = float64(100) + ) + + if metricsMap == nil { + metricsMap = map[string][]*dto.Metric{} + } + + finalCpuMetricName := resourceType + cpuMetricSuffix + finalMemoryMetricName := resourceType + memoryMetricSuffix + + newCPUMetric := dto.Metric{ + Label: label, + Counter: &dto.Counter{ + Value: &dummyValue, + }, + } + newMemoryMetric := dto.Metric{ + Label: label, + Gauge: &dto.Gauge{ + Value: &dummyValue, + }, + } + // if metric family exists add to metric array + if cpuMetrics, ok := metricsMap[finalCpuMetricName]; ok { + metricsMap[finalCpuMetricName] = append(cpuMetrics, &newCPUMetric) + } else { + metricsMap[finalCpuMetricName] = []*dto.Metric{&newCPUMetric} + } + if memoryMetrics, ok := metricsMap[finalMemoryMetricName]; ok { + metricsMap[finalMemoryMetricName] = append(memoryMetrics, &newMemoryMetric) + } else { + metricsMap[finalMemoryMetricName] = []*dto.Metric{&newMemoryMetric} + } + + return metricsMap +} + +func (p *MockProvider) getMetricType(metricName string) *dto.MetricType { + var ( + dtoCounterMetricType = dto.MetricType_COUNTER + dtoGaugeMetricType = dto.MetricType_GAUGE + cpuMetricSuffix = "_cpu_usage_seconds_total" + memoryMetricSuffix = "_memory_working_set_bytes" + ) + if strings.HasSuffix(metricName, cpuMetricSuffix) { + return &dtoCounterMetricType + } + if strings.HasSuffix(metricName, memoryMetricSuffix) { + return &dtoGaugeMetricType + } + + return nil +} + +func (p *MockProvider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) { + var span trace.Span + ctx, span = trace.StartSpan(ctx, "GetMetricsResource") //nolint: ineffassign,staticcheck + defer span.End() + + var ( + nodeNameStr = "NodeName" + podNameStr = "PodName" + containerNameStr = "containerName" + ) + nodeLabels := []*dto.LabelPair{ + { + Name: &nodeNameStr, + Value: &p.nodeName, + }, + } + + metricsMap := p.generateMockMetrics(nil, "node", nodeLabels) + for _, pod := range p.pods { + podLabels := []*dto.LabelPair{ + { + Name: &nodeNameStr, + Value: &p.nodeName, + }, + { + Name: &podNameStr, + Value: &pod.Name, + }, + } + metricsMap = p.generateMockMetrics(metricsMap, "pod", podLabels) + for _, container := range pod.Spec.Containers { + containerLabels := []*dto.LabelPair{ + { + Name: &nodeNameStr, + Value: &p.nodeName, + }, + { + Name: &podNameStr, + Value: &pod.Name, + }, + { + Name: &containerNameStr, + Value: &container.Name, + }, + } + metricsMap = p.generateMockMetrics(metricsMap, "container", containerLabels) + } + } + + res := []*dto.MetricFamily{} + for metricName := range metricsMap { + tempName := metricName + tempMetrics := metricsMap[tempName] + + metricFamily := dto.MetricFamily{ + Name: &tempName, + Type: p.getMetricType(tempName), + Metric: tempMetrics, + } + res = append(res, &metricFamily) + } + + return res, nil +} + // NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done // within the provider. func (p *MockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) { diff --git a/internal/test/e2e/framework/stats.go b/internal/test/e2e/framework/stats.go index 5e39d688a..7277a69a4 100644 --- a/internal/test/e2e/framework/stats.go +++ b/internal/test/e2e/framework/stats.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" + dto "github.com/prometheus/client_model/go" + api "github.com/virtual-kubelet/virtual-kubelet/node/api" stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" "k8s.io/apimachinery/pkg/util/net" ) @@ -29,3 +31,26 @@ func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error) } return res, nil } + +// GetStatsSummary queries the /metrics/resource endpoint of the virtual-kubelet and returns the Summary object obtained as a response. +func (f *Framework) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) { + // Query the /stats/summary endpoint. + b, err := f.KubeClient.CoreV1(). + RESTClient(). + Get(). + Namespace(f.Namespace). + Resource("pods"). + SubResource("proxy"). + Name(net.JoinSchemeNamePort("https", f.NodeName, "10250")). + Suffix(api.MetricsResourceRouteSuffix).DoRaw(ctx) + if err != nil { + return nil, err + } + // Unmarshal the response as a MetricFamily object and return it. + res := []*dto.MetricFamily{} + err = json.Unmarshal(b, &res) + if err != nil { + return nil, err + } + return res, nil +} diff --git a/node/api/metrics.go b/node/api/metrics.go new file mode 100644 index 000000000..15e44568a --- /dev/null +++ b/node/api/metrics.go @@ -0,0 +1,53 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/pkg/errors" + dto "github.com/prometheus/client_model/go" +) + +// PodMetricsResourceHandlerFunc defines the handler for getting pod metrics +type PodMetricsResourceHandlerFunc func(context.Context) ([]*dto.MetricFamily, error) + +// HandlePodMetricsResource makes an HTTP handler for implementing the kubelet /metrics/resource endpoint +func HandlePodMetricsResource(h PodMetricsResourceHandlerFunc) http.HandlerFunc { + if h == nil { + return NotImplemented + } + return handleError(func(w http.ResponseWriter, req *http.Request) error { + metrics, err := h(req.Context()) + if err != nil { + if isCancelled(err) { + return err + } + return errors.Wrap(err, "error getting status from provider") + } + + b, err := json.Marshal(metrics) + if err != nil { + return errors.Wrap(err, "error marshalling metrics") + } + + if _, err := w.Write(b); err != nil { + return errors.Wrap(err, "could not write to client") + } + return nil + }) +} diff --git a/node/api/server.go b/node/api/server.go index cba81e7d9..f0e130f6e 100644 --- a/node/api/server.go +++ b/node/api/server.go @@ -41,10 +41,13 @@ type PodHandlerConfig struct { //nolint:golint // GetPodsFromKubernetes is meant to enumerate the pods that the node is meant to be running GetPodsFromKubernetes PodListerFunc GetStatsSummary PodStatsSummaryHandlerFunc + GetMetricsResource PodMetricsResourceHandlerFunc StreamIdleTimeout time.Duration StreamCreationTimeout time.Duration } +const MetricsResourceRouteSuffix = "/metrics/resource" + // PodHandler creates an http handler for interacting with pods/containers. func PodHandler(p PodHandlerConfig, debug bool) http.Handler { r := mux.NewRouter() @@ -72,6 +75,11 @@ func PodHandler(p PodHandlerConfig, debug bool) http.Handler { r.HandleFunc("/stats/summary/", f).Methods("GET") } + if p.GetMetricsResource != nil { + f := HandlePodMetricsResource(p.GetMetricsResource) + r.HandleFunc(MetricsResourceRouteSuffix, f).Methods("GET") + r.HandleFunc(MetricsResourceRouteSuffix+"/", f).Methods("GET") + } r.NotFoundHandler = http.HandlerFunc(NotFound) return r } @@ -97,6 +105,26 @@ func PodStatsSummaryHandler(f PodStatsSummaryHandlerFunc) http.Handler { return r } +// PodMetricsResourceHandler creates an http handler for serving pod metrics. +// +// If the passed in handler func is nil this will create handlers which only +// serves http.StatusNotImplemented +func PodMetricsResourceHandler(f PodMetricsResourceHandlerFunc) http.Handler { + if f == nil { + return http.HandlerFunc(NotImplemented) + } + + r := mux.NewRouter() + + h := HandlePodMetricsResource(f) + + r.Handle(MetricsResourceRouteSuffix, ochttp.WithRouteTag(h, "PodMetricsResourceHandler")).Methods("GET") + r.Handle(MetricsResourceRouteSuffix+"/", ochttp.WithRouteTag(h, "PodMetricsResourceHandler")).Methods("GET") + + r.NotFoundHandler = http.HandlerFunc(NotFound) + return r +} + // AttachPodRoutes adds the http routes for pod stuff to the passed in serve mux. // // Callers should take care to namespace the serve mux as they see fit, however @@ -111,7 +139,8 @@ func AttachPodRoutes(p PodHandlerConfig, mux ServeMux, debug bool) { // The main reason for this struct is in case of expansion we do not need to break // the package level API. type PodMetricsConfig struct { - GetStatsSummary PodStatsSummaryHandlerFunc + GetStatsSummary PodStatsSummaryHandlerFunc + GetMetricsResource PodMetricsResourceHandlerFunc } // AttachPodMetricsRoutes adds the http routes for pod/node metrics to the passed in serve mux. @@ -120,6 +149,7 @@ type PodMetricsConfig struct { // these routes get called by the Kubernetes API server. func AttachPodMetricsRoutes(p PodMetricsConfig, mux ServeMux) { mux.Handle("/", InstrumentHandler(HandlePodStatsSummary(p.GetStatsSummary))) + mux.Handle("/", InstrumentHandler(HandlePodMetricsResource(p.GetMetricsResource))) } func instrumentRequest(r *http.Request) *http.Request { diff --git a/node/nodeutil/provider.go b/node/nodeutil/provider.go index 0f6a72f03..6cd707b70 100644 --- a/node/nodeutil/provider.go +++ b/node/nodeutil/provider.go @@ -4,6 +4,7 @@ import ( "context" "io" + dto "github.com/prometheus/client_model/go" "github.com/virtual-kubelet/virtual-kubelet/node" "github.com/virtual-kubelet/virtual-kubelet/node/api" "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" @@ -29,6 +30,9 @@ type Provider interface { // GetStatsSummary gets the stats for the node, including running pods GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) + + // GetMetricsResource gets the metrics for the node, including running pods + GetMetricsResource(context.Context) ([]*dto.MetricFamily, error) } // ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself. @@ -61,6 +65,7 @@ func AttachProviderRoutes(mux api.ServeMux) NodeOpt { return pods.List(labels.Everything()) }, GetStatsSummary: p.GetStatsSummary, + GetMetricsResource: p.GetMetricsResource, StreamIdleTimeout: cfg.StreamIdleTimeout, StreamCreationTimeout: cfg.StreamCreationTimeout, }, true)) diff --git a/test/e2e/basic.go b/test/e2e/basic.go index 445fe3ec4..72d811b05 100644 --- a/test/e2e/basic.go +++ b/test/e2e/basic.go @@ -111,6 +111,64 @@ func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) { } } +// TestGetMetricsResource creates a pod having two containers and queries the /metrics/resource endpoint of the virtual-kubelet. +// It expects this endpoint to return stats for the current node, as well as for the aforementioned pod and each of its two containers. +func (ts *EndToEndTestSuite) TestGetMetricsResource(t *testing.T) { + ctx := context.Background() + + // Create a pod with prefix "nginx-" having three containers. + pod, err := f.CreatePod(ctx, f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo", "bar", "baz")) + if err != nil { + t.Fatal(err) + } + // Delete the "nginx-0-X" pod after the test finishes. + defer func() { + if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) { + t.Error(err) + } + }() + + // Wait for the "nginx-" pod to be reported as running and ready. + if _, err := f.WaitUntilPodReady(pod.Namespace, pod.Name); err != nil { + t.Fatal(err) + } + + // Grab the stats from the provider. + metricsResourceResponse, err := f.GetMetricsResource(ctx) + if err != nil { + t.Fatal(err) + } + + // Make sure the "nginx-" pod exists in the metrics returned. + currentContainerStatsCount := 0 + found := false + for _, metricFamily := range metricsResourceResponse { + if *metricFamily.Name == "pod_cpu_usage_seconds_total" { + for _, metric := range metricFamily.Metric { + if *metric.Label[1].Value == pod.Name { + found = true + } + } + } + if *metricFamily.Name == "container_cpu_usage_seconds_total" { + for _, metric := range metricFamily.Metric { + if *metric.Label[1].Value == pod.Name { + currentContainerStatsCount += 1 + } + } + } + } + if !found { + t.Fatalf("Pod %s not found in metrics", pod.Name) + } + + // Make sure that we've got stats for all the containers in the "nginx-" pod. + desiredContainerStatsCount := len(pod.Spec.Containers) + if currentContainerStatsCount != desiredContainerStatsCount { + t.Fatalf("expected stats for %d containers, got stats for %d containers", desiredContainerStatsCount, currentContainerStatsCount) + } +} + // TestPodLifecycleGracefulDelete creates a pod and verifies that the provider has been asked to create it. // Then, it deletes the pods and verifies that the provider has been asked to delete it. // These verifications are made using the /stats/summary endpoint of the virtual-kubelet, by checking for the presence or absence of the pods.