Add supports for stats in ACI provider

This adds a new, optional, interface for providers that want to provide
stats.
This commit is contained in:
Brian Goff
2018-08-02 17:06:50 -07:00
parent 6284757aa1
commit e8abca0ac9
11 changed files with 643 additions and 5 deletions

1
Gopkg.lock generated
View File

@@ -1390,6 +1390,7 @@
"github.com/kr/pretty", "github.com/kr/pretty",
"github.com/lawrencegripper/pod2docker", "github.com/lawrencegripper/pod2docker",
"github.com/mitchellh/go-homedir", "github.com/mitchellh/go-homedir",
"github.com/pkg/errors",
"github.com/spf13/cobra", "github.com/spf13/cobra",
"github.com/spf13/viper", "github.com/spf13/viper",
"github.com/stretchr/testify/assert", "github.com/stretchr/testify/assert",

View File

@@ -40,6 +40,7 @@ var providerConfig string
var taintKey string var taintKey string
var disableTaint bool var disableTaint bool
var logLevel string var logLevel string
var metricsAddr string
// RootCmd represents the base command when called without any subcommands // RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{ var RootCmd = &cobra.Command{
@@ -49,7 +50,7 @@ var RootCmd = &cobra.Command{
backend implementation allowing users to create kubernetes nodes without running the kubelet. backend implementation allowing users to create kubernetes nodes without running the kubelet.
This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint) f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint, metricsAddr)
if err != nil { if err != nil {
log.L.WithError(err).Fatal("Error initializing vritual kubelet") log.L.WithError(err).Fatal("Error initializing vritual kubelet")
} }
@@ -87,6 +88,8 @@ func init() {
RootCmd.PersistentFlags().StringVar(&provider, "provider", "", "cloud provider") RootCmd.PersistentFlags().StringVar(&provider, "provider", "", "cloud provider")
RootCmd.PersistentFlags().BoolVar(&disableTaint, "disable-taint", false, "disable the virtual-kubelet node taint") RootCmd.PersistentFlags().BoolVar(&disableTaint, "disable-taint", false, "disable the virtual-kubelet node taint")
RootCmd.PersistentFlags().StringVar(&providerConfig, "provider-config", "", "cloud provider configuration file") RootCmd.PersistentFlags().StringVar(&providerConfig, "provider-config", "", "cloud provider configuration file")
RootCmd.PersistentFlags().StringVar(&metricsAddr, "metrics-addr", ":10255", "address to listen for metrics/stats requests")
RootCmd.PersistentFlags().StringVar(&taintKey, "taint", "", "Set node taint key") RootCmd.PersistentFlags().StringVar(&taintKey, "taint", "", "Set node taint key")
RootCmd.PersistentFlags().MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable") RootCmd.PersistentFlags().MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable")
RootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", `set the log level, e.g. "trace", debug", "info", "warn", "error"`) RootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", `set the log level, e.g. "trace", debug", "info", "warn", "error"`)

View File

@@ -12,6 +12,7 @@ import (
"os" "os"
"reflect" "reflect"
"strings" "strings"
"sync"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
@@ -26,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
) )
// The service account secret mount path. // The service account secret mount path.
@@ -47,6 +49,10 @@ type ACIProvider struct {
internalIP string internalIP string
daemonEndpointPort int32 daemonEndpointPort int32
diagnostics *aci.ContainerGroupDiagnostics diagnostics *aci.ContainerGroupDiagnostics
metricsSync sync.Mutex
metricsSyncTime time.Time
lastMetric *stats.Summary
} }
// AuthConfig is the secret returned from an ImageRegistryCredential // AuthConfig is the secret returned from an ImageRegistryCredential
@@ -296,13 +302,17 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error {
// TODO(BJK) containergrouprestartpolicy?? // TODO(BJK) containergrouprestartpolicy??
_, err = p.aciClient.CreateContainerGroup( _, err = p.aciClient.CreateContainerGroup(
p.resourceGroup, p.resourceGroup,
fmt.Sprintf("%s-%s", pod.Namespace, pod.Name), containerGroupName(pod),
containerGroup, containerGroup,
) )
return err return err
} }
func containerGroupName(pod *v1.Pod) string {
return fmt.Sprintf("%s-%s", pod.Namespace, pod.Name)
}
// UpdatePod is a noop, ACI currently does not support live updates of a pod. // UpdatePod is a noop, ACI currently does not support live updates of a pod.
func (p *ACIProvider) UpdatePod(pod *v1.Pod) error { func (p *ACIProvider) UpdatePod(pod *v1.Pod) error {
return nil return nil

View File

@@ -18,6 +18,7 @@ const (
containerGroupListByResourceGroupURLPath = "subscriptions/{{.subscriptionId}}/resourceGroups/{{.resourceGroup}}/providers/Microsoft.ContainerInstance/containerGroups" containerGroupListByResourceGroupURLPath = "subscriptions/{{.subscriptionId}}/resourceGroups/{{.resourceGroup}}/providers/Microsoft.ContainerInstance/containerGroups"
containerLogsURLPath = containerGroupURLPath + "/containers/{{.containerName}}/logs" containerLogsURLPath = containerGroupURLPath + "/containers/{{.containerName}}/logs"
containerExecURLPath = containerGroupURLPath + "/containers/{{.containerName}}/exec" containerExecURLPath = containerGroupURLPath + "/containers/{{.containerName}}/exec"
containerGroupMetricsURLPath = containerGroupURLPath + "/providers/microsoft.Insights/metrics"
) )
// Client is a client for interacting with Azure Container Instances. // Client is a client for interacting with Azure Container Instances.

View File

@@ -0,0 +1,97 @@
package aci
import (
"context"
"encoding/json"
"net/http"
"net/url"
"path"
"time"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api"
)
// GetContainerGroupMetrics gets metrics for the provided container group
func (c *Client) GetContainerGroupMetrics(ctx context.Context, resourceGroup, containerGroup string, options MetricsRequest) (*ContainerGroupMetricsResult, error) {
if len(options.Types) == 0 {
return nil, errors.New("must provide metrics types to fetch")
}
if options.Start.After(options.End) || options.Start.Equal(options.End) && !options.Start.IsZero() {
return nil, errors.Errorf("end parameter must be after start: start=%s, end=%s", options.Start, options.End)
}
var metricNames string
for _, t := range options.Types {
if len(metricNames) > 0 {
metricNames += ","
}
metricNames += string(t)
}
var ag string
for _, a := range options.Aggregations {
if len(ag) > 0 {
ag += ","
}
ag += string(a)
}
urlParams := url.Values{
"api-version": []string{"2018-01-01"},
"aggregation": []string{ag},
"metricnames": []string{metricNames},
"interval": []string{"PT1M"}, // TODO: make configurable?
}
if options.Dimension != "" {
urlParams.Add("$filter", options.Dimension)
}
if !options.Start.IsZero() || !options.End.IsZero() {
urlParams.Add("timespan", path.Join(options.Start.Format(time.RFC3339), options.End.Format(time.RFC3339)))
}
// Create the url.
uri := api.ResolveRelative(c.auth.ResourceManagerEndpoint, containerGroupMetricsURLPath)
uri += "?" + url.Values(urlParams).Encode()
// Create the request.
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, errors.Wrap(err, "creating get container group metrics uri request failed")
}
req = req.WithContext(ctx)
// Add the parameters to the url.
if err := api.ExpandURL(req.URL, map[string]string{
"subscriptionId": c.auth.SubscriptionID,
"resourceGroup": resourceGroup,
"containerGroupName": containerGroup,
}); err != nil {
return nil, errors.Wrap(err, "expanding URL with parameters failed")
}
// SEnd the request.
resp, err := c.hc.Do(req)
if err != nil {
return nil, errors.Wrap(err, "sending get container group metrics request failed")
}
defer resp.Body.Close()
// 200 (OK) is a success response.
if err := api.CheckResponse(resp); err != nil {
return nil, err
}
// Decode the body from the response.
if resp.Body == nil {
return nil, errors.New("container group metrics returned an empty body in the response")
}
var metrics ContainerGroupMetricsResult
if err := json.NewDecoder(resp.Body).Decode(&metrics); err != nil {
return nil, errors.Wrap(err, "decoding get container group metrics response body failed")
}
return &metrics, nil
}

View File

@@ -1,6 +1,8 @@
package aci package aci
import ( import (
"time"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api" "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api"
) )
@@ -332,3 +334,85 @@ type LogAnalyticsWorkspace struct {
WorkspaceID string `json:"workspaceID,omitempty"` WorkspaceID string `json:"workspaceID,omitempty"`
WorkspaceKey string `json:"workspaceKey,omitempty"` WorkspaceKey string `json:"workspaceKey,omitempty"`
} }
// ContainerGroupMetricsResult stores all the results for a container group metrics request.
type ContainerGroupMetricsResult struct {
Value []MetricValue `json:"value"`
}
// MetricValue stores metrics results
type MetricValue struct {
ID string `json:"id"`
Desc MetricDescriptor `json:"name"`
Timeseries []MetricTimeSeries `json:"timeseries"`
Type string `json:"type"`
Unit string `json:"unit"`
}
// MetricDescriptor stores the name for a given metric and the localized version of that name.
type MetricDescriptor struct {
Value MetricType `json:"value"`
LocalizedValue string `json:"localizedValue"`
}
// MetricTimeSeries is the time series for a given metric
// It contains all the metrics values and other details for the dimension the metrics are aggregated on.
type MetricTimeSeries struct {
Data []TimeSeriesEntry `json:"data"`
MetadataValues []MetricMetadataValue `json:"metadatavalues,omitempty"`
}
// MetricMetadataValue stores extra metadata about a metric
// In particular it is used to provide details about the breakdown of a metric dimension.
type MetricMetadataValue struct {
Name ValueDescriptor `json:"name"`
Value string `json:"value"`
}
// ValueDescriptor describes a generic value.
// It is used to describe metadata fields.
type ValueDescriptor struct {
Value string `json:"value"`
LocalizedValue string `json:"localizedValue"`
}
// TimeSeriesEntry is the metric data for a given timestamp/metric type
type TimeSeriesEntry struct {
Timestamp time.Time `json:"timestamp"`
Average float64 `json:"average"`
Total float64 `json:"total"`
Count float64 `json:"count"`
}
// MetricsRequest is an options struct used when getting container group metrics
type MetricsRequest struct {
Start time.Time
End time.Time
Types []MetricType
Aggregations []AggregationType
// Note that a dimension may not be available for certain metrics.
// In such cases, you will need to make separate requests.
Dimension string
}
// MetricType is an enum type for defining supported metric types.
type MetricType string
// Supported metric types
const (
MetricTypeCPUUsage MetricType = "CpuUsage"
MetricTypeMemoryUsage MetricType = "MemoryUsage"
MetricTyperNetworkBytesRecievedPerSecond MetricType = "NetworkBytesReceivedPerSecond"
MetricTyperNetworkBytesTransmittedPerSecond MetricType = "NetworkBytesTransmittedPerSecond"
)
// AggregationType is an enum type for defining supported aggregation types
type AggregationType string
// Supported metric aggregation types
const (
AggregationTypeCount AggregationType = "count"
AggregationTypeAverage AggregationType = "average"
AggregationTypeTotal AggregationType = "total"
)

219
providers/azure/metrics.go Normal file
View File

@@ -0,0 +1,219 @@
package azure
import (
"context"
"strings"
"time"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
"golang.org/x/sync/errgroup"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// GetStatsSummary returns the stats summary for pods running on ACI
func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summary, err error) {
p.metricsSync.Lock()
defer p.metricsSync.Unlock()
if time.Now().Sub(p.metricsSyncTime) < time.Minute {
return p.lastMetric, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
defer func() {
if err != nil {
return
}
p.lastMetric = summary
p.metricsSyncTime = time.Now()
}()
pods := p.resourceManager.GetPods()
var errGroup errgroup.Group
chResult := make(chan stats.PodStats, len(pods))
end := time.Now()
start := end.Add(-1 * time.Minute)
sema := make(chan struct{}, 10)
for _, pod := range pods {
if pod.Status.Phase != v1.PodRunning {
continue
}
errGroup.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case sema <- struct{}{}:
}
defer func() {
<-sema
}()
cgName := containerGroupName(pod)
// cpu/mem and net stats are split because net stats do not support container level detail
systemStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{
Dimension: "containerName eq '*'",
Start: start,
End: end,
Aggregations: []aci.AggregationType{aci.AggregationTypeAverage},
Types: []aci.MetricType{aci.MetricTypeCPUUsage, aci.MetricTypeMemoryUsage},
})
if err != nil {
return errors.Wrapf(err, "error fetching cpu/mem stats for container group %s", cgName)
}
netStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{
Start: start,
End: end,
Aggregations: []aci.AggregationType{aci.AggregationTypeAverage},
Types: []aci.MetricType{aci.MetricTyperNetworkBytesRecievedPerSecond, aci.MetricTyperNetworkBytesTransmittedPerSecond},
})
if err != nil {
return errors.Wrapf(err, "error fetching network stats for container group %s", cgName)
}
chResult <- collectMetrics(pod, systemStats, netStats)
return nil
})
}
if err := errGroup.Wait(); err != nil {
return nil, errors.Wrap(err, "error in request to fetch container group metrics")
}
close(chResult)
var s stats.Summary
s.Node = stats.NodeStats{
NodeName: p.nodeName,
}
s.Pods = make([]stats.PodStats, 0, len(chResult))
for stat := range chResult {
s.Pods = append(s.Pods, stat)
}
return &s, nil
}
func collectMetrics(pod *v1.Pod, system, net *aci.ContainerGroupMetricsResult) stats.PodStats {
var stat stats.PodStats
containerStats := make(map[string]*stats.ContainerStats, len(pod.Status.ContainerStatuses))
stat.StartTime = pod.CreationTimestamp
for _, m := range system.Value {
// cpu/mem stats are per container, so each entry in the time series is for a container, not the container group.
for _, entry := range m.Timeseries {
if len(entry.Data) == 0 {
continue
}
var cs *stats.ContainerStats
for _, v := range entry.MetadataValues {
if strings.ToLower(v.Name.Value) != "containername" {
continue
}
if cs = containerStats[v.Value]; cs == nil {
cs = &stats.ContainerStats{Name: v.Value, StartTime: stat.StartTime}
containerStats[v.Value] = cs
}
}
if cs == nil {
continue
}
if stat.Containers == nil {
stat.Containers = make([]stats.ContainerStats, 0, len(containerStats))
}
data := entry.Data[len(entry.Data)-1] // get only the last entry
switch m.Desc.Value {
case aci.MetricTypeCPUUsage:
if cs.CPU == nil {
cs.CPU = &stats.CPUStats{}
}
// average is the average number of millicores over a 1 minute interval (which is the interval we are pulling the stats for)
nanoCores := uint64(data.Average * 1000000)
usageNanoSeconds := nanoCores * 60
cs.CPU.Time = metav1.NewTime(data.Timestamp)
cs.CPU.UsageCoreNanoSeconds = &usageNanoSeconds
cs.CPU.UsageNanoCores = &nanoCores
if stat.CPU == nil {
var zero uint64
stat.CPU = &stats.CPUStats{UsageNanoCores: &zero, UsageCoreNanoSeconds: &zero, Time: metav1.NewTime(data.Timestamp)}
}
podCPUSec := *stat.CPU.UsageCoreNanoSeconds
podCPUSec += usageNanoSeconds
stat.CPU.UsageCoreNanoSeconds = &podCPUSec
podCPUCore := *stat.CPU.UsageNanoCores
podCPUCore += nanoCores
stat.CPU.UsageNanoCores = &podCPUCore
case aci.MetricTypeMemoryUsage:
if cs.Memory == nil {
cs.Memory = &stats.MemoryStats{}
}
cs.Memory.Time = metav1.NewTime(data.Timestamp)
bytes := uint64(data.Average)
cs.Memory.UsageBytes = &bytes
cs.Memory.WorkingSetBytes = &bytes
if stat.Memory == nil {
var zero uint64
stat.Memory = &stats.MemoryStats{UsageBytes: &zero, WorkingSetBytes: &zero, Time: metav1.NewTime(data.Timestamp)}
}
podMem := *stat.Memory.UsageBytes
podMem += bytes
stat.Memory.UsageBytes = &podMem
stat.Memory.WorkingSetBytes = &podMem
}
}
}
for _, m := range net.Value {
if stat.Network == nil {
stat.Network = &stats.NetworkStats{}
}
// network stats are for the whole container group, so there should only be one entry here.
if len(m.Timeseries) == 0 {
continue
}
entry := m.Timeseries[0]
if len(entry.Data) == 0 {
continue
}
data := entry.Data[len(entry.Data)-1] // get only the last entry
bytes := uint64(data.Average)
switch m.Desc.Value {
case aci.MetricTyperNetworkBytesRecievedPerSecond:
stat.Network.RxBytes = &bytes
case aci.MetricTyperNetworkBytesTransmittedPerSecond:
stat.Network.TxBytes = &bytes
}
stat.Network.Time = metav1.NewTime(data.Timestamp)
stat.Network.InterfaceStats.Name = "eth0"
}
for _, cs := range containerStats {
stat.Containers = append(stat.Containers, *cs)
}
stat.PodRef = stats.PodReference{
Name: pod.Name,
Namespace: pod.Namespace,
UID: string(pod.UID),
}
return stat
}

View File

@@ -0,0 +1,163 @@
package azure
import (
"path"
"reflect"
"strconv"
"testing"
"time"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
func TestCollectMetrics(t *testing.T) {
cases := []metricTestCase{
{desc: "no containers"}, // this is just for sort of fuzzing things, make sure there's no panics
{desc: "zeroed stats", stats: [][2]float64{{0, 0}}, rx: 0, tx: 0, collected: time.Now()},
{desc: "normal", stats: [][2]float64{{400.0, 1000.0}}, rx: 100.0, tx: 5000.0, collected: time.Now()},
{desc: "multiple containers", stats: [][2]float64{{100.0, 250.0}, {400.0, 1000.0}}, rx: 100.0, tx: 439833.0, collected: time.Now()},
}
for _, test := range cases {
t.Run(test.desc, func(t *testing.T) {
pod := fakePod(t, len(test.stats), time.Now())
expected := podStatFromTestCase(t, pod, test)
system, net := fakeACIMetrics(pod, test)
actual := collectMetrics(pod, system, net)
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("got unexpected results\nexpected:\n%+v\nactual:\n%+v", expected, actual)
}
})
}
}
type metricTestCase struct {
desc string
stats [][2]float64
rx, tx float64
collected time.Time
}
func fakeACIMetrics(pod *v1.Pod, testCase metricTestCase) (*aci.ContainerGroupMetricsResult, *aci.ContainerGroupMetricsResult) {
newMetricValue := func(mt aci.MetricType) aci.MetricValue {
return aci.MetricValue{
Desc: aci.MetricDescriptor{
Value: mt,
},
}
}
newNetMetric := func(collected time.Time, value float64) aci.MetricTimeSeries {
return aci.MetricTimeSeries{
Data: []aci.TimeSeriesEntry{
{Timestamp: collected, Average: value},
},
}
}
newSystemMetric := func(c v1.ContainerStatus, collected time.Time, value float64) aci.MetricTimeSeries {
return aci.MetricTimeSeries{
Data: []aci.TimeSeriesEntry{
{Timestamp: collected, Average: value},
},
MetadataValues: []aci.MetricMetadataValue{
{Name: aci.ValueDescriptor{Value: "containerName"}, Value: c.Name},
},
}
}
// create fake aci metrics for the container group and test data
cpuV := newMetricValue(aci.MetricTypeCPUUsage)
memV := newMetricValue(aci.MetricTypeMemoryUsage)
for i, c := range pod.Status.ContainerStatuses {
cpuV.Timeseries = append(cpuV.Timeseries, newSystemMetric(c, testCase.collected, testCase.stats[i][0]))
memV.Timeseries = append(memV.Timeseries, newSystemMetric(c, testCase.collected, testCase.stats[i][1]))
}
system := &aci.ContainerGroupMetricsResult{
Value: []aci.MetricValue{cpuV, memV},
}
rxV := newMetricValue(aci.MetricTyperNetworkBytesRecievedPerSecond)
txV := newMetricValue(aci.MetricTyperNetworkBytesTransmittedPerSecond)
rxV.Timeseries = append(rxV.Timeseries, newNetMetric(testCase.collected, testCase.rx))
txV.Timeseries = append(txV.Timeseries, newNetMetric(testCase.collected, testCase.tx))
net := &aci.ContainerGroupMetricsResult{
Value: []aci.MetricValue{rxV, txV},
}
return system, net
}
func fakePod(t *testing.T, size int, created time.Time) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: path.Base(t.Name()),
Namespace: path.Dir(t.Name()),
UID: types.UID(t.Name()),
CreationTimestamp: metav1.NewTime(created),
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: make([]v1.ContainerStatus, 0, size),
},
}
for i := 0; i < size; i++ {
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
Name: "c" + strconv.Itoa(i),
})
}
return pod
}
func podStatFromTestCase(t *testing.T, pod *v1.Pod, test metricTestCase) stats.PodStats {
rx := uint64(test.rx)
tx := uint64(test.tx)
expected := stats.PodStats{
StartTime: pod.CreationTimestamp,
PodRef: stats.PodReference{
Name: pod.Name,
Namespace: pod.Namespace,
UID: string(pod.UID),
},
Network: &stats.NetworkStats{
Time: metav1.NewTime(test.collected),
InterfaceStats: stats.InterfaceStats{
Name: "eth0",
RxBytes: &rx,
TxBytes: &tx,
},
},
}
var (
nodeCPU uint64
nodeMem uint64
)
for i := range test.stats {
cpu := uint64(test.stats[i][0] * 1000000)
cpuNanoSeconds := cpu * 60
mem := uint64(test.stats[i][1])
expected.Containers = append(expected.Containers, stats.ContainerStats{
StartTime: pod.CreationTimestamp,
Name: pod.Status.ContainerStatuses[i].Name,
CPU: &stats.CPUStats{Time: metav1.NewTime(test.collected), UsageNanoCores: &cpu, UsageCoreNanoSeconds: &cpuNanoSeconds},
Memory: &stats.MemoryStats{Time: metav1.NewTime(test.collected), UsageBytes: &mem, WorkingSetBytes: &mem},
})
nodeCPU += cpu
nodeMem += mem
}
if len(expected.Containers) > 0 {
nanoCPUSeconds := nodeCPU * 60
expected.CPU = &stats.CPUStats{UsageNanoCores: &nodeCPU, UsageCoreNanoSeconds: &nanoCPUSeconds, Time: metav1.NewTime(test.collected)}
expected.Memory = &stats.MemoryStats{UsageBytes: &nodeMem, WorkingSetBytes: &nodeMem, Time: metav1.NewTime(test.collected)}
}
return expected
}

View File

@@ -2,6 +2,7 @@ package vkubelet
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@@ -12,6 +13,7 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/log"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
) )
@@ -34,7 +36,7 @@ func NotFound(w http.ResponseWriter, r *http.Request) {
http.Error(w, "404 request not found", http.StatusNotFound) http.Error(w, "404 request not found", http.StatusNotFound)
} }
func ApiserverStart(provider Provider) { func ApiserverStart(provider Provider, metricsAddr string) {
p = provider p = provider
certFilePath := os.Getenv("APISERVER_CERT_LOCATION") certFilePath := os.Getenv("APISERVER_CERT_LOCATION")
keyFilePath := os.Getenv("APISERVER_KEY_LOCATION") keyFilePath := os.Getenv("APISERVER_KEY_LOCATION")
@@ -46,11 +48,62 @@ func ApiserverStart(provider Provider) {
r.HandleFunc("/exec/{namespace}/{pod}/{container}", ApiServerHandlerExec).Methods("POST") r.HandleFunc("/exec/{namespace}/{pod}/{container}", ApiServerHandlerExec).Methods("POST")
r.NotFoundHandler = http.HandlerFunc(NotFound) r.NotFoundHandler = http.HandlerFunc(NotFound)
if metricsAddr != "" {
go MetricsServerStart(metricsAddr)
} else {
log.G(context.TODO()).Info("Skipping metrics server startup since no address was provided")
}
if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil { if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil {
log.G(context.TODO()).WithError(err).Error("error setting up http server") log.G(context.TODO()).WithError(err).Error("error setting up http server")
} }
} }
// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API.
// TLS is never enabled on this endpoint.
func MetricsServerStart(addr string) {
r := mux.NewRouter()
r.HandleFunc("/stats/summary", MetricsSummaryHandler).Methods("GET")
r.HandleFunc("/stats/summary/", MetricsSummaryHandler).Methods("GET")
r.NotFoundHandler = http.HandlerFunc(NotFound)
if err := http.ListenAndServe(addr, r); err != nil {
log.G(context.TODO()).WithError(err).Error("Error starting http server")
}
}
// MetricsSummaryHandler is an HTTP handler for implementing the kubelet summary stats endpoint
func MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) {
ctx := loggingContext(req)
mp, ok := p.(MetricsProvider)
if !ok {
log.G(ctx).Debug("stats not implemented for provider")
http.Error(w, "not implememnted", http.StatusNotImplemented)
return
}
stats, err := mp.GetStatsSummary(req.Context())
if err != nil {
if errors.Cause(err) == context.Canceled {
return
}
log.G(ctx).Error("Error getting stats from provider:", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
b, err := json.Marshal(stats)
if err != nil {
log.G(ctx).WithError(err).Error("Could not marshal stats")
http.Error(w, "could not marshal stats: "+err.Error(), http.StatusInternalServerError)
return
}
if _, err := w.Write(b); err != nil {
log.G(ctx).WithError(err).Debug("Could not write to client")
}
}
func ApiServerHandler(w http.ResponseWriter, req *http.Request) { func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req) vars := mux.Vars(req)
if len(vars) != 3 { if len(vars) != 3 {

View File

@@ -1,12 +1,14 @@
package vkubelet package vkubelet
import ( import (
"context"
"io" "io"
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
) )
// Provider contains the methods required to implement a virtual-kubelet provider. // Provider contains the methods required to implement a virtual-kubelet provider.
@@ -54,3 +56,8 @@ type Provider interface {
// OperatingSystem returns the operating system the provider is for. // OperatingSystem returns the operating system the provider is for.
OperatingSystem() string OperatingSystem() string
} }
// MetricsProvider is an optional interface that providers can implement to expose pod stats
type MetricsProvider interface {
GetStatsSummary(context.Context) (*stats.Summary, error)
}

View File

@@ -48,7 +48,7 @@ func getEnv(key, defaultValue string) string {
} }
// New creates a new virtual-kubelet server. // New creates a new virtual-kubelet server.
func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool) (*Server, error) { func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool, metricsAddr string) (*Server, error) {
var config *rest.Config var config *rest.Config
// Check if the kubeConfig file exists. // Check if the kubeConfig file exists.
@@ -137,7 +137,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
return s, err return s, err
} }
go ApiserverStart(p) go ApiserverStart(p, metricsAddr)
tick := time.Tick(5 * time.Second) tick := time.Tick(5 * time.Second)