574 lines
18 KiB
Go
574 lines
18 KiB
Go
package mock
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/cpuguy83/strongerrors"
|
|
"github.com/cpuguy83/strongerrors/status/ocstatus"
|
|
"github.com/virtual-kubelet/virtual-kubelet/log"
|
|
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
|
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
|
"github.com/virtual-kubelet/virtual-kubelet/vkubelet"
|
|
"github.com/virtual-kubelet/virtual-kubelet/vkubelet/api"
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
|
)
|
|
|
|
const (
|
|
// Provider configuration defaults.
|
|
defaultCPUCapacity = "20"
|
|
defaultMemoryCapacity = "100Gi"
|
|
defaultPodCapacity = "20"
|
|
|
|
// Values used in tracing as attribute keys.
|
|
namespaceKey = "namespace"
|
|
nameKey = "name"
|
|
containerNameKey = "containerName"
|
|
)
|
|
|
|
var (
|
|
_ providers.Provider = (*MockLegacyProvider)(nil)
|
|
_ providers.PodMetricsProvider = (*MockLegacyProvider)(nil)
|
|
_ vkubelet.PodNotifier = (*MockProvider)(nil)
|
|
)
|
|
|
|
// MockLegacyProvider implements the virtual-kubelet provider interface and stores pods in memory.
|
|
type MockLegacyProvider struct {
|
|
nodeName string
|
|
operatingSystem string
|
|
internalIP string
|
|
daemonEndpointPort int32
|
|
pods map[string]*v1.Pod
|
|
config MockConfig
|
|
startTime time.Time
|
|
notifier func(*v1.Pod)
|
|
}
|
|
|
|
// MockProvider is like MockLegacyProvider, but implements the PodNotifier interface
|
|
type MockProvider struct {
|
|
*MockLegacyProvider
|
|
}
|
|
|
|
// MockConfig contains a mock virtual-kubelet's configurable parameters.
|
|
type MockConfig struct {
|
|
CPU string `json:"cpu,omitempty"`
|
|
Memory string `json:"memory,omitempty"`
|
|
Pods string `json:"pods,omitempty"`
|
|
}
|
|
|
|
// NewMockProviderMockConfig creates a new MockLegacyProvider. Mock legacy provider does not implement the new asynchronous podnotifier interface
|
|
func NewMockLegacyProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockLegacyProvider, error) {
|
|
//set defaults
|
|
if config.CPU == "" {
|
|
config.CPU = defaultCPUCapacity
|
|
}
|
|
if config.Memory == "" {
|
|
config.Memory = defaultMemoryCapacity
|
|
}
|
|
if config.Pods == "" {
|
|
config.Pods = defaultPodCapacity
|
|
}
|
|
provider := MockLegacyProvider{
|
|
nodeName: nodeName,
|
|
operatingSystem: operatingSystem,
|
|
internalIP: internalIP,
|
|
daemonEndpointPort: daemonEndpointPort,
|
|
pods: make(map[string]*v1.Pod),
|
|
config: config,
|
|
startTime: time.Now(),
|
|
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
|
|
// it will be set, and then we'll call a real underlying implementation.
|
|
// This makes it easier in the sense we don't need to wrap each method.
|
|
notifier: func(*v1.Pod) {},
|
|
}
|
|
|
|
return &provider, nil
|
|
}
|
|
|
|
// NewMockLegacyProvider creates a new MockLegacyProvider
|
|
func NewMockLegacyProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockLegacyProvider, error) {
|
|
config, err := loadConfig(providerConfig, nodeName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return NewMockLegacyProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
|
}
|
|
|
|
// NewMockProviderMockConfig creates a new MockProvider with the given config
|
|
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
|
|
p, err := NewMockLegacyProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
|
|
|
return &MockProvider{MockLegacyProvider: p}, err
|
|
}
|
|
|
|
// NewMockProvider creates a new MockProvider, which implements the PodNotifier interface
|
|
func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
|
|
config, err := loadConfig(providerConfig, nodeName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return NewMockProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
|
}
|
|
|
|
// loadConfig loads the given json configuration files.
|
|
func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) {
|
|
data, err := ioutil.ReadFile(providerConfig)
|
|
if err != nil {
|
|
return config, err
|
|
}
|
|
configMap := map[string]MockConfig{}
|
|
err = json.Unmarshal(data, &configMap)
|
|
if err != nil {
|
|
return config, err
|
|
}
|
|
if _, exist := configMap[nodeName]; exist {
|
|
config = configMap[nodeName]
|
|
if config.CPU == "" {
|
|
config.CPU = defaultCPUCapacity
|
|
}
|
|
if config.Memory == "" {
|
|
config.Memory = defaultMemoryCapacity
|
|
}
|
|
if config.Pods == "" {
|
|
config.Pods = defaultPodCapacity
|
|
}
|
|
}
|
|
|
|
if _, err = resource.ParseQuantity(config.CPU); err != nil {
|
|
return config, fmt.Errorf("Invalid CPU value %v", config.CPU)
|
|
}
|
|
if _, err = resource.ParseQuantity(config.Memory); err != nil {
|
|
return config, fmt.Errorf("Invalid memory value %v", config.Memory)
|
|
}
|
|
if _, err = resource.ParseQuantity(config.Pods); err != nil {
|
|
return config, fmt.Errorf("Invalid pods value %v", config.Pods)
|
|
}
|
|
return config, nil
|
|
}
|
|
|
|
// CreatePod accepts a Pod definition and stores it in memory.
|
|
func (p *MockLegacyProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
|
ctx, span := trace.StartSpan(ctx, "CreatePod")
|
|
defer span.End()
|
|
|
|
// Add the pod's coordinates to the current span.
|
|
ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name)
|
|
|
|
log.G(ctx).Infof("receive CreatePod %q", pod.Name)
|
|
|
|
key, err := buildKey(pod)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
now := metav1.NewTime(time.Now())
|
|
pod.Status = v1.PodStatus{
|
|
Phase: v1.PodRunning,
|
|
HostIP: "1.2.3.4",
|
|
PodIP: "5.6.7.8",
|
|
StartTime: &now,
|
|
Conditions: []v1.PodCondition{
|
|
{
|
|
Type: v1.PodInitialized,
|
|
Status: v1.ConditionTrue,
|
|
},
|
|
{
|
|
Type: v1.PodReady,
|
|
Status: v1.ConditionTrue,
|
|
},
|
|
{
|
|
Type: v1.PodScheduled,
|
|
Status: v1.ConditionTrue,
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, container := range pod.Spec.Containers {
|
|
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
|
|
Name: container.Name,
|
|
Image: container.Image,
|
|
Ready: true,
|
|
RestartCount: 0,
|
|
State: v1.ContainerState{
|
|
Running: &v1.ContainerStateRunning{
|
|
StartedAt: now,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
p.pods[key] = pod
|
|
p.notifier(pod)
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdatePod accepts a Pod definition and updates its reference.
|
|
func (p *MockLegacyProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
|
ctx, span := trace.StartSpan(ctx, "UpdatePod")
|
|
defer span.End()
|
|
|
|
// Add the pod's coordinates to the current span.
|
|
ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name)
|
|
|
|
log.G(ctx).Infof("receive UpdatePod %q", pod.Name)
|
|
|
|
key, err := buildKey(pod)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
p.pods[key] = pod
|
|
p.notifier(pod)
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeletePod deletes the specified pod out of memory.
|
|
func (p *MockLegacyProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
|
|
ctx, span := trace.StartSpan(ctx, "DeletePod")
|
|
defer span.End()
|
|
|
|
// Add the pod's coordinates to the current span.
|
|
ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name)
|
|
|
|
log.G(ctx).Infof("receive DeletePod %q", pod.Name)
|
|
|
|
key, err := buildKey(pod)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, exists := p.pods[key]; !exists {
|
|
return strongerrors.NotFound(fmt.Errorf("pod not found"))
|
|
}
|
|
|
|
now := metav1.Now()
|
|
delete(p.pods, key)
|
|
pod.Status.Phase = v1.PodSucceeded
|
|
pod.Status.Reason = "MockProviderPodDeleted"
|
|
|
|
for idx := range pod.Status.ContainerStatuses {
|
|
pod.Status.ContainerStatuses[idx].Ready = false
|
|
pod.Status.ContainerStatuses[idx].State = v1.ContainerState{
|
|
Terminated: &v1.ContainerStateTerminated{
|
|
Message: "Mock provider terminated container upon deletion",
|
|
FinishedAt: now,
|
|
Reason: "MockProviderPodContainerDeleted",
|
|
StartedAt: pod.Status.ContainerStatuses[idx].State.Running.StartedAt,
|
|
},
|
|
}
|
|
}
|
|
|
|
p.notifier(pod)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetPod returns a pod by name that is stored in memory.
|
|
func (p *MockLegacyProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
|
|
ctx, span := trace.StartSpan(ctx, "GetPod")
|
|
defer func() {
|
|
span.SetStatus(ocstatus.FromError(err))
|
|
span.End()
|
|
}()
|
|
|
|
// Add the pod's coordinates to the current span.
|
|
ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name)
|
|
|
|
log.G(ctx).Infof("receive GetPod %q", name)
|
|
|
|
key, err := buildKeyFromNames(namespace, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if pod, ok := p.pods[key]; ok {
|
|
return pod, nil
|
|
}
|
|
return nil, strongerrors.NotFound(fmt.Errorf("pod \"%s/%s\" is not known to the provider", namespace, name))
|
|
}
|
|
|
|
// GetContainerLogs retrieves the logs of a container by name from the provider.
|
|
func (p *MockLegacyProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
|
|
ctx, span := trace.StartSpan(ctx, "GetContainerLogs")
|
|
defer span.End()
|
|
|
|
// Add pod and container attributes to the current span.
|
|
ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName)
|
|
|
|
log.G(ctx).Info("receive GetContainerLogs %q", podName)
|
|
return ioutil.NopCloser(strings.NewReader("")), nil
|
|
}
|
|
|
|
// Get full pod name as defined in the provider context
|
|
// TODO: Implementation
|
|
func (p *MockLegacyProvider) GetPodFullName(namespace string, pod string) string {
|
|
return ""
|
|
}
|
|
|
|
// RunInContainer executes a command in a container in the pod, copying data
|
|
// between in/out/err and the container's stdin/stdout/stderr.
|
|
func (p *MockLegacyProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {
|
|
log.G(context.TODO()).Infof("receive ExecInContainer %q", container)
|
|
return nil
|
|
}
|
|
|
|
// GetPodStatus returns the status of a pod by name that is "running".
|
|
// returns nil if a pod by that name is not found.
|
|
func (p *MockLegacyProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
|
|
ctx, span := trace.StartSpan(ctx, "GetPodStatus")
|
|
defer span.End()
|
|
|
|
// Add namespace and name as attributes to the current span.
|
|
ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name)
|
|
|
|
log.G(ctx).Infof("receive GetPodStatus %q", name)
|
|
|
|
pod, err := p.GetPod(ctx, namespace, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &pod.Status, nil
|
|
}
|
|
|
|
// GetPods returns a list of all pods known to be "running".
|
|
func (p *MockLegacyProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
|
|
ctx, span := trace.StartSpan(ctx, "GetPods")
|
|
defer span.End()
|
|
|
|
log.G(ctx).Info("receive GetPods")
|
|
|
|
var pods []*v1.Pod
|
|
|
|
for _, pod := range p.pods {
|
|
pods = append(pods, pod)
|
|
}
|
|
|
|
return pods, nil
|
|
}
|
|
|
|
// Capacity returns a resource list containing the capacity limits.
|
|
func (p *MockLegacyProvider) Capacity(ctx context.Context) v1.ResourceList {
|
|
ctx, span := trace.StartSpan(ctx, "Capacity")
|
|
defer span.End()
|
|
|
|
return v1.ResourceList{
|
|
"cpu": resource.MustParse(p.config.CPU),
|
|
"memory": resource.MustParse(p.config.Memory),
|
|
"pods": resource.MustParse(p.config.Pods),
|
|
}
|
|
}
|
|
|
|
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
|
|
// within Kubernetes.
|
|
func (p *MockLegacyProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
|
|
ctx, span := trace.StartSpan(ctx, "NodeConditions")
|
|
defer span.End()
|
|
|
|
// TODO: Make this configurable
|
|
return []v1.NodeCondition{
|
|
{
|
|
Type: "Ready",
|
|
Status: v1.ConditionTrue,
|
|
LastHeartbeatTime: metav1.Now(),
|
|
LastTransitionTime: metav1.Now(),
|
|
Reason: "KubeletReady",
|
|
Message: "kubelet is ready.",
|
|
},
|
|
{
|
|
Type: "OutOfDisk",
|
|
Status: v1.ConditionFalse,
|
|
LastHeartbeatTime: metav1.Now(),
|
|
LastTransitionTime: metav1.Now(),
|
|
Reason: "KubeletHasSufficientDisk",
|
|
Message: "kubelet has sufficient disk space available",
|
|
},
|
|
{
|
|
Type: "MemoryPressure",
|
|
Status: v1.ConditionFalse,
|
|
LastHeartbeatTime: metav1.Now(),
|
|
LastTransitionTime: metav1.Now(),
|
|
Reason: "KubeletHasSufficientMemory",
|
|
Message: "kubelet has sufficient memory available",
|
|
},
|
|
{
|
|
Type: "DiskPressure",
|
|
Status: v1.ConditionFalse,
|
|
LastHeartbeatTime: metav1.Now(),
|
|
LastTransitionTime: metav1.Now(),
|
|
Reason: "KubeletHasNoDiskPressure",
|
|
Message: "kubelet has no disk pressure",
|
|
},
|
|
{
|
|
Type: "NetworkUnavailable",
|
|
Status: v1.ConditionFalse,
|
|
LastHeartbeatTime: metav1.Now(),
|
|
LastTransitionTime: metav1.Now(),
|
|
Reason: "RouteCreated",
|
|
Message: "RouteController created a route",
|
|
},
|
|
}
|
|
|
|
}
|
|
|
|
// NodeAddresses returns a list of addresses for the node status
|
|
// within Kubernetes.
|
|
func (p *MockLegacyProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
|
|
ctx, span := trace.StartSpan(ctx, "NodeAddresses")
|
|
defer span.End()
|
|
|
|
return []v1.NodeAddress{
|
|
{
|
|
Type: "InternalIP",
|
|
Address: p.internalIP,
|
|
},
|
|
}
|
|
}
|
|
|
|
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
|
|
// within Kubernetes.
|
|
func (p *MockLegacyProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
|
|
ctx, span := trace.StartSpan(ctx, "NodeDaemonEndpoints")
|
|
defer span.End()
|
|
|
|
return &v1.NodeDaemonEndpoints{
|
|
KubeletEndpoint: v1.DaemonEndpoint{
|
|
Port: p.daemonEndpointPort,
|
|
},
|
|
}
|
|
}
|
|
|
|
// OperatingSystem returns the operating system for this provider.
|
|
// This is a noop to default to Linux for now.
|
|
|
|
func (p *MockLegacyProvider) OperatingSystem() string {
|
|
return providers.OperatingSystemLinux
|
|
}
|
|
|
|
// GetStatsSummary returns dummy stats for all pods known by this provider.
|
|
func (p *MockLegacyProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
|
|
ctx, span := trace.StartSpan(ctx, "GetStatsSummary")
|
|
defer span.End()
|
|
|
|
// Grab the current timestamp so we can report it as the time the stats were generated.
|
|
time := metav1.NewTime(time.Now())
|
|
|
|
// Create the Summary object that will later be populated with node and pod stats.
|
|
res := &stats.Summary{}
|
|
|
|
// Populate the Summary object with basic node stats.
|
|
res.Node = stats.NodeStats{
|
|
NodeName: p.nodeName,
|
|
StartTime: metav1.NewTime(p.startTime),
|
|
}
|
|
|
|
// Populate the Summary object with dummy stats for each pod known by this provider.
|
|
for _, pod := range p.pods {
|
|
var (
|
|
// totalUsageNanoCores will be populated with the sum of the values of UsageNanoCores computes across all containers in the pod.
|
|
totalUsageNanoCores uint64
|
|
// totalUsageBytes will be populated with the sum of the values of UsageBytes computed across all containers in the pod.
|
|
totalUsageBytes uint64
|
|
)
|
|
|
|
// Create a PodStats object to populate with pod stats.
|
|
pss := stats.PodStats{
|
|
PodRef: stats.PodReference{
|
|
Name: pod.Name,
|
|
Namespace: pod.Namespace,
|
|
UID: string(pod.UID),
|
|
},
|
|
StartTime: pod.CreationTimestamp,
|
|
}
|
|
|
|
// Iterate over all containers in the current pod to compute dummy stats.
|
|
for _, container := range pod.Spec.Containers {
|
|
// Grab a dummy value to be used as the total CPU usage.
|
|
// The value should fit a uint32 in order to avoid overflows later on when computing pod stats.
|
|
dummyUsageNanoCores := uint64(rand.Uint32())
|
|
totalUsageNanoCores += dummyUsageNanoCores
|
|
// Create a dummy value to be used as the total RAM usage.
|
|
// The value should fit a uint32 in order to avoid overflows later on when computing pod stats.
|
|
dummyUsageBytes := uint64(rand.Uint32())
|
|
totalUsageBytes += dummyUsageBytes
|
|
// Append a ContainerStats object containing the dummy stats to the PodStats object.
|
|
pss.Containers = append(pss.Containers, stats.ContainerStats{
|
|
Name: container.Name,
|
|
StartTime: pod.CreationTimestamp,
|
|
CPU: &stats.CPUStats{
|
|
Time: time,
|
|
UsageNanoCores: &dummyUsageNanoCores,
|
|
},
|
|
Memory: &stats.MemoryStats{
|
|
Time: time,
|
|
UsageBytes: &dummyUsageBytes,
|
|
},
|
|
})
|
|
}
|
|
|
|
// Populate the CPU and RAM stats for the pod and append the PodsStats object to the Summary object to be returned.
|
|
pss.CPU = &stats.CPUStats{
|
|
Time: time,
|
|
UsageNanoCores: &totalUsageNanoCores,
|
|
}
|
|
pss.Memory = &stats.MemoryStats{
|
|
Time: time,
|
|
UsageBytes: &totalUsageBytes,
|
|
}
|
|
res.Pods = append(res.Pods, pss)
|
|
}
|
|
|
|
// Return the dummy stats.
|
|
return res, nil
|
|
}
|
|
|
|
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
|
|
// within the provider.
|
|
func (p *MockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
|
|
p.notifier = notifier
|
|
}
|
|
|
|
func buildKeyFromNames(namespace string, name string) (string, error) {
|
|
return fmt.Sprintf("%s-%s", namespace, name), nil
|
|
}
|
|
|
|
// buildKey is a helper for building the "key" for the providers pod store.
|
|
func buildKey(pod *v1.Pod) (string, error) {
|
|
if pod.ObjectMeta.Namespace == "" {
|
|
return "", fmt.Errorf("pod namespace not found")
|
|
}
|
|
|
|
if pod.ObjectMeta.Name == "" {
|
|
return "", fmt.Errorf("pod name not found")
|
|
}
|
|
|
|
return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
|
|
}
|
|
|
|
// addAttributes adds the specified attributes to the provided span.
|
|
// attrs must be an even-sized list of string arguments.
|
|
// Otherwise, the span won't be modified.
|
|
// TODO: Refactor and move to a "tracing utilities" package.
|
|
func addAttributes(ctx context.Context, span trace.Span, attrs ...string) context.Context {
|
|
if len(attrs)%2 == 1 {
|
|
return ctx
|
|
}
|
|
for i := 0; i < len(attrs); i += 2 {
|
|
ctx = span.WithField(ctx, attrs[i], attrs[i+1])
|
|
}
|
|
return ctx
|
|
}
|