Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
55bc0c3f75 | ||
|
|
b019ec5549 | ||
|
|
7442189e77 | ||
|
|
bb8ba567e3 | ||
|
|
8de6693460 | ||
|
|
99e0cfad5c | ||
|
|
e8abca0ac9 | ||
|
|
6284757aa1 | ||
|
|
17cc3033ba | ||
|
|
1e774a32b3 |
14
Gopkg.lock
generated
14
Gopkg.lock
generated
@@ -1026,6 +1026,14 @@
|
||||
pruneopts = "NUT"
|
||||
revision = "f73e4c9ed3b7ebdd5f699a16a880c2b1994e50dd"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:39ebcc2b11457b703ae9ee2e8cca0f68df21969c6102cb3b705f76cca0ea0239"
|
||||
name = "golang.org/x/sync"
|
||||
packages = ["errgroup"]
|
||||
pruneopts = "NUT"
|
||||
revision = "1d60e4601c6fd243af51cc01ddf169918a5407ca"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:ee40f9506736b3b55162b22d82911b797301904848b1f1ae5db49f561ad48a79"
|
||||
@@ -1317,11 +1325,12 @@
|
||||
version = "v7.0.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:3f23907ae824a94920ce9bc22310631cb2be91787bd7de16de84bbb9af92cad4"
|
||||
digest = "1:58c71c78f19e62f5f3ba1f37d21670481de1f46c4a2d4d4ee31fe70b03b57ab3"
|
||||
name = "k8s.io/kubernetes"
|
||||
packages = [
|
||||
"pkg/apis/core",
|
||||
"pkg/kubelet/apis/cri/runtime/v1alpha2",
|
||||
"pkg/kubelet/apis/stats/v1alpha1",
|
||||
"pkg/kubelet/server/remotecommand",
|
||||
]
|
||||
pruneopts = "NUT"
|
||||
@@ -1381,6 +1390,7 @@
|
||||
"github.com/kr/pretty",
|
||||
"github.com/lawrencegripper/pod2docker",
|
||||
"github.com/mitchellh/go-homedir",
|
||||
"github.com/pkg/errors",
|
||||
"github.com/spf13/cobra",
|
||||
"github.com/spf13/viper",
|
||||
"github.com/stretchr/testify/assert",
|
||||
@@ -1405,6 +1415,7 @@
|
||||
"github.com/vmware/vic/pkg/trace",
|
||||
"github.com/vmware/vic/pkg/vsphere/sys",
|
||||
"golang.org/x/net/context",
|
||||
"golang.org/x/sync/errgroup",
|
||||
"google.golang.org/grpc",
|
||||
"gopkg.in/yaml.v2",
|
||||
"k8s.io/api/core/v1",
|
||||
@@ -1422,6 +1433,7 @@
|
||||
"k8s.io/client-go/tools/clientcmd",
|
||||
"k8s.io/client-go/tools/remotecommand",
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2",
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1",
|
||||
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand",
|
||||
]
|
||||
solver-name = "gps-cdcl"
|
||||
|
||||
@@ -85,6 +85,9 @@ spec:
|
||||
"--provider", "{{ required "provider is required" .Values.provider }}",
|
||||
"--namespace", "{{ .Values.monitoredNamespace }}",
|
||||
"--nodename", "{{ required "nodeName is required" .Values.nodeName }}",
|
||||
{{- if .Values.logLevel }}
|
||||
"--log-level", "{{.Values.logLevel}}",
|
||||
{{- end }}
|
||||
"--os", "{{ .Values.nodeOsType }}"
|
||||
]
|
||||
volumes:
|
||||
|
||||
@@ -10,6 +10,7 @@ nodeOsType: "Linux"
|
||||
monitoredNamespace: ""
|
||||
apiserverCert:
|
||||
apiserverKey:
|
||||
logLevel:
|
||||
|
||||
taint:
|
||||
enabled: true
|
||||
|
||||
47
cmd/root.go
47
cmd/root.go
@@ -15,15 +15,16 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
homedir "github.com/mitchellh/go-homedir"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
vkubelet "github.com/virtual-kubelet/virtual-kubelet/vkubelet"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -38,6 +39,8 @@ var provider string
|
||||
var providerConfig string
|
||||
var taintKey string
|
||||
var disableTaint bool
|
||||
var logLevel string
|
||||
var metricsAddr string
|
||||
|
||||
// RootCmd represents the base command when called without any subcommands
|
||||
var RootCmd = &cobra.Command{
|
||||
@@ -47,12 +50,13 @@ var RootCmd = &cobra.Command{
|
||||
backend implementation allowing users to create kubernetes nodes without running the kubelet.
|
||||
This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
fmt.Println(kubeConfig)
|
||||
f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint)
|
||||
f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint, metricsAddr)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
log.L.WithError(err).Fatal("Error initializing vritual kubelet")
|
||||
}
|
||||
if err := f.Run(context.Background()); err != nil {
|
||||
log.L.Fatal(err)
|
||||
}
|
||||
f.Run()
|
||||
},
|
||||
}
|
||||
|
||||
@@ -60,8 +64,7 @@ This allows users to schedule kubernetes workloads on nodes that aren't running
|
||||
// This is called by main.main(). It only needs to happen once to the rootCmd.
|
||||
func Execute() {
|
||||
if err := RootCmd.Execute(); err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
log.GetLogger(context.TODO()).WithError(err).Fatal("Error executing root command")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,8 +88,11 @@ func init() {
|
||||
RootCmd.PersistentFlags().StringVar(&provider, "provider", "", "cloud provider")
|
||||
RootCmd.PersistentFlags().BoolVar(&disableTaint, "disable-taint", false, "disable the virtual-kubelet node taint")
|
||||
RootCmd.PersistentFlags().StringVar(&providerConfig, "provider-config", "", "cloud provider configuration file")
|
||||
RootCmd.PersistentFlags().StringVar(&metricsAddr, "metrics-addr", ":10255", "address to listen for metrics/stats requests")
|
||||
|
||||
RootCmd.PersistentFlags().StringVar(&taintKey, "taint", "", "Set node taint key")
|
||||
RootCmd.PersistentFlags().MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable")
|
||||
RootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", `set the log level, e.g. "trace", debug", "info", "warn", "error"`)
|
||||
|
||||
// Cobra also supports local flags, which will only run
|
||||
// when this action is called directly.
|
||||
@@ -96,15 +102,13 @@ func init() {
|
||||
// initConfig reads in config file and ENV variables if set.
|
||||
func initConfig() {
|
||||
if provider == "" {
|
||||
fmt.Println("You must supply a cloud provider option: use --provider")
|
||||
os.Exit(1)
|
||||
log.G(context.TODO()).Fatal("You must supply a cloud provider option: use --provider")
|
||||
}
|
||||
|
||||
// Find home directory.
|
||||
home, err := homedir.Dir()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
log.G(context.TODO()).WithError(err).Fatal("Error reading homedir")
|
||||
}
|
||||
|
||||
if kubeletConfig != "" {
|
||||
@@ -120,7 +124,7 @@ func initConfig() {
|
||||
|
||||
// If a config file is found, read it in.
|
||||
if err := viper.ReadInConfig(); err == nil {
|
||||
fmt.Println("Using config file:", viper.ConfigFileUsed())
|
||||
log.G(context.TODO()).Debugf("Using config file %s", viper.ConfigFileUsed())
|
||||
}
|
||||
|
||||
if kubeConfig == "" {
|
||||
@@ -135,7 +139,20 @@ func initConfig() {
|
||||
// Validate operating system.
|
||||
ok, _ := providers.ValidOperatingSystems[operatingSystem]
|
||||
if !ok {
|
||||
fmt.Printf("Operating system '%s' not supported. Valid options are %s\n", operatingSystem, strings.Join(providers.ValidOperatingSystems.Names(), " | "))
|
||||
os.Exit(1)
|
||||
log.G(context.TODO()).WithField("OperatingSystem", operatingSystem).Fatalf("Operating system not supported. Valid options are: %s", strings.Join(providers.ValidOperatingSystems.Names(), " | "))
|
||||
}
|
||||
|
||||
level, err := log.ParseLevel(logLevel)
|
||||
if err != nil {
|
||||
log.G(context.TODO()).WithField("logLevel", logLevel).Fatal("log level is not supported")
|
||||
}
|
||||
|
||||
logger := log.L.WithFields(logrus.Fields{
|
||||
"provider": provider,
|
||||
"operatingSystem": operatingSystem,
|
||||
"node": nodeName,
|
||||
"namespace": kubeNamespace,
|
||||
})
|
||||
logger.Level = level
|
||||
log.L = logger
|
||||
}
|
||||
|
||||
90
log/log.go
Normal file
90
log/log.go
Normal file
@@ -0,0 +1,90 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package log
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
// G is an alias for GetLogger.
|
||||
//
|
||||
// We may want to define this locally to a package to get package tagged log
|
||||
// messages.
|
||||
G = GetLogger
|
||||
|
||||
// L is an alias for the the standard logger.
|
||||
L = logrus.NewEntry(logrus.StandardLogger())
|
||||
)
|
||||
|
||||
type (
|
||||
loggerKey struct{}
|
||||
)
|
||||
|
||||
// TraceLevel is the log level for tracing. Trace level is lower than debug level,
|
||||
// and is usually used to trace detailed behavior of the program.
|
||||
const TraceLevel = logrus.Level(uint32(logrus.DebugLevel + 1))
|
||||
|
||||
// RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to
|
||||
// ensure the formatted time is always the same number of characters.
|
||||
const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
|
||||
|
||||
// ParseLevel takes a string level and returns the Logrus log level constant.
|
||||
// It supports trace level.
|
||||
func ParseLevel(lvl string) (logrus.Level, error) {
|
||||
if lvl == "trace" {
|
||||
return TraceLevel, nil
|
||||
}
|
||||
return logrus.ParseLevel(lvl)
|
||||
}
|
||||
|
||||
// WithLogger returns a new context with the provided logger. Use in
|
||||
// combination with logger.WithField(s) for great effect.
|
||||
func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context {
|
||||
return context.WithValue(ctx, loggerKey{}, logger)
|
||||
}
|
||||
|
||||
// GetLogger retrieves the current logger from the context. If no logger is
|
||||
// available, the default logger is returned.
|
||||
func GetLogger(ctx context.Context) *logrus.Entry {
|
||||
logger := ctx.Value(loggerKey{})
|
||||
|
||||
if logger == nil {
|
||||
return L
|
||||
}
|
||||
|
||||
return logger.(*logrus.Entry)
|
||||
}
|
||||
|
||||
// Trace logs a message at level Trace with the log entry passed-in.
|
||||
func Trace(e *logrus.Entry, args ...interface{}) {
|
||||
level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level)))
|
||||
if level >= TraceLevel {
|
||||
e.Debug(args...)
|
||||
}
|
||||
}
|
||||
|
||||
// Tracef logs a message at level Trace with the log entry passed-in.
|
||||
func Tracef(e *logrus.Entry, format string, args ...interface{}) {
|
||||
level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level)))
|
||||
if level >= TraceLevel {
|
||||
e.Debugf(format, args...)
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,10 @@
|
||||
package manager
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
@@ -26,7 +26,7 @@ type ResourceManager struct {
|
||||
}
|
||||
|
||||
// NewResourceManager returns a ResourceManager with the internal maps initialized.
|
||||
func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
|
||||
func NewResourceManager(k8sClient kubernetes.Interface) (*ResourceManager, error) {
|
||||
rm := ResourceManager{
|
||||
pods: make(map[string]*v1.Pod, 0),
|
||||
deletingPods: make(map[string]*v1.Pod, 0),
|
||||
@@ -37,8 +37,18 @@ func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
|
||||
k8sClient: k8sClient,
|
||||
}
|
||||
|
||||
go rm.watchConfigMaps()
|
||||
go rm.watchSecrets()
|
||||
configW, err := rm.k8sClient.CoreV1().ConfigMaps(v1.NamespaceAll).Watch(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error getting config watch")
|
||||
}
|
||||
|
||||
secretsW, err := rm.k8sClient.CoreV1().Secrets(v1.NamespaceAll).Watch(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error getting secrets watch")
|
||||
}
|
||||
|
||||
go rm.watchConfigMaps(configW)
|
||||
go rm.watchSecrets(secretsW)
|
||||
|
||||
tick := time.Tick(5 * time.Minute)
|
||||
go func() {
|
||||
@@ -68,7 +78,7 @@ func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
|
||||
}
|
||||
}()
|
||||
|
||||
return &rm
|
||||
return &rm, nil
|
||||
}
|
||||
|
||||
// SetPods clears the internal cache and populates it with the supplied pods.
|
||||
@@ -213,12 +223,7 @@ func (rm *ResourceManager) GetSecret(name, namespace string) (*v1.Secret, error)
|
||||
|
||||
// watchConfigMaps monitors the kubernetes API for modifications and deletions of configmaps
|
||||
// it evicts them from the internal cache
|
||||
func (rm *ResourceManager) watchConfigMaps() {
|
||||
var opts metav1.ListOptions
|
||||
w, err := rm.k8sClient.CoreV1().ConfigMaps(v1.NamespaceAll).Watch(opts)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
func (rm *ResourceManager) watchConfigMaps(w watch.Interface) {
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -242,12 +247,7 @@ func (rm *ResourceManager) watchConfigMaps() {
|
||||
|
||||
// watchSecretes monitors the kubernetes API for modifications and deletions of secrets
|
||||
// it evicts them from the internal cache
|
||||
func (rm *ResourceManager) watchSecrets() {
|
||||
var opts metav1.ListOptions
|
||||
w, err := rm.k8sClient.CoreV1().Secrets(v1.NamespaceAll).Watch(opts)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
func (rm *ResourceManager) watchSecrets(w watch.Interface) {
|
||||
|
||||
for {
|
||||
select {
|
||||
|
||||
@@ -19,7 +19,10 @@ func init() {
|
||||
}
|
||||
|
||||
func TestResourceManager(t *testing.T) {
|
||||
pm := NewResourceManager(fakeClient)
|
||||
pm, err := NewResourceManager(fakeClient)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pod1Name := "Pod1"
|
||||
pod1Namespace := "Pod1Namespace"
|
||||
pod1 := makePod(pod1Namespace, pod1Name)
|
||||
@@ -36,7 +39,10 @@ func TestResourceManager(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestResourceManagerDeletePod(t *testing.T) {
|
||||
pm := NewResourceManager(fakeClient)
|
||||
pm, err := NewResourceManager(fakeClient)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pod1Name := "Pod1"
|
||||
pod1Namespace := "Pod1Namespace"
|
||||
pod1 := makePod(pod1Namespace, pod1Name)
|
||||
@@ -61,7 +67,10 @@ func makePod(namespace, name string) *v1.Pod {
|
||||
}
|
||||
|
||||
func TestResourceManagerUpdatePod(t *testing.T) {
|
||||
pm := NewResourceManager(fakeClient)
|
||||
pm, err := NewResourceManager(fakeClient)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pod1Name := "Pod1"
|
||||
pod1Namespace := "Pod1Namespace"
|
||||
pod1 := makePod(pod1Namespace, pod1Name)
|
||||
|
||||
@@ -318,10 +318,10 @@ spec:
|
||||
- containerPort: 443
|
||||
name: https
|
||||
dnsPolicy: ClusterFirst
|
||||
nodeName: virtual-kubelet-myconnector-linux
|
||||
tolerations:
|
||||
- key: virtual-kubelet.io/provider
|
||||
value: azure
|
||||
operator: Exists
|
||||
- key: azure.com/aci
|
||||
effect: NoSchedule
|
||||
```
|
||||
|
||||
@@ -330,7 +330,8 @@ Notice that Virtual-Kubelet nodes are tainted by default to avoid unexpected pod
|
||||
```
|
||||
tolerations:
|
||||
- key: virtual-kubelet.io/provider
|
||||
value: azure
|
||||
operator: Exists
|
||||
- key: azure.com/aci
|
||||
effect: NoSchedule
|
||||
```
|
||||
|
||||
@@ -395,7 +396,11 @@ spec:
|
||||
- containerPort: 443
|
||||
name: https
|
||||
dnsPolicy: ClusterFirst
|
||||
nodeName: virtual-kubelet
|
||||
tolerations:
|
||||
- key: virtual-kubelet.io/provider
|
||||
operator: Exists
|
||||
- key: azure.com/aci
|
||||
effect: NoSchedule
|
||||
```
|
||||
|
||||
To confirm the Azure Container Instance received and bound the DNS Name specified, use the [az container show][az-container-show] Azure CLI command. Virtual Kubelet's naming
|
||||
|
||||
@@ -2,19 +2,24 @@ package azure
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
||||
client "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
|
||||
@@ -24,6 +29,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/tools/remotecommand"
|
||||
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
)
|
||||
|
||||
// The service account secret mount path.
|
||||
@@ -45,6 +51,10 @@ type ACIProvider struct {
|
||||
internalIP string
|
||||
daemonEndpointPort int32
|
||||
diagnostics *aci.ContainerGroupDiagnostics
|
||||
|
||||
metricsSync sync.Mutex
|
||||
metricsSyncTime time.Time
|
||||
lastMetric *stats.Summary
|
||||
}
|
||||
|
||||
// AuthConfig is the secret returned from an ImageRegistryCredential
|
||||
@@ -294,13 +304,17 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error {
|
||||
// TODO(BJK) containergrouprestartpolicy??
|
||||
_, err = p.aciClient.CreateContainerGroup(
|
||||
p.resourceGroup,
|
||||
fmt.Sprintf("%s-%s", pod.Namespace, pod.Name),
|
||||
containerGroupName(pod),
|
||||
containerGroup,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func containerGroupName(pod *v1.Pod) string {
|
||||
return fmt.Sprintf("%s-%s", pod.Namespace, pod.Name)
|
||||
}
|
||||
|
||||
// UpdatePod is a noop, ACI currently does not support live updates of a pod.
|
||||
func (p *ACIProvider) UpdatePod(pod *v1.Pod) error {
|
||||
return nil
|
||||
@@ -345,7 +359,7 @@ func (p *ACIProvider) GetContainerLogs(namespace, podName, containerName string,
|
||||
for i := 0; i < retry; i++ {
|
||||
cLogs, err := p.aciClient.GetContainerLogs(p.resourceGroup, cg.Name, containerName, tail)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
log.G(context.TODO()).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying")
|
||||
time.Sleep(5000 * time.Millisecond)
|
||||
} else {
|
||||
logContent = cLogs.Content
|
||||
@@ -469,7 +483,11 @@ func (p *ACIProvider) GetPods() ([]*v1.Pod, error) {
|
||||
|
||||
p, err := containerGroupToPod(&c)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
log.G(context.TODO()).WithFields(logrus.Fields{
|
||||
"name": c.Name,
|
||||
"id": c.ID,
|
||||
}).WithError(err).Error("error converting container group to pod")
|
||||
|
||||
continue
|
||||
}
|
||||
pods = append(pods, p)
|
||||
@@ -958,7 +976,7 @@ func containerGroupToPod(cg *aci.ContainerGroup) (*v1.Pod, error) {
|
||||
RestartCount: c.InstanceView.RestartCount,
|
||||
Image: c.Image,
|
||||
ImageID: "",
|
||||
ContainerID: "",
|
||||
ContainerID: getContainerID(cg.ID, c.Name),
|
||||
}
|
||||
|
||||
// Add to containerStatuses
|
||||
@@ -1002,6 +1020,19 @@ func containerGroupToPod(cg *aci.ContainerGroup) (*v1.Pod, error) {
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
func getContainerID(cgID, containerName string) string {
|
||||
if cgID == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
containerResourceID := fmt.Sprintf("%s/containers/%s", cgID, containerName)
|
||||
|
||||
h := sha256.New()
|
||||
h.Write([]byte(strings.ToUpper(containerResourceID)))
|
||||
hashBytes := h.Sum(nil)
|
||||
return fmt.Sprintf("aci://%s", hex.EncodeToString(hashBytes))
|
||||
}
|
||||
|
||||
func aciStateToPodPhase(state string) v1.PodPhase {
|
||||
switch state {
|
||||
case "Running":
|
||||
@@ -1105,7 +1136,8 @@ func filterServiceAccountSecretVolume(osType string, containerGroup *aci.Contain
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Ignoring service account secret volumes '%v' for Windows", reflect.ValueOf(serviceAccountSecretVolumeName).MapKeys())
|
||||
l := log.G(context.TODO()).WithField("containerGroup", containerGroup.Name)
|
||||
l.Infof("Ignoring service account secret volumes '%v' for Windows", reflect.ValueOf(serviceAccountSecretVolumeName).MapKeys())
|
||||
|
||||
volumes := make([]aci.Volume, 0, len(containerGroup.ContainerGroupProperties.Volumes))
|
||||
for _, volume := range containerGroup.ContainerGroupProperties.Volumes {
|
||||
|
||||
@@ -7,6 +7,7 @@ package azure
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -364,6 +365,69 @@ func TestGetPodWithoutResourceRequestsLimits(t *testing.T) {
|
||||
"Containers[0].Resources.Requests.Memory doesn't match")
|
||||
}
|
||||
|
||||
func TestGetPodWithContainerID(t *testing.T) {
|
||||
_, aciServerMocker, provider, err := prepareMocks()
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Unable to prepare the mocks", err)
|
||||
}
|
||||
|
||||
podName := "pod-" + uuid.New().String()
|
||||
podNamespace := "ns-" + uuid.New().String()
|
||||
containerName := "c-" + uuid.New().String()
|
||||
containerImage := "ci-" + uuid.New().String()
|
||||
|
||||
cgID := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.ContainerInstance/containerGroups/%s-%s", fakeSubscription, fakeResourceGroup, podNamespace, podName)
|
||||
|
||||
aciServerMocker.OnGetContainerGroup = func(subscription, resourceGroup, containerGroup string) (int, interface{}) {
|
||||
assert.Equal(t, fakeSubscription, subscription, "Subscription doesn't match")
|
||||
assert.Equal(t, fakeResourceGroup, resourceGroup, "Resource group doesn't match")
|
||||
assert.Equal(t, podNamespace+"-"+podName, containerGroup, "Container group name is not expected")
|
||||
|
||||
return http.StatusOK, aci.ContainerGroup{
|
||||
ID: cgID,
|
||||
Tags: map[string]string{
|
||||
"NodeName": fakeNodeName,
|
||||
},
|
||||
ContainerGroupProperties: aci.ContainerGroupProperties{
|
||||
ProvisioningState: "Creating",
|
||||
Containers: []aci.Container{
|
||||
aci.Container{
|
||||
Name: containerName,
|
||||
ContainerProperties: aci.ContainerProperties{
|
||||
Image: containerImage,
|
||||
Command: []string{"nginx", "-g", "daemon off;"},
|
||||
Ports: []aci.ContainerPort{
|
||||
{
|
||||
Protocol: aci.ContainerNetworkProtocolTCP,
|
||||
Port: 80,
|
||||
},
|
||||
},
|
||||
Resources: aci.ResourceRequirements{
|
||||
Requests: &aci.ResourceRequests{
|
||||
CPU: 0.99,
|
||||
MemoryInGB: 1.5,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pod, err := provider.GetPod(podNamespace, podName)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to get pod", err)
|
||||
}
|
||||
|
||||
assert.NotNil(t, pod, "Response pod should not be nil")
|
||||
assert.Equal(t, 1, len(pod.Status.ContainerStatuses), "1 container status is expected")
|
||||
assert.Equal(t, containerName, pod.Status.ContainerStatuses[0].Name, "Container name in the container status doesn't match")
|
||||
assert.Equal(t, containerImage, pod.Status.ContainerStatuses[0].Image, "Container image in the container status doesn't match")
|
||||
assert.Equal(t, getContainerID(cgID, containerName), pod.Status.ContainerStatuses[0].ContainerID, "Container ID in the container status is not expected")
|
||||
}
|
||||
|
||||
func TestPodToACISecretEnvVar(t *testing.T) {
|
||||
|
||||
testKey := "testVar"
|
||||
@@ -448,7 +512,10 @@ func prepareMocks() (*AADMock, *ACIMock, *ACIProvider, error) {
|
||||
os.Setenv("ACI_RESOURCE_GROUP", fakeResourceGroup)
|
||||
|
||||
clientset := fake.NewSimpleClientset()
|
||||
rm := manager.NewResourceManager(clientset)
|
||||
rm, err := manager.NewResourceManager(clientset)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
provider, err := NewACIProvider("example.toml", rm, fakeNodeName, "Linux", "0.0.0.0", 10250)
|
||||
if err != nil {
|
||||
@@ -482,45 +549,45 @@ func TestCreatePodWithLivenessProbe(t *testing.T) {
|
||||
assert.Equal(t, 1, len(cg.ContainerGroupProperties.Containers), "1 Container is expected")
|
||||
assert.Equal(t, "nginx", cg.ContainerGroupProperties.Containers[0].Name, "Container nginx is expected")
|
||||
assert.NotNil(t, cg.Containers[0].LivenessProbe, "Liveness probe expected")
|
||||
assert.Equal(t, cg.Containers[0].LivenessProbe.InitialDelaySeconds, 10, "Initial Probe Delay doesn't match")
|
||||
assert.Equal(t, cg.Containers[0].LivenessProbe.Period, 5, "Probe Period doesn't match")
|
||||
assert.Equal(t, cg.Containers[0].LivenessProbe.TimeoutSeconds, 60, "Probe Timeout doesn't match")
|
||||
assert.Equal(t, cg.Containers[0].LivenessProbe.SuccessThreshold, 3, "Probe Success Threshold doesn't match")
|
||||
assert.Equal(t, cg.Containers[0].LivenessProbe.FailureThreshold, 5, "Probe Failure Threshold doesn't match")
|
||||
assert.Equal(t, int32(10), cg.Containers[0].LivenessProbe.InitialDelaySeconds, "Initial Probe Delay doesn't match")
|
||||
assert.Equal(t, int32(5), cg.Containers[0].LivenessProbe.Period, "Probe Period doesn't match")
|
||||
assert.Equal(t, int32(60), cg.Containers[0].LivenessProbe.TimeoutSeconds, "Probe Timeout doesn't match")
|
||||
assert.Equal(t, int32(3), cg.Containers[0].LivenessProbe.SuccessThreshold, "Probe Success Threshold doesn't match")
|
||||
assert.Equal(t, int32(5), cg.Containers[0].LivenessProbe.FailureThreshold, "Probe Failure Threshold doesn't match")
|
||||
assert.NotNil(t, cg.Containers[0].LivenessProbe.HTTPGet, "Expected an HTTP Get Probe")
|
||||
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
v1.Container{
|
||||
Name: "nginx",
|
||||
LivenessProbe: &v1.Probe{
|
||||
Handler: v1.Handler{
|
||||
HTTPGet: &v1.HTTPGetAction{
|
||||
Port: intstr.FromString("8080"),
|
||||
Path: "/",
|
||||
},
|
||||
return http.StatusOK, cg
|
||||
}
|
||||
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
v1.Container{
|
||||
Name: "nginx",
|
||||
LivenessProbe: &v1.Probe{
|
||||
Handler: v1.Handler{
|
||||
HTTPGet: &v1.HTTPGetAction{
|
||||
Port: intstr.FromString("8080"),
|
||||
Path: "/",
|
||||
},
|
||||
InitialDelaySeconds: 10,
|
||||
PeriodSeconds: 5,
|
||||
TimeoutSeconds: 60,
|
||||
SuccessThreshold: 3,
|
||||
FailureThreshold: 5,
|
||||
},
|
||||
InitialDelaySeconds: 10,
|
||||
PeriodSeconds: 5,
|
||||
TimeoutSeconds: 60,
|
||||
SuccessThreshold: 3,
|
||||
FailureThreshold: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
if err := provider.CreatePod(pod); err != nil {
|
||||
t.Fatal("Failed to create pod", err)
|
||||
}
|
||||
|
||||
return http.StatusOK, cg
|
||||
if err := provider.CreatePod(pod); err != nil {
|
||||
t.Fatal("Failed to create pod", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -544,44 +611,44 @@ func TestCreatePodWithReadinessProbe(t *testing.T) {
|
||||
assert.Equal(t, 1, len(cg.ContainerGroupProperties.Containers), "1 Container is expected")
|
||||
assert.Equal(t, "nginx", cg.ContainerGroupProperties.Containers[0].Name, "Container nginx is expected")
|
||||
assert.NotNil(t, cg.Containers[0].ReadinessProbe, "Readiness probe expected")
|
||||
assert.Equal(t, cg.Containers[0].ReadinessProbe.InitialDelaySeconds, 10, "Initial Probe Delay doesn't match")
|
||||
assert.Equal(t, cg.Containers[0].ReadinessProbe.Period, 5, "Probe Period doesn't match")
|
||||
assert.Equal(t, cg.Containers[0].ReadinessProbe.TimeoutSeconds, 60, "Probe Timeout doesn't match")
|
||||
assert.Equal(t, cg.Containers[0].ReadinessProbe.SuccessThreshold, 3, "Probe Success Threshold doesn't match")
|
||||
assert.Equal(t, cg.Containers[0].ReadinessProbe.FailureThreshold, 5, "Probe Failure Threshold doesn't match")
|
||||
assert.Equal(t, int32(10), cg.Containers[0].ReadinessProbe.InitialDelaySeconds, "Initial Probe Delay doesn't match")
|
||||
assert.Equal(t, int32(5), cg.Containers[0].ReadinessProbe.Period, "Probe Period doesn't match")
|
||||
assert.Equal(t, int32(60), cg.Containers[0].ReadinessProbe.TimeoutSeconds, "Probe Timeout doesn't match")
|
||||
assert.Equal(t, int32(3), cg.Containers[0].ReadinessProbe.SuccessThreshold, "Probe Success Threshold doesn't match")
|
||||
assert.Equal(t, int32(5), cg.Containers[0].ReadinessProbe.FailureThreshold, "Probe Failure Threshold doesn't match")
|
||||
assert.NotNil(t, cg.Containers[0].ReadinessProbe.HTTPGet, "Expected an HTTP Get Probe")
|
||||
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
v1.Container{
|
||||
Name: "nginx",
|
||||
ReadinessProbe: &v1.Probe{
|
||||
Handler: v1.Handler{
|
||||
HTTPGet: &v1.HTTPGetAction{
|
||||
Port: intstr.FromString("8080"),
|
||||
Path: "/",
|
||||
},
|
||||
},
|
||||
InitialDelaySeconds: 10,
|
||||
PeriodSeconds: 5,
|
||||
TimeoutSeconds: 60,
|
||||
SuccessThreshold: 3,
|
||||
FailureThreshold: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if err := provider.CreatePod(pod); err != nil {
|
||||
t.Fatal("Failed to create pod", err)
|
||||
}
|
||||
|
||||
return http.StatusOK, cg
|
||||
}
|
||||
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
v1.Container{
|
||||
Name: "nginx",
|
||||
ReadinessProbe: &v1.Probe{
|
||||
Handler: v1.Handler{
|
||||
HTTPGet: &v1.HTTPGetAction{
|
||||
Port: intstr.FromString("8080"),
|
||||
Path: "/",
|
||||
},
|
||||
},
|
||||
InitialDelaySeconds: 10,
|
||||
PeriodSeconds: 5,
|
||||
TimeoutSeconds: 60,
|
||||
SuccessThreshold: 3,
|
||||
FailureThreshold: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if err := provider.CreatePod(pod); err != nil {
|
||||
t.Fatal("Failed to create pod", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
package azure
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
)
|
||||
|
||||
// AcsCredential represents the credential file for ACS
|
||||
@@ -19,12 +21,13 @@ type AcsCredential struct {
|
||||
}
|
||||
|
||||
// NewAcsCredential returns an AcsCredential struct from file path
|
||||
func NewAcsCredential(filepath string) (*AcsCredential, error) {
|
||||
log.Printf("Reading ACS credential file %q", filepath)
|
||||
func NewAcsCredential(p string) (*AcsCredential, error) {
|
||||
logger := log.G(context.TODO()).WithField("method", "NewAcsCredential").WithField("file", p)
|
||||
log.Trace(logger, "Reading ACS credential file")
|
||||
|
||||
b, err := ioutil.ReadFile(filepath)
|
||||
b, err := ioutil.ReadFile(p)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Reading ACS credential file %q failed: %v", filepath, err)
|
||||
return nil, fmt.Errorf("Reading ACS credential file %q failed: %v", p, err)
|
||||
}
|
||||
|
||||
// Unmarshal the authentication file.
|
||||
@@ -33,6 +36,6 @@ func NewAcsCredential(filepath string) (*AcsCredential, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Printf("Load ACS credential file %q successfully", filepath)
|
||||
log.Trace(logger, "Load ACS credential file successfully")
|
||||
return &cred, nil
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ const (
|
||||
containerGroupListByResourceGroupURLPath = "subscriptions/{{.subscriptionId}}/resourceGroups/{{.resourceGroup}}/providers/Microsoft.ContainerInstance/containerGroups"
|
||||
containerLogsURLPath = containerGroupURLPath + "/containers/{{.containerName}}/logs"
|
||||
containerExecURLPath = containerGroupURLPath + "/containers/{{.containerName}}/exec"
|
||||
containerGroupMetricsURLPath = containerGroupURLPath + "/providers/microsoft.Insights/metrics"
|
||||
)
|
||||
|
||||
// Client is a client for interacting with Azure Container Instances.
|
||||
|
||||
97
providers/azure/client/aci/metrics.go
Normal file
97
providers/azure/client/aci/metrics.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package aci
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api"
|
||||
)
|
||||
|
||||
// GetContainerGroupMetrics gets metrics for the provided container group
|
||||
func (c *Client) GetContainerGroupMetrics(ctx context.Context, resourceGroup, containerGroup string, options MetricsRequest) (*ContainerGroupMetricsResult, error) {
|
||||
if len(options.Types) == 0 {
|
||||
return nil, errors.New("must provide metrics types to fetch")
|
||||
}
|
||||
if options.Start.After(options.End) || options.Start.Equal(options.End) && !options.Start.IsZero() {
|
||||
return nil, errors.Errorf("end parameter must be after start: start=%s, end=%s", options.Start, options.End)
|
||||
}
|
||||
|
||||
var metricNames string
|
||||
for _, t := range options.Types {
|
||||
if len(metricNames) > 0 {
|
||||
metricNames += ","
|
||||
}
|
||||
metricNames += string(t)
|
||||
}
|
||||
|
||||
var ag string
|
||||
for _, a := range options.Aggregations {
|
||||
if len(ag) > 0 {
|
||||
ag += ","
|
||||
}
|
||||
ag += string(a)
|
||||
}
|
||||
|
||||
urlParams := url.Values{
|
||||
"api-version": []string{"2018-01-01"},
|
||||
"aggregation": []string{ag},
|
||||
"metricnames": []string{metricNames},
|
||||
"interval": []string{"PT1M"}, // TODO: make configurable?
|
||||
}
|
||||
|
||||
if options.Dimension != "" {
|
||||
urlParams.Add("$filter", options.Dimension)
|
||||
}
|
||||
|
||||
if !options.Start.IsZero() || !options.End.IsZero() {
|
||||
urlParams.Add("timespan", path.Join(options.Start.Format(time.RFC3339), options.End.Format(time.RFC3339)))
|
||||
}
|
||||
|
||||
// Create the url.
|
||||
uri := api.ResolveRelative(c.auth.ResourceManagerEndpoint, containerGroupMetricsURLPath)
|
||||
uri += "?" + url.Values(urlParams).Encode()
|
||||
|
||||
// Create the request.
|
||||
req, err := http.NewRequest("GET", uri, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "creating get container group metrics uri request failed")
|
||||
}
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
// Add the parameters to the url.
|
||||
if err := api.ExpandURL(req.URL, map[string]string{
|
||||
"subscriptionId": c.auth.SubscriptionID,
|
||||
"resourceGroup": resourceGroup,
|
||||
"containerGroupName": containerGroup,
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "expanding URL with parameters failed")
|
||||
}
|
||||
|
||||
// SEnd the request.
|
||||
resp, err := c.hc.Do(req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "sending get container group metrics request failed")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 200 (OK) is a success response.
|
||||
if err := api.CheckResponse(resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Decode the body from the response.
|
||||
if resp.Body == nil {
|
||||
return nil, errors.New("container group metrics returned an empty body in the response")
|
||||
}
|
||||
var metrics ContainerGroupMetricsResult
|
||||
if err := json.NewDecoder(resp.Body).Decode(&metrics); err != nil {
|
||||
return nil, errors.Wrap(err, "decoding get container group metrics response body failed")
|
||||
}
|
||||
|
||||
return &metrics, nil
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
package aci
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api"
|
||||
)
|
||||
|
||||
@@ -332,3 +334,85 @@ type LogAnalyticsWorkspace struct {
|
||||
WorkspaceID string `json:"workspaceID,omitempty"`
|
||||
WorkspaceKey string `json:"workspaceKey,omitempty"`
|
||||
}
|
||||
|
||||
// ContainerGroupMetricsResult stores all the results for a container group metrics request.
|
||||
type ContainerGroupMetricsResult struct {
|
||||
Value []MetricValue `json:"value"`
|
||||
}
|
||||
|
||||
// MetricValue stores metrics results
|
||||
type MetricValue struct {
|
||||
ID string `json:"id"`
|
||||
Desc MetricDescriptor `json:"name"`
|
||||
Timeseries []MetricTimeSeries `json:"timeseries"`
|
||||
Type string `json:"type"`
|
||||
Unit string `json:"unit"`
|
||||
}
|
||||
|
||||
// MetricDescriptor stores the name for a given metric and the localized version of that name.
|
||||
type MetricDescriptor struct {
|
||||
Value MetricType `json:"value"`
|
||||
LocalizedValue string `json:"localizedValue"`
|
||||
}
|
||||
|
||||
// MetricTimeSeries is the time series for a given metric
|
||||
// It contains all the metrics values and other details for the dimension the metrics are aggregated on.
|
||||
type MetricTimeSeries struct {
|
||||
Data []TimeSeriesEntry `json:"data"`
|
||||
MetadataValues []MetricMetadataValue `json:"metadatavalues,omitempty"`
|
||||
}
|
||||
|
||||
// MetricMetadataValue stores extra metadata about a metric
|
||||
// In particular it is used to provide details about the breakdown of a metric dimension.
|
||||
type MetricMetadataValue struct {
|
||||
Name ValueDescriptor `json:"name"`
|
||||
Value string `json:"value"`
|
||||
}
|
||||
|
||||
// ValueDescriptor describes a generic value.
|
||||
// It is used to describe metadata fields.
|
||||
type ValueDescriptor struct {
|
||||
Value string `json:"value"`
|
||||
LocalizedValue string `json:"localizedValue"`
|
||||
}
|
||||
|
||||
// TimeSeriesEntry is the metric data for a given timestamp/metric type
|
||||
type TimeSeriesEntry struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Average float64 `json:"average"`
|
||||
Total float64 `json:"total"`
|
||||
Count float64 `json:"count"`
|
||||
}
|
||||
|
||||
// MetricsRequest is an options struct used when getting container group metrics
|
||||
type MetricsRequest struct {
|
||||
Start time.Time
|
||||
End time.Time
|
||||
Types []MetricType
|
||||
Aggregations []AggregationType
|
||||
|
||||
// Note that a dimension may not be available for certain metrics.
|
||||
// In such cases, you will need to make separate requests.
|
||||
Dimension string
|
||||
}
|
||||
|
||||
// MetricType is an enum type for defining supported metric types.
|
||||
type MetricType string
|
||||
|
||||
// Supported metric types
|
||||
const (
|
||||
MetricTypeCPUUsage MetricType = "CpuUsage"
|
||||
MetricTypeMemoryUsage MetricType = "MemoryUsage"
|
||||
MetricTyperNetworkBytesRecievedPerSecond MetricType = "NetworkBytesReceivedPerSecond"
|
||||
MetricTyperNetworkBytesTransmittedPerSecond MetricType = "NetworkBytesTransmittedPerSecond"
|
||||
)
|
||||
|
||||
// AggregationType is an enum type for defining supported aggregation types
|
||||
type AggregationType string
|
||||
|
||||
// Supported metric aggregation types
|
||||
const (
|
||||
AggregationTypeCount AggregationType = "count"
|
||||
AggregationTypeAverage AggregationType = "average"
|
||||
AggregationTypeTotal AggregationType = "total"
|
||||
)
|
||||
|
||||
219
providers/azure/metrics.go
Normal file
219
providers/azure/metrics.go
Normal file
@@ -0,0 +1,219 @@
|
||||
package azure
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
)
|
||||
|
||||
// GetStatsSummary returns the stats summary for pods running on ACI
|
||||
func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summary, err error) {
|
||||
p.metricsSync.Lock()
|
||||
defer p.metricsSync.Unlock()
|
||||
|
||||
if time.Now().Sub(p.metricsSyncTime) < time.Minute {
|
||||
return p.lastMetric, nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
p.lastMetric = summary
|
||||
p.metricsSyncTime = time.Now()
|
||||
}()
|
||||
|
||||
pods := p.resourceManager.GetPods()
|
||||
var errGroup errgroup.Group
|
||||
chResult := make(chan stats.PodStats, len(pods))
|
||||
|
||||
end := time.Now()
|
||||
start := end.Add(-1 * time.Minute)
|
||||
|
||||
sema := make(chan struct{}, 10)
|
||||
for _, pod := range pods {
|
||||
if pod.Status.Phase != v1.PodRunning {
|
||||
continue
|
||||
}
|
||||
errGroup.Go(func() error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case sema <- struct{}{}:
|
||||
}
|
||||
defer func() {
|
||||
<-sema
|
||||
}()
|
||||
|
||||
cgName := containerGroupName(pod)
|
||||
// cpu/mem and net stats are split because net stats do not support container level detail
|
||||
systemStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{
|
||||
Dimension: "containerName eq '*'",
|
||||
Start: start,
|
||||
End: end,
|
||||
Aggregations: []aci.AggregationType{aci.AggregationTypeAverage},
|
||||
Types: []aci.MetricType{aci.MetricTypeCPUUsage, aci.MetricTypeMemoryUsage},
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error fetching cpu/mem stats for container group %s", cgName)
|
||||
}
|
||||
|
||||
netStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{
|
||||
Start: start,
|
||||
End: end,
|
||||
Aggregations: []aci.AggregationType{aci.AggregationTypeAverage},
|
||||
Types: []aci.MetricType{aci.MetricTyperNetworkBytesRecievedPerSecond, aci.MetricTyperNetworkBytesTransmittedPerSecond},
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error fetching network stats for container group %s", cgName)
|
||||
}
|
||||
|
||||
chResult <- collectMetrics(pod, systemStats, netStats)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := errGroup.Wait(); err != nil {
|
||||
return nil, errors.Wrap(err, "error in request to fetch container group metrics")
|
||||
}
|
||||
close(chResult)
|
||||
|
||||
var s stats.Summary
|
||||
s.Node = stats.NodeStats{
|
||||
NodeName: p.nodeName,
|
||||
}
|
||||
s.Pods = make([]stats.PodStats, 0, len(chResult))
|
||||
|
||||
for stat := range chResult {
|
||||
s.Pods = append(s.Pods, stat)
|
||||
}
|
||||
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
func collectMetrics(pod *v1.Pod, system, net *aci.ContainerGroupMetricsResult) stats.PodStats {
|
||||
var stat stats.PodStats
|
||||
containerStats := make(map[string]*stats.ContainerStats, len(pod.Status.ContainerStatuses))
|
||||
stat.StartTime = pod.CreationTimestamp
|
||||
|
||||
for _, m := range system.Value {
|
||||
// cpu/mem stats are per container, so each entry in the time series is for a container, not the container group.
|
||||
for _, entry := range m.Timeseries {
|
||||
if len(entry.Data) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var cs *stats.ContainerStats
|
||||
for _, v := range entry.MetadataValues {
|
||||
if strings.ToLower(v.Name.Value) != "containername" {
|
||||
continue
|
||||
}
|
||||
if cs = containerStats[v.Value]; cs == nil {
|
||||
cs = &stats.ContainerStats{Name: v.Value, StartTime: stat.StartTime}
|
||||
containerStats[v.Value] = cs
|
||||
}
|
||||
}
|
||||
if cs == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if stat.Containers == nil {
|
||||
stat.Containers = make([]stats.ContainerStats, 0, len(containerStats))
|
||||
}
|
||||
|
||||
data := entry.Data[len(entry.Data)-1] // get only the last entry
|
||||
switch m.Desc.Value {
|
||||
case aci.MetricTypeCPUUsage:
|
||||
if cs.CPU == nil {
|
||||
cs.CPU = &stats.CPUStats{}
|
||||
}
|
||||
|
||||
// average is the average number of millicores over a 1 minute interval (which is the interval we are pulling the stats for)
|
||||
nanoCores := uint64(data.Average * 1000000)
|
||||
usageNanoSeconds := nanoCores * 60
|
||||
cs.CPU.Time = metav1.NewTime(data.Timestamp)
|
||||
cs.CPU.UsageCoreNanoSeconds = &usageNanoSeconds
|
||||
cs.CPU.UsageNanoCores = &nanoCores
|
||||
|
||||
if stat.CPU == nil {
|
||||
var zero uint64
|
||||
stat.CPU = &stats.CPUStats{UsageNanoCores: &zero, UsageCoreNanoSeconds: &zero, Time: metav1.NewTime(data.Timestamp)}
|
||||
}
|
||||
podCPUSec := *stat.CPU.UsageCoreNanoSeconds
|
||||
podCPUSec += usageNanoSeconds
|
||||
stat.CPU.UsageCoreNanoSeconds = &podCPUSec
|
||||
|
||||
podCPUCore := *stat.CPU.UsageNanoCores
|
||||
podCPUCore += nanoCores
|
||||
stat.CPU.UsageNanoCores = &podCPUCore
|
||||
case aci.MetricTypeMemoryUsage:
|
||||
if cs.Memory == nil {
|
||||
cs.Memory = &stats.MemoryStats{}
|
||||
}
|
||||
cs.Memory.Time = metav1.NewTime(data.Timestamp)
|
||||
bytes := uint64(data.Average)
|
||||
cs.Memory.UsageBytes = &bytes
|
||||
cs.Memory.WorkingSetBytes = &bytes
|
||||
|
||||
if stat.Memory == nil {
|
||||
var zero uint64
|
||||
stat.Memory = &stats.MemoryStats{UsageBytes: &zero, WorkingSetBytes: &zero, Time: metav1.NewTime(data.Timestamp)}
|
||||
}
|
||||
podMem := *stat.Memory.UsageBytes
|
||||
podMem += bytes
|
||||
stat.Memory.UsageBytes = &podMem
|
||||
stat.Memory.WorkingSetBytes = &podMem
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, m := range net.Value {
|
||||
if stat.Network == nil {
|
||||
stat.Network = &stats.NetworkStats{}
|
||||
}
|
||||
// network stats are for the whole container group, so there should only be one entry here.
|
||||
if len(m.Timeseries) == 0 {
|
||||
continue
|
||||
}
|
||||
entry := m.Timeseries[0]
|
||||
if len(entry.Data) == 0 {
|
||||
continue
|
||||
}
|
||||
data := entry.Data[len(entry.Data)-1] // get only the last entry
|
||||
|
||||
bytes := uint64(data.Average)
|
||||
switch m.Desc.Value {
|
||||
case aci.MetricTyperNetworkBytesRecievedPerSecond:
|
||||
stat.Network.RxBytes = &bytes
|
||||
case aci.MetricTyperNetworkBytesTransmittedPerSecond:
|
||||
stat.Network.TxBytes = &bytes
|
||||
}
|
||||
stat.Network.Time = metav1.NewTime(data.Timestamp)
|
||||
stat.Network.InterfaceStats.Name = "eth0"
|
||||
}
|
||||
|
||||
for _, cs := range containerStats {
|
||||
stat.Containers = append(stat.Containers, *cs)
|
||||
}
|
||||
|
||||
stat.PodRef = stats.PodReference{
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
UID: string(pod.UID),
|
||||
}
|
||||
|
||||
return stat
|
||||
}
|
||||
163
providers/azure/metrics_test.go
Normal file
163
providers/azure/metrics_test.go
Normal file
@@ -0,0 +1,163 @@
|
||||
package azure
|
||||
|
||||
import (
|
||||
"path"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
)
|
||||
|
||||
func TestCollectMetrics(t *testing.T) {
|
||||
cases := []metricTestCase{
|
||||
{desc: "no containers"}, // this is just for sort of fuzzing things, make sure there's no panics
|
||||
{desc: "zeroed stats", stats: [][2]float64{{0, 0}}, rx: 0, tx: 0, collected: time.Now()},
|
||||
{desc: "normal", stats: [][2]float64{{400.0, 1000.0}}, rx: 100.0, tx: 5000.0, collected: time.Now()},
|
||||
{desc: "multiple containers", stats: [][2]float64{{100.0, 250.0}, {400.0, 1000.0}}, rx: 100.0, tx: 439833.0, collected: time.Now()},
|
||||
}
|
||||
|
||||
for _, test := range cases {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
pod := fakePod(t, len(test.stats), time.Now())
|
||||
expected := podStatFromTestCase(t, pod, test)
|
||||
|
||||
system, net := fakeACIMetrics(pod, test)
|
||||
actual := collectMetrics(pod, system, net)
|
||||
|
||||
if !reflect.DeepEqual(expected, actual) {
|
||||
t.Fatalf("got unexpected results\nexpected:\n%+v\nactual:\n%+v", expected, actual)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type metricTestCase struct {
|
||||
desc string
|
||||
stats [][2]float64
|
||||
rx, tx float64
|
||||
collected time.Time
|
||||
}
|
||||
|
||||
func fakeACIMetrics(pod *v1.Pod, testCase metricTestCase) (*aci.ContainerGroupMetricsResult, *aci.ContainerGroupMetricsResult) {
|
||||
newMetricValue := func(mt aci.MetricType) aci.MetricValue {
|
||||
return aci.MetricValue{
|
||||
Desc: aci.MetricDescriptor{
|
||||
Value: mt,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
newNetMetric := func(collected time.Time, value float64) aci.MetricTimeSeries {
|
||||
return aci.MetricTimeSeries{
|
||||
Data: []aci.TimeSeriesEntry{
|
||||
{Timestamp: collected, Average: value},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
newSystemMetric := func(c v1.ContainerStatus, collected time.Time, value float64) aci.MetricTimeSeries {
|
||||
return aci.MetricTimeSeries{
|
||||
Data: []aci.TimeSeriesEntry{
|
||||
{Timestamp: collected, Average: value},
|
||||
},
|
||||
MetadataValues: []aci.MetricMetadataValue{
|
||||
{Name: aci.ValueDescriptor{Value: "containerName"}, Value: c.Name},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// create fake aci metrics for the container group and test data
|
||||
cpuV := newMetricValue(aci.MetricTypeCPUUsage)
|
||||
memV := newMetricValue(aci.MetricTypeMemoryUsage)
|
||||
|
||||
for i, c := range pod.Status.ContainerStatuses {
|
||||
cpuV.Timeseries = append(cpuV.Timeseries, newSystemMetric(c, testCase.collected, testCase.stats[i][0]))
|
||||
memV.Timeseries = append(memV.Timeseries, newSystemMetric(c, testCase.collected, testCase.stats[i][1]))
|
||||
}
|
||||
system := &aci.ContainerGroupMetricsResult{
|
||||
Value: []aci.MetricValue{cpuV, memV},
|
||||
}
|
||||
|
||||
rxV := newMetricValue(aci.MetricTyperNetworkBytesRecievedPerSecond)
|
||||
txV := newMetricValue(aci.MetricTyperNetworkBytesTransmittedPerSecond)
|
||||
rxV.Timeseries = append(rxV.Timeseries, newNetMetric(testCase.collected, testCase.rx))
|
||||
txV.Timeseries = append(txV.Timeseries, newNetMetric(testCase.collected, testCase.tx))
|
||||
net := &aci.ContainerGroupMetricsResult{
|
||||
Value: []aci.MetricValue{rxV, txV},
|
||||
}
|
||||
return system, net
|
||||
}
|
||||
|
||||
func fakePod(t *testing.T, size int, created time.Time) *v1.Pod {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: path.Base(t.Name()),
|
||||
Namespace: path.Dir(t.Name()),
|
||||
UID: types.UID(t.Name()),
|
||||
CreationTimestamp: metav1.NewTime(created),
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodRunning,
|
||||
ContainerStatuses: make([]v1.ContainerStatus, 0, size),
|
||||
},
|
||||
}
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
|
||||
Name: "c" + strconv.Itoa(i),
|
||||
})
|
||||
}
|
||||
return pod
|
||||
}
|
||||
|
||||
func podStatFromTestCase(t *testing.T, pod *v1.Pod, test metricTestCase) stats.PodStats {
|
||||
rx := uint64(test.rx)
|
||||
tx := uint64(test.tx)
|
||||
expected := stats.PodStats{
|
||||
StartTime: pod.CreationTimestamp,
|
||||
PodRef: stats.PodReference{
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
UID: string(pod.UID),
|
||||
},
|
||||
Network: &stats.NetworkStats{
|
||||
Time: metav1.NewTime(test.collected),
|
||||
InterfaceStats: stats.InterfaceStats{
|
||||
Name: "eth0",
|
||||
RxBytes: &rx,
|
||||
TxBytes: &tx,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var (
|
||||
nodeCPU uint64
|
||||
nodeMem uint64
|
||||
)
|
||||
for i := range test.stats {
|
||||
cpu := uint64(test.stats[i][0] * 1000000)
|
||||
cpuNanoSeconds := cpu * 60
|
||||
mem := uint64(test.stats[i][1])
|
||||
|
||||
expected.Containers = append(expected.Containers, stats.ContainerStats{
|
||||
StartTime: pod.CreationTimestamp,
|
||||
Name: pod.Status.ContainerStatuses[i].Name,
|
||||
CPU: &stats.CPUStats{Time: metav1.NewTime(test.collected), UsageNanoCores: &cpu, UsageCoreNanoSeconds: &cpuNanoSeconds},
|
||||
Memory: &stats.MemoryStats{Time: metav1.NewTime(test.collected), UsageBytes: &mem, WorkingSetBytes: &mem},
|
||||
})
|
||||
nodeCPU += cpu
|
||||
nodeMem += mem
|
||||
}
|
||||
if len(expected.Containers) > 0 {
|
||||
nanoCPUSeconds := nodeCPU * 60
|
||||
expected.CPU = &stats.CPUStats{UsageNanoCores: &nodeCPU, UsageCoreNanoSeconds: &nanoCPUSeconds, Time: metav1.NewTime(test.collected)}
|
||||
expected.Memory = &stats.MemoryStats{UsageBytes: &nodeMem, WorkingSetBytes: &nodeMem, Time: metav1.NewTime(test.collected)}
|
||||
}
|
||||
return expected
|
||||
}
|
||||
@@ -443,7 +443,7 @@ func (p *HyperProvider) NodeAddresses() []v1.NodeAddress {
|
||||
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
|
||||
// within Kubernetes.
|
||||
func (p *HyperProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
|
||||
return nil
|
||||
return &v1.NodeDaemonEndpoints{}
|
||||
}
|
||||
|
||||
// OperatingSystem returns the operating system for this provider.
|
||||
|
||||
3
vendor/golang.org/x/sync/AUTHORS
generated
vendored
Normal file
3
vendor/golang.org/x/sync/AUTHORS
generated
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
# This source code refers to The Go Authors for copyright purposes.
|
||||
# The master list of authors is in the main Go distribution,
|
||||
# visible at http://tip.golang.org/AUTHORS.
|
||||
3
vendor/golang.org/x/sync/CONTRIBUTORS
generated
vendored
Normal file
3
vendor/golang.org/x/sync/CONTRIBUTORS
generated
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
# This source code was written by the Go contributors.
|
||||
# The master list of contributors is in the main Go distribution,
|
||||
# visible at http://tip.golang.org/CONTRIBUTORS.
|
||||
27
vendor/golang.org/x/sync/LICENSE
generated
vendored
Normal file
27
vendor/golang.org/x/sync/LICENSE
generated
vendored
Normal file
@@ -0,0 +1,27 @@
|
||||
Copyright (c) 2009 The Go Authors. All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are
|
||||
met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above
|
||||
copyright notice, this list of conditions and the following disclaimer
|
||||
in the documentation and/or other materials provided with the
|
||||
distribution.
|
||||
* Neither the name of Google Inc. nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
22
vendor/golang.org/x/sync/PATENTS
generated
vendored
Normal file
22
vendor/golang.org/x/sync/PATENTS
generated
vendored
Normal file
@@ -0,0 +1,22 @@
|
||||
Additional IP Rights Grant (Patents)
|
||||
|
||||
"This implementation" means the copyrightable works distributed by
|
||||
Google as part of the Go project.
|
||||
|
||||
Google hereby grants to You a perpetual, worldwide, non-exclusive,
|
||||
no-charge, royalty-free, irrevocable (except as stated in this section)
|
||||
patent license to make, have made, use, offer to sell, sell, import,
|
||||
transfer and otherwise run, modify and propagate the contents of this
|
||||
implementation of Go, where such license applies only to those patent
|
||||
claims, both currently owned or controlled by Google and acquired in
|
||||
the future, licensable by Google that are necessarily infringed by this
|
||||
implementation of Go. This grant does not include claims that would be
|
||||
infringed only as a consequence of further modification of this
|
||||
implementation. If you or your agent or exclusive licensee institute or
|
||||
order or agree to the institution of patent litigation against any
|
||||
entity (including a cross-claim or counterclaim in a lawsuit) alleging
|
||||
that this implementation of Go or any code incorporated within this
|
||||
implementation of Go constitutes direct or contributory patent
|
||||
infringement, or inducement of patent infringement, then any patent
|
||||
rights granted to you under this License for this implementation of Go
|
||||
shall terminate as of the date such litigation is filed.
|
||||
67
vendor/golang.org/x/sync/errgroup/errgroup.go
generated
vendored
Normal file
67
vendor/golang.org/x/sync/errgroup/errgroup.go
generated
vendored
Normal file
@@ -0,0 +1,67 @@
|
||||
// Copyright 2016 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package errgroup provides synchronization, error propagation, and Context
|
||||
// cancelation for groups of goroutines working on subtasks of a common task.
|
||||
package errgroup
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// A Group is a collection of goroutines working on subtasks that are part of
|
||||
// the same overall task.
|
||||
//
|
||||
// A zero Group is valid and does not cancel on error.
|
||||
type Group struct {
|
||||
cancel func()
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
errOnce sync.Once
|
||||
err error
|
||||
}
|
||||
|
||||
// WithContext returns a new Group and an associated Context derived from ctx.
|
||||
//
|
||||
// The derived Context is canceled the first time a function passed to Go
|
||||
// returns a non-nil error or the first time Wait returns, whichever occurs
|
||||
// first.
|
||||
func WithContext(ctx context.Context) (*Group, context.Context) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &Group{cancel: cancel}, ctx
|
||||
}
|
||||
|
||||
// Wait blocks until all function calls from the Go method have returned, then
|
||||
// returns the first non-nil error (if any) from them.
|
||||
func (g *Group) Wait() error {
|
||||
g.wg.Wait()
|
||||
if g.cancel != nil {
|
||||
g.cancel()
|
||||
}
|
||||
return g.err
|
||||
}
|
||||
|
||||
// Go calls the given function in a new goroutine.
|
||||
//
|
||||
// The first call to return a non-nil error cancels the group; its error will be
|
||||
// returned by Wait.
|
||||
func (g *Group) Go(f func() error) {
|
||||
g.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer g.wg.Done()
|
||||
|
||||
if err := f(); err != nil {
|
||||
g.errOnce.Do(func() {
|
||||
g.err = err
|
||||
if g.cancel != nil {
|
||||
g.cancel()
|
||||
}
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
335
vendor/k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1/types.go
generated
vendored
Normal file
335
vendor/k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1/types.go
generated
vendored
Normal file
@@ -0,0 +1,335 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
|
||||
import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// Summary is a top-level container for holding NodeStats and PodStats.
|
||||
type Summary struct {
|
||||
// Overall node stats.
|
||||
Node NodeStats `json:"node"`
|
||||
// Per-pod stats.
|
||||
Pods []PodStats `json:"pods"`
|
||||
}
|
||||
|
||||
// NodeStats holds node-level unprocessed sample stats.
|
||||
type NodeStats struct {
|
||||
// Reference to the measured Node.
|
||||
NodeName string `json:"nodeName"`
|
||||
// Stats of system daemons tracked as raw containers.
|
||||
// The system containers are named according to the SystemContainer* constants.
|
||||
// +optional
|
||||
// +patchMergeKey=name
|
||||
// +patchStrategy=merge
|
||||
SystemContainers []ContainerStats `json:"systemContainers,omitempty" patchStrategy:"merge" patchMergeKey:"name"`
|
||||
// The time at which data collection for the node-scoped (i.e. aggregate) stats was (re)started.
|
||||
StartTime metav1.Time `json:"startTime"`
|
||||
// Stats pertaining to CPU resources.
|
||||
// +optional
|
||||
CPU *CPUStats `json:"cpu,omitempty"`
|
||||
// Stats pertaining to memory (RAM) resources.
|
||||
// +optional
|
||||
Memory *MemoryStats `json:"memory,omitempty"`
|
||||
// Stats pertaining to network resources.
|
||||
// +optional
|
||||
Network *NetworkStats `json:"network,omitempty"`
|
||||
// Stats pertaining to total usage of filesystem resources on the rootfs used by node k8s components.
|
||||
// NodeFs.Used is the total bytes used on the filesystem.
|
||||
// +optional
|
||||
Fs *FsStats `json:"fs,omitempty"`
|
||||
// Stats about the underlying container runtime.
|
||||
// +optional
|
||||
Runtime *RuntimeStats `json:"runtime,omitempty"`
|
||||
// Stats about the rlimit of system.
|
||||
// +optional
|
||||
Rlimit *RlimitStats `json:"rlimit,omitempty"`
|
||||
}
|
||||
|
||||
// RlimitStats are stats rlimit of OS.
|
||||
type RlimitStats struct {
|
||||
Time metav1.Time `json:"time"`
|
||||
|
||||
// The max PID of OS.
|
||||
MaxPID *int64 `json:"maxpid,omitempty"`
|
||||
// The number of running process in the OS.
|
||||
NumOfRunningProcesses *int64 `json:"curproc,omitempty"`
|
||||
}
|
||||
|
||||
// RuntimeStats are stats pertaining to the underlying container runtime.
|
||||
type RuntimeStats struct {
|
||||
// Stats about the underlying filesystem where container images are stored.
|
||||
// This filesystem could be the same as the primary (root) filesystem.
|
||||
// Usage here refers to the total number of bytes occupied by images on the filesystem.
|
||||
// +optional
|
||||
ImageFs *FsStats `json:"imageFs,omitempty"`
|
||||
}
|
||||
|
||||
const (
|
||||
// SystemContainerKubelet is the container name for the system container tracking Kubelet usage.
|
||||
SystemContainerKubelet = "kubelet"
|
||||
// SystemContainerRuntime is the container name for the system container tracking the runtime (e.g. docker or rkt) usage.
|
||||
SystemContainerRuntime = "runtime"
|
||||
// SystemContainerMisc is the container name for the system container tracking non-kubernetes processes.
|
||||
SystemContainerMisc = "misc"
|
||||
// SystemContainerPods is the container name for the system container tracking user pods.
|
||||
SystemContainerPods = "pods"
|
||||
)
|
||||
|
||||
// PodStats holds pod-level unprocessed sample stats.
|
||||
type PodStats struct {
|
||||
// Reference to the measured Pod.
|
||||
PodRef PodReference `json:"podRef"`
|
||||
// The time at which data collection for the pod-scoped (e.g. network) stats was (re)started.
|
||||
StartTime metav1.Time `json:"startTime"`
|
||||
// Stats of containers in the measured pod.
|
||||
// +patchMergeKey=name
|
||||
// +patchStrategy=merge
|
||||
Containers []ContainerStats `json:"containers" patchStrategy:"merge" patchMergeKey:"name"`
|
||||
// Stats pertaining to CPU resources consumed by pod cgroup (which includes all containers' resource usage and pod overhead).
|
||||
// +optional
|
||||
CPU *CPUStats `json:"cpu,omitempty"`
|
||||
// Stats pertaining to memory (RAM) resources consumed by pod cgroup (which includes all containers' resource usage and pod overhead).
|
||||
// +optional
|
||||
Memory *MemoryStats `json:"memory,omitempty"`
|
||||
// Stats pertaining to network resources.
|
||||
// +optional
|
||||
Network *NetworkStats `json:"network,omitempty"`
|
||||
// Stats pertaining to volume usage of filesystem resources.
|
||||
// VolumeStats.UsedBytes is the number of bytes used by the Volume
|
||||
// +optional
|
||||
// +patchMergeKey=name
|
||||
// +patchStrategy=merge
|
||||
VolumeStats []VolumeStats `json:"volume,omitempty" patchStrategy:"merge" patchMergeKey:"name"`
|
||||
// EphemeralStorage reports the total filesystem usage for the containers and emptyDir-backed volumes in the measured Pod.
|
||||
// +optional
|
||||
EphemeralStorage *FsStats `json:"ephemeral-storage,omitempty"`
|
||||
}
|
||||
|
||||
// ContainerStats holds container-level unprocessed sample stats.
|
||||
type ContainerStats struct {
|
||||
// Reference to the measured container.
|
||||
Name string `json:"name"`
|
||||
// The time at which data collection for this container was (re)started.
|
||||
StartTime metav1.Time `json:"startTime"`
|
||||
// Stats pertaining to CPU resources.
|
||||
// +optional
|
||||
CPU *CPUStats `json:"cpu,omitempty"`
|
||||
// Stats pertaining to memory (RAM) resources.
|
||||
// +optional
|
||||
Memory *MemoryStats `json:"memory,omitempty"`
|
||||
// Metrics for Accelerators. Each Accelerator corresponds to one element in the array.
|
||||
Accelerators []AcceleratorStats `json:"accelerators,omitempty"`
|
||||
// Stats pertaining to container rootfs usage of filesystem resources.
|
||||
// Rootfs.UsedBytes is the number of bytes used for the container write layer.
|
||||
// +optional
|
||||
Rootfs *FsStats `json:"rootfs,omitempty"`
|
||||
// Stats pertaining to container logs usage of filesystem resources.
|
||||
// Logs.UsedBytes is the number of bytes used for the container logs.
|
||||
// +optional
|
||||
Logs *FsStats `json:"logs,omitempty"`
|
||||
// User defined metrics that are exposed by containers in the pod. Typically, we expect only one container in the pod to be exposing user defined metrics. In the event of multiple containers exposing metrics, they will be combined here.
|
||||
// +patchMergeKey=name
|
||||
// +patchStrategy=merge
|
||||
UserDefinedMetrics []UserDefinedMetric `json:"userDefinedMetrics,omitmepty" patchStrategy:"merge" patchMergeKey:"name"`
|
||||
}
|
||||
|
||||
// PodReference contains enough information to locate the referenced pod.
|
||||
type PodReference struct {
|
||||
Name string `json:"name"`
|
||||
Namespace string `json:"namespace"`
|
||||
UID string `json:"uid"`
|
||||
}
|
||||
|
||||
// InterfaceStats contains resource value data about interface.
|
||||
type InterfaceStats struct {
|
||||
// The name of the interface
|
||||
Name string `json:"name"`
|
||||
// Cumulative count of bytes received.
|
||||
// +optional
|
||||
RxBytes *uint64 `json:"rxBytes,omitempty"`
|
||||
// Cumulative count of receive errors encountered.
|
||||
// +optional
|
||||
RxErrors *uint64 `json:"rxErrors,omitempty"`
|
||||
// Cumulative count of bytes transmitted.
|
||||
// +optional
|
||||
TxBytes *uint64 `json:"txBytes,omitempty"`
|
||||
// Cumulative count of transmit errors encountered.
|
||||
// +optional
|
||||
TxErrors *uint64 `json:"txErrors,omitempty"`
|
||||
}
|
||||
|
||||
// NetworkStats contains data about network resources.
|
||||
type NetworkStats struct {
|
||||
// The time at which these stats were updated.
|
||||
Time metav1.Time `json:"time"`
|
||||
|
||||
// Stats for the default interface, if found
|
||||
InterfaceStats `json:",inline"`
|
||||
|
||||
Interfaces []InterfaceStats `json:"interfaces,omitempty"`
|
||||
}
|
||||
|
||||
// CPUStats contains data about CPU usage.
|
||||
type CPUStats struct {
|
||||
// The time at which these stats were updated.
|
||||
Time metav1.Time `json:"time"`
|
||||
// Total CPU usage (sum of all cores) averaged over the sample window.
|
||||
// The "core" unit can be interpreted as CPU core-nanoseconds per second.
|
||||
// +optional
|
||||
UsageNanoCores *uint64 `json:"usageNanoCores,omitempty"`
|
||||
// Cumulative CPU usage (sum of all cores) since object creation.
|
||||
// +optional
|
||||
UsageCoreNanoSeconds *uint64 `json:"usageCoreNanoSeconds,omitempty"`
|
||||
}
|
||||
|
||||
// MemoryStats contains data about memory usage.
|
||||
type MemoryStats struct {
|
||||
// The time at which these stats were updated.
|
||||
Time metav1.Time `json:"time"`
|
||||
// Available memory for use. This is defined as the memory limit - workingSetBytes.
|
||||
// If memory limit is undefined, the available bytes is omitted.
|
||||
// +optional
|
||||
AvailableBytes *uint64 `json:"availableBytes,omitempty"`
|
||||
// Total memory in use. This includes all memory regardless of when it was accessed.
|
||||
// +optional
|
||||
UsageBytes *uint64 `json:"usageBytes,omitempty"`
|
||||
// The amount of working set memory. This includes recently accessed memory,
|
||||
// dirty memory, and kernel memory. WorkingSetBytes is <= UsageBytes
|
||||
// +optional
|
||||
WorkingSetBytes *uint64 `json:"workingSetBytes,omitempty"`
|
||||
// The amount of anonymous and swap cache memory (includes transparent
|
||||
// hugepages).
|
||||
// +optional
|
||||
RSSBytes *uint64 `json:"rssBytes,omitempty"`
|
||||
// Cumulative number of minor page faults.
|
||||
// +optional
|
||||
PageFaults *uint64 `json:"pageFaults,omitempty"`
|
||||
// Cumulative number of major page faults.
|
||||
// +optional
|
||||
MajorPageFaults *uint64 `json:"majorPageFaults,omitempty"`
|
||||
}
|
||||
|
||||
// AcceleratorStats contains stats for accelerators attached to the container.
|
||||
type AcceleratorStats struct {
|
||||
// Make of the accelerator (nvidia, amd, google etc.)
|
||||
Make string `json:"make"`
|
||||
|
||||
// Model of the accelerator (tesla-p100, tesla-k80 etc.)
|
||||
Model string `json:"model"`
|
||||
|
||||
// ID of the accelerator.
|
||||
ID string `json:"id"`
|
||||
|
||||
// Total accelerator memory.
|
||||
// unit: bytes
|
||||
MemoryTotal uint64 `json:"memoryTotal"`
|
||||
|
||||
// Total accelerator memory allocated.
|
||||
// unit: bytes
|
||||
MemoryUsed uint64 `json:"memoryUsed"`
|
||||
|
||||
// Percent of time over the past sample period (10s) during which
|
||||
// the accelerator was actively processing.
|
||||
DutyCycle uint64 `json:"dutyCycle"`
|
||||
}
|
||||
|
||||
// VolumeStats contains data about Volume filesystem usage.
|
||||
type VolumeStats struct {
|
||||
// Embedded FsStats
|
||||
FsStats
|
||||
// Name is the name given to the Volume
|
||||
// +optional
|
||||
Name string `json:"name,omitempty"`
|
||||
// Reference to the PVC, if one exists
|
||||
// +optional
|
||||
PVCRef *PVCReference `json:"pvcRef,omitempty"`
|
||||
}
|
||||
|
||||
// PVCReference contains enough information to describe the referenced PVC.
|
||||
type PVCReference struct {
|
||||
Name string `json:"name"`
|
||||
Namespace string `json:"namespace"`
|
||||
}
|
||||
|
||||
// FsStats contains data about filesystem usage.
|
||||
type FsStats struct {
|
||||
// The time at which these stats were updated.
|
||||
Time metav1.Time `json:"time"`
|
||||
// AvailableBytes represents the storage space available (bytes) for the filesystem.
|
||||
// +optional
|
||||
AvailableBytes *uint64 `json:"availableBytes,omitempty"`
|
||||
// CapacityBytes represents the total capacity (bytes) of the filesystems underlying storage.
|
||||
// +optional
|
||||
CapacityBytes *uint64 `json:"capacityBytes,omitempty"`
|
||||
// UsedBytes represents the bytes used for a specific task on the filesystem.
|
||||
// This may differ from the total bytes used on the filesystem and may not equal CapacityBytes - AvailableBytes.
|
||||
// e.g. For ContainerStats.Rootfs this is the bytes used by the container rootfs on the filesystem.
|
||||
// +optional
|
||||
UsedBytes *uint64 `json:"usedBytes,omitempty"`
|
||||
// InodesFree represents the free inodes in the filesystem.
|
||||
// +optional
|
||||
InodesFree *uint64 `json:"inodesFree,omitempty"`
|
||||
// Inodes represents the total inodes in the filesystem.
|
||||
// +optional
|
||||
Inodes *uint64 `json:"inodes,omitempty"`
|
||||
// InodesUsed represents the inodes used by the filesystem
|
||||
// This may not equal Inodes - InodesFree because this filesystem may share inodes with other "filesystems"
|
||||
// e.g. For ContainerStats.Rootfs, this is the inodes used only by that container, and does not count inodes used by other containers.
|
||||
InodesUsed *uint64 `json:"inodesUsed,omitempty"`
|
||||
}
|
||||
|
||||
// UserDefinedMetricType defines how the metric should be interpreted by the user.
|
||||
type UserDefinedMetricType string
|
||||
|
||||
const (
|
||||
// MetricGauge is an instantaneous value. May increase or decrease.
|
||||
MetricGauge UserDefinedMetricType = "gauge"
|
||||
|
||||
// MetricCumulative is a counter-like value that is only expected to increase.
|
||||
MetricCumulative UserDefinedMetricType = "cumulative"
|
||||
|
||||
// MetricDelta is a rate over a time period.
|
||||
MetricDelta UserDefinedMetricType = "delta"
|
||||
)
|
||||
|
||||
// UserDefinedMetricDescriptor contains metadata that describes a user defined metric.
|
||||
type UserDefinedMetricDescriptor struct {
|
||||
// The name of the metric.
|
||||
Name string `json:"name"`
|
||||
|
||||
// Type of the metric.
|
||||
Type UserDefinedMetricType `json:"type"`
|
||||
|
||||
// Display Units for the stats.
|
||||
Units string `json:"units"`
|
||||
|
||||
// Metadata labels associated with this metric.
|
||||
// +optional
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
}
|
||||
|
||||
// UserDefinedMetric represents a metric defined and generate by users.
|
||||
type UserDefinedMetric struct {
|
||||
UserDefinedMetricDescriptor `json:",inline"`
|
||||
// The time at which these stats were updated.
|
||||
Time metav1.Time `json:"time"`
|
||||
// Value of the metric. Float64s have 53 bit precision.
|
||||
// We do not foresee any metrics exceeding that value.
|
||||
Value float64 `json:"value"`
|
||||
}
|
||||
@@ -1,75 +1,152 @@
|
||||
package vkubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
|
||||
)
|
||||
|
||||
var p Provider
|
||||
var r mux.Router
|
||||
func loggingContext(r *http.Request) context.Context {
|
||||
ctx := r.Context()
|
||||
logger := log.G(ctx).WithFields(logrus.Fields{
|
||||
"uri": r.RequestURI,
|
||||
"vars": mux.Vars(r),
|
||||
})
|
||||
return log.WithLogger(ctx, logger)
|
||||
}
|
||||
|
||||
// NotFound provides a handler for cases where the requested endpoint doesn't exist
|
||||
func NotFound(w http.ResponseWriter, r *http.Request) {
|
||||
log.Printf("404 request not found. \n %v", mux.Vars(r))
|
||||
logger := log.G(loggingContext(r))
|
||||
log.Trace(logger, "404 request not found")
|
||||
http.Error(w, "404 request not found", http.StatusNotFound)
|
||||
}
|
||||
|
||||
func ApiserverStart(provider Provider) {
|
||||
p = provider
|
||||
// KubeletServer implements HTTP endpoints for serving kubelet API's
|
||||
type KubeletServer struct {
|
||||
p Provider
|
||||
}
|
||||
|
||||
// KubeletServertStart starts the virtual kubelet HTTP server.
|
||||
func KubeletServerStart(p Provider) {
|
||||
certFilePath := os.Getenv("APISERVER_CERT_LOCATION")
|
||||
keyFilePath := os.Getenv("APISERVER_KEY_LOCATION")
|
||||
port := os.Getenv("KUBELET_PORT")
|
||||
addr := fmt.Sprintf(":%s", port)
|
||||
|
||||
r := mux.NewRouter()
|
||||
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", ApiServerHandler).Methods("GET")
|
||||
r.HandleFunc("/exec/{namespace}/{pod}/{container}", ApiServerHandlerExec).Methods("POST")
|
||||
s := &KubeletServer{p: p}
|
||||
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", s.ApiServerHandler).Methods("GET")
|
||||
r.HandleFunc("/exec/{namespace}/{pod}/{container}", s.ApiServerHandlerExec).Methods("POST")
|
||||
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
||||
|
||||
if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil {
|
||||
log.Println(err)
|
||||
log.G(context.TODO()).WithError(err).Error("error setting up http server")
|
||||
}
|
||||
}
|
||||
|
||||
func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
|
||||
// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API.
|
||||
// TLS is never enabled on this endpoint.
|
||||
func MetricsServerStart(p Provider, addr string) {
|
||||
r := mux.NewRouter()
|
||||
s := &MetricsServer{p: p}
|
||||
r.HandleFunc("/stats/summary", s.MetricsSummaryHandler).Methods("GET")
|
||||
r.HandleFunc("/stats/summary/", s.MetricsSummaryHandler).Methods("GET")
|
||||
r.NotFoundHandler = http.HandlerFunc(NotFound)
|
||||
if err := http.ListenAndServe(addr, r); err != nil {
|
||||
log.G(context.TODO()).WithError(err).Error("Error starting http server")
|
||||
}
|
||||
}
|
||||
|
||||
// MetricsServer provides an HTTP endpopint for accessing pod metrics
|
||||
type MetricsServer struct {
|
||||
p Provider
|
||||
}
|
||||
|
||||
// MetricsSummaryHandler is an HTTP handler for implementing the kubelet summary stats endpoint
|
||||
func (s *MetricsServer) MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) {
|
||||
ctx := loggingContext(req)
|
||||
|
||||
mp, ok := s.p.(MetricsProvider)
|
||||
if !ok {
|
||||
log.G(ctx).Debug("stats not implemented for provider")
|
||||
http.Error(w, "not implememnted", http.StatusNotImplemented)
|
||||
return
|
||||
}
|
||||
|
||||
stats, err := mp.GetStatsSummary(req.Context())
|
||||
if err != nil {
|
||||
if errors.Cause(err) == context.Canceled {
|
||||
return
|
||||
}
|
||||
log.G(ctx).Error("Error getting stats from provider:", err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := json.Marshal(stats)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("Could not marshal stats")
|
||||
http.Error(w, "could not marshal stats: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := w.Write(b); err != nil {
|
||||
log.G(ctx).WithError(err).Debug("Could not write to client")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *KubeletServer) ApiServerHandler(w http.ResponseWriter, req *http.Request) {
|
||||
vars := mux.Vars(req)
|
||||
if len(vars) == 3 {
|
||||
namespace := vars["namespace"]
|
||||
pod := vars["pod"]
|
||||
container := vars["container"]
|
||||
tail := 10
|
||||
q := req.URL.Query()
|
||||
queryTail := q.Get("tailLines")
|
||||
if queryTail != "" {
|
||||
t, err := strconv.Atoi(queryTail)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
io.WriteString(w, err.Error())
|
||||
} else {
|
||||
tail = t
|
||||
}
|
||||
}
|
||||
podsLogs, err := p.GetContainerLogs(namespace, pod, container, tail)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
io.WriteString(w, err.Error())
|
||||
} else {
|
||||
io.WriteString(w, podsLogs)
|
||||
}
|
||||
} else {
|
||||
if len(vars) != 3 {
|
||||
NotFound(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
ctx := loggingContext(req)
|
||||
|
||||
namespace := vars["namespace"]
|
||||
pod := vars["pod"]
|
||||
container := vars["container"]
|
||||
tail := 10
|
||||
q := req.URL.Query()
|
||||
|
||||
if queryTail := q.Get("tailLines"); queryTail != "" {
|
||||
t, err := strconv.Atoi(queryTail)
|
||||
if err != nil {
|
||||
logger := log.G(context.TODO()).WithError(err)
|
||||
log.Trace(logger, "could not parse tailLines")
|
||||
http.Error(w, fmt.Sprintf("could not parse \"tailLines\": %v", err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
tail = t
|
||||
}
|
||||
|
||||
podsLogs, err := s.p.GetContainerLogs(namespace, pod, container, tail)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("error getting container logs")
|
||||
http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := io.WriteString(w, podsLogs); err != nil {
|
||||
log.G(ctx).WithError(err).Warn("error writing response to client")
|
||||
}
|
||||
}
|
||||
|
||||
func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
|
||||
func (s *KubeletServer) ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
|
||||
vars := mux.Vars(req)
|
||||
|
||||
namespace := vars["namespace"]
|
||||
@@ -99,5 +176,5 @@ func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
|
||||
idleTimeout := time.Second * 30
|
||||
streamCreationTimeout := time.Second * 30
|
||||
|
||||
remotecommand.ServeExec(w, req, p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
|
||||
remotecommand.ServeExec(w, req, s.p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
|
||||
}
|
||||
|
||||
@@ -3,8 +3,7 @@
|
||||
package vkubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/aws"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/azure"
|
||||
@@ -56,8 +55,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager
|
||||
case "vic":
|
||||
return vic.NewVicProvider(providerConfig, rm, nodeName, operatingSystem)
|
||||
default:
|
||||
fmt.Printf("Provider '%s' is not supported\n", provider)
|
||||
return nil, errors.New("provider not supported")
|
||||
}
|
||||
var p Provider
|
||||
return p, nil
|
||||
}
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
package vkubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/aws"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/azure"
|
||||
@@ -43,8 +42,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager
|
||||
case "sfmesh":
|
||||
return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
default:
|
||||
fmt.Printf("Provider '%s' is not supported\n", provider)
|
||||
return nil, errors.New("provider is not supported")
|
||||
}
|
||||
var p Provider
|
||||
return p, nil
|
||||
}
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
package vkubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/aws"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/azure"
|
||||
@@ -43,8 +42,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager
|
||||
case "sfmesh":
|
||||
return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
default:
|
||||
fmt.Printf("Provider '%s' is not supported\n", provider)
|
||||
return nil, errors.New("provider not supported")
|
||||
}
|
||||
var p Provider
|
||||
return p, nil
|
||||
}
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package vkubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/tools/remotecommand"
|
||||
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
)
|
||||
|
||||
// Provider contains the methods required to implement a virtual-kubelet provider.
|
||||
@@ -54,3 +56,8 @@ type Provider interface {
|
||||
// OperatingSystem returns the operating system the provider is for.
|
||||
OperatingSystem() string
|
||||
}
|
||||
|
||||
// MetricsProvider is an optional interface that providers can implement to expose pod stats
|
||||
type MetricsProvider interface {
|
||||
GetStatsSummary(context.Context) (*stats.Summary, error)
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package vkubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
pkgerrors "github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
@@ -46,7 +48,7 @@ func getEnv(key, defaultValue string) string {
|
||||
}
|
||||
|
||||
// New creates a new virtual-kubelet server.
|
||||
func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool) (*Server, error) {
|
||||
func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool, metricsAddr string) (*Server, error) {
|
||||
var config *rest.Config
|
||||
|
||||
// Check if the kubeConfig file exists.
|
||||
@@ -69,7 +71,10 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rm := manager.NewResourceManager(clientset)
|
||||
rm, err := manager.NewResourceManager(clientset)
|
||||
if err != nil {
|
||||
return nil, pkgerrors.Wrap(err, "error creating resource manager")
|
||||
}
|
||||
|
||||
daemonEndpointPortEnv := os.Getenv("KUBELET_PORT")
|
||||
if daemonEndpointPortEnv == "" {
|
||||
@@ -101,7 +106,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
|
||||
case "PreferNoSchedule":
|
||||
vkTaintEffect = corev1.TaintEffectPreferNoSchedule
|
||||
default:
|
||||
fmt.Printf("Taint effect '%s' is not supported\n", vkTaintEffectEnv)
|
||||
return nil, pkgerrors.Errorf("taint effect %q is not supported", vkTaintEffectEnv)
|
||||
}
|
||||
|
||||
taint := corev1.Taint{
|
||||
@@ -110,7 +115,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
|
||||
Effect: vkTaintEffect,
|
||||
}
|
||||
|
||||
p, err = lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
p, err := lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -125,17 +130,27 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
|
||||
provider: p,
|
||||
}
|
||||
|
||||
if err = s.registerNode(); err != nil {
|
||||
ctx := context.TODO()
|
||||
ctx = log.WithLogger(ctx, log.G(ctx))
|
||||
|
||||
if err = s.registerNode(ctx); err != nil {
|
||||
return s, err
|
||||
}
|
||||
|
||||
go ApiserverStart(p)
|
||||
go KubeletServerStart(p)
|
||||
|
||||
if metricsAddr != "" {
|
||||
go MetricsServerStart(p, metricsAddr)
|
||||
} else {
|
||||
log.G(ctx).Info("Skipping metrics server startup since no address was provided")
|
||||
}
|
||||
|
||||
tick := time.Tick(5 * time.Second)
|
||||
|
||||
go func() {
|
||||
for range tick {
|
||||
s.updateNode()
|
||||
s.updatePodStatuses()
|
||||
s.updateNode(ctx)
|
||||
s.updatePodStatuses(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -143,7 +158,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
|
||||
}
|
||||
|
||||
// registerNode registers this virtual node with the Kubernetes API.
|
||||
func (s *Server) registerNode() error {
|
||||
func (s *Server) registerNode(ctx context.Context) error {
|
||||
taints := make([]corev1.Taint, 0)
|
||||
|
||||
if !s.disableTaint {
|
||||
@@ -182,14 +197,14 @@ func (s *Server) registerNode() error {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("Node '%s' with OS type '%s' registered\n", node.Name, node.Status.NodeInfo.OperatingSystem)
|
||||
log.G(ctx).Info("Registered node")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster.
|
||||
// Run will block until Stop is called or a SIGINT or SIGTERM signal is received.
|
||||
func (s *Server) Run() error {
|
||||
func (s *Server) Run(ctx context.Context) error {
|
||||
shouldStop := false
|
||||
|
||||
sig := make(chan os.Signal, 1)
|
||||
@@ -207,15 +222,15 @@ func (s *Server) Run() error {
|
||||
|
||||
pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to list pods", err)
|
||||
return pkgerrors.Wrap(err, "error getting pod list")
|
||||
}
|
||||
s.resourceManager.SetPods(pods)
|
||||
s.reconcile()
|
||||
s.reconcile(ctx)
|
||||
|
||||
opts.ResourceVersion = pods.ResourceVersion
|
||||
s.podWatcher, err = s.k8sClient.CoreV1().Pods(s.namespace).Watch(opts)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to watch pods", err)
|
||||
return pkgerrors.Wrap(err, "failed to watch pods")
|
||||
}
|
||||
|
||||
loop:
|
||||
@@ -224,15 +239,15 @@ func (s *Server) Run() error {
|
||||
case ev, ok := <-s.podWatcher.ResultChan():
|
||||
if !ok {
|
||||
if shouldStop {
|
||||
log.Println("Pod watcher is stopped.")
|
||||
log.G(ctx).Info("Pod watcher is stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Println("Pod watcher connection is closed unexpectedly.")
|
||||
log.G(ctx).Error("Pod watcher connection is closed unexpectedly")
|
||||
break loop
|
||||
}
|
||||
|
||||
log.Println("Pod watcher event is received:", ev.Type)
|
||||
log.G(ctx).WithField("type", ev.Type).Debug("Pod watcher event is received")
|
||||
reconcile := false
|
||||
switch ev.Type {
|
||||
case watch.Added:
|
||||
@@ -244,7 +259,7 @@ func (s *Server) Run() error {
|
||||
}
|
||||
|
||||
if reconcile {
|
||||
s.reconcile()
|
||||
s.reconcile(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -262,19 +277,19 @@ func (s *Server) Stop() {
|
||||
}
|
||||
|
||||
// updateNode updates the node status within Kubernetes with updated NodeConditions.
|
||||
func (s *Server) updateNode() {
|
||||
func (s *Server) updateNode(ctx context.Context) {
|
||||
opts := metav1.GetOptions{}
|
||||
n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts)
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
log.Println("Failed to retrieve node:", err)
|
||||
log.G(ctx).WithError(err).Error("Failed to retrieve node")
|
||||
return
|
||||
}
|
||||
|
||||
if errors.IsNotFound(err) {
|
||||
if err = s.registerNode(); err != nil {
|
||||
log.Println("Failed to register node:", err)
|
||||
return
|
||||
if err = s.registerNode(ctx); err != nil {
|
||||
log.G(ctx).WithError(err).Error("Failed to register node")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
n.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
|
||||
@@ -288,27 +303,31 @@ func (s *Server) updateNode() {
|
||||
|
||||
n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n)
|
||||
if err != nil {
|
||||
log.Println("Failed to update node:", err)
|
||||
log.G(ctx).WithError(err).Error("Failed to update node")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// reconcile is the main reconciliation loop that compares differences between Kubernetes and
|
||||
// the active provider and reconciles the differences.
|
||||
func (s *Server) reconcile() {
|
||||
log.Println("Start reconcile.")
|
||||
func (s *Server) reconcile(ctx context.Context) {
|
||||
logger := log.G(ctx)
|
||||
logger.Debug("Start reconcile")
|
||||
defer logger.Debug("End reconcile")
|
||||
|
||||
providerPods, err := s.provider.GetPods()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
logger.WithError(err).Error("Error getting pod list from provider")
|
||||
return
|
||||
}
|
||||
|
||||
for _, pod := range providerPods {
|
||||
// Delete pods that don't exist in Kubernetes
|
||||
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil {
|
||||
log.Printf("Deleting pod '%s'\n", pod.Name)
|
||||
if err := s.deletePod(pod); err != nil {
|
||||
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
|
||||
logger := logger.WithField("pod", pod.Name)
|
||||
logger.Debug("Deleting pod '%s'\n", pod.Name)
|
||||
if err := s.deletePod(ctx, pod); err != nil {
|
||||
logger.WithError(err).Error("Error deleting pod")
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -317,6 +336,7 @@ func (s *Server) reconcile() {
|
||||
// Create any pods for k8s pods that don't exist in the provider
|
||||
pods := s.resourceManager.GetPods()
|
||||
for _, pod := range pods {
|
||||
logger := logger.WithField("pod", pod.Name)
|
||||
var providerPod *corev1.Pod
|
||||
for _, p := range providerPods {
|
||||
if p.Namespace == pod.Namespace && p.Name == pod.Name {
|
||||
@@ -326,30 +346,33 @@ func (s *Server) reconcile() {
|
||||
}
|
||||
|
||||
if pod.DeletionTimestamp == nil && pod.Status.Phase != corev1.PodFailed && providerPod == nil {
|
||||
log.Printf("Creating pod '%s'\n", pod.Name)
|
||||
if err := s.createPod(pod); err != nil {
|
||||
log.Printf("Error creating pod '%s': %s\n", pod.Name, err)
|
||||
logger.Debug("Creating pod")
|
||||
if err := s.createPod(ctx, pod); err != nil {
|
||||
logger.WithError(err).Error("Error creating pod")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Delete pod if DeletionTimestamp is set
|
||||
if pod.DeletionTimestamp != nil {
|
||||
log.Printf("Pod '%s' is pending deletion.\n", pod.Name)
|
||||
log.Trace(logger, "Pod pending deletion")
|
||||
var err error
|
||||
if err = s.deletePod(pod); err != nil {
|
||||
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
|
||||
if err = s.deletePod(ctx, pod); err != nil {
|
||||
logger.WithError(err).Error("Error deleting pod")
|
||||
continue
|
||||
}
|
||||
log.Trace(logger, "Pod deletion complete")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) createPod(pod *corev1.Pod) error {
|
||||
func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
|
||||
if err := s.populateSecretsAndConfigMapsInEnv(pod); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger := log.G(ctx).WithField("pod", pod.Name)
|
||||
|
||||
if origErr := s.provider.CreatePod(pod); origErr != nil {
|
||||
pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
|
||||
pod.Status.Phase = corev1.PodFailed
|
||||
@@ -358,29 +381,29 @@ func (s *Server) createPod(pod *corev1.Pod) error {
|
||||
|
||||
_, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
|
||||
if err != nil {
|
||||
log.Println("Failed to update pod status:", err)
|
||||
return origErr
|
||||
logger.WithError(err).Warn("Failed to update pod status")
|
||||
}
|
||||
|
||||
return origErr
|
||||
}
|
||||
|
||||
log.Printf("Pod '%s' created.\n", pod.Name)
|
||||
logger.Info("Pod created")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) deletePod(pod *corev1.Pod) error {
|
||||
func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
var delErr error
|
||||
if delErr = s.provider.DeletePod(pod); delErr != nil && errors.IsNotFound(delErr) {
|
||||
return delErr
|
||||
}
|
||||
|
||||
logger := log.G(ctx).WithField("pod", pod.Name)
|
||||
if !errors.IsNotFound(delErr) {
|
||||
var grace int64
|
||||
if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
|
||||
if errors.IsNotFound(err) {
|
||||
log.Printf("Pod '%s' doesn't exist.\n", pod.Name)
|
||||
logger.Error("Pod doesn't exist")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -388,15 +411,14 @@ func (s *Server) deletePod(pod *corev1.Pod) error {
|
||||
}
|
||||
|
||||
s.resourceManager.DeletePod(pod)
|
||||
|
||||
log.Printf("Pod '%s' deleted.\n", pod.Name)
|
||||
logger.Info("Pod deleted")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updatePodStatuses syncs the providers pod status with the kubernetes pod status.
|
||||
func (s *Server) updatePodStatuses() {
|
||||
func (s *Server) updatePodStatuses(ctx context.Context) {
|
||||
// Update all the pods with the provider status.
|
||||
pods := s.resourceManager.GetPods()
|
||||
for _, pod := range pods {
|
||||
@@ -406,7 +428,7 @@ func (s *Server) updatePodStatuses() {
|
||||
|
||||
status, err := s.provider.GetPodStatus(pod.Namespace, pod.Name)
|
||||
if err != nil {
|
||||
log.Printf("Error retrieving pod '%s' in namespace '%s' status from provider: %s\n", pod.Name, pod.Namespace, err)
|
||||
log.G(ctx).WithField("pod", pod.Name).Error("Error retrieving pod status")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user