package azure import ( "context" "strings" "time" "github.com/cpuguy83/strongerrors/status/ocstatus" "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci" "go.opencensus.io/trace" "golang.org/x/sync/errgroup" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) // GetStatsSummary returns the stats summary for pods running on ACI func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summary, err error) { ctx, span := trace.StartSpan(ctx, "GetSummaryStats") defer span.End() addAzureAttributes(span, p) p.metricsSync.Lock() defer p.metricsSync.Unlock() span.Annotate(nil, "acquired metrics mutex") if time.Now().Sub(p.metricsSyncTime) < time.Minute { span.AddAttributes(trace.BoolAttribute("preCachedResult", true), trace.StringAttribute("cachedResultSampleTime", p.metricsSyncTime.String())) return p.lastMetric, nil } span.AddAttributes(trace.BoolAttribute("preCachedResult", false), trace.StringAttribute("cachedResultSampleTime", p.metricsSyncTime.String())) select { case <-ctx.Done(): return nil, ctx.Err() default: } defer func() { if err != nil { return } p.lastMetric = summary p.metricsSyncTime = time.Now() }() pods := p.resourceManager.GetPods() var errGroup errgroup.Group chResult := make(chan stats.PodStats, len(pods)) end := time.Now() start := end.Add(-1 * time.Minute) sema := make(chan struct{}, 10) for _, pod := range pods { if pod.Status.Phase != v1.PodRunning { continue } pod := pod errGroup.Go(func() error { ctx, span := trace.StartSpan(ctx, "getPodMetrics") defer span.End() span.AddAttributes( trace.StringAttribute("UID", string(pod.UID)), trace.StringAttribute("Name", pod.Name), trace.StringAttribute("Namespace", pod.Namespace), ) select { case <-ctx.Done(): return ctx.Err() case sema <- struct{}{}: } defer func() { <-sema }() span.Annotate(nil, "Acquired semaphore") cgName := containerGroupName(pod) // cpu/mem and net stats are split because net stats do not support container level detail systemStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{ Dimension: "containerName eq '*'", Start: start, End: end, Aggregations: []aci.AggregationType{aci.AggregationTypeAverage}, Types: []aci.MetricType{aci.MetricTypeCPUUsage, aci.MetricTypeMemoryUsage}, }) if err != nil { span.SetStatus(ocstatus.FromError(err)) return errors.Wrapf(err, "error fetching cpu/mem stats for container group %s", cgName) } span.Annotate(nil, "Got system stats") netStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{ Start: start, End: end, Aggregations: []aci.AggregationType{aci.AggregationTypeAverage}, Types: []aci.MetricType{aci.MetricTyperNetworkBytesRecievedPerSecond, aci.MetricTyperNetworkBytesTransmittedPerSecond}, }) if err != nil { span.SetStatus(ocstatus.FromError(err)) return errors.Wrapf(err, "error fetching network stats for container group %s", cgName) } span.Annotate(nil, "Got network stats") chResult <- collectMetrics(pod, systemStats, netStats) return nil }) } if err := errGroup.Wait(); err != nil { return nil, errors.Wrap(err, "error in request to fetch container group metrics") } close(chResult) span.Annotate([]trace.Attribute{trace.Int64Attribute("nPods", int64(len(pods)))}, "Collected stats from Azure") var s stats.Summary s.Node = stats.NodeStats{ NodeName: p.nodeName, } s.Pods = make([]stats.PodStats, 0, len(chResult)) for stat := range chResult { s.Pods = append(s.Pods, stat) } return &s, nil } func collectMetrics(pod *v1.Pod, system, net *aci.ContainerGroupMetricsResult) stats.PodStats { var stat stats.PodStats containerStats := make(map[string]*stats.ContainerStats, len(pod.Status.ContainerStatuses)) stat.StartTime = pod.CreationTimestamp for _, m := range system.Value { // cpu/mem stats are per container, so each entry in the time series is for a container, not the container group. for _, entry := range m.Timeseries { if len(entry.Data) == 0 { continue } var cs *stats.ContainerStats for _, v := range entry.MetadataValues { if strings.ToLower(v.Name.Value) != "containername" { continue } if cs = containerStats[v.Value]; cs == nil { cs = &stats.ContainerStats{Name: v.Value, StartTime: stat.StartTime} containerStats[v.Value] = cs } } if cs == nil { continue } if stat.Containers == nil { stat.Containers = make([]stats.ContainerStats, 0, len(containerStats)) } data := entry.Data[len(entry.Data)-1] // get only the last entry switch m.Desc.Value { case aci.MetricTypeCPUUsage: if cs.CPU == nil { cs.CPU = &stats.CPUStats{} } // average is the average number of millicores over a 1 minute interval (which is the interval we are pulling the stats for) nanoCores := uint64(data.Average * 1000000) usageNanoSeconds := nanoCores * 60 cs.CPU.Time = metav1.NewTime(data.Timestamp) cs.CPU.UsageCoreNanoSeconds = &usageNanoSeconds cs.CPU.UsageNanoCores = &nanoCores if stat.CPU == nil { var zero uint64 stat.CPU = &stats.CPUStats{UsageNanoCores: &zero, UsageCoreNanoSeconds: &zero, Time: metav1.NewTime(data.Timestamp)} } podCPUSec := *stat.CPU.UsageCoreNanoSeconds podCPUSec += usageNanoSeconds stat.CPU.UsageCoreNanoSeconds = &podCPUSec podCPUCore := *stat.CPU.UsageNanoCores podCPUCore += nanoCores stat.CPU.UsageNanoCores = &podCPUCore case aci.MetricTypeMemoryUsage: if cs.Memory == nil { cs.Memory = &stats.MemoryStats{} } cs.Memory.Time = metav1.NewTime(data.Timestamp) bytes := uint64(data.Average) cs.Memory.UsageBytes = &bytes cs.Memory.WorkingSetBytes = &bytes if stat.Memory == nil { var zero uint64 stat.Memory = &stats.MemoryStats{UsageBytes: &zero, WorkingSetBytes: &zero, Time: metav1.NewTime(data.Timestamp)} } podMem := *stat.Memory.UsageBytes podMem += bytes stat.Memory.UsageBytes = &podMem stat.Memory.WorkingSetBytes = &podMem } } } for _, m := range net.Value { if stat.Network == nil { stat.Network = &stats.NetworkStats{} } // network stats are for the whole container group, so there should only be one entry here. if len(m.Timeseries) == 0 { continue } entry := m.Timeseries[0] if len(entry.Data) == 0 { continue } data := entry.Data[len(entry.Data)-1] // get only the last entry bytes := uint64(data.Average) switch m.Desc.Value { case aci.MetricTyperNetworkBytesRecievedPerSecond: stat.Network.RxBytes = &bytes case aci.MetricTyperNetworkBytesTransmittedPerSecond: stat.Network.TxBytes = &bytes } stat.Network.Time = metav1.NewTime(data.Timestamp) stat.Network.InterfaceStats.Name = "eth0" } for _, cs := range containerStats { stat.Containers = append(stat.Containers, *cs) } stat.PodRef = stats.PodReference{ Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID), } return stat }