Compare commits

...

10 Commits

Author SHA1 Message Date
Robbie Zhang
55bc0c3f75 Add ContainerID into the ContainerStatus (#337) 2018-08-27 15:38:14 -07:00
Robbie Zhang
b019ec5549 Bug Fixes (#329) 2018-08-27 11:53:59 -07:00
Ria Bhatia
7442189e77 adding the taint format (#334)
adding the new taint format
2018-08-23 14:09:06 -07:00
Brian Goff
bb8ba567e3 Merge pull request #325 from cpuguy83/refactor_api_init
Don't use globals for API server
2018-08-20 13:44:47 -07:00
Brian Goff
8de6693460 Don't use globals for API server
Refactors how HTTP servers are started and binds them to objects that
can store the provider rather than relying on a global.
2018-08-20 11:52:54 -07:00
Brian Goff
99e0cfad5c Merge pull request #306 from cpuguy83/metrics
Add support fort kubelet stats summary
2018-08-20 11:37:50 -07:00
Brian Goff
e8abca0ac9 Add supports for stats in ACI provider
This adds a new, optional, interface for providers that want to provide
stats.
2018-08-17 17:03:25 -07:00
Brian Goff
6284757aa1 Add errgroup dep which will be used for ACI stats 2018-08-17 16:55:18 -07:00
Brian Goff
17cc3033ba Update kubelet vendor to pull in stats API 2018-08-17 16:55:18 -07:00
Brian Goff
1e774a32b3 Use standard logging package (#323) 2018-08-17 16:50:24 -07:00
29 changed files with 1576 additions and 219 deletions

14
Gopkg.lock generated
View File

@@ -1026,6 +1026,14 @@
pruneopts = "NUT"
revision = "f73e4c9ed3b7ebdd5f699a16a880c2b1994e50dd"
[[projects]]
branch = "master"
digest = "1:39ebcc2b11457b703ae9ee2e8cca0f68df21969c6102cb3b705f76cca0ea0239"
name = "golang.org/x/sync"
packages = ["errgroup"]
pruneopts = "NUT"
revision = "1d60e4601c6fd243af51cc01ddf169918a5407ca"
[[projects]]
branch = "master"
digest = "1:ee40f9506736b3b55162b22d82911b797301904848b1f1ae5db49f561ad48a79"
@@ -1317,11 +1325,12 @@
version = "v7.0.0"
[[projects]]
digest = "1:3f23907ae824a94920ce9bc22310631cb2be91787bd7de16de84bbb9af92cad4"
digest = "1:58c71c78f19e62f5f3ba1f37d21670481de1f46c4a2d4d4ee31fe70b03b57ab3"
name = "k8s.io/kubernetes"
packages = [
"pkg/apis/core",
"pkg/kubelet/apis/cri/runtime/v1alpha2",
"pkg/kubelet/apis/stats/v1alpha1",
"pkg/kubelet/server/remotecommand",
]
pruneopts = "NUT"
@@ -1381,6 +1390,7 @@
"github.com/kr/pretty",
"github.com/lawrencegripper/pod2docker",
"github.com/mitchellh/go-homedir",
"github.com/pkg/errors",
"github.com/spf13/cobra",
"github.com/spf13/viper",
"github.com/stretchr/testify/assert",
@@ -1405,6 +1415,7 @@
"github.com/vmware/vic/pkg/trace",
"github.com/vmware/vic/pkg/vsphere/sys",
"golang.org/x/net/context",
"golang.org/x/sync/errgroup",
"google.golang.org/grpc",
"gopkg.in/yaml.v2",
"k8s.io/api/core/v1",
@@ -1422,6 +1433,7 @@
"k8s.io/client-go/tools/clientcmd",
"k8s.io/client-go/tools/remotecommand",
"k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2",
"k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1",
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand",
]
solver-name = "gps-cdcl"

View File

@@ -85,6 +85,9 @@ spec:
"--provider", "{{ required "provider is required" .Values.provider }}",
"--namespace", "{{ .Values.monitoredNamespace }}",
"--nodename", "{{ required "nodeName is required" .Values.nodeName }}",
{{- if .Values.logLevel }}
"--log-level", "{{.Values.logLevel}}",
{{- end }}
"--os", "{{ .Values.nodeOsType }}"
]
volumes:

View File

@@ -10,6 +10,7 @@ nodeOsType: "Linux"
monitoredNamespace: ""
apiserverCert:
apiserverKey:
logLevel:
taint:
enabled: true

View File

@@ -15,15 +15,16 @@
package cmd
import (
"fmt"
"log"
"context"
"os"
"path/filepath"
"strings"
"github.com/Sirupsen/logrus"
homedir "github.com/mitchellh/go-homedir"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/providers"
vkubelet "github.com/virtual-kubelet/virtual-kubelet/vkubelet"
corev1 "k8s.io/api/core/v1"
@@ -38,6 +39,8 @@ var provider string
var providerConfig string
var taintKey string
var disableTaint bool
var logLevel string
var metricsAddr string
// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
@@ -47,12 +50,13 @@ var RootCmd = &cobra.Command{
backend implementation allowing users to create kubernetes nodes without running the kubelet.
This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Println(kubeConfig)
f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint)
f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint, metricsAddr)
if err != nil {
log.Fatal(err)
log.L.WithError(err).Fatal("Error initializing vritual kubelet")
}
if err := f.Run(context.Background()); err != nil {
log.L.Fatal(err)
}
f.Run()
},
}
@@ -60,8 +64,7 @@ This allows users to schedule kubernetes workloads on nodes that aren't running
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
if err := RootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
log.GetLogger(context.TODO()).WithError(err).Fatal("Error executing root command")
}
}
@@ -85,8 +88,11 @@ func init() {
RootCmd.PersistentFlags().StringVar(&provider, "provider", "", "cloud provider")
RootCmd.PersistentFlags().BoolVar(&disableTaint, "disable-taint", false, "disable the virtual-kubelet node taint")
RootCmd.PersistentFlags().StringVar(&providerConfig, "provider-config", "", "cloud provider configuration file")
RootCmd.PersistentFlags().StringVar(&metricsAddr, "metrics-addr", ":10255", "address to listen for metrics/stats requests")
RootCmd.PersistentFlags().StringVar(&taintKey, "taint", "", "Set node taint key")
RootCmd.PersistentFlags().MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable")
RootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", `set the log level, e.g. "trace", debug", "info", "warn", "error"`)
// Cobra also supports local flags, which will only run
// when this action is called directly.
@@ -96,15 +102,13 @@ func init() {
// initConfig reads in config file and ENV variables if set.
func initConfig() {
if provider == "" {
fmt.Println("You must supply a cloud provider option: use --provider")
os.Exit(1)
log.G(context.TODO()).Fatal("You must supply a cloud provider option: use --provider")
}
// Find home directory.
home, err := homedir.Dir()
if err != nil {
fmt.Println(err)
os.Exit(1)
log.G(context.TODO()).WithError(err).Fatal("Error reading homedir")
}
if kubeletConfig != "" {
@@ -120,7 +124,7 @@ func initConfig() {
// If a config file is found, read it in.
if err := viper.ReadInConfig(); err == nil {
fmt.Println("Using config file:", viper.ConfigFileUsed())
log.G(context.TODO()).Debugf("Using config file %s", viper.ConfigFileUsed())
}
if kubeConfig == "" {
@@ -135,7 +139,20 @@ func initConfig() {
// Validate operating system.
ok, _ := providers.ValidOperatingSystems[operatingSystem]
if !ok {
fmt.Printf("Operating system '%s' not supported. Valid options are %s\n", operatingSystem, strings.Join(providers.ValidOperatingSystems.Names(), " | "))
os.Exit(1)
log.G(context.TODO()).WithField("OperatingSystem", operatingSystem).Fatalf("Operating system not supported. Valid options are: %s", strings.Join(providers.ValidOperatingSystems.Names(), " | "))
}
level, err := log.ParseLevel(logLevel)
if err != nil {
log.G(context.TODO()).WithField("logLevel", logLevel).Fatal("log level is not supported")
}
logger := log.L.WithFields(logrus.Fields{
"provider": provider,
"operatingSystem": operatingSystem,
"node": nodeName,
"namespace": kubeNamespace,
})
logger.Level = level
log.L = logger
}

90
log/log.go Normal file
View File

@@ -0,0 +1,90 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package log
import (
"context"
"sync/atomic"
"github.com/Sirupsen/logrus"
)
var (
// G is an alias for GetLogger.
//
// We may want to define this locally to a package to get package tagged log
// messages.
G = GetLogger
// L is an alias for the the standard logger.
L = logrus.NewEntry(logrus.StandardLogger())
)
type (
loggerKey struct{}
)
// TraceLevel is the log level for tracing. Trace level is lower than debug level,
// and is usually used to trace detailed behavior of the program.
const TraceLevel = logrus.Level(uint32(logrus.DebugLevel + 1))
// RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to
// ensure the formatted time is always the same number of characters.
const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
// ParseLevel takes a string level and returns the Logrus log level constant.
// It supports trace level.
func ParseLevel(lvl string) (logrus.Level, error) {
if lvl == "trace" {
return TraceLevel, nil
}
return logrus.ParseLevel(lvl)
}
// WithLogger returns a new context with the provided logger. Use in
// combination with logger.WithField(s) for great effect.
func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context {
return context.WithValue(ctx, loggerKey{}, logger)
}
// GetLogger retrieves the current logger from the context. If no logger is
// available, the default logger is returned.
func GetLogger(ctx context.Context) *logrus.Entry {
logger := ctx.Value(loggerKey{})
if logger == nil {
return L
}
return logger.(*logrus.Entry)
}
// Trace logs a message at level Trace with the log entry passed-in.
func Trace(e *logrus.Entry, args ...interface{}) {
level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level)))
if level >= TraceLevel {
e.Debug(args...)
}
}
// Tracef logs a message at level Trace with the log entry passed-in.
func Tracef(e *logrus.Entry, format string, args ...interface{}) {
level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level)))
if level >= TraceLevel {
e.Debugf(format, args...)
}
}

View File

@@ -1,10 +1,10 @@
package manager
import (
"log"
"sync"
"time"
"github.com/pkg/errors"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
@@ -26,7 +26,7 @@ type ResourceManager struct {
}
// NewResourceManager returns a ResourceManager with the internal maps initialized.
func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
func NewResourceManager(k8sClient kubernetes.Interface) (*ResourceManager, error) {
rm := ResourceManager{
pods: make(map[string]*v1.Pod, 0),
deletingPods: make(map[string]*v1.Pod, 0),
@@ -37,8 +37,18 @@ func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
k8sClient: k8sClient,
}
go rm.watchConfigMaps()
go rm.watchSecrets()
configW, err := rm.k8sClient.CoreV1().ConfigMaps(v1.NamespaceAll).Watch(metav1.ListOptions{})
if err != nil {
return nil, errors.Wrap(err, "error getting config watch")
}
secretsW, err := rm.k8sClient.CoreV1().Secrets(v1.NamespaceAll).Watch(metav1.ListOptions{})
if err != nil {
return nil, errors.Wrap(err, "error getting secrets watch")
}
go rm.watchConfigMaps(configW)
go rm.watchSecrets(secretsW)
tick := time.Tick(5 * time.Minute)
go func() {
@@ -68,7 +78,7 @@ func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
}
}()
return &rm
return &rm, nil
}
// SetPods clears the internal cache and populates it with the supplied pods.
@@ -213,12 +223,7 @@ func (rm *ResourceManager) GetSecret(name, namespace string) (*v1.Secret, error)
// watchConfigMaps monitors the kubernetes API for modifications and deletions of configmaps
// it evicts them from the internal cache
func (rm *ResourceManager) watchConfigMaps() {
var opts metav1.ListOptions
w, err := rm.k8sClient.CoreV1().ConfigMaps(v1.NamespaceAll).Watch(opts)
if err != nil {
log.Fatal(err)
}
func (rm *ResourceManager) watchConfigMaps(w watch.Interface) {
for {
select {
@@ -242,12 +247,7 @@ func (rm *ResourceManager) watchConfigMaps() {
// watchSecretes monitors the kubernetes API for modifications and deletions of secrets
// it evicts them from the internal cache
func (rm *ResourceManager) watchSecrets() {
var opts metav1.ListOptions
w, err := rm.k8sClient.CoreV1().Secrets(v1.NamespaceAll).Watch(opts)
if err != nil {
log.Fatal(err)
}
func (rm *ResourceManager) watchSecrets(w watch.Interface) {
for {
select {

View File

@@ -19,7 +19,10 @@ func init() {
}
func TestResourceManager(t *testing.T) {
pm := NewResourceManager(fakeClient)
pm, err := NewResourceManager(fakeClient)
if err != nil {
t.Fatal(err)
}
pod1Name := "Pod1"
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)
@@ -36,7 +39,10 @@ func TestResourceManager(t *testing.T) {
}
func TestResourceManagerDeletePod(t *testing.T) {
pm := NewResourceManager(fakeClient)
pm, err := NewResourceManager(fakeClient)
if err != nil {
t.Fatal(err)
}
pod1Name := "Pod1"
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)
@@ -61,7 +67,10 @@ func makePod(namespace, name string) *v1.Pod {
}
func TestResourceManagerUpdatePod(t *testing.T) {
pm := NewResourceManager(fakeClient)
pm, err := NewResourceManager(fakeClient)
if err != nil {
t.Fatal(err)
}
pod1Name := "Pod1"
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)

View File

@@ -318,10 +318,10 @@ spec:
- containerPort: 443
name: https
dnsPolicy: ClusterFirst
nodeName: virtual-kubelet-myconnector-linux
tolerations:
- key: virtual-kubelet.io/provider
value: azure
operator: Exists
- key: azure.com/aci
effect: NoSchedule
```
@@ -330,7 +330,8 @@ Notice that Virtual-Kubelet nodes are tainted by default to avoid unexpected pod
```
tolerations:
- key: virtual-kubelet.io/provider
value: azure
operator: Exists
- key: azure.com/aci
effect: NoSchedule
```
@@ -395,7 +396,11 @@ spec:
- containerPort: 443
name: https
dnsPolicy: ClusterFirst
nodeName: virtual-kubelet
tolerations:
- key: virtual-kubelet.io/provider
operator: Exists
- key: azure.com/aci
effect: NoSchedule
```
To confirm the Azure Container Instance received and bound the DNS Name specified, use the [az container show][az-container-show] Azure CLI command. Virtual Kubelet's naming

View File

@@ -2,19 +2,24 @@ package azure
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"reflect"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/gorilla/websocket"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
client "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
@@ -24,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// The service account secret mount path.
@@ -45,6 +51,10 @@ type ACIProvider struct {
internalIP string
daemonEndpointPort int32
diagnostics *aci.ContainerGroupDiagnostics
metricsSync sync.Mutex
metricsSyncTime time.Time
lastMetric *stats.Summary
}
// AuthConfig is the secret returned from an ImageRegistryCredential
@@ -294,13 +304,17 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error {
// TODO(BJK) containergrouprestartpolicy??
_, err = p.aciClient.CreateContainerGroup(
p.resourceGroup,
fmt.Sprintf("%s-%s", pod.Namespace, pod.Name),
containerGroupName(pod),
containerGroup,
)
return err
}
func containerGroupName(pod *v1.Pod) string {
return fmt.Sprintf("%s-%s", pod.Namespace, pod.Name)
}
// UpdatePod is a noop, ACI currently does not support live updates of a pod.
func (p *ACIProvider) UpdatePod(pod *v1.Pod) error {
return nil
@@ -345,7 +359,7 @@ func (p *ACIProvider) GetContainerLogs(namespace, podName, containerName string,
for i := 0; i < retry; i++ {
cLogs, err := p.aciClient.GetContainerLogs(p.resourceGroup, cg.Name, containerName, tail)
if err != nil {
log.Println(err)
log.G(context.TODO()).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying")
time.Sleep(5000 * time.Millisecond)
} else {
logContent = cLogs.Content
@@ -469,7 +483,11 @@ func (p *ACIProvider) GetPods() ([]*v1.Pod, error) {
p, err := containerGroupToPod(&c)
if err != nil {
log.Println(err)
log.G(context.TODO()).WithFields(logrus.Fields{
"name": c.Name,
"id": c.ID,
}).WithError(err).Error("error converting container group to pod")
continue
}
pods = append(pods, p)
@@ -958,7 +976,7 @@ func containerGroupToPod(cg *aci.ContainerGroup) (*v1.Pod, error) {
RestartCount: c.InstanceView.RestartCount,
Image: c.Image,
ImageID: "",
ContainerID: "",
ContainerID: getContainerID(cg.ID, c.Name),
}
// Add to containerStatuses
@@ -1002,6 +1020,19 @@ func containerGroupToPod(cg *aci.ContainerGroup) (*v1.Pod, error) {
return &p, nil
}
func getContainerID(cgID, containerName string) string {
if cgID == "" {
return ""
}
containerResourceID := fmt.Sprintf("%s/containers/%s", cgID, containerName)
h := sha256.New()
h.Write([]byte(strings.ToUpper(containerResourceID)))
hashBytes := h.Sum(nil)
return fmt.Sprintf("aci://%s", hex.EncodeToString(hashBytes))
}
func aciStateToPodPhase(state string) v1.PodPhase {
switch state {
case "Running":
@@ -1105,7 +1136,8 @@ func filterServiceAccountSecretVolume(osType string, containerGroup *aci.Contain
return
}
log.Printf("Ignoring service account secret volumes '%v' for Windows", reflect.ValueOf(serviceAccountSecretVolumeName).MapKeys())
l := log.G(context.TODO()).WithField("containerGroup", containerGroup.Name)
l.Infof("Ignoring service account secret volumes '%v' for Windows", reflect.ValueOf(serviceAccountSecretVolumeName).MapKeys())
volumes := make([]aci.Volume, 0, len(containerGroup.ContainerGroupProperties.Volumes))
for _, volume := range containerGroup.ContainerGroupProperties.Volumes {

View File

@@ -7,6 +7,7 @@ package azure
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
@@ -364,6 +365,69 @@ func TestGetPodWithoutResourceRequestsLimits(t *testing.T) {
"Containers[0].Resources.Requests.Memory doesn't match")
}
func TestGetPodWithContainerID(t *testing.T) {
_, aciServerMocker, provider, err := prepareMocks()
if err != nil {
t.Fatal("Unable to prepare the mocks", err)
}
podName := "pod-" + uuid.New().String()
podNamespace := "ns-" + uuid.New().String()
containerName := "c-" + uuid.New().String()
containerImage := "ci-" + uuid.New().String()
cgID := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.ContainerInstance/containerGroups/%s-%s", fakeSubscription, fakeResourceGroup, podNamespace, podName)
aciServerMocker.OnGetContainerGroup = func(subscription, resourceGroup, containerGroup string) (int, interface{}) {
assert.Equal(t, fakeSubscription, subscription, "Subscription doesn't match")
assert.Equal(t, fakeResourceGroup, resourceGroup, "Resource group doesn't match")
assert.Equal(t, podNamespace+"-"+podName, containerGroup, "Container group name is not expected")
return http.StatusOK, aci.ContainerGroup{
ID: cgID,
Tags: map[string]string{
"NodeName": fakeNodeName,
},
ContainerGroupProperties: aci.ContainerGroupProperties{
ProvisioningState: "Creating",
Containers: []aci.Container{
aci.Container{
Name: containerName,
ContainerProperties: aci.ContainerProperties{
Image: containerImage,
Command: []string{"nginx", "-g", "daemon off;"},
Ports: []aci.ContainerPort{
{
Protocol: aci.ContainerNetworkProtocolTCP,
Port: 80,
},
},
Resources: aci.ResourceRequirements{
Requests: &aci.ResourceRequests{
CPU: 0.99,
MemoryInGB: 1.5,
},
},
},
},
},
},
}
}
pod, err := provider.GetPod(podNamespace, podName)
if err != nil {
t.Fatal("Failed to get pod", err)
}
assert.NotNil(t, pod, "Response pod should not be nil")
assert.Equal(t, 1, len(pod.Status.ContainerStatuses), "1 container status is expected")
assert.Equal(t, containerName, pod.Status.ContainerStatuses[0].Name, "Container name in the container status doesn't match")
assert.Equal(t, containerImage, pod.Status.ContainerStatuses[0].Image, "Container image in the container status doesn't match")
assert.Equal(t, getContainerID(cgID, containerName), pod.Status.ContainerStatuses[0].ContainerID, "Container ID in the container status is not expected")
}
func TestPodToACISecretEnvVar(t *testing.T) {
testKey := "testVar"
@@ -448,7 +512,10 @@ func prepareMocks() (*AADMock, *ACIMock, *ACIProvider, error) {
os.Setenv("ACI_RESOURCE_GROUP", fakeResourceGroup)
clientset := fake.NewSimpleClientset()
rm := manager.NewResourceManager(clientset)
rm, err := manager.NewResourceManager(clientset)
if err != nil {
return nil, nil, nil, err
}
provider, err := NewACIProvider("example.toml", rm, fakeNodeName, "Linux", "0.0.0.0", 10250)
if err != nil {
@@ -482,45 +549,45 @@ func TestCreatePodWithLivenessProbe(t *testing.T) {
assert.Equal(t, 1, len(cg.ContainerGroupProperties.Containers), "1 Container is expected")
assert.Equal(t, "nginx", cg.ContainerGroupProperties.Containers[0].Name, "Container nginx is expected")
assert.NotNil(t, cg.Containers[0].LivenessProbe, "Liveness probe expected")
assert.Equal(t, cg.Containers[0].LivenessProbe.InitialDelaySeconds, 10, "Initial Probe Delay doesn't match")
assert.Equal(t, cg.Containers[0].LivenessProbe.Period, 5, "Probe Period doesn't match")
assert.Equal(t, cg.Containers[0].LivenessProbe.TimeoutSeconds, 60, "Probe Timeout doesn't match")
assert.Equal(t, cg.Containers[0].LivenessProbe.SuccessThreshold, 3, "Probe Success Threshold doesn't match")
assert.Equal(t, cg.Containers[0].LivenessProbe.FailureThreshold, 5, "Probe Failure Threshold doesn't match")
assert.Equal(t, int32(10), cg.Containers[0].LivenessProbe.InitialDelaySeconds, "Initial Probe Delay doesn't match")
assert.Equal(t, int32(5), cg.Containers[0].LivenessProbe.Period, "Probe Period doesn't match")
assert.Equal(t, int32(60), cg.Containers[0].LivenessProbe.TimeoutSeconds, "Probe Timeout doesn't match")
assert.Equal(t, int32(3), cg.Containers[0].LivenessProbe.SuccessThreshold, "Probe Success Threshold doesn't match")
assert.Equal(t, int32(5), cg.Containers[0].LivenessProbe.FailureThreshold, "Probe Failure Threshold doesn't match")
assert.NotNil(t, cg.Containers[0].LivenessProbe.HTTPGet, "Expected an HTTP Get Probe")
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "nginx",
LivenessProbe: &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Port: intstr.FromString("8080"),
Path: "/",
},
return http.StatusOK, cg
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "nginx",
LivenessProbe: &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Port: intstr.FromString("8080"),
Path: "/",
},
InitialDelaySeconds: 10,
PeriodSeconds: 5,
TimeoutSeconds: 60,
SuccessThreshold: 3,
FailureThreshold: 5,
},
InitialDelaySeconds: 10,
PeriodSeconds: 5,
TimeoutSeconds: 60,
SuccessThreshold: 3,
FailureThreshold: 5,
},
},
},
}
},
}
if err := provider.CreatePod(pod); err != nil {
t.Fatal("Failed to create pod", err)
}
return http.StatusOK, cg
if err := provider.CreatePod(pod); err != nil {
t.Fatal("Failed to create pod", err)
}
}
@@ -544,44 +611,44 @@ func TestCreatePodWithReadinessProbe(t *testing.T) {
assert.Equal(t, 1, len(cg.ContainerGroupProperties.Containers), "1 Container is expected")
assert.Equal(t, "nginx", cg.ContainerGroupProperties.Containers[0].Name, "Container nginx is expected")
assert.NotNil(t, cg.Containers[0].ReadinessProbe, "Readiness probe expected")
assert.Equal(t, cg.Containers[0].ReadinessProbe.InitialDelaySeconds, 10, "Initial Probe Delay doesn't match")
assert.Equal(t, cg.Containers[0].ReadinessProbe.Period, 5, "Probe Period doesn't match")
assert.Equal(t, cg.Containers[0].ReadinessProbe.TimeoutSeconds, 60, "Probe Timeout doesn't match")
assert.Equal(t, cg.Containers[0].ReadinessProbe.SuccessThreshold, 3, "Probe Success Threshold doesn't match")
assert.Equal(t, cg.Containers[0].ReadinessProbe.FailureThreshold, 5, "Probe Failure Threshold doesn't match")
assert.Equal(t, int32(10), cg.Containers[0].ReadinessProbe.InitialDelaySeconds, "Initial Probe Delay doesn't match")
assert.Equal(t, int32(5), cg.Containers[0].ReadinessProbe.Period, "Probe Period doesn't match")
assert.Equal(t, int32(60), cg.Containers[0].ReadinessProbe.TimeoutSeconds, "Probe Timeout doesn't match")
assert.Equal(t, int32(3), cg.Containers[0].ReadinessProbe.SuccessThreshold, "Probe Success Threshold doesn't match")
assert.Equal(t, int32(5), cg.Containers[0].ReadinessProbe.FailureThreshold, "Probe Failure Threshold doesn't match")
assert.NotNil(t, cg.Containers[0].ReadinessProbe.HTTPGet, "Expected an HTTP Get Probe")
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "nginx",
ReadinessProbe: &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Port: intstr.FromString("8080"),
Path: "/",
},
},
InitialDelaySeconds: 10,
PeriodSeconds: 5,
TimeoutSeconds: 60,
SuccessThreshold: 3,
FailureThreshold: 5,
},
},
},
},
}
if err := provider.CreatePod(pod); err != nil {
t.Fatal("Failed to create pod", err)
}
return http.StatusOK, cg
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "nginx",
ReadinessProbe: &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Port: intstr.FromString("8080"),
Path: "/",
},
},
InitialDelaySeconds: 10,
PeriodSeconds: 5,
TimeoutSeconds: 60,
SuccessThreshold: 3,
FailureThreshold: 5,
},
},
},
},
}
if err := provider.CreatePod(pod); err != nil {
t.Fatal("Failed to create pod", err)
}
}

View File

@@ -1,10 +1,12 @@
package azure
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"github.com/virtual-kubelet/virtual-kubelet/log"
)
// AcsCredential represents the credential file for ACS
@@ -19,12 +21,13 @@ type AcsCredential struct {
}
// NewAcsCredential returns an AcsCredential struct from file path
func NewAcsCredential(filepath string) (*AcsCredential, error) {
log.Printf("Reading ACS credential file %q", filepath)
func NewAcsCredential(p string) (*AcsCredential, error) {
logger := log.G(context.TODO()).WithField("method", "NewAcsCredential").WithField("file", p)
log.Trace(logger, "Reading ACS credential file")
b, err := ioutil.ReadFile(filepath)
b, err := ioutil.ReadFile(p)
if err != nil {
return nil, fmt.Errorf("Reading ACS credential file %q failed: %v", filepath, err)
return nil, fmt.Errorf("Reading ACS credential file %q failed: %v", p, err)
}
// Unmarshal the authentication file.
@@ -33,6 +36,6 @@ func NewAcsCredential(filepath string) (*AcsCredential, error) {
return nil, err
}
log.Printf("Load ACS credential file %q successfully", filepath)
log.Trace(logger, "Load ACS credential file successfully")
return &cred, nil
}

View File

@@ -18,6 +18,7 @@ const (
containerGroupListByResourceGroupURLPath = "subscriptions/{{.subscriptionId}}/resourceGroups/{{.resourceGroup}}/providers/Microsoft.ContainerInstance/containerGroups"
containerLogsURLPath = containerGroupURLPath + "/containers/{{.containerName}}/logs"
containerExecURLPath = containerGroupURLPath + "/containers/{{.containerName}}/exec"
containerGroupMetricsURLPath = containerGroupURLPath + "/providers/microsoft.Insights/metrics"
)
// Client is a client for interacting with Azure Container Instances.

View File

@@ -0,0 +1,97 @@
package aci
import (
"context"
"encoding/json"
"net/http"
"net/url"
"path"
"time"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api"
)
// GetContainerGroupMetrics gets metrics for the provided container group
func (c *Client) GetContainerGroupMetrics(ctx context.Context, resourceGroup, containerGroup string, options MetricsRequest) (*ContainerGroupMetricsResult, error) {
if len(options.Types) == 0 {
return nil, errors.New("must provide metrics types to fetch")
}
if options.Start.After(options.End) || options.Start.Equal(options.End) && !options.Start.IsZero() {
return nil, errors.Errorf("end parameter must be after start: start=%s, end=%s", options.Start, options.End)
}
var metricNames string
for _, t := range options.Types {
if len(metricNames) > 0 {
metricNames += ","
}
metricNames += string(t)
}
var ag string
for _, a := range options.Aggregations {
if len(ag) > 0 {
ag += ","
}
ag += string(a)
}
urlParams := url.Values{
"api-version": []string{"2018-01-01"},
"aggregation": []string{ag},
"metricnames": []string{metricNames},
"interval": []string{"PT1M"}, // TODO: make configurable?
}
if options.Dimension != "" {
urlParams.Add("$filter", options.Dimension)
}
if !options.Start.IsZero() || !options.End.IsZero() {
urlParams.Add("timespan", path.Join(options.Start.Format(time.RFC3339), options.End.Format(time.RFC3339)))
}
// Create the url.
uri := api.ResolveRelative(c.auth.ResourceManagerEndpoint, containerGroupMetricsURLPath)
uri += "?" + url.Values(urlParams).Encode()
// Create the request.
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, errors.Wrap(err, "creating get container group metrics uri request failed")
}
req = req.WithContext(ctx)
// Add the parameters to the url.
if err := api.ExpandURL(req.URL, map[string]string{
"subscriptionId": c.auth.SubscriptionID,
"resourceGroup": resourceGroup,
"containerGroupName": containerGroup,
}); err != nil {
return nil, errors.Wrap(err, "expanding URL with parameters failed")
}
// SEnd the request.
resp, err := c.hc.Do(req)
if err != nil {
return nil, errors.Wrap(err, "sending get container group metrics request failed")
}
defer resp.Body.Close()
// 200 (OK) is a success response.
if err := api.CheckResponse(resp); err != nil {
return nil, err
}
// Decode the body from the response.
if resp.Body == nil {
return nil, errors.New("container group metrics returned an empty body in the response")
}
var metrics ContainerGroupMetricsResult
if err := json.NewDecoder(resp.Body).Decode(&metrics); err != nil {
return nil, errors.Wrap(err, "decoding get container group metrics response body failed")
}
return &metrics, nil
}

View File

@@ -1,6 +1,8 @@
package aci
import (
"time"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/api"
)
@@ -332,3 +334,85 @@ type LogAnalyticsWorkspace struct {
WorkspaceID string `json:"workspaceID,omitempty"`
WorkspaceKey string `json:"workspaceKey,omitempty"`
}
// ContainerGroupMetricsResult stores all the results for a container group metrics request.
type ContainerGroupMetricsResult struct {
Value []MetricValue `json:"value"`
}
// MetricValue stores metrics results
type MetricValue struct {
ID string `json:"id"`
Desc MetricDescriptor `json:"name"`
Timeseries []MetricTimeSeries `json:"timeseries"`
Type string `json:"type"`
Unit string `json:"unit"`
}
// MetricDescriptor stores the name for a given metric and the localized version of that name.
type MetricDescriptor struct {
Value MetricType `json:"value"`
LocalizedValue string `json:"localizedValue"`
}
// MetricTimeSeries is the time series for a given metric
// It contains all the metrics values and other details for the dimension the metrics are aggregated on.
type MetricTimeSeries struct {
Data []TimeSeriesEntry `json:"data"`
MetadataValues []MetricMetadataValue `json:"metadatavalues,omitempty"`
}
// MetricMetadataValue stores extra metadata about a metric
// In particular it is used to provide details about the breakdown of a metric dimension.
type MetricMetadataValue struct {
Name ValueDescriptor `json:"name"`
Value string `json:"value"`
}
// ValueDescriptor describes a generic value.
// It is used to describe metadata fields.
type ValueDescriptor struct {
Value string `json:"value"`
LocalizedValue string `json:"localizedValue"`
}
// TimeSeriesEntry is the metric data for a given timestamp/metric type
type TimeSeriesEntry struct {
Timestamp time.Time `json:"timestamp"`
Average float64 `json:"average"`
Total float64 `json:"total"`
Count float64 `json:"count"`
}
// MetricsRequest is an options struct used when getting container group metrics
type MetricsRequest struct {
Start time.Time
End time.Time
Types []MetricType
Aggregations []AggregationType
// Note that a dimension may not be available for certain metrics.
// In such cases, you will need to make separate requests.
Dimension string
}
// MetricType is an enum type for defining supported metric types.
type MetricType string
// Supported metric types
const (
MetricTypeCPUUsage MetricType = "CpuUsage"
MetricTypeMemoryUsage MetricType = "MemoryUsage"
MetricTyperNetworkBytesRecievedPerSecond MetricType = "NetworkBytesReceivedPerSecond"
MetricTyperNetworkBytesTransmittedPerSecond MetricType = "NetworkBytesTransmittedPerSecond"
)
// AggregationType is an enum type for defining supported aggregation types
type AggregationType string
// Supported metric aggregation types
const (
AggregationTypeCount AggregationType = "count"
AggregationTypeAverage AggregationType = "average"
AggregationTypeTotal AggregationType = "total"
)

219
providers/azure/metrics.go Normal file
View File

@@ -0,0 +1,219 @@
package azure
import (
"context"
"strings"
"time"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
"golang.org/x/sync/errgroup"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// GetStatsSummary returns the stats summary for pods running on ACI
func (p *ACIProvider) GetStatsSummary(ctx context.Context) (summary *stats.Summary, err error) {
p.metricsSync.Lock()
defer p.metricsSync.Unlock()
if time.Now().Sub(p.metricsSyncTime) < time.Minute {
return p.lastMetric, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
defer func() {
if err != nil {
return
}
p.lastMetric = summary
p.metricsSyncTime = time.Now()
}()
pods := p.resourceManager.GetPods()
var errGroup errgroup.Group
chResult := make(chan stats.PodStats, len(pods))
end := time.Now()
start := end.Add(-1 * time.Minute)
sema := make(chan struct{}, 10)
for _, pod := range pods {
if pod.Status.Phase != v1.PodRunning {
continue
}
errGroup.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case sema <- struct{}{}:
}
defer func() {
<-sema
}()
cgName := containerGroupName(pod)
// cpu/mem and net stats are split because net stats do not support container level detail
systemStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{
Dimension: "containerName eq '*'",
Start: start,
End: end,
Aggregations: []aci.AggregationType{aci.AggregationTypeAverage},
Types: []aci.MetricType{aci.MetricTypeCPUUsage, aci.MetricTypeMemoryUsage},
})
if err != nil {
return errors.Wrapf(err, "error fetching cpu/mem stats for container group %s", cgName)
}
netStats, err := p.aciClient.GetContainerGroupMetrics(ctx, p.resourceGroup, cgName, aci.MetricsRequest{
Start: start,
End: end,
Aggregations: []aci.AggregationType{aci.AggregationTypeAverage},
Types: []aci.MetricType{aci.MetricTyperNetworkBytesRecievedPerSecond, aci.MetricTyperNetworkBytesTransmittedPerSecond},
})
if err != nil {
return errors.Wrapf(err, "error fetching network stats for container group %s", cgName)
}
chResult <- collectMetrics(pod, systemStats, netStats)
return nil
})
}
if err := errGroup.Wait(); err != nil {
return nil, errors.Wrap(err, "error in request to fetch container group metrics")
}
close(chResult)
var s stats.Summary
s.Node = stats.NodeStats{
NodeName: p.nodeName,
}
s.Pods = make([]stats.PodStats, 0, len(chResult))
for stat := range chResult {
s.Pods = append(s.Pods, stat)
}
return &s, nil
}
func collectMetrics(pod *v1.Pod, system, net *aci.ContainerGroupMetricsResult) stats.PodStats {
var stat stats.PodStats
containerStats := make(map[string]*stats.ContainerStats, len(pod.Status.ContainerStatuses))
stat.StartTime = pod.CreationTimestamp
for _, m := range system.Value {
// cpu/mem stats are per container, so each entry in the time series is for a container, not the container group.
for _, entry := range m.Timeseries {
if len(entry.Data) == 0 {
continue
}
var cs *stats.ContainerStats
for _, v := range entry.MetadataValues {
if strings.ToLower(v.Name.Value) != "containername" {
continue
}
if cs = containerStats[v.Value]; cs == nil {
cs = &stats.ContainerStats{Name: v.Value, StartTime: stat.StartTime}
containerStats[v.Value] = cs
}
}
if cs == nil {
continue
}
if stat.Containers == nil {
stat.Containers = make([]stats.ContainerStats, 0, len(containerStats))
}
data := entry.Data[len(entry.Data)-1] // get only the last entry
switch m.Desc.Value {
case aci.MetricTypeCPUUsage:
if cs.CPU == nil {
cs.CPU = &stats.CPUStats{}
}
// average is the average number of millicores over a 1 minute interval (which is the interval we are pulling the stats for)
nanoCores := uint64(data.Average * 1000000)
usageNanoSeconds := nanoCores * 60
cs.CPU.Time = metav1.NewTime(data.Timestamp)
cs.CPU.UsageCoreNanoSeconds = &usageNanoSeconds
cs.CPU.UsageNanoCores = &nanoCores
if stat.CPU == nil {
var zero uint64
stat.CPU = &stats.CPUStats{UsageNanoCores: &zero, UsageCoreNanoSeconds: &zero, Time: metav1.NewTime(data.Timestamp)}
}
podCPUSec := *stat.CPU.UsageCoreNanoSeconds
podCPUSec += usageNanoSeconds
stat.CPU.UsageCoreNanoSeconds = &podCPUSec
podCPUCore := *stat.CPU.UsageNanoCores
podCPUCore += nanoCores
stat.CPU.UsageNanoCores = &podCPUCore
case aci.MetricTypeMemoryUsage:
if cs.Memory == nil {
cs.Memory = &stats.MemoryStats{}
}
cs.Memory.Time = metav1.NewTime(data.Timestamp)
bytes := uint64(data.Average)
cs.Memory.UsageBytes = &bytes
cs.Memory.WorkingSetBytes = &bytes
if stat.Memory == nil {
var zero uint64
stat.Memory = &stats.MemoryStats{UsageBytes: &zero, WorkingSetBytes: &zero, Time: metav1.NewTime(data.Timestamp)}
}
podMem := *stat.Memory.UsageBytes
podMem += bytes
stat.Memory.UsageBytes = &podMem
stat.Memory.WorkingSetBytes = &podMem
}
}
}
for _, m := range net.Value {
if stat.Network == nil {
stat.Network = &stats.NetworkStats{}
}
// network stats are for the whole container group, so there should only be one entry here.
if len(m.Timeseries) == 0 {
continue
}
entry := m.Timeseries[0]
if len(entry.Data) == 0 {
continue
}
data := entry.Data[len(entry.Data)-1] // get only the last entry
bytes := uint64(data.Average)
switch m.Desc.Value {
case aci.MetricTyperNetworkBytesRecievedPerSecond:
stat.Network.RxBytes = &bytes
case aci.MetricTyperNetworkBytesTransmittedPerSecond:
stat.Network.TxBytes = &bytes
}
stat.Network.Time = metav1.NewTime(data.Timestamp)
stat.Network.InterfaceStats.Name = "eth0"
}
for _, cs := range containerStats {
stat.Containers = append(stat.Containers, *cs)
}
stat.PodRef = stats.PodReference{
Name: pod.Name,
Namespace: pod.Namespace,
UID: string(pod.UID),
}
return stat
}

View File

@@ -0,0 +1,163 @@
package azure
import (
"path"
"reflect"
"strconv"
"testing"
"time"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
func TestCollectMetrics(t *testing.T) {
cases := []metricTestCase{
{desc: "no containers"}, // this is just for sort of fuzzing things, make sure there's no panics
{desc: "zeroed stats", stats: [][2]float64{{0, 0}}, rx: 0, tx: 0, collected: time.Now()},
{desc: "normal", stats: [][2]float64{{400.0, 1000.0}}, rx: 100.0, tx: 5000.0, collected: time.Now()},
{desc: "multiple containers", stats: [][2]float64{{100.0, 250.0}, {400.0, 1000.0}}, rx: 100.0, tx: 439833.0, collected: time.Now()},
}
for _, test := range cases {
t.Run(test.desc, func(t *testing.T) {
pod := fakePod(t, len(test.stats), time.Now())
expected := podStatFromTestCase(t, pod, test)
system, net := fakeACIMetrics(pod, test)
actual := collectMetrics(pod, system, net)
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("got unexpected results\nexpected:\n%+v\nactual:\n%+v", expected, actual)
}
})
}
}
type metricTestCase struct {
desc string
stats [][2]float64
rx, tx float64
collected time.Time
}
func fakeACIMetrics(pod *v1.Pod, testCase metricTestCase) (*aci.ContainerGroupMetricsResult, *aci.ContainerGroupMetricsResult) {
newMetricValue := func(mt aci.MetricType) aci.MetricValue {
return aci.MetricValue{
Desc: aci.MetricDescriptor{
Value: mt,
},
}
}
newNetMetric := func(collected time.Time, value float64) aci.MetricTimeSeries {
return aci.MetricTimeSeries{
Data: []aci.TimeSeriesEntry{
{Timestamp: collected, Average: value},
},
}
}
newSystemMetric := func(c v1.ContainerStatus, collected time.Time, value float64) aci.MetricTimeSeries {
return aci.MetricTimeSeries{
Data: []aci.TimeSeriesEntry{
{Timestamp: collected, Average: value},
},
MetadataValues: []aci.MetricMetadataValue{
{Name: aci.ValueDescriptor{Value: "containerName"}, Value: c.Name},
},
}
}
// create fake aci metrics for the container group and test data
cpuV := newMetricValue(aci.MetricTypeCPUUsage)
memV := newMetricValue(aci.MetricTypeMemoryUsage)
for i, c := range pod.Status.ContainerStatuses {
cpuV.Timeseries = append(cpuV.Timeseries, newSystemMetric(c, testCase.collected, testCase.stats[i][0]))
memV.Timeseries = append(memV.Timeseries, newSystemMetric(c, testCase.collected, testCase.stats[i][1]))
}
system := &aci.ContainerGroupMetricsResult{
Value: []aci.MetricValue{cpuV, memV},
}
rxV := newMetricValue(aci.MetricTyperNetworkBytesRecievedPerSecond)
txV := newMetricValue(aci.MetricTyperNetworkBytesTransmittedPerSecond)
rxV.Timeseries = append(rxV.Timeseries, newNetMetric(testCase.collected, testCase.rx))
txV.Timeseries = append(txV.Timeseries, newNetMetric(testCase.collected, testCase.tx))
net := &aci.ContainerGroupMetricsResult{
Value: []aci.MetricValue{rxV, txV},
}
return system, net
}
func fakePod(t *testing.T, size int, created time.Time) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: path.Base(t.Name()),
Namespace: path.Dir(t.Name()),
UID: types.UID(t.Name()),
CreationTimestamp: metav1.NewTime(created),
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: make([]v1.ContainerStatus, 0, size),
},
}
for i := 0; i < size; i++ {
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
Name: "c" + strconv.Itoa(i),
})
}
return pod
}
func podStatFromTestCase(t *testing.T, pod *v1.Pod, test metricTestCase) stats.PodStats {
rx := uint64(test.rx)
tx := uint64(test.tx)
expected := stats.PodStats{
StartTime: pod.CreationTimestamp,
PodRef: stats.PodReference{
Name: pod.Name,
Namespace: pod.Namespace,
UID: string(pod.UID),
},
Network: &stats.NetworkStats{
Time: metav1.NewTime(test.collected),
InterfaceStats: stats.InterfaceStats{
Name: "eth0",
RxBytes: &rx,
TxBytes: &tx,
},
},
}
var (
nodeCPU uint64
nodeMem uint64
)
for i := range test.stats {
cpu := uint64(test.stats[i][0] * 1000000)
cpuNanoSeconds := cpu * 60
mem := uint64(test.stats[i][1])
expected.Containers = append(expected.Containers, stats.ContainerStats{
StartTime: pod.CreationTimestamp,
Name: pod.Status.ContainerStatuses[i].Name,
CPU: &stats.CPUStats{Time: metav1.NewTime(test.collected), UsageNanoCores: &cpu, UsageCoreNanoSeconds: &cpuNanoSeconds},
Memory: &stats.MemoryStats{Time: metav1.NewTime(test.collected), UsageBytes: &mem, WorkingSetBytes: &mem},
})
nodeCPU += cpu
nodeMem += mem
}
if len(expected.Containers) > 0 {
nanoCPUSeconds := nodeCPU * 60
expected.CPU = &stats.CPUStats{UsageNanoCores: &nodeCPU, UsageCoreNanoSeconds: &nanoCPUSeconds, Time: metav1.NewTime(test.collected)}
expected.Memory = &stats.MemoryStats{UsageBytes: &nodeMem, WorkingSetBytes: &nodeMem, Time: metav1.NewTime(test.collected)}
}
return expected
}

View File

@@ -443,7 +443,7 @@ func (p *HyperProvider) NodeAddresses() []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *HyperProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
return nil
return &v1.NodeDaemonEndpoints{}
}
// OperatingSystem returns the operating system for this provider.

3
vendor/golang.org/x/sync/AUTHORS generated vendored Normal file
View File

@@ -0,0 +1,3 @@
# This source code refers to The Go Authors for copyright purposes.
# The master list of authors is in the main Go distribution,
# visible at http://tip.golang.org/AUTHORS.

3
vendor/golang.org/x/sync/CONTRIBUTORS generated vendored Normal file
View File

@@ -0,0 +1,3 @@
# This source code was written by the Go contributors.
# The master list of contributors is in the main Go distribution,
# visible at http://tip.golang.org/CONTRIBUTORS.

27
vendor/golang.org/x/sync/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,27 @@
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

22
vendor/golang.org/x/sync/PATENTS generated vendored Normal file
View File

@@ -0,0 +1,22 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.

67
vendor/golang.org/x/sync/errgroup/errgroup.go generated vendored Normal file
View File

@@ -0,0 +1,67 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
package errgroup
import (
"sync"
"golang.org/x/net/context"
)
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}

View File

@@ -0,0 +1,335 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Summary is a top-level container for holding NodeStats and PodStats.
type Summary struct {
// Overall node stats.
Node NodeStats `json:"node"`
// Per-pod stats.
Pods []PodStats `json:"pods"`
}
// NodeStats holds node-level unprocessed sample stats.
type NodeStats struct {
// Reference to the measured Node.
NodeName string `json:"nodeName"`
// Stats of system daemons tracked as raw containers.
// The system containers are named according to the SystemContainer* constants.
// +optional
// +patchMergeKey=name
// +patchStrategy=merge
SystemContainers []ContainerStats `json:"systemContainers,omitempty" patchStrategy:"merge" patchMergeKey:"name"`
// The time at which data collection for the node-scoped (i.e. aggregate) stats was (re)started.
StartTime metav1.Time `json:"startTime"`
// Stats pertaining to CPU resources.
// +optional
CPU *CPUStats `json:"cpu,omitempty"`
// Stats pertaining to memory (RAM) resources.
// +optional
Memory *MemoryStats `json:"memory,omitempty"`
// Stats pertaining to network resources.
// +optional
Network *NetworkStats `json:"network,omitempty"`
// Stats pertaining to total usage of filesystem resources on the rootfs used by node k8s components.
// NodeFs.Used is the total bytes used on the filesystem.
// +optional
Fs *FsStats `json:"fs,omitempty"`
// Stats about the underlying container runtime.
// +optional
Runtime *RuntimeStats `json:"runtime,omitempty"`
// Stats about the rlimit of system.
// +optional
Rlimit *RlimitStats `json:"rlimit,omitempty"`
}
// RlimitStats are stats rlimit of OS.
type RlimitStats struct {
Time metav1.Time `json:"time"`
// The max PID of OS.
MaxPID *int64 `json:"maxpid,omitempty"`
// The number of running process in the OS.
NumOfRunningProcesses *int64 `json:"curproc,omitempty"`
}
// RuntimeStats are stats pertaining to the underlying container runtime.
type RuntimeStats struct {
// Stats about the underlying filesystem where container images are stored.
// This filesystem could be the same as the primary (root) filesystem.
// Usage here refers to the total number of bytes occupied by images on the filesystem.
// +optional
ImageFs *FsStats `json:"imageFs,omitempty"`
}
const (
// SystemContainerKubelet is the container name for the system container tracking Kubelet usage.
SystemContainerKubelet = "kubelet"
// SystemContainerRuntime is the container name for the system container tracking the runtime (e.g. docker or rkt) usage.
SystemContainerRuntime = "runtime"
// SystemContainerMisc is the container name for the system container tracking non-kubernetes processes.
SystemContainerMisc = "misc"
// SystemContainerPods is the container name for the system container tracking user pods.
SystemContainerPods = "pods"
)
// PodStats holds pod-level unprocessed sample stats.
type PodStats struct {
// Reference to the measured Pod.
PodRef PodReference `json:"podRef"`
// The time at which data collection for the pod-scoped (e.g. network) stats was (re)started.
StartTime metav1.Time `json:"startTime"`
// Stats of containers in the measured pod.
// +patchMergeKey=name
// +patchStrategy=merge
Containers []ContainerStats `json:"containers" patchStrategy:"merge" patchMergeKey:"name"`
// Stats pertaining to CPU resources consumed by pod cgroup (which includes all containers' resource usage and pod overhead).
// +optional
CPU *CPUStats `json:"cpu,omitempty"`
// Stats pertaining to memory (RAM) resources consumed by pod cgroup (which includes all containers' resource usage and pod overhead).
// +optional
Memory *MemoryStats `json:"memory,omitempty"`
// Stats pertaining to network resources.
// +optional
Network *NetworkStats `json:"network,omitempty"`
// Stats pertaining to volume usage of filesystem resources.
// VolumeStats.UsedBytes is the number of bytes used by the Volume
// +optional
// +patchMergeKey=name
// +patchStrategy=merge
VolumeStats []VolumeStats `json:"volume,omitempty" patchStrategy:"merge" patchMergeKey:"name"`
// EphemeralStorage reports the total filesystem usage for the containers and emptyDir-backed volumes in the measured Pod.
// +optional
EphemeralStorage *FsStats `json:"ephemeral-storage,omitempty"`
}
// ContainerStats holds container-level unprocessed sample stats.
type ContainerStats struct {
// Reference to the measured container.
Name string `json:"name"`
// The time at which data collection for this container was (re)started.
StartTime metav1.Time `json:"startTime"`
// Stats pertaining to CPU resources.
// +optional
CPU *CPUStats `json:"cpu,omitempty"`
// Stats pertaining to memory (RAM) resources.
// +optional
Memory *MemoryStats `json:"memory,omitempty"`
// Metrics for Accelerators. Each Accelerator corresponds to one element in the array.
Accelerators []AcceleratorStats `json:"accelerators,omitempty"`
// Stats pertaining to container rootfs usage of filesystem resources.
// Rootfs.UsedBytes is the number of bytes used for the container write layer.
// +optional
Rootfs *FsStats `json:"rootfs,omitempty"`
// Stats pertaining to container logs usage of filesystem resources.
// Logs.UsedBytes is the number of bytes used for the container logs.
// +optional
Logs *FsStats `json:"logs,omitempty"`
// User defined metrics that are exposed by containers in the pod. Typically, we expect only one container in the pod to be exposing user defined metrics. In the event of multiple containers exposing metrics, they will be combined here.
// +patchMergeKey=name
// +patchStrategy=merge
UserDefinedMetrics []UserDefinedMetric `json:"userDefinedMetrics,omitmepty" patchStrategy:"merge" patchMergeKey:"name"`
}
// PodReference contains enough information to locate the referenced pod.
type PodReference struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
UID string `json:"uid"`
}
// InterfaceStats contains resource value data about interface.
type InterfaceStats struct {
// The name of the interface
Name string `json:"name"`
// Cumulative count of bytes received.
// +optional
RxBytes *uint64 `json:"rxBytes,omitempty"`
// Cumulative count of receive errors encountered.
// +optional
RxErrors *uint64 `json:"rxErrors,omitempty"`
// Cumulative count of bytes transmitted.
// +optional
TxBytes *uint64 `json:"txBytes,omitempty"`
// Cumulative count of transmit errors encountered.
// +optional
TxErrors *uint64 `json:"txErrors,omitempty"`
}
// NetworkStats contains data about network resources.
type NetworkStats struct {
// The time at which these stats were updated.
Time metav1.Time `json:"time"`
// Stats for the default interface, if found
InterfaceStats `json:",inline"`
Interfaces []InterfaceStats `json:"interfaces,omitempty"`
}
// CPUStats contains data about CPU usage.
type CPUStats struct {
// The time at which these stats were updated.
Time metav1.Time `json:"time"`
// Total CPU usage (sum of all cores) averaged over the sample window.
// The "core" unit can be interpreted as CPU core-nanoseconds per second.
// +optional
UsageNanoCores *uint64 `json:"usageNanoCores,omitempty"`
// Cumulative CPU usage (sum of all cores) since object creation.
// +optional
UsageCoreNanoSeconds *uint64 `json:"usageCoreNanoSeconds,omitempty"`
}
// MemoryStats contains data about memory usage.
type MemoryStats struct {
// The time at which these stats were updated.
Time metav1.Time `json:"time"`
// Available memory for use. This is defined as the memory limit - workingSetBytes.
// If memory limit is undefined, the available bytes is omitted.
// +optional
AvailableBytes *uint64 `json:"availableBytes,omitempty"`
// Total memory in use. This includes all memory regardless of when it was accessed.
// +optional
UsageBytes *uint64 `json:"usageBytes,omitempty"`
// The amount of working set memory. This includes recently accessed memory,
// dirty memory, and kernel memory. WorkingSetBytes is <= UsageBytes
// +optional
WorkingSetBytes *uint64 `json:"workingSetBytes,omitempty"`
// The amount of anonymous and swap cache memory (includes transparent
// hugepages).
// +optional
RSSBytes *uint64 `json:"rssBytes,omitempty"`
// Cumulative number of minor page faults.
// +optional
PageFaults *uint64 `json:"pageFaults,omitempty"`
// Cumulative number of major page faults.
// +optional
MajorPageFaults *uint64 `json:"majorPageFaults,omitempty"`
}
// AcceleratorStats contains stats for accelerators attached to the container.
type AcceleratorStats struct {
// Make of the accelerator (nvidia, amd, google etc.)
Make string `json:"make"`
// Model of the accelerator (tesla-p100, tesla-k80 etc.)
Model string `json:"model"`
// ID of the accelerator.
ID string `json:"id"`
// Total accelerator memory.
// unit: bytes
MemoryTotal uint64 `json:"memoryTotal"`
// Total accelerator memory allocated.
// unit: bytes
MemoryUsed uint64 `json:"memoryUsed"`
// Percent of time over the past sample period (10s) during which
// the accelerator was actively processing.
DutyCycle uint64 `json:"dutyCycle"`
}
// VolumeStats contains data about Volume filesystem usage.
type VolumeStats struct {
// Embedded FsStats
FsStats
// Name is the name given to the Volume
// +optional
Name string `json:"name,omitempty"`
// Reference to the PVC, if one exists
// +optional
PVCRef *PVCReference `json:"pvcRef,omitempty"`
}
// PVCReference contains enough information to describe the referenced PVC.
type PVCReference struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
}
// FsStats contains data about filesystem usage.
type FsStats struct {
// The time at which these stats were updated.
Time metav1.Time `json:"time"`
// AvailableBytes represents the storage space available (bytes) for the filesystem.
// +optional
AvailableBytes *uint64 `json:"availableBytes,omitempty"`
// CapacityBytes represents the total capacity (bytes) of the filesystems underlying storage.
// +optional
CapacityBytes *uint64 `json:"capacityBytes,omitempty"`
// UsedBytes represents the bytes used for a specific task on the filesystem.
// This may differ from the total bytes used on the filesystem and may not equal CapacityBytes - AvailableBytes.
// e.g. For ContainerStats.Rootfs this is the bytes used by the container rootfs on the filesystem.
// +optional
UsedBytes *uint64 `json:"usedBytes,omitempty"`
// InodesFree represents the free inodes in the filesystem.
// +optional
InodesFree *uint64 `json:"inodesFree,omitempty"`
// Inodes represents the total inodes in the filesystem.
// +optional
Inodes *uint64 `json:"inodes,omitempty"`
// InodesUsed represents the inodes used by the filesystem
// This may not equal Inodes - InodesFree because this filesystem may share inodes with other "filesystems"
// e.g. For ContainerStats.Rootfs, this is the inodes used only by that container, and does not count inodes used by other containers.
InodesUsed *uint64 `json:"inodesUsed,omitempty"`
}
// UserDefinedMetricType defines how the metric should be interpreted by the user.
type UserDefinedMetricType string
const (
// MetricGauge is an instantaneous value. May increase or decrease.
MetricGauge UserDefinedMetricType = "gauge"
// MetricCumulative is a counter-like value that is only expected to increase.
MetricCumulative UserDefinedMetricType = "cumulative"
// MetricDelta is a rate over a time period.
MetricDelta UserDefinedMetricType = "delta"
)
// UserDefinedMetricDescriptor contains metadata that describes a user defined metric.
type UserDefinedMetricDescriptor struct {
// The name of the metric.
Name string `json:"name"`
// Type of the metric.
Type UserDefinedMetricType `json:"type"`
// Display Units for the stats.
Units string `json:"units"`
// Metadata labels associated with this metric.
// +optional
Labels map[string]string `json:"labels,omitempty"`
}
// UserDefinedMetric represents a metric defined and generate by users.
type UserDefinedMetric struct {
UserDefinedMetricDescriptor `json:",inline"`
// The time at which these stats were updated.
Time metav1.Time `json:"time"`
// Value of the metric. Float64s have 53 bit precision.
// We do not foresee any metrics exceeding that value.
Value float64 `json:"value"`
}

View File

@@ -1,75 +1,152 @@
package vkubelet
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
)
var p Provider
var r mux.Router
func loggingContext(r *http.Request) context.Context {
ctx := r.Context()
logger := log.G(ctx).WithFields(logrus.Fields{
"uri": r.RequestURI,
"vars": mux.Vars(r),
})
return log.WithLogger(ctx, logger)
}
// NotFound provides a handler for cases where the requested endpoint doesn't exist
func NotFound(w http.ResponseWriter, r *http.Request) {
log.Printf("404 request not found. \n %v", mux.Vars(r))
logger := log.G(loggingContext(r))
log.Trace(logger, "404 request not found")
http.Error(w, "404 request not found", http.StatusNotFound)
}
func ApiserverStart(provider Provider) {
p = provider
// KubeletServer implements HTTP endpoints for serving kubelet API's
type KubeletServer struct {
p Provider
}
// KubeletServertStart starts the virtual kubelet HTTP server.
func KubeletServerStart(p Provider) {
certFilePath := os.Getenv("APISERVER_CERT_LOCATION")
keyFilePath := os.Getenv("APISERVER_KEY_LOCATION")
port := os.Getenv("KUBELET_PORT")
addr := fmt.Sprintf(":%s", port)
r := mux.NewRouter()
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", ApiServerHandler).Methods("GET")
r.HandleFunc("/exec/{namespace}/{pod}/{container}", ApiServerHandlerExec).Methods("POST")
s := &KubeletServer{p: p}
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", s.ApiServerHandler).Methods("GET")
r.HandleFunc("/exec/{namespace}/{pod}/{container}", s.ApiServerHandlerExec).Methods("POST")
r.NotFoundHandler = http.HandlerFunc(NotFound)
if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil {
log.Println(err)
log.G(context.TODO()).WithError(err).Error("error setting up http server")
}
}
func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
// MetricsServerStart starts an HTTP server on the provided addr for serving the kubelset summary stats API.
// TLS is never enabled on this endpoint.
func MetricsServerStart(p Provider, addr string) {
r := mux.NewRouter()
s := &MetricsServer{p: p}
r.HandleFunc("/stats/summary", s.MetricsSummaryHandler).Methods("GET")
r.HandleFunc("/stats/summary/", s.MetricsSummaryHandler).Methods("GET")
r.NotFoundHandler = http.HandlerFunc(NotFound)
if err := http.ListenAndServe(addr, r); err != nil {
log.G(context.TODO()).WithError(err).Error("Error starting http server")
}
}
// MetricsServer provides an HTTP endpopint for accessing pod metrics
type MetricsServer struct {
p Provider
}
// MetricsSummaryHandler is an HTTP handler for implementing the kubelet summary stats endpoint
func (s *MetricsServer) MetricsSummaryHandler(w http.ResponseWriter, req *http.Request) {
ctx := loggingContext(req)
mp, ok := s.p.(MetricsProvider)
if !ok {
log.G(ctx).Debug("stats not implemented for provider")
http.Error(w, "not implememnted", http.StatusNotImplemented)
return
}
stats, err := mp.GetStatsSummary(req.Context())
if err != nil {
if errors.Cause(err) == context.Canceled {
return
}
log.G(ctx).Error("Error getting stats from provider:", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
b, err := json.Marshal(stats)
if err != nil {
log.G(ctx).WithError(err).Error("Could not marshal stats")
http.Error(w, "could not marshal stats: "+err.Error(), http.StatusInternalServerError)
return
}
if _, err := w.Write(b); err != nil {
log.G(ctx).WithError(err).Debug("Could not write to client")
}
}
func (s *KubeletServer) ApiServerHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
if len(vars) == 3 {
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
queryTail := q.Get("tailLines")
if queryTail != "" {
t, err := strconv.Atoi(queryTail)
if err != nil {
log.Println(err)
io.WriteString(w, err.Error())
} else {
tail = t
}
}
podsLogs, err := p.GetContainerLogs(namespace, pod, container, tail)
if err != nil {
log.Println(err)
io.WriteString(w, err.Error())
} else {
io.WriteString(w, podsLogs)
}
} else {
if len(vars) != 3 {
NotFound(w, req)
return
}
ctx := loggingContext(req)
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
if queryTail := q.Get("tailLines"); queryTail != "" {
t, err := strconv.Atoi(queryTail)
if err != nil {
logger := log.G(context.TODO()).WithError(err)
log.Trace(logger, "could not parse tailLines")
http.Error(w, fmt.Sprintf("could not parse \"tailLines\": %v", err), http.StatusBadRequest)
return
}
tail = t
}
podsLogs, err := s.p.GetContainerLogs(namespace, pod, container, tail)
if err != nil {
log.G(ctx).WithError(err).Error("error getting container logs")
http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError)
return
}
if _, err := io.WriteString(w, podsLogs); err != nil {
log.G(ctx).WithError(err).Warn("error writing response to client")
}
}
func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
func (s *KubeletServer) ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
namespace := vars["namespace"]
@@ -99,5 +176,5 @@ func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
idleTimeout := time.Second * 30
streamCreationTimeout := time.Second * 30
remotecommand.ServeExec(w, req, p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
remotecommand.ServeExec(w, req, s.p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
}

View File

@@ -3,8 +3,7 @@
package vkubelet
import (
"fmt"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers/aws"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure"
@@ -56,8 +55,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager
case "vic":
return vic.NewVicProvider(providerConfig, rm, nodeName, operatingSystem)
default:
fmt.Printf("Provider '%s' is not supported\n", provider)
return nil, errors.New("provider not supported")
}
var p Provider
return p, nil
}

View File

@@ -1,8 +1,7 @@
package vkubelet
import (
"fmt"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers/aws"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure"
@@ -43,8 +42,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager
case "sfmesh":
return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
default:
fmt.Printf("Provider '%s' is not supported\n", provider)
return nil, errors.New("provider is not supported")
}
var p Provider
return p, nil
}

View File

@@ -1,8 +1,7 @@
package vkubelet
import (
"fmt"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers/aws"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure"
@@ -43,8 +42,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager
case "sfmesh":
return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
default:
fmt.Printf("Provider '%s' is not supported\n", provider)
return nil, errors.New("provider not supported")
}
var p Provider
return p, nil
}

View File

@@ -1,12 +1,14 @@
package vkubelet
import (
"context"
"io"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// Provider contains the methods required to implement a virtual-kubelet provider.
@@ -54,3 +56,8 @@ type Provider interface {
// OperatingSystem returns the operating system the provider is for.
OperatingSystem() string
}
// MetricsProvider is an optional interface that providers can implement to expose pod stats
type MetricsProvider interface {
GetStatsSummary(context.Context) (*stats.Summary, error)
}

View File

@@ -1,8 +1,8 @@
package vkubelet
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strconv"
@@ -10,6 +10,8 @@ import (
"syscall"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
@@ -46,7 +48,7 @@ func getEnv(key, defaultValue string) string {
}
// New creates a new virtual-kubelet server.
func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool) (*Server, error) {
func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerConfig, taintKey string, disableTaint bool, metricsAddr string) (*Server, error) {
var config *rest.Config
// Check if the kubeConfig file exists.
@@ -69,7 +71,10 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
return nil, err
}
rm := manager.NewResourceManager(clientset)
rm, err := manager.NewResourceManager(clientset)
if err != nil {
return nil, pkgerrors.Wrap(err, "error creating resource manager")
}
daemonEndpointPortEnv := os.Getenv("KUBELET_PORT")
if daemonEndpointPortEnv == "" {
@@ -101,7 +106,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
case "PreferNoSchedule":
vkTaintEffect = corev1.TaintEffectPreferNoSchedule
default:
fmt.Printf("Taint effect '%s' is not supported\n", vkTaintEffectEnv)
return nil, pkgerrors.Errorf("taint effect %q is not supported", vkTaintEffectEnv)
}
taint := corev1.Taint{
@@ -110,7 +115,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
Effect: vkTaintEffect,
}
p, err = lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
p, err := lookupProvider(provider, providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
if err != nil {
return nil, err
}
@@ -125,17 +130,27 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
provider: p,
}
if err = s.registerNode(); err != nil {
ctx := context.TODO()
ctx = log.WithLogger(ctx, log.G(ctx))
if err = s.registerNode(ctx); err != nil {
return s, err
}
go ApiserverStart(p)
go KubeletServerStart(p)
if metricsAddr != "" {
go MetricsServerStart(p, metricsAddr)
} else {
log.G(ctx).Info("Skipping metrics server startup since no address was provided")
}
tick := time.Tick(5 * time.Second)
go func() {
for range tick {
s.updateNode()
s.updatePodStatuses()
s.updateNode(ctx)
s.updatePodStatuses(ctx)
}
}()
@@ -143,7 +158,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
}
// registerNode registers this virtual node with the Kubernetes API.
func (s *Server) registerNode() error {
func (s *Server) registerNode(ctx context.Context) error {
taints := make([]corev1.Taint, 0)
if !s.disableTaint {
@@ -182,14 +197,14 @@ func (s *Server) registerNode() error {
return err
}
log.Printf("Node '%s' with OS type '%s' registered\n", node.Name, node.Status.NodeInfo.OperatingSystem)
log.G(ctx).Info("Registered node")
return nil
}
// Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster.
// Run will block until Stop is called or a SIGINT or SIGTERM signal is received.
func (s *Server) Run() error {
func (s *Server) Run(ctx context.Context) error {
shouldStop := false
sig := make(chan os.Signal, 1)
@@ -207,15 +222,15 @@ func (s *Server) Run() error {
pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts)
if err != nil {
log.Fatal("Failed to list pods", err)
return pkgerrors.Wrap(err, "error getting pod list")
}
s.resourceManager.SetPods(pods)
s.reconcile()
s.reconcile(ctx)
opts.ResourceVersion = pods.ResourceVersion
s.podWatcher, err = s.k8sClient.CoreV1().Pods(s.namespace).Watch(opts)
if err != nil {
log.Fatal("Failed to watch pods", err)
return pkgerrors.Wrap(err, "failed to watch pods")
}
loop:
@@ -224,15 +239,15 @@ func (s *Server) Run() error {
case ev, ok := <-s.podWatcher.ResultChan():
if !ok {
if shouldStop {
log.Println("Pod watcher is stopped.")
log.G(ctx).Info("Pod watcher is stopped")
return nil
}
log.Println("Pod watcher connection is closed unexpectedly.")
log.G(ctx).Error("Pod watcher connection is closed unexpectedly")
break loop
}
log.Println("Pod watcher event is received:", ev.Type)
log.G(ctx).WithField("type", ev.Type).Debug("Pod watcher event is received")
reconcile := false
switch ev.Type {
case watch.Added:
@@ -244,7 +259,7 @@ func (s *Server) Run() error {
}
if reconcile {
s.reconcile()
s.reconcile(ctx)
}
}
}
@@ -262,19 +277,19 @@ func (s *Server) Stop() {
}
// updateNode updates the node status within Kubernetes with updated NodeConditions.
func (s *Server) updateNode() {
func (s *Server) updateNode(ctx context.Context) {
opts := metav1.GetOptions{}
n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts)
if err != nil && !errors.IsNotFound(err) {
log.Println("Failed to retrieve node:", err)
log.G(ctx).WithError(err).Error("Failed to retrieve node")
return
}
if errors.IsNotFound(err) {
if err = s.registerNode(); err != nil {
log.Println("Failed to register node:", err)
return
if err = s.registerNode(ctx); err != nil {
log.G(ctx).WithError(err).Error("Failed to register node")
}
return
}
n.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
@@ -288,27 +303,31 @@ func (s *Server) updateNode() {
n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n)
if err != nil {
log.Println("Failed to update node:", err)
log.G(ctx).WithError(err).Error("Failed to update node")
return
}
}
// reconcile is the main reconciliation loop that compares differences between Kubernetes and
// the active provider and reconciles the differences.
func (s *Server) reconcile() {
log.Println("Start reconcile.")
func (s *Server) reconcile(ctx context.Context) {
logger := log.G(ctx)
logger.Debug("Start reconcile")
defer logger.Debug("End reconcile")
providerPods, err := s.provider.GetPods()
if err != nil {
log.Println(err)
logger.WithError(err).Error("Error getting pod list from provider")
return
}
for _, pod := range providerPods {
// Delete pods that don't exist in Kubernetes
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil {
log.Printf("Deleting pod '%s'\n", pod.Name)
if err := s.deletePod(pod); err != nil {
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
logger := logger.WithField("pod", pod.Name)
logger.Debug("Deleting pod '%s'\n", pod.Name)
if err := s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Error deleting pod")
continue
}
}
@@ -317,6 +336,7 @@ func (s *Server) reconcile() {
// Create any pods for k8s pods that don't exist in the provider
pods := s.resourceManager.GetPods()
for _, pod := range pods {
logger := logger.WithField("pod", pod.Name)
var providerPod *corev1.Pod
for _, p := range providerPods {
if p.Namespace == pod.Namespace && p.Name == pod.Name {
@@ -326,30 +346,33 @@ func (s *Server) reconcile() {
}
if pod.DeletionTimestamp == nil && pod.Status.Phase != corev1.PodFailed && providerPod == nil {
log.Printf("Creating pod '%s'\n", pod.Name)
if err := s.createPod(pod); err != nil {
log.Printf("Error creating pod '%s': %s\n", pod.Name, err)
logger.Debug("Creating pod")
if err := s.createPod(ctx, pod); err != nil {
logger.WithError(err).Error("Error creating pod")
continue
}
}
// Delete pod if DeletionTimestamp is set
if pod.DeletionTimestamp != nil {
log.Printf("Pod '%s' is pending deletion.\n", pod.Name)
log.Trace(logger, "Pod pending deletion")
var err error
if err = s.deletePod(pod); err != nil {
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
if err = s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Error deleting pod")
continue
}
log.Trace(logger, "Pod deletion complete")
}
}
}
func (s *Server) createPod(pod *corev1.Pod) error {
func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
if err := s.populateSecretsAndConfigMapsInEnv(pod); err != nil {
return err
}
logger := log.G(ctx).WithField("pod", pod.Name)
if origErr := s.provider.CreatePod(pod); origErr != nil {
pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
pod.Status.Phase = corev1.PodFailed
@@ -358,29 +381,29 @@ func (s *Server) createPod(pod *corev1.Pod) error {
_, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
if err != nil {
log.Println("Failed to update pod status:", err)
return origErr
logger.WithError(err).Warn("Failed to update pod status")
}
return origErr
}
log.Printf("Pod '%s' created.\n", pod.Name)
logger.Info("Pod created")
return nil
}
func (s *Server) deletePod(pod *corev1.Pod) error {
func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error {
var delErr error
if delErr = s.provider.DeletePod(pod); delErr != nil && errors.IsNotFound(delErr) {
return delErr
}
logger := log.G(ctx).WithField("pod", pod.Name)
if !errors.IsNotFound(delErr) {
var grace int64
if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
if errors.IsNotFound(err) {
log.Printf("Pod '%s' doesn't exist.\n", pod.Name)
logger.Error("Pod doesn't exist")
return nil
}
@@ -388,15 +411,14 @@ func (s *Server) deletePod(pod *corev1.Pod) error {
}
s.resourceManager.DeletePod(pod)
log.Printf("Pod '%s' deleted.\n", pod.Name)
logger.Info("Pod deleted")
}
return nil
}
// updatePodStatuses syncs the providers pod status with the kubernetes pod status.
func (s *Server) updatePodStatuses() {
func (s *Server) updatePodStatuses(ctx context.Context) {
// Update all the pods with the provider status.
pods := s.resourceManager.GetPods()
for _, pod := range pods {
@@ -406,7 +428,7 @@ func (s *Server) updatePodStatuses() {
status, err := s.provider.GetPodStatus(pod.Namespace, pod.Name)
if err != nil {
log.Printf("Error retrieving pod '%s' in namespace '%s' status from provider: %s\n", pod.Name, pod.Namespace, err)
log.G(ctx).WithField("pod", pod.Name).Error("Error retrieving pod status")
return
}