246 lines
7.2 KiB
Go
246 lines
7.2 KiB
Go
package azure
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/cpuguy83/strongerrors/status/ocstatus"
|
|
"github.com/pkg/errors"
|
|
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
|
|
"go.opencensus.io/trace"
|
|
"golang.org/x/sync/errgroup"
|
|
"k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
|
)
|
|
|
|
// GetStatsSummary returns the stats summary for pods running on ACI
|
|
func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summary, err error) {
|
|
ctx, span := trace.StartSpan(ctx, "GetSummaryStats")
|
|
defer span.End()
|
|
addAzureAttributes(span, p)
|
|
|
|
p.metricsSync.Lock()
|
|
defer p.metricsSync.Unlock()
|
|
span.Annotate(nil, "acquired metrics mutex")
|
|
|
|
if time.Now().Sub(p.metricsSyncTime) < time.Minute {
|
|
span.AddAttributes(trace.BoolAttribute("preCachedResult", true), trace.StringAttribute("cachedResultSampleTime", p.metricsSyncTime.String()))
|
|
return p.lastMetric, nil
|
|
}
|
|
span.AddAttributes(trace.BoolAttribute("preCachedResult", false), trace.StringAttribute("cachedResultSampleTime", p.metricsSyncTime.String()))
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
return
|
|
}
|
|
p.lastMetric = summary
|
|
p.metricsSyncTime = time.Now()
|
|
}()
|
|
|
|
pods := p.resourceManager.GetPods()
|
|
|
|
var errGroup errgroup.Group
|
|
chResult := make(chan stats.PodStats, len(pods))
|
|
|
|
end := time.Now()
|
|
start := end.Add(-1 * time.Minute)
|
|
|
|
sema := make(chan struct{}, 10)
|
|
for _, pod := range pods {
|
|
if pod.Status.Phase != v1.PodRunning {
|
|
continue
|
|
}
|
|
pod := pod
|
|
errGroup.Go(func() error {
|
|
ctx, span := trace.StartSpan(ctx, "getPodMetrics")
|
|
defer span.End()
|
|
span.AddAttributes(
|
|
trace.StringAttribute("UID", string(pod.UID)),
|
|
trace.StringAttribute("Name", pod.Name),
|
|
trace.StringAttribute("Namespace", pod.Namespace),
|
|
)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case sema <- struct{}{}:
|
|
}
|
|
defer func() {
|
|
<-sema
|
|
}()
|
|
|
|
span.Annotate(nil, "Acquired semaphore")
|
|
|
|
cgName := containerGroupName(pod)
|
|
// cpu/mem and net stats are split because net stats do not support container level detail
|
|
systemStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{
|
|
Dimension: "containerName eq '*'",
|
|
Start: start,
|
|
End: end,
|
|
Aggregations: []aci.AggregationType{aci.AggregationTypeAverage},
|
|
Types: []aci.MetricType{aci.MetricTypeCPUUsage, aci.MetricTypeMemoryUsage},
|
|
})
|
|
if err != nil {
|
|
span.SetStatus(ocstatus.FromError(err))
|
|
return errors.Wrapf(err, "error fetching cpu/mem stats for container group %s", cgName)
|
|
}
|
|
span.Annotate(nil, "Got system stats")
|
|
|
|
netStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{
|
|
Start: start,
|
|
End: end,
|
|
Aggregations: []aci.AggregationType{aci.AggregationTypeAverage},
|
|
Types: []aci.MetricType{aci.MetricTyperNetworkBytesRecievedPerSecond, aci.MetricTyperNetworkBytesTransmittedPerSecond},
|
|
})
|
|
if err != nil {
|
|
span.SetStatus(ocstatus.FromError(err))
|
|
return errors.Wrapf(err, "error fetching network stats for container group %s", cgName)
|
|
}
|
|
span.Annotate(nil, "Got network stats")
|
|
|
|
chResult <- collectMetrics(pod, systemStats, netStats)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
if err := errGroup.Wait(); err != nil {
|
|
return nil, errors.Wrap(err, "error in request to fetch container group metrics")
|
|
}
|
|
close(chResult)
|
|
span.Annotate([]trace.Attribute{trace.Int64Attribute("nPods", int64(len(pods)))}, "Collected stats from Azure")
|
|
|
|
var s stats.Summary
|
|
s.Node = stats.NodeStats{
|
|
NodeName: p.nodeName,
|
|
}
|
|
s.Pods = make([]stats.PodStats, 0, len(chResult))
|
|
|
|
for stat := range chResult {
|
|
s.Pods = append(s.Pods, stat)
|
|
}
|
|
|
|
return &s, nil
|
|
}
|
|
|
|
func collectMetrics(pod *v1.Pod, system, net *aci.ContainerGroupMetricsResult) stats.PodStats {
|
|
var stat stats.PodStats
|
|
containerStats := make(map[string]*stats.ContainerStats, len(pod.Status.ContainerStatuses))
|
|
stat.StartTime = pod.CreationTimestamp
|
|
|
|
for _, m := range system.Value {
|
|
// cpu/mem stats are per container, so each entry in the time series is for a container, not the container group.
|
|
for _, entry := range m.Timeseries {
|
|
if len(entry.Data) == 0 {
|
|
continue
|
|
}
|
|
|
|
var cs *stats.ContainerStats
|
|
for _, v := range entry.MetadataValues {
|
|
if strings.ToLower(v.Name.Value) != "containername" {
|
|
continue
|
|
}
|
|
if cs = containerStats[v.Value]; cs == nil {
|
|
cs = &stats.ContainerStats{Name: v.Value, StartTime: stat.StartTime}
|
|
containerStats[v.Value] = cs
|
|
}
|
|
}
|
|
if cs == nil {
|
|
continue
|
|
}
|
|
|
|
if stat.Containers == nil {
|
|
stat.Containers = make([]stats.ContainerStats, 0, len(containerStats))
|
|
}
|
|
|
|
data := entry.Data[len(entry.Data)-1] // get only the last entry
|
|
switch m.Desc.Value {
|
|
case aci.MetricTypeCPUUsage:
|
|
if cs.CPU == nil {
|
|
cs.CPU = &stats.CPUStats{}
|
|
}
|
|
|
|
// average is the average number of millicores over a 1 minute interval (which is the interval we are pulling the stats for)
|
|
nanoCores := uint64(data.Average * 1000000)
|
|
usageNanoSeconds := nanoCores * 60
|
|
cs.CPU.Time = metav1.NewTime(data.Timestamp)
|
|
cs.CPU.UsageCoreNanoSeconds = &usageNanoSeconds
|
|
cs.CPU.UsageNanoCores = &nanoCores
|
|
|
|
if stat.CPU == nil {
|
|
var zero uint64
|
|
stat.CPU = &stats.CPUStats{UsageNanoCores: &zero, UsageCoreNanoSeconds: &zero, Time: metav1.NewTime(data.Timestamp)}
|
|
}
|
|
podCPUSec := *stat.CPU.UsageCoreNanoSeconds
|
|
podCPUSec += usageNanoSeconds
|
|
stat.CPU.UsageCoreNanoSeconds = &podCPUSec
|
|
|
|
podCPUCore := *stat.CPU.UsageNanoCores
|
|
podCPUCore += nanoCores
|
|
stat.CPU.UsageNanoCores = &podCPUCore
|
|
case aci.MetricTypeMemoryUsage:
|
|
if cs.Memory == nil {
|
|
cs.Memory = &stats.MemoryStats{}
|
|
}
|
|
cs.Memory.Time = metav1.NewTime(data.Timestamp)
|
|
bytes := uint64(data.Average)
|
|
cs.Memory.UsageBytes = &bytes
|
|
cs.Memory.WorkingSetBytes = &bytes
|
|
|
|
if stat.Memory == nil {
|
|
var zero uint64
|
|
stat.Memory = &stats.MemoryStats{UsageBytes: &zero, WorkingSetBytes: &zero, Time: metav1.NewTime(data.Timestamp)}
|
|
}
|
|
podMem := *stat.Memory.UsageBytes
|
|
podMem += bytes
|
|
stat.Memory.UsageBytes = &podMem
|
|
stat.Memory.WorkingSetBytes = &podMem
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, m := range net.Value {
|
|
if stat.Network == nil {
|
|
stat.Network = &stats.NetworkStats{}
|
|
}
|
|
// network stats are for the whole container group, so there should only be one entry here.
|
|
if len(m.Timeseries) == 0 {
|
|
continue
|
|
}
|
|
entry := m.Timeseries[0]
|
|
if len(entry.Data) == 0 {
|
|
continue
|
|
}
|
|
data := entry.Data[len(entry.Data)-1] // get only the last entry
|
|
|
|
bytes := uint64(data.Average)
|
|
switch m.Desc.Value {
|
|
case aci.MetricTyperNetworkBytesRecievedPerSecond:
|
|
stat.Network.RxBytes = &bytes
|
|
case aci.MetricTyperNetworkBytesTransmittedPerSecond:
|
|
stat.Network.TxBytes = &bytes
|
|
}
|
|
stat.Network.Time = metav1.NewTime(data.Timestamp)
|
|
stat.Network.InterfaceStats.Name = "eth0"
|
|
}
|
|
|
|
for _, cs := range containerStats {
|
|
stat.Containers = append(stat.Containers, *cs)
|
|
}
|
|
|
|
stat.PodRef = stats.PodReference{
|
|
Name: pod.Name,
|
|
Namespace: pod.Namespace,
|
|
UID: string(pod.UID),
|
|
}
|
|
|
|
return stat
|
|
}
|