Merge pull request #306 from cpuguy83/metrics

Add support fort kubelet stats summary
This commit is contained in:
Brian Goff
2018-08-20 11:37:50 -07:00
committed by GitHub
17 changed files with 1112 additions and 6 deletions

14
Gopkg.lock generated
View File

@@ -1026,6 +1026,14 @@
pruneopts = "NUT"
revision = "f73e4c9ed3b7ebdd5f699a16a880c2b1994e50dd"
[[projects]]
branch = "master"
digest = "1:39ebcc2b11457b703ae9ee2e8cca0f68df21969c6102cb3b705f76cca0ea0239"
name = "golang.org/x/sync"
packages = ["errgroup"]
pruneopts = "NUT"
revision = "1d60e4601c6fd243af51cc01ddf169918a5407ca"
[[projects]]
branch = "master"
digest = "1:ee40f9506736b3b55162b22d82911b797301904848b1f1ae5db49f561ad48a79"
@@ -1317,11 +1325,12 @@
version = "v7.0.0"
[[projects]]
digest = "1:3f23907ae824a94920ce9bc22310631cb2be91787bd7de16de84bbb9af92cad4"
digest = "1:58c71c78f19e62f5f3ba1f37d21670481de1f46c4a2d4d4ee31fe70b03b57ab3"
name = "k8s.io/kubernetes"
packages = [
"pkg/apis/core",
"pkg/kubelet/apis/cri/runtime/v1alpha2",
"pkg/kubelet/apis/stats/v1alpha1",
"pkg/kubelet/server/remotecommand",
]
pruneopts = "NUT"
@@ -1381,6 +1390,7 @@
"github.com/kr/pretty",
"github.com/lawrencegripper/pod2docker",
"github.com/mitchellh/go-homedir",
"github.com/pkg/errors",
"github.com/spf13/cobra",
"github.com/spf13/viper",
"github.com/stretchr/testify/assert",
@@ -1405,6 +1415,7 @@
"github.com/vmware/vic/pkg/trace",
"github.com/vmware/vic/pkg/vsphere/sys",
"golang.org/x/net/context",
"golang.org/x/sync/errgroup",
"google.golang.org/grpc",
"gopkg.in/yaml.v2",
"k8s.io/api/core/v1",
@@ -1422,6 +1433,7 @@
"k8s.io/client-go/tools/clientcmd",
"k8s.io/client-go/tools/remotecommand",
"k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2",
"k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1",
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand",
]
solver-name = "gps-cdcl"

View File

@@ -40,6 +40,7 @@ var providerConfig string
var taintKey string
var disableTaint bool
var logLevel string
var metricsAddr string
// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
@@ -49,7 +50,7 @@ var RootCmd = &cobra.Command{
backend implementation allowing users to create kubernetes nodes without running the kubelet.
This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`,
Run: func(cmd *cobra.Command, args []string) {
f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint)
f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint, metricsAddr)
if err != nil {
log.L.WithError(err).Fatal("Error initializing vritual kubelet")
}
@@ -87,6 +88,8 @@ func init() {
RootCmd.PersistentFlags().StringVar(&provider, "provider", "", "cloud provider")
RootCmd.PersistentFlags().BoolVar(&disableTaint, "disable-taint", false, "disable the virtual-kubelet node taint")
RootCmd.PersistentFlags().StringVar(&providerConfig, "provider-config", "", "cloud provider configuration file")
RootCmd.PersistentFlags().StringVar(&metricsAddr, "metrics-addr", ":10255", "address to listen for metrics/stats requests")
RootCmd.PersistentFlags().StringVar(&taintKey, "taint", "", "Set node taint key")
RootCmd.PersistentFlags().MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable")
RootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", `set the log level, e.g. "trace", debug", "info", "warn", "error"`)

View File

@@ -12,6 +12,7 @@ import (
"os"
"reflect"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
@@ -26,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// The service account secret mount path.
@@ -47,6 +49,10 @@ type ACIProvider struct {
internalIP string
daemonEndpointPort int32
diagnostics *aci.ContainerGroupDiagnostics
metricsSync sync.Mutex
metricsSyncTime time.Time
lastMetric *stats.Summary
}
// AuthConfig is the secret returned from an ImageRegistryCredential
@@ -296,13 +302,17 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error {
// TODO(BJK) containergrouprestartpolicy??
_, err = p.aciClient.CreateContainerGroup(
p.resourceGroup,
fmt.Sprintf("%s-%s", pod.Namespace, pod.Name),
containerGroupName(pod),
containerGroup,
)
return err
}
func containerGroupName(pod *v1.Pod) string {
return fmt.Sprintf("%s-%s", pod.Namespace, pod.Name)
}
// UpdatePod is a noop, ACI currently does not support live updates of a pod.
func (p *ACIProvider) UpdatePod(pod *v1.Pod) error {
return nil

View File

@@ -18,6 +18,7 @@ const (
containerGroupListByResourceGroupURLPath = "subscriptions/{{.subscriptionId}}/resourceGroups/{{.resourceGroup}}/providers/Microsoft.ContainerInstance/containerGroups"
containerLogsURLPath = containerGroupURLPath + "/containers/{{.containerName}}/logs"
containerExecURLPath = containerGroupURLPath + "/containers/{{.containerName}}/exec"
containerGroupMetricsURLPath = containerGroupURLPath + "/providers/microsoft.Insights/metrics"
)
// Client is a client for interacting with Azure Container Instances.

View File

@@ -0,0 +1,97 @@
package aci
import (
"context"
"encoding/json"
"net/http"
"net/url"
"path"
"time"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api"
)
// GetContainerGroupMetrics gets metrics for the provided container group
func (c *Client) GetContainerGroupMetrics(ctx context.Context, resourceGroup, containerGroup string, options MetricsRequest) (*ContainerGroupMetricsResult, error) {
if len(options.Types) == 0 {
return nil, errors.New("must provide metrics types to fetch")
}
if options.Start.After(options.End) || options.Start.Equal(options.End) && !options.Start.IsZero() {
return nil, errors.Errorf("end parameter must be after start: start=%s, end=%s", options.Start, options.End)
}
var metricNames string
for _, t := range options.Types {
if len(metricNames) > 0 {
metricNames += ","
}
metricNames += string(t)
}
var ag string
for _, a := range options.Aggregations {
if len(ag) > 0 {
ag += ","
}
ag += string(a)
}
urlParams := url.Values{
"api-version": []string{"2018-01-01"},
"aggregation": []string{ag},
"metricnames": []string{metricNames},
"interval": []string{"PT1M"}, // TODO: make configurable?
}
if options.Dimension != "" {
urlParams.Add("$filter", options.Dimension)
}
if !options.Start.IsZero() || !options.End.IsZero() {
urlParams.Add("timespan", path.Join(options.Start.Format(time.RFC3339), options.End.Format(time.RFC3339)))
}
// Create the url.
uri := api.ResolveRelative(c.auth.ResourceManagerEndpoint, containerGroupMetricsURLPath)
uri += "?" + url.Values(urlParams).Encode()
// Create the request.
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, errors.Wrap(err, "creating get container group metrics uri request failed")
}
req = req.WithContext(ctx)
// Add the parameters to the url.
if err := api.ExpandURL(req.URL, map[string]string{
"subscriptionId": c.auth.SubscriptionID,
"resourceGroup": resourceGroup,
"containerGroupName": containerGroup,
}); err != nil {
return nil, errors.Wrap(err, "expanding URL with parameters failed")
}
// SEnd the request.
resp, err := c.hc.Do(req)
if err != nil {
return nil, errors.Wrap(err, "sending get container group metrics request failed")
}
defer resp.Body.Close()
// 200 (OK) is a success response.
if err := api.CheckResponse(resp); err != nil {
return nil, err
}
// Decode the body from the response.
if resp.Body == nil {
return nil, errors.New("container group metrics returned an empty body in the response")
}
var metrics ContainerGroupMetricsResult
if err := json.NewDecoder(resp.Body).Decode(&metrics); err != nil {
return nil, errors.Wrap(err, "decoding get container group metrics response body failed")
}
return &metrics, nil
}

View File

@@ -1,6 +1,8 @@
package aci
import (
"time"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api"
)
@@ -332,3 +334,85 @@ type LogAnalyticsWorkspace struct {
WorkspaceID string `json:"workspaceID,omitempty"`
WorkspaceKey string `json:"workspaceKey,omitempty"`
}
// ContainerGroupMetricsResult stores all the results for a container group metrics request.
type ContainerGroupMetricsResult struct {
Value []MetricValue `json:"value"`
}
// MetricValue stores metrics results
type MetricValue struct {
ID string `json:"id"`
Desc MetricDescriptor `json:"name"`
Timeseries []MetricTimeSeries `json:"timeseries"`
Type string `json:"type"`
Unit string `json:"unit"`
}
// MetricDescriptor stores the name for a given metric and the localized version of that name.
type MetricDescriptor struct {
Value MetricType `json:"value"`
LocalizedValue string `json:"localizedValue"`
}
// MetricTimeSeries is the time series for a given metric
// It contains all the metrics values and other details for the dimension the metrics are aggregated on.
type MetricTimeSeries struct {
Data []TimeSeriesEntry `json:"data"`
MetadataValues []MetricMetadataValue `json:"metadatavalues,omitempty"`
}
// MetricMetadataValue stores extra metadata about a metric
// In particular it is used to provide details about the breakdown of a metric dimension.
type MetricMetadataValue struct {
Name ValueDescriptor `json:"name"`
Value string `json:"value"`
}
// ValueDescriptor describes a generic value.
// It is used to describe metadata fields.
type ValueDescriptor struct {
Value string `json:"value"`
LocalizedValue string `json:"localizedValue"`
}
// TimeSeriesEntry is the metric data for a given timestamp/metric type
type TimeSeriesEntry struct {
Timestamp time.Time `json:"timestamp"`
Average float64 `json:"average"`
Total float64 `json:"total"`
Count float64 `json:"count"`
}
// MetricsRequest is an options struct used when getting container group metrics
type MetricsRequest struct {
Start time.Time
End time.Time
Types []MetricType
Aggregations []AggregationType
// Note that a dimension may not be available for certain metrics.
// In such cases, you will need to make separate requests.
Dimension string
}
// MetricType is an enum type for defining supported metric types.
type MetricType string
// Supported metric types
const (
MetricTypeCPUUsage MetricType = "CpuUsage"
MetricTypeMemoryUsage MetricType = "MemoryUsage"
MetricTyperNetworkBytesRecievedPerSecond MetricType = "NetworkBytesReceivedPerSecond"
MetricTyperNetworkBytesTransmittedPerSecond MetricType = "NetworkBytesTransmittedPerSecond"
)
// AggregationType is an enum type for defining supported aggregation types
type AggregationType string
// Supported metric aggregation types
const (
AggregationTypeCount AggregationType = "count"
AggregationTypeAverage AggregationType = "average"
AggregationTypeTotal AggregationType = "total"
)

219
providers/azure/metrics.go Normal file
View File

@@ -0,0 +1,219 @@
package azure
import (
"context"
"strings"
"time"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
"golang.org/x/sync/errgroup"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// GetStatsSummary returns the stats summary for pods running on ACI
func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summary, err error) {
p.metricsSync.Lock()
defer p.metricsSync.Unlock()
if time.Now().Sub(p.metricsSyncTime) < time.Minute {
return p.lastMetric, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
defer func() {
if err != nil {
return
}
p.lastMetric = summary
p.metricsSyncTime = time.Now()
}()
pods := p.resourceManager.GetPods()
var errGroup errgroup.Group
chResult := make(chan stats.PodStats, len(pods))
end := time.Now()
start := end.Add(-1 * time.Minute)
sema := make(chan struct{}, 10)
for _, pod := range pods {
if pod.Status.Phase != v1.PodRunning {
continue
}
errGroup.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case sema <- struct{}{}:
}
defer func() {
<-sema
}()
cgName := containerGroupName(pod)
// cpu/mem and net stats are split because net stats do not support container level detail
systemStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{
Dimension: "containerName eq '*'",
Start: start,
End: end,
Aggregations: []aci.AggregationType{aci.AggregationTypeAverage},
Types: []aci.MetricType{aci.MetricTypeCPUUsage, aci.MetricTypeMemoryUsage},
})
if err != nil {
return errors.Wrapf(err, "error fetching cpu/mem stats for container group %s", cgName)
}
netStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{
Start: start,
End: end,
Aggregations: []aci.AggregationType{aci.AggregationTypeAverage},
Types: []aci.MetricType{aci.MetricTyperNetworkBytesRecievedPerSecond, aci.MetricTyperNetworkBytesTransmittedPerSecond},
})
if err != nil {
return errors.Wrapf(err, "error fetching network stats for container group %s", cgName)
}
chResult <- collectMetrics(pod, systemStats, netStats)
return nil
})
}
if err := errGroup.Wait(); err != nil {
return nil, errors.Wrap(err, "error in request to fetch container group metrics")
}
close(chResult)
var s stats.Summary
s.Node = stats.NodeStats{
NodeName: p.nodeName,
}
s.Pods = make([]stats.PodStats, 0, len(chResult))
for stat := range chResult {
s.Pods = append(s.Pods, stat)
}
return &s, nil
}
func collectMetrics(pod *v1.Pod, system, net *aci.ContainerGroupMetricsResult) stats.PodStats {
var stat stats.PodStats
containerStats := make(map[string]*stats.ContainerStats, len(pod.Status.ContainerStatuses))
stat.StartTime = pod.CreationTimestamp
for _, m := range system.Value {
// cpu/mem stats are per container, so each entry in the time series is for a container, not the container group.
for _, entry := range m.Timeseries {
if len(entry.Data) == 0 {
continue
}
var cs *stats.ContainerStats
for _, v := range entry.MetadataValues {
if strings.ToLower(v.Name.Value) != "containername" {
continue
}
if cs = containerStats[v.Value]; cs == nil {
cs = &stats.ContainerStats{Name: v.Value, StartTime: stat.StartTime}
containerStats[v.Value] = cs
}
}
if cs == nil {
continue
}
if stat.Containers == nil {
stat.Containers = make([]stats.ContainerStats, 0, len(containerStats))
}
data := entry.Data[len(entry.Data)-1] // get only the last entry
switch m.Desc.Value {
case aci.MetricTypeCPUUsage:
if cs.CPU == nil {
cs.CPU = &stats.CPUStats{}
}
// average is the average number of millicores over a 1 minute interval (which is the interval we are pulling the stats for)
nanoCores := uint64(data.Average * 1000000)
usageNanoSeconds := nanoCores * 60
cs.CPU.Time = metav1.NewTime(data.Timestamp)
cs.CPU.UsageCoreNanoSeconds = &usageNanoSeconds
cs.CPU.UsageNanoCores = &nanoCores
if stat.CPU == nil {
var zero uint64
stat.CPU = &stats.CPUStats{UsageNanoCores: &zero, UsageCoreNanoSeconds: &zero, Time: metav1.NewTime(data.Timestamp)}
}
podCPUSec := *stat.CPU.UsageCoreNanoSeconds
podCPUSec += usageNanoSeconds
stat.CPU.UsageCoreNanoSeconds = &podCPUSec
podCPUCore := *stat.CPU.UsageNanoCores
podCPUCore += nanoCores
stat.CPU.UsageNanoCores = &podCPUCore
case aci.MetricTypeMemoryUsage:
if cs.Memory == nil {
cs.Memory = &stats.MemoryStats{}
}
cs.Memory.Time = metav1.NewTime(data.Timestamp)
bytes := uint64(data.Average)
cs.Memory.UsageBytes = &bytes
cs.Memory.WorkingSetBytes = &bytes
if stat.Memory == nil {
var zero uint64
stat.Memory = &stats.MemoryStats{UsageBytes: &zero, WorkingSetBytes: &zero, Time: metav1.NewTime(data.Timestamp)}
}
podMem := *stat.Memory.UsageBytes
podMem += bytes
stat.Memory.UsageBytes = &podMem
stat.Memory.WorkingSetBytes = &podMem
}
}
}
for _, m := range net.Value {
if stat.Network == nil {
stat.Network = &stats.NetworkStats{}
}
// network stats are for the whole container group, so there should only be one entry here.
if len(m.Timeseries) == 0 {
continue
}
entry := m.Timeseries[0]
if len(entry.Data) == 0 {
continue
}
data := entry.Data[len(entry.Data)-1] // get only the last entry
bytes := uint64(data.Average)
switch m.Desc.Value {
case aci.MetricTyperNetworkBytesRecievedPerSecond:
stat.Network.RxBytes = &bytes
case aci.MetricTyperNetworkBytesTransmittedPerSecond:
stat.Network.TxBytes = &bytes
}
stat.Network.Time = metav1.NewTime(data.Timestamp)
stat.Network.InterfaceStats.Name = "eth0"
}
for _, cs := range containerStats {
stat.Containers = append(stat.Containers, *cs)
}
stat.PodRef = stats.PodReference{
Name: pod.Name,
Namespace: pod.Namespace,
UID: string(pod.UID),
}
return stat
}

View File

@@ -0,0 +1,163 @@
package azure
import (
"path"
"reflect"
"strconv"
"testing"
"time"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
func TestCollectMetrics(t *testing.T) {
cases := []metricTestCase{
{desc: "no containers"}, // this is just for sort of fuzzing things, make sure there's no panics
{desc: "zeroed stats", stats: [][2]float64{{0, 0}}, rx: 0, tx: 0, collected: time.Now()},
{desc: "normal", stats: [][2]float64{{400.0, 1000.0}}, rx: 100.0, tx: 5000.0, collected: time.Now()},
{desc: "multiple containers", stats: [][2]float64{{100.0, 250.0}, {400.0, 1000.0}}, rx: 100.0, tx: 439833.0, collected: time.Now()},
}
for _, test := range cases {
t.Run(test.desc, func(t *testing.T) {
pod := fakePod(t, len(test.stats), time.Now())
expected := podStatFromTestCase(t, pod, test)
system, net := fakeACIMetrics(pod, test)
actual := collectMetrics(pod, system, net)
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("got unexpected results\nexpected:\n%+v\nactual:\n%+v", expected, actual)
}
})
}
}
type metricTestCase struct {
desc string
stats [][2]float64
rx, tx float64
collected time.Time
}
func fakeACIMetrics(pod *v1.Pod, testCase metricTestCase) (*aci.ContainerGroupMetricsResult, *aci.ContainerGroupMetricsResult) {
newMetricValue := func(mt aci.MetricType) aci.MetricValue {
return aci.MetricValue{
Desc: aci.MetricDescriptor{
Value: mt,
},
}
}
newNetMetric := func(collected time.Time, value float64) aci.MetricTimeSeries {
return aci.MetricTimeSeries{
Data: []aci.TimeSeriesEntry{
{Timestamp: collected, Average: value},
},
}
}
newSystemMetric := func(c v1.ContainerStatus, collected time.Time, value float64) aci.MetricTimeSeries {
return aci.MetricTimeSeries{
Data: []aci.TimeSeriesEntry{
{Timestamp: collected, Average: value},
},
MetadataValues: []aci.MetricMetadataValue{
{Name: aci.ValueDescriptor{Value: "containerName"}, Value: c.Name},
},
}
}
// create fake aci metrics for the container group and test data
cpuV := newMetricValue(aci.MetricTypeCPUUsage)
memV := newMetricValue(aci.MetricTypeMemoryUsage)
for i, c := range pod.Status.ContainerStatuses {
cpuV.Timeseries = append(cpuV.Timeseries, newSystemMetric(c, testCase.collected, testCase.stats[i][0]))
memV.Timeseries = append(memV.Timeseries, newSystemMetric(c, testCase.collected, testCase.stats[i][1]))
}
system := &aci.ContainerGroupMetricsResult{
Value: []aci.MetricValue{cpuV, memV},
}
rxV := newMetricValue(aci.MetricTyperNetworkBytesRecievedPerSecond)
txV := newMetricValue(aci.MetricTyperNetworkBytesTransmittedPerSecond)
rxV.Timeseries = append(rxV.Timeseries, newNetMetric(testCase.collected, testCase.rx))
txV.Timeseries = append(txV.Timeseries, newNetMetric(testCase.collected, testCase.tx))
net := &aci.ContainerGroupMetricsResult{
Value: []aci.MetricValue{rxV, txV},
}
return system, net
}
func fakePod(t *testing.T, size int, created time.Time) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: path.Base(t.Name()),
Namespace: path.Dir(t.Name()),
UID: types.UID(t.Name()),
CreationTimestamp: metav1.NewTime(created),
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: make([]v1.ContainerStatus, 0, size),
},
}
for i := 0; i < size; i++ {
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
Name: "c" + strconv.Itoa(i),
})
}
return pod
}
func podStatFromTestCase(t *testing.T, pod *v1.Pod, test metricTestCase) stats.PodStats {
rx := uint64(test.rx)
tx := uint64(test.tx)
expected := stats.PodStats{
StartTime: pod.CreationTimestamp,
PodRef: stats.PodReference{
Name: pod.Name,
Namespace: pod.Namespace,
UID: string(pod.UID),
},
Network: &stats.NetworkStats{
Time: metav1.NewTime(test.collected),
InterfaceStats: stats.InterfaceStats{
Name: "eth0",
RxBytes: &rx,
TxBytes: &tx,
},
},
}
var (
nodeCPU uint64
nodeMem uint64
)
for i := range test.stats {
cpu := uint64(test.stats[i][0] * 1000000)
cpuNanoSeconds := cpu * 60
mem := uint64(test.stats[i][1])
expected.Containers = append(expected.Containers, stats.ContainerStats{
StartTime: pod.CreationTimestamp,
Name: pod.Status.ContainerStatuses[i].Name,
CPU: &stats.CPUStats{Time: metav1.NewTime(test.collected), UsageNanoCores: &cpu, UsageCoreNanoSeconds: &cpuNanoSeconds},
Memory: &stats.MemoryStats{Time: metav1.NewTime(test.collected), UsageBytes: &mem, WorkingSetBytes: &mem},
})
nodeCPU += cpu
nodeMem += mem
}
if len(expected.Containers) > 0 {
nanoCPUSeconds := nodeCPU * 60
expected.CPU = &stats.CPUStats{UsageNanoCores: &nodeCPU, UsageCoreNanoSeconds: &nanoCPUSeconds, Time: metav1.NewTime(test.collected)}
expected.Memory = &stats.MemoryStats{UsageBytes: &nodeMem, WorkingSetBytes: &nodeMem, Time: metav1.NewTime(test.collected)}
}
return expected
}

3
vendor/golang.org/x/sync/AUTHORS generated vendored Normal file
View File

@@ -0,0 +1,3 @@
# This source code refers to The Go Authors for copyright purposes.
# The master list of authors is in the main Go distribution,
# visible at http://tip.golang.org/AUTHORS.

3
vendor/golang.org/x/sync/CONTRIBUTORS generated vendored Normal file
View File

@@ -0,0 +1,3 @@
# This source code was written by the Go contributors.
# The master list of contributors is in the main Go distribution,
# visible at http://tip.golang.org/CONTRIBUTORS.

27
vendor/golang.org/x/sync/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,27 @@
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

22
vendor/golang.org/x/sync/PATENTS generated vendored Normal file
View File

@@ -0,0 +1,22 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.

67
vendor/golang.org/x/sync/errgroup/errgroup.go generated vendored Normal file
View File

@@ -0,0 +1,67 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
package errgroup
import (
"sync"
"golang.org/x/net/context"
)
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}

View File

@@ -0,0 +1,335 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Summary is a top-level container for holding NodeStats and PodStats.
type Summary struct {
// Overall node stats.
Node NodeStats `json:"node"`
// Per-pod stats.
Pods []PodStats `json:"pods"`
}
// NodeStats holds node-level unprocessed sample stats.
type NodeStats struct {
// Reference to the measured Node.
NodeName string `json:"nodeName"`
// Stats of system daemons tracked as raw containers.
// The system containers are named according to the SystemContainer* constants.
// +optional
// +patchMergeKey=name
// +patchStrategy=merge
SystemContainers []ContainerStats `json:"systemContainers,omitempty" patchStrategy:"merge" patchMergeKey:"name"`
// The time at which data collection for the node-scoped (i.e. aggregate) stats was (re)started.
StartTime metav1.Time `json:"startTime"`
// Stats pertaining to CPU resources.
// +optional
CPU *CPUStats `json:"cpu,omitempty"`
// Stats pertaining to memory (RAM) resources.
// +optional
Memory *MemoryStats `json:"memory,omitempty"`
// Stats pertaining to network resources.
// +optional
Network *NetworkStats `json:"network,omitempty"`
// Stats pertaining to total usage of filesystem resources on the rootfs used by node k8s components.
// NodeFs.Used is the total bytes used on the filesystem.
// +optional
Fs *FsStats `json:"fs,omitempty"`
// Stats about the underlying container runtime.
// +optional
Runtime *RuntimeStats `json:"runtime,omitempty"`
// Stats about the rlimit of system.
// +optional
Rlimit *RlimitStats `json:"rlimit,omitempty"`
}
// RlimitStats are stats rlimit of OS.
type RlimitStats struct {
Time metav1.Time `json:"time"`
// The max PID of OS.
MaxPID *int64 `json:"maxpid,omitempty"`
// The number of running process in the OS.
NumOfRunningProcesses *int64 `json:"curproc,omitempty"`
}
// RuntimeStats are stats pertaining to the underlying container runtime.
type RuntimeStats struct {
// Stats about the underlying filesystem where container images are stored.
// This filesystem could be the same as the primary (root) filesystem.
// Usage here refers to the total number of bytes occupied by images on the filesystem.
// +optional
ImageFs *FsStats `json:"imageFs,omitempty"`
}
const (
// SystemContainerKubelet is the container name for the system container tracking Kubelet usage.
SystemContainerKubelet = "kubelet"
// SystemContainerRuntime is the container name for the system container tracking the runtime (e.g. docker or rkt) usage.
SystemContainerRuntime = "runtime"
// SystemContainerMisc is the container name for the system container tracking non-kubernetes processes.
SystemContainerMisc = "misc"
// SystemContainerPods is the container name for the system container tracking user pods.
SystemContainerPods = "pods"
)
// PodStats holds pod-level unprocessed sample stats.
type PodStats struct {
// Reference to the measured Pod.
PodRef PodReference `json:"podRef"`
// The time at which data collection for the pod-scoped (e.g. network) stats was (re)started.
StartTime metav1.Time `json:"startTime"`
// Stats of containers in the measured pod.
// +patchMergeKey=name
// +patchStrategy=merge
Containers []ContainerStats `json:"containers" patchStrategy:"merge" patchMergeKey:"name"`
// Stats pertaining to CPU resources consumed by pod cgroup (which includes all containers' resource usage and pod overhead).
// +optional
CPU *CPUStats `json:"cpu,omitempty"`
// Stats pertaining to memory (RAM) resources consumed by pod cgroup (which includes all containers' resource usage and pod overhead).
// +optional
Memory *MemoryStats `json:"memory,omitempty"`
// Stats pertaining to network resources.
// +optional
Network *NetworkStats `json:"network,omitempty"`
// Stats pertaining to volume usage of filesystem resources.
// VolumeStats.UsedBytes is the number of bytes used by the Volume
// +optional
// +patchMergeKey=name
// +patchStrategy=merge
VolumeStats []VolumeStats `json:"volume,omitempty" patchStrategy:"merge" patchMergeKey:"name"`
// EphemeralStorage reports the total filesystem usage for the containers and emptyDir-backed volumes in the measured Pod.
// +optional
EphemeralStorage *FsStats `json:"ephemeral-storage,omitempty"`
}
// ContainerStats holds container-level unprocessed sample stats.
type ContainerStats struct {
// Reference to the measured container.
Name string `json:"name"`
// The time at which data collection for this container was (re)started.
StartTime metav1.Time `json:"startTime"`
// Stats pertaining to CPU resources.
// +optional
CPU *CPUStats `json:"cpu,omitempty"`
// Stats pertaining to memory (RAM) resources.
// +optional
Memory *MemoryStats `json:"memory,omitempty"`
// Metrics for Accelerators. Each Accelerator corresponds to one element in the array.
Accelerators []AcceleratorStats `json:"accelerators,omitempty"`
// Stats pertaining to container rootfs usage of filesystem resources.
// Rootfs.UsedBytes is the number of bytes used for the container write layer.
// +optional
Rootfs *FsStats `json:"rootfs,omitempty"`
// Stats pertaining to container logs usage of filesystem resources.
// Logs.UsedBytes is the number of bytes used for the container logs.
// +optional
Logs *FsStats `json:"logs,omitempty"`
// User defined metrics that are exposed by containers in the pod. Typically, we expect only one container in the pod to be exposing user defined metrics. In the event of multiple containers exposing metrics, they will be combined here.
// +patchMergeKey=name
// +patchStrategy=merge
UserDefinedMetrics []UserDefinedMetric `json:"userDefinedMetrics,omitmepty" patchStrategy:"merge" patchMergeKey:"name"`
}
// PodReference contains enough information to locate the referenced pod.
type PodReference struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
UID string `json:"uid"`
}
// InterfaceStats contains resource value data about interface.
type InterfaceStats struct {
// The name of the interface
Name string `json:"name"`
// Cumulative count of bytes received.
// +optional
RxBytes *uint64 `json:"rxBytes,omitempty"`
// Cumulative count of receive errors encountered.
// +optional
RxErrors *uint64 `json:"rxErrors,omitempty"`
// Cumulative count of bytes transmitted.
// +optional
TxBytes *uint64 `json:"txBytes,omitempty"`
// Cumulative count of transmit errors encountered.
// +optional
TxErrors *uint64 `json:"txErrors,omitempty"`
}
// NetworkStats contains data about network resources.
type NetworkStats struct {
// The time at which these stats were updated.
Time metav1.Time `json:"time"`
// Stats for the default interface, if found
InterfaceStats `json:",inline"`
Interfaces []InterfaceStats `json:"interfaces,omitempty"`
}
// CPUStats contains data about CPU usage.
type CPUStats struct {
// The time at which these stats were updated.
Time metav1.Time `json:"time"`
// Total CPU usage (sum of all cores) averaged over the sample window.
// The "core" unit can be interpreted as CPU core-nanoseconds per second.
// +optional
UsageNanoCores *uint64 `json:"usageNanoCores,omitempty"`
// Cumulative CPU usage (sum of all cores) since object creation.
// +optional
UsageCoreNanoSeconds *uint64 `json:"usageCoreNanoSeconds,omitempty"`
}
// MemoryStats contains data about memory usage.
type MemoryStats struct {
// The time at which these stats were updated.
Time metav1.Time `json:"time"`
// Available memory for use. This is defined as the memory limit - workingSetBytes.
// If memory limit is undefined, the available bytes is omitted.
// +optional
AvailableBytes *uint64 `json:"availableBytes,omitempty"`
// Total memory in use. This includes all memory regardless of when it was accessed.
// +optional
UsageBytes *uint64 `json:"usageBytes,omitempty"`
// The amount of working set memory. This includes recently accessed memory,
// dirty memory, and kernel memory. WorkingSetBytes is <= UsageBytes
// +optional
WorkingSetBytes *uint64 `json:"workingSetBytes,omitempty"`
// The amount of anonymous and swap cache memory (includes transparent
// hugepages).
// +optional
RSSBytes *uint64 `json:"rssBytes,omitempty"`
// Cumulative number of minor page faults.
// +optional
PageFaults *uint64 `json:"pageFaults,omitempty"`
// Cumulative number of major page faults.
// +optional
MajorPageFaults *uint64 `json:"majorPageFaults,omitempty"`
}
// AcceleratorStats contains stats for accelerators attached to the container.
type AcceleratorStats struct {
// Make of the accelerator (nvidia, amd, google etc.)
Make string `json:"make"`
// Model of the accelerator (tesla-p100, tesla-k80 etc.)
Model string `json:"model"`
// ID of the accelerator.
ID string `json:"id"`
// Total accelerator memory.
// unit: bytes
MemoryTotal uint64 `json:"memoryTotal"`
// Total accelerator memory allocated.
// unit: bytes
MemoryUsed uint64 `json:"memoryUsed"`
// Percent of time over the past sample period (10s) during which
// the accelerator was actively processing.
DutyCycle uint64 `json:"dutyCycle"`
}
// VolumeStats contains data about Volume filesystem usage.
type VolumeStats struct {
// Embedded FsStats
FsStats
// Name is the name given to the Volume
// +optional
Name string `json:"name,omitempty"`
// Reference to the PVC, if one exists
// +optional
PVCRef *PVCReference `json:"pvcRef,omitempty"`
}
// PVCReference contains enough information to describe the referenced PVC.
type PVCReference struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
}
// FsStats contains data about filesystem usage.
type FsStats struct {
// The time at which these stats were updated.
Time metav1.Time `json:"time"`
// AvailableBytes represents the storage space available (bytes) for the filesystem.
// +optional
AvailableBytes *uint64 `json:"availableBytes,omitempty"`
// CapacityBytes represents the total capacity (bytes) of the filesystems underlying storage.
// +optional
CapacityBytes *uint64 `json:"capacityBytes,omitempty"`
// UsedBytes represents the bytes used for a specific task on the filesystem.
// This may differ from the total bytes used on the filesystem and may not equal CapacityBytes - AvailableBytes.
// e.g. For ContainerStats.Rootfs this is the bytes used by the container rootfs on the filesystem.
// +optional
UsedBytes *uint64 `json:"usedBytes,omitempty"`
// InodesFree represents the free inodes in the filesystem.
// +optional
InodesFree *uint64 `json:"inodesFree,omitempty"`
// Inodes represents the total inodes in the filesystem.
// +optional
Inodes *uint64 `json:"inodes,omitempty"`
// InodesUsed represents the inodes used by the filesystem
// This may not equal Inodes - InodesFree because this filesystem may share inodes with other "filesystems"
// e.g. For ContainerStats.Rootfs, this is the inodes used only by that container, and does not count inodes used by other containers.
InodesUsed *uint64 `json:"inodesUsed,omitempty"`
}
// UserDefinedMetricType defines how the metric should be interpreted by the user.
type UserDefinedMetricType string
const (
// MetricGauge is an instantaneous value. May increase or decrease.
MetricGauge UserDefinedMetricType = "gauge"
// MetricCumulative is a counter-like value that is only expected to increase.
MetricCumulative UserDefinedMetricType = "cumulative"
// MetricDelta is a rate over a time period.
MetricDelta UserDefinedMetricType = "delta"
)
// UserDefinedMetricDescriptor contains metadata that describes a user defined metric.
type UserDefinedMetricDescriptor struct {
// The name of the metric.
Name string `json:"name"`
// Type of the metric.
Type UserDefinedMetricType `json:"type"`
// Display Units for the stats.
Units string `json:"units"`
// Metadata labels associated with this metric.
// +optional
Labels map[string]string `json:"labels,omitempty"`
}
// UserDefinedMetric represents a metric defined and generate by users.
type UserDefinedMetric struct {
UserDefinedMetricDescriptor `json:",inline"`
// The time at which these stats were updated.
Time metav1.Time `json:"time"`
// Value of the metric. Float64s have 53 bit precision.
// We do not foresee any metrics exceeding that value.
Value float64 `json:"value"`
}

View File

@@ -2,6 +2,7 @@ package vkubelet
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
@@ -12,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
)
@@ -34,7 +36,7 @@ func NotFound(w http.ResponseWriter, r *http.Request) {
http.Error(w, "404 request not found", http.StatusNotFound)
}
func ApiserverStart(provider Provider) {
func ApiserverStart(provider Provider, metricsAddr string) {
p = provider
certFilePath := os.Getenv("APISERVER_CERT_LOCATION")
keyFilePath := os.Getenv("APISERVER_KEY_LOCATION")
@@ -46,11 +48,62 @@ func ApiserverStart(provider Provider) {
r.HandleFunc("/exec/{namespace}/{pod}/{container}", ApiServerHandlerExec).Methods("POST")
r.NotFoundHandler = http.HandlerFunc(NotFound)
if metricsAddr != "" {
go MetricsServerStart(metricsAddr)
} else {
log.G(context.TODO()).Info("Skipping metrics server startup since no address was provided")
}
if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil {
log.G(context.TODO()).WithError(err).Error("error setting up http server")
}
}
// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API.
// TLS is never enabled on this endpoint.
func MetricsServerStart(addr string) {
r := mux.NewRouter()
r.HandleFunc("/stats/summary", MetricsSummaryHandler).Methods("GET")
r.HandleFunc("/stats/summary/", MetricsSummaryHandler).Methods("GET")
r.NotFoundHandler = http.HandlerFunc(NotFound)
if err := http.ListenAndServe(addr, r); err != nil {
log.G(context.TODO()).WithError(err).Error("Error starting http server")
}
}
// MetricsSummaryHandler is an HTTP handler for implementing the kubelet summary stats endpoint
func MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) {
ctx := loggingContext(req)
mp, ok := p.(MetricsProvider)
if !ok {
log.G(ctx).Debug("stats not implemented for provider")
http.Error(w, "not implememnted", http.StatusNotImplemented)
return
}
stats, err := mp.GetStatsSummary(req.Context())
if err != nil {
if errors.Cause(err) == context.Canceled {
return
}
log.G(ctx).Error("Error getting stats from provider:", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
b, err := json.Marshal(stats)
if err != nil {
log.G(ctx).WithError(err).Error("Could not marshal stats")
http.Error(w, "could not marshal stats: "+err.Error(), http.StatusInternalServerError)
return
}
if _, err := w.Write(b); err != nil {
log.G(ctx).WithError(err).Debug("Could not write to client")
}
}
func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
if len(vars) != 3 {

View File

@@ -1,12 +1,14 @@
package vkubelet
import (
"context"
"io"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// Provider contains the methods required to implement a virtual-kubelet provider.
@@ -54,3 +56,8 @@ type Provider interface {
// OperatingSystem returns the operating system the provider is for.
OperatingSystem() string
}
// MetricsProvider is an optional interface that providers can implement to expose pod stats
type MetricsProvider interface {
GetStatsSummary(context.Context) (*stats.Summary, error)
}

View File

@@ -48,7 +48,7 @@ func getEnv(key, defaultValue string) string {
}
// New creates a new virtual-kubelet server.
func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool) (*Server, error) {
func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool, metricsAddr string) (*Server, error) {
var config *rest.Config
// Check if the kubeConfig file exists.
@@ -137,7 +137,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
return s, err
}
go ApiserverStart(p)
go ApiserverStart(p, metricsAddr)
tick := time.Tick(5 * time.Second)