From e8abca0ac94ec93dfc1d297632abf3c7b05a2414 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Thu, 2 Aug 2018 17:06:50 -0700 Subject: [PATCH] Add supports for stats in ACI provider This adds a new, optional, interface for providers that want to provide stats. --- Gopkg.lock | 1 + cmd/root.go | 5 +- providers/azure/aci.go | 12 +- providers/azure/client/aci/client.go | 1 + providers/azure/client/aci/metrics.go | 97 ++++++++++++ providers/azure/client/aci/types.go | 84 ++++++++++ providers/azure/metrics.go | 219 ++++++++++++++++++++++++++ providers/azure/metrics_test.go | 163 +++++++++++++++++++ vkubelet/apiserver.go | 55 ++++++- vkubelet/provider.go | 7 + vkubelet/vkubelet.go | 4 +- 11 files changed, 643 insertions(+), 5 deletions(-) create mode 100644 providers/azure/client/aci/metrics.go create mode 100644 providers/azure/metrics.go create mode 100644 providers/azure/metrics_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 384939d1d..ea31e05d3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1390,6 +1390,7 @@ "github.com/kr/pretty", "github.com/lawrencegripper/pod2docker", "github.com/mitchellh/go-homedir", + "github.com/pkg/errors", "github.com/spf13/cobra", "github.com/spf13/viper", "github.com/stretchr/testify/assert", diff --git a/cmd/root.go b/cmd/root.go index 827948545..45393d6d1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -40,6 +40,7 @@ var providerConfig string var taintKey string var disableTaint bool var logLevel string +var metricsAddr string // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ @@ -49,7 +50,7 @@ var RootCmd = &cobra.Command{ backend implementation allowing users to create kubernetes nodes without running the kubelet. This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, Run: func(cmd *cobra.Command, args []string) { - f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint) + f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint, metricsAddr) if err != nil { log.L.WithError(err).Fatal("Error initializing vritual kubelet") } @@ -87,6 +88,8 @@ func init() { RootCmd.PersistentFlags().StringVar(&provider, "provider", "", "cloud provider") RootCmd.PersistentFlags().BoolVar(&disableTaint, "disable-taint", false, "disable the virtual-kubelet node taint") RootCmd.PersistentFlags().StringVar(&providerConfig, "provider-config", "", "cloud provider configuration file") + RootCmd.PersistentFlags().StringVar(&metricsAddr, "metrics-addr", ":10255", "address to listen for metrics/stats requests") + RootCmd.PersistentFlags().StringVar(&taintKey, "taint", "", "Set node taint key") RootCmd.PersistentFlags().MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable") RootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", `set the log level, e.g. "trace", debug", "info", "warn", "error"`) diff --git a/providers/azure/aci.go b/providers/azure/aci.go index 5b812557e..5efb9e632 100644 --- a/providers/azure/aci.go +++ b/providers/azure/aci.go @@ -12,6 +12,7 @@ import ( "os" "reflect" "strings" + "sync" "time" "github.com/Sirupsen/logrus" @@ -26,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/remotecommand" + stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) // The service account secret mount path. @@ -47,6 +49,10 @@ type ACIProvider struct { internalIP string daemonEndpointPort int32 diagnostics *aci.ContainerGroupDiagnostics + + metricsSync sync.Mutex + metricsSyncTime time.Time + lastMetric *stats.Summary } // AuthConfig is the secret returned from an ImageRegistryCredential @@ -296,13 +302,17 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error { // TODO(BJK) containergrouprestartpolicy?? _, err = p.aciClient.CreateContainerGroup( p.resourceGroup, - fmt.Sprintf("%s-%s", pod.Namespace, pod.Name), + containerGroupName(pod), containerGroup, ) return err } +func containerGroupName(pod *v1.Pod) string { + return fmt.Sprintf("%s-%s", pod.Namespace, pod.Name) +} + // UpdatePod is a noop, ACI currently does not support live updates of a pod. func (p *ACIProvider) UpdatePod(pod *v1.Pod) error { return nil diff --git a/providers/azure/client/aci/client.go b/providers/azure/client/aci/client.go index 309d11adb..66aac3795 100644 --- a/providers/azure/client/aci/client.go +++ b/providers/azure/client/aci/client.go @@ -18,6 +18,7 @@ const ( containerGroupListByResourceGroupURLPath = "subscriptions/{{.subscriptionId}}/resourceGroups/{{.resourceGroup}}/providers/Microsoft.ContainerInstance/containerGroups" containerLogsURLPath = containerGroupURLPath + "/containers/{{.containerName}}/logs" containerExecURLPath = containerGroupURLPath + "/containers/{{.containerName}}/exec" + containerGroupMetricsURLPath = containerGroupURLPath + "/providers/microsoft.Insights/metrics" ) // Client is a client for interacting with Azure Container Instances. diff --git a/providers/azure/client/aci/metrics.go b/providers/azure/client/aci/metrics.go new file mode 100644 index 000000000..b7b792056 --- /dev/null +++ b/providers/azure/client/aci/metrics.go @@ -0,0 +1,97 @@ +package aci + +import ( + "context" + "encoding/json" + "net/http" + "net/url" + "path" + "time" + + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api" +) + +// GetContainerGroupMetrics gets metrics for the provided container group +func (c *Client) GetContainerGroupMetrics(ctx context.Context, resourceGroup, containerGroup string, options MetricsRequest) (*ContainerGroupMetricsResult, error) { + if len(options.Types) == 0 { + return nil, errors.New("must provide metrics types to fetch") + } + if options.Start.After(options.End) || options.Start.Equal(options.End) && !options.Start.IsZero() { + return nil, errors.Errorf("end parameter must be after start: start=%s, end=%s", options.Start, options.End) + } + + var metricNames string + for _, t := range options.Types { + if len(metricNames) > 0 { + metricNames += "," + } + metricNames += string(t) + } + + var ag string + for _, a := range options.Aggregations { + if len(ag) > 0 { + ag += "," + } + ag += string(a) + } + + urlParams := url.Values{ + "api-version": []string{"2018-01-01"}, + "aggregation": []string{ag}, + "metricnames": []string{metricNames}, + "interval": []string{"PT1M"}, // TODO: make configurable? + } + + if options.Dimension != "" { + urlParams.Add("$filter", options.Dimension) + } + + if !options.Start.IsZero() || !options.End.IsZero() { + urlParams.Add("timespan", path.Join(options.Start.Format(time.RFC3339), options.End.Format(time.RFC3339))) + } + + // Create the url. + uri := api.ResolveRelative(c.auth.ResourceManagerEndpoint, containerGroupMetricsURLPath) + uri += "?" + url.Values(urlParams).Encode() + + // Create the request. + req, err := http.NewRequest("GET", uri, nil) + if err != nil { + return nil, errors.Wrap(err, "creating get container group metrics uri request failed") + } + req = req.WithContext(ctx) + + // Add the parameters to the url. + if err := api.ExpandURL(req.URL, map[string]string{ + "subscriptionId": c.auth.SubscriptionID, + "resourceGroup": resourceGroup, + "containerGroupName": containerGroup, + }); err != nil { + return nil, errors.Wrap(err, "expanding URL with parameters failed") + } + + // SEnd the request. + resp, err := c.hc.Do(req) + if err != nil { + return nil, errors.Wrap(err, "sending get container group metrics request failed") + } + defer resp.Body.Close() + + // 200 (OK) is a success response. + if err := api.CheckResponse(resp); err != nil { + return nil, err + } + + // Decode the body from the response. + if resp.Body == nil { + return nil, errors.New("container group metrics returned an empty body in the response") + } + var metrics ContainerGroupMetricsResult + if err := json.NewDecoder(resp.Body).Decode(&metrics); err != nil { + return nil, errors.Wrap(err, "decoding get container group metrics response body failed") + } + + return &metrics, nil +} diff --git a/providers/azure/client/aci/types.go b/providers/azure/client/aci/types.go index 1407c57e7..0abca698c 100644 --- a/providers/azure/client/aci/types.go +++ b/providers/azure/client/aci/types.go @@ -1,6 +1,8 @@ package aci import ( + "time" + "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api" ) @@ -332,3 +334,85 @@ type LogAnalyticsWorkspace struct { WorkspaceID string `json:"workspaceID,omitempty"` WorkspaceKey string `json:"workspaceKey,omitempty"` } + +// ContainerGroupMetricsResult stores all the results for a container group metrics request. +type ContainerGroupMetricsResult struct { + Value []MetricValue `json:"value"` +} + +// MetricValue stores metrics results +type MetricValue struct { + ID string `json:"id"` + Desc MetricDescriptor `json:"name"` + Timeseries []MetricTimeSeries `json:"timeseries"` + Type string `json:"type"` + Unit string `json:"unit"` +} + +// MetricDescriptor stores the name for a given metric and the localized version of that name. +type MetricDescriptor struct { + Value MetricType `json:"value"` + LocalizedValue string `json:"localizedValue"` +} + +// MetricTimeSeries is the time series for a given metric +// It contains all the metrics values and other details for the dimension the metrics are aggregated on. +type MetricTimeSeries struct { + Data []TimeSeriesEntry `json:"data"` + MetadataValues []MetricMetadataValue `json:"metadatavalues,omitempty"` +} + +// MetricMetadataValue stores extra metadata about a metric +// In particular it is used to provide details about the breakdown of a metric dimension. +type MetricMetadataValue struct { + Name ValueDescriptor `json:"name"` + Value string `json:"value"` +} + +// ValueDescriptor describes a generic value. +// It is used to describe metadata fields. +type ValueDescriptor struct { + Value string `json:"value"` + LocalizedValue string `json:"localizedValue"` +} + +// TimeSeriesEntry is the metric data for a given timestamp/metric type +type TimeSeriesEntry struct { + Timestamp time.Time `json:"timestamp"` + Average float64 `json:"average"` + Total float64 `json:"total"` + Count float64 `json:"count"` +} + +// MetricsRequest is an options struct used when getting container group metrics +type MetricsRequest struct { + Start time.Time + End time.Time + Types []MetricType + Aggregations []AggregationType + + // Note that a dimension may not be available for certain metrics. + // In such cases, you will need to make separate requests. + Dimension string +} + +// MetricType is an enum type for defining supported metric types. +type MetricType string + +// Supported metric types +const ( + MetricTypeCPUUsage MetricType = "CpuUsage" + MetricTypeMemoryUsage MetricType = "MemoryUsage" + MetricTyperNetworkBytesRecievedPerSecond MetricType = "NetworkBytesReceivedPerSecond" + MetricTyperNetworkBytesTransmittedPerSecond MetricType = "NetworkBytesTransmittedPerSecond" +) + +// AggregationType is an enum type for defining supported aggregation types +type AggregationType string + +// Supported metric aggregation types +const ( + AggregationTypeCount AggregationType = "count" + AggregationTypeAverage AggregationType = "average" + AggregationTypeTotal AggregationType = "total" +) diff --git a/providers/azure/metrics.go b/providers/azure/metrics.go new file mode 100644 index 000000000..879a3063b --- /dev/null +++ b/providers/azure/metrics.go @@ -0,0 +1,219 @@ +package azure + +import ( + "context" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci" + "golang.org/x/sync/errgroup" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" +) + +// GetStatsSummary returns the stats summary for pods running on ACI +func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summary, err error) { + p.metricsSync.Lock() + defer p.metricsSync.Unlock() + + if time.Now().Sub(p.metricsSyncTime) < time.Minute { + return p.lastMetric, nil + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + defer func() { + if err != nil { + return + } + p.lastMetric = summary + p.metricsSyncTime = time.Now() + }() + + pods := p.resourceManager.GetPods() + var errGroup errgroup.Group + chResult := make(chan stats.PodStats, len(pods)) + + end := time.Now() + start := end.Add(-1 * time.Minute) + + sema := make(chan struct{}, 10) + for _, pod := range pods { + if pod.Status.Phase != v1.PodRunning { + continue + } + errGroup.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + case sema <- struct{}{}: + } + defer func() { + <-sema + }() + + cgName := containerGroupName(pod) + // cpu/mem and net stats are split because net stats do not support container level detail + systemStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{ + Dimension: "containerName eq '*'", + Start: start, + End: end, + Aggregations: []aci.AggregationType{aci.AggregationTypeAverage}, + Types: []aci.MetricType{aci.MetricTypeCPUUsage, aci.MetricTypeMemoryUsage}, + }) + if err != nil { + return errors.Wrapf(err, "error fetching cpu/mem stats for container group %s", cgName) + } + + netStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{ + Start: start, + End: end, + Aggregations: []aci.AggregationType{aci.AggregationTypeAverage}, + Types: []aci.MetricType{aci.MetricTyperNetworkBytesRecievedPerSecond, aci.MetricTyperNetworkBytesTransmittedPerSecond}, + }) + if err != nil { + return errors.Wrapf(err, "error fetching network stats for container group %s", cgName) + } + + chResult <- collectMetrics(pod, systemStats, netStats) + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, errors.Wrap(err, "error in request to fetch container group metrics") + } + close(chResult) + + var s stats.Summary + s.Node = stats.NodeStats{ + NodeName: p.nodeName, + } + s.Pods = make([]stats.PodStats, 0, len(chResult)) + + for stat := range chResult { + s.Pods = append(s.Pods, stat) + } + + return &s, nil +} + +func collectMetrics(pod *v1.Pod, system, net *aci.ContainerGroupMetricsResult) stats.PodStats { + var stat stats.PodStats + containerStats := make(map[string]*stats.ContainerStats, len(pod.Status.ContainerStatuses)) + stat.StartTime = pod.CreationTimestamp + + for _, m := range system.Value { + // cpu/mem stats are per container, so each entry in the time series is for a container, not the container group. + for _, entry := range m.Timeseries { + if len(entry.Data) == 0 { + continue + } + + var cs *stats.ContainerStats + for _, v := range entry.MetadataValues { + if strings.ToLower(v.Name.Value) != "containername" { + continue + } + if cs = containerStats[v.Value]; cs == nil { + cs = &stats.ContainerStats{Name: v.Value, StartTime: stat.StartTime} + containerStats[v.Value] = cs + } + } + if cs == nil { + continue + } + + if stat.Containers == nil { + stat.Containers = make([]stats.ContainerStats, 0, len(containerStats)) + } + + data := entry.Data[len(entry.Data)-1] // get only the last entry + switch m.Desc.Value { + case aci.MetricTypeCPUUsage: + if cs.CPU == nil { + cs.CPU = &stats.CPUStats{} + } + + // average is the average number of millicores over a 1 minute interval (which is the interval we are pulling the stats for) + nanoCores := uint64(data.Average * 1000000) + usageNanoSeconds := nanoCores * 60 + cs.CPU.Time = metav1.NewTime(data.Timestamp) + cs.CPU.UsageCoreNanoSeconds = &usageNanoSeconds + cs.CPU.UsageNanoCores = &nanoCores + + if stat.CPU == nil { + var zero uint64 + stat.CPU = &stats.CPUStats{UsageNanoCores: &zero, UsageCoreNanoSeconds: &zero, Time: metav1.NewTime(data.Timestamp)} + } + podCPUSec := *stat.CPU.UsageCoreNanoSeconds + podCPUSec += usageNanoSeconds + stat.CPU.UsageCoreNanoSeconds = &podCPUSec + + podCPUCore := *stat.CPU.UsageNanoCores + podCPUCore += nanoCores + stat.CPU.UsageNanoCores = &podCPUCore + case aci.MetricTypeMemoryUsage: + if cs.Memory == nil { + cs.Memory = &stats.MemoryStats{} + } + cs.Memory.Time = metav1.NewTime(data.Timestamp) + bytes := uint64(data.Average) + cs.Memory.UsageBytes = &bytes + cs.Memory.WorkingSetBytes = &bytes + + if stat.Memory == nil { + var zero uint64 + stat.Memory = &stats.MemoryStats{UsageBytes: &zero, WorkingSetBytes: &zero, Time: metav1.NewTime(data.Timestamp)} + } + podMem := *stat.Memory.UsageBytes + podMem += bytes + stat.Memory.UsageBytes = &podMem + stat.Memory.WorkingSetBytes = &podMem + } + } + } + + for _, m := range net.Value { + if stat.Network == nil { + stat.Network = &stats.NetworkStats{} + } + // network stats are for the whole container group, so there should only be one entry here. + if len(m.Timeseries) == 0 { + continue + } + entry := m.Timeseries[0] + if len(entry.Data) == 0 { + continue + } + data := entry.Data[len(entry.Data)-1] // get only the last entry + + bytes := uint64(data.Average) + switch m.Desc.Value { + case aci.MetricTyperNetworkBytesRecievedPerSecond: + stat.Network.RxBytes = &bytes + case aci.MetricTyperNetworkBytesTransmittedPerSecond: + stat.Network.TxBytes = &bytes + } + stat.Network.Time = metav1.NewTime(data.Timestamp) + stat.Network.InterfaceStats.Name = "eth0" + } + + for _, cs := range containerStats { + stat.Containers = append(stat.Containers, *cs) + } + + stat.PodRef = stats.PodReference{ + Name: pod.Name, + Namespace: pod.Namespace, + UID: string(pod.UID), + } + + return stat +} diff --git a/providers/azure/metrics_test.go b/providers/azure/metrics_test.go new file mode 100644 index 000000000..f64ff48ab --- /dev/null +++ b/providers/azure/metrics_test.go @@ -0,0 +1,163 @@ +package azure + +import ( + "path" + "reflect" + "strconv" + "testing" + "time" + + "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" +) + +func TestCollectMetrics(t *testing.T) { + cases := []metricTestCase{ + {desc: "no containers"}, // this is just for sort of fuzzing things, make sure there's no panics + {desc: "zeroed stats", stats: [][2]float64{{0, 0}}, rx: 0, tx: 0, collected: time.Now()}, + {desc: "normal", stats: [][2]float64{{400.0, 1000.0}}, rx: 100.0, tx: 5000.0, collected: time.Now()}, + {desc: "multiple containers", stats: [][2]float64{{100.0, 250.0}, {400.0, 1000.0}}, rx: 100.0, tx: 439833.0, collected: time.Now()}, + } + + for _, test := range cases { + t.Run(test.desc, func(t *testing.T) { + pod := fakePod(t, len(test.stats), time.Now()) + expected := podStatFromTestCase(t, pod, test) + + system, net := fakeACIMetrics(pod, test) + actual := collectMetrics(pod, system, net) + + if !reflect.DeepEqual(expected, actual) { + t.Fatalf("got unexpected results\nexpected:\n%+v\nactual:\n%+v", expected, actual) + } + }) + } +} + +type metricTestCase struct { + desc string + stats [][2]float64 + rx, tx float64 + collected time.Time +} + +func fakeACIMetrics(pod *v1.Pod, testCase metricTestCase) (*aci.ContainerGroupMetricsResult, *aci.ContainerGroupMetricsResult) { + newMetricValue := func(mt aci.MetricType) aci.MetricValue { + return aci.MetricValue{ + Desc: aci.MetricDescriptor{ + Value: mt, + }, + } + } + + newNetMetric := func(collected time.Time, value float64) aci.MetricTimeSeries { + return aci.MetricTimeSeries{ + Data: []aci.TimeSeriesEntry{ + {Timestamp: collected, Average: value}, + }, + } + } + + newSystemMetric := func(c v1.ContainerStatus, collected time.Time, value float64) aci.MetricTimeSeries { + return aci.MetricTimeSeries{ + Data: []aci.TimeSeriesEntry{ + {Timestamp: collected, Average: value}, + }, + MetadataValues: []aci.MetricMetadataValue{ + {Name: aci.ValueDescriptor{Value: "containerName"}, Value: c.Name}, + }, + } + } + + // create fake aci metrics for the container group and test data + cpuV := newMetricValue(aci.MetricTypeCPUUsage) + memV := newMetricValue(aci.MetricTypeMemoryUsage) + + for i, c := range pod.Status.ContainerStatuses { + cpuV.Timeseries = append(cpuV.Timeseries, newSystemMetric(c, testCase.collected, testCase.stats[i][0])) + memV.Timeseries = append(memV.Timeseries, newSystemMetric(c, testCase.collected, testCase.stats[i][1])) + } + system := &aci.ContainerGroupMetricsResult{ + Value: []aci.MetricValue{cpuV, memV}, + } + + rxV := newMetricValue(aci.MetricTyperNetworkBytesRecievedPerSecond) + txV := newMetricValue(aci.MetricTyperNetworkBytesTransmittedPerSecond) + rxV.Timeseries = append(rxV.Timeseries, newNetMetric(testCase.collected, testCase.rx)) + txV.Timeseries = append(txV.Timeseries, newNetMetric(testCase.collected, testCase.tx)) + net := &aci.ContainerGroupMetricsResult{ + Value: []aci.MetricValue{rxV, txV}, + } + return system, net +} + +func fakePod(t *testing.T, size int, created time.Time) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: path.Base(t.Name()), + Namespace: path.Dir(t.Name()), + UID: types.UID(t.Name()), + CreationTimestamp: metav1.NewTime(created), + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + ContainerStatuses: make([]v1.ContainerStatus, 0, size), + }, + } + + for i := 0; i < size; i++ { + pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{ + Name: "c" + strconv.Itoa(i), + }) + } + return pod +} + +func podStatFromTestCase(t *testing.T, pod *v1.Pod, test metricTestCase) stats.PodStats { + rx := uint64(test.rx) + tx := uint64(test.tx) + expected := stats.PodStats{ + StartTime: pod.CreationTimestamp, + PodRef: stats.PodReference{ + Name: pod.Name, + Namespace: pod.Namespace, + UID: string(pod.UID), + }, + Network: &stats.NetworkStats{ + Time: metav1.NewTime(test.collected), + InterfaceStats: stats.InterfaceStats{ + Name: "eth0", + RxBytes: &rx, + TxBytes: &tx, + }, + }, + } + + var ( + nodeCPU uint64 + nodeMem uint64 + ) + for i := range test.stats { + cpu := uint64(test.stats[i][0] * 1000000) + cpuNanoSeconds := cpu * 60 + mem := uint64(test.stats[i][1]) + + expected.Containers = append(expected.Containers, stats.ContainerStats{ + StartTime: pod.CreationTimestamp, + Name: pod.Status.ContainerStatuses[i].Name, + CPU: &stats.CPUStats{Time: metav1.NewTime(test.collected), UsageNanoCores: &cpu, UsageCoreNanoSeconds: &cpuNanoSeconds}, + Memory: &stats.MemoryStats{Time: metav1.NewTime(test.collected), UsageBytes: &mem, WorkingSetBytes: &mem}, + }) + nodeCPU += cpu + nodeMem += mem + } + if len(expected.Containers) > 0 { + nanoCPUSeconds := nodeCPU * 60 + expected.CPU = &stats.CPUStats{UsageNanoCores: &nodeCPU, UsageCoreNanoSeconds: &nanoCPUSeconds, Time: metav1.NewTime(test.collected)} + expected.Memory = &stats.MemoryStats{UsageBytes: &nodeMem, WorkingSetBytes: &nodeMem, Time: metav1.NewTime(test.collected)} + } + return expected +} diff --git a/vkubelet/apiserver.go b/vkubelet/apiserver.go index 9612207e4..e65c18a35 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/apiserver.go @@ -2,6 +2,7 @@ package vkubelet import ( "context" + "encoding/json" "fmt" "io" "net/http" @@ -12,6 +13,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/gorilla/mux" + "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" ) @@ -34,7 +36,7 @@ func NotFound(w http.ResponseWriter, r *http.Request) { http.Error(w, "404 request not found", http.StatusNotFound) } -func ApiserverStart(provider Provider) { +func ApiserverStart(provider Provider, metricsAddr string) { p = provider certFilePath := os.Getenv("APISERVER_CERT_LOCATION") keyFilePath := os.Getenv("APISERVER_KEY_LOCATION") @@ -46,11 +48,62 @@ func ApiserverStart(provider Provider) { r.HandleFunc("/exec/{namespace}/{pod}/{container}", ApiServerHandlerExec).Methods("POST") r.NotFoundHandler = http.HandlerFunc(NotFound) + if metricsAddr != "" { + go MetricsServerStart(metricsAddr) + } else { + log.G(context.TODO()).Info("Skipping metrics server startup since no address was provided") + } + if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil { log.G(context.TODO()).WithError(err).Error("error setting up http server") } } +// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API. +// TLS is never enabled on this endpoint. +func MetricsServerStart(addr string) { + r := mux.NewRouter() + r.HandleFunc("/stats/summary", MetricsSummaryHandler).Methods("GET") + r.HandleFunc("/stats/summary/", MetricsSummaryHandler).Methods("GET") + r.NotFoundHandler = http.HandlerFunc(NotFound) + if err := http.ListenAndServe(addr, r); err != nil { + log.G(context.TODO()).WithError(err).Error("Error starting http server") + } +} + +// MetricsSummaryHandler is an HTTP handler for implementing the kubelet summary stats endpoint +func MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) { + ctx := loggingContext(req) + + mp, ok := p.(MetricsProvider) + if !ok { + log.G(ctx).Debug("stats not implemented for provider") + http.Error(w, "not implememnted", http.StatusNotImplemented) + return + } + + stats, err := mp.GetStatsSummary(req.Context()) + if err != nil { + if errors.Cause(err) == context.Canceled { + return + } + log.G(ctx).Error("Error getting stats from provider:", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + b, err := json.Marshal(stats) + if err != nil { + log.G(ctx).WithError(err).Error("Could not marshal stats") + http.Error(w, "could not marshal stats: "+err.Error(), http.StatusInternalServerError) + return + } + + if _, err := w.Write(b); err != nil { + log.G(ctx).WithError(err).Debug("Could not write to client") + } +} + func ApiServerHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) if len(vars) != 3 { diff --git a/vkubelet/provider.go b/vkubelet/provider.go index 6fab1d7a2..5f932f891 100644 --- a/vkubelet/provider.go +++ b/vkubelet/provider.go @@ -1,12 +1,14 @@ package vkubelet import ( + "context" "io" "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/remotecommand" + stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) // Provider contains the methods required to implement a virtual-kubelet provider. @@ -54,3 +56,8 @@ type Provider interface { // OperatingSystem returns the operating system the provider is for. OperatingSystem() string } + +// MetricsProvider is an optional interface that providers can implement to expose pod stats +type MetricsProvider interface { + GetStatsSummary(context.Context) (*stats.Summary, error) +} diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index e4b3cd159..543d7594c 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -48,7 +48,7 @@ func getEnv(key, defaultValue string) string { } // New creates a new virtual-kubelet server. -func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool) (*Server, error) { +func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool, metricsAddr string) (*Server, error) { var config *rest.Config // Check if the kubeConfig file exists. @@ -137,7 +137,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon return s, err } - go ApiserverStart(p) + go ApiserverStart(p, metricsAddr) tick := time.Tick(5 * time.Second)