package mock import ( "context" "encoding/json" "fmt" "io" "io/ioutil" "math/rand" "strings" "time" "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node/api" "github.com/virtual-kubelet/virtual-kubelet/trace" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) const ( // Provider configuration defaults. defaultCPUCapacity = "20" defaultMemoryCapacity = "100Gi" defaultPodCapacity = "20" // Values used in tracing as attribute keys. namespaceKey = "namespace" nameKey = "name" containerNameKey = "containerName" ) // See: https://github.com/virtual-kubelet/virtual-kubelet/issues/632 /* var ( _ providers.Provider = (*MockV0Provider)(nil) _ providers.PodMetricsProvider = (*MockV0Provider)(nil) _ node.PodNotifier = (*MockProvider)(nil) ) */ // MockV0Provider implements the virtual-kubelet provider interface and stores pods in memory. type MockV0Provider struct { nodeName string operatingSystem string internalIP string daemonEndpointPort int32 pods map[string]*v1.Pod config MockConfig startTime time.Time notifier func(*v1.Pod) } // MockProvider is like MockV0Provider, but implements the PodNotifier interface type MockProvider struct { *MockV0Provider } // MockConfig contains a mock virtual-kubelet's configurable parameters. type MockConfig struct { CPU string `json:"cpu,omitempty"` Memory string `json:"memory,omitempty"` Pods string `json:"pods,omitempty"` } // NewMockProviderMockConfig creates a new MockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) { //set defaults if config.CPU == "" { config.CPU = defaultCPUCapacity } if config.Memory == "" { config.Memory = defaultMemoryCapacity } if config.Pods == "" { config.Pods = defaultPodCapacity } provider := MockV0Provider{ nodeName: nodeName, operatingSystem: operatingSystem, internalIP: internalIP, daemonEndpointPort: daemonEndpointPort, pods: make(map[string]*v1.Pod), config: config, startTime: time.Now(), // By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface, // it will be set, and then we'll call a real underlying implementation. // This makes it easier in the sense we don't need to wrap each method. notifier: func(*v1.Pod) {}, } return &provider, nil } // NewMockV0Provider creates a new MockV0Provider func NewMockV0Provider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) { config, err := loadConfig(providerConfig, nodeName) if err != nil { return nil, err } return NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort) } // NewMockProviderMockConfig creates a new MockProvider with the given config func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) { p, err := NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort) return &MockProvider{MockV0Provider: p}, err } // NewMockProvider creates a new MockProvider, which implements the PodNotifier interface func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) { config, err := loadConfig(providerConfig, nodeName) if err != nil { return nil, err } return NewMockProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort) } // loadConfig loads the given json configuration files. func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) { data, err := ioutil.ReadFile(providerConfig) if err != nil { return config, err } configMap := map[string]MockConfig{} err = json.Unmarshal(data, &configMap) if err != nil { return config, err } if _, exist := configMap[nodeName]; exist { config = configMap[nodeName] if config.CPU == "" { config.CPU = defaultCPUCapacity } if config.Memory == "" { config.Memory = defaultMemoryCapacity } if config.Pods == "" { config.Pods = defaultPodCapacity } } if _, err = resource.ParseQuantity(config.CPU); err != nil { return config, fmt.Errorf("Invalid CPU value %v", config.CPU) } if _, err = resource.ParseQuantity(config.Memory); err != nil { return config, fmt.Errorf("Invalid memory value %v", config.Memory) } if _, err = resource.ParseQuantity(config.Pods); err != nil { return config, fmt.Errorf("Invalid pods value %v", config.Pods) } return config, nil } // CreatePod accepts a Pod definition and stores it in memory. func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "CreatePod") defer span.End() // Add the pod's coordinates to the current span. ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name) log.G(ctx).Infof("receive CreatePod %q", pod.Name) key, err := buildKey(pod) if err != nil { return err } now := metav1.NewTime(time.Now()) pod.Status = v1.PodStatus{ Phase: v1.PodRunning, HostIP: "1.2.3.4", PodIP: "5.6.7.8", StartTime: &now, Conditions: []v1.PodCondition{ { Type: v1.PodInitialized, Status: v1.ConditionTrue, }, { Type: v1.PodReady, Status: v1.ConditionTrue, }, { Type: v1.PodScheduled, Status: v1.ConditionTrue, }, }, } for _, container := range pod.Spec.Containers { pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{ Name: container.Name, Image: container.Image, Ready: true, RestartCount: 0, State: v1.ContainerState{ Running: &v1.ContainerStateRunning{ StartedAt: now, }, }, }) } p.pods[key] = pod p.notifier(pod) return nil } // UpdatePod accepts a Pod definition and updates its reference. func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "UpdatePod") defer span.End() // Add the pod's coordinates to the current span. ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name) log.G(ctx).Infof("receive UpdatePod %q", pod.Name) key, err := buildKey(pod) if err != nil { return err } p.pods[key] = pod p.notifier(pod) return nil } // DeletePod deletes the specified pod out of memory. func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { ctx, span := trace.StartSpan(ctx, "DeletePod") defer span.End() // Add the pod's coordinates to the current span. ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name) log.G(ctx).Infof("receive DeletePod %q", pod.Name) key, err := buildKey(pod) if err != nil { return err } if _, exists := p.pods[key]; !exists { return errdefs.NotFound("pod not found") } now := metav1.Now() delete(p.pods, key) pod.Status.Phase = v1.PodSucceeded pod.Status.Reason = "MockProviderPodDeleted" for idx := range pod.Status.ContainerStatuses { pod.Status.ContainerStatuses[idx].Ready = false pod.Status.ContainerStatuses[idx].State = v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ Message: "Mock provider terminated container upon deletion", FinishedAt: now, Reason: "MockProviderPodContainerDeleted", StartedAt: pod.Status.ContainerStatuses[idx].State.Running.StartedAt, }, } } p.notifier(pod) return nil } // GetPod returns a pod by name that is stored in memory. func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { ctx, span := trace.StartSpan(ctx, "GetPod") defer func() { span.SetStatus(err) span.End() }() // Add the pod's coordinates to the current span. ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name) log.G(ctx).Infof("receive GetPod %q", name) key, err := buildKeyFromNames(namespace, name) if err != nil { return nil, err } if pod, ok := p.pods[key]; ok { return pod, nil } return nil, errdefs.NotFoundf("pod \"%s/%s\" is not known to the provider", namespace, name) } // GetContainerLogs retrieves the logs of a container by name from the provider. func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { ctx, span := trace.StartSpan(ctx, "GetContainerLogs") defer span.End() // Add pod and container attributes to the current span. ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName) log.G(ctx).Info("receive GetContainerLogs %q", podName) return ioutil.NopCloser(strings.NewReader("")), nil } // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. func (p *MockV0Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { log.G(context.TODO()).Infof("receive ExecInContainer %q", container) return nil } // GetPodStatus returns the status of a pod by name that is "running". // returns nil if a pod by that name is not found. func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { ctx, span := trace.StartSpan(ctx, "GetPodStatus") defer span.End() // Add namespace and name as attributes to the current span. ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name) log.G(ctx).Infof("receive GetPodStatus %q", name) pod, err := p.GetPod(ctx, namespace, name) if err != nil { return nil, err } return &pod.Status, nil } // GetPods returns a list of all pods known to be "running". func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { ctx, span := trace.StartSpan(ctx, "GetPods") defer span.End() log.G(ctx).Info("receive GetPods") var pods []*v1.Pod for _, pod := range p.pods { pods = append(pods, pod) } return pods, nil } func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) { ctx, span := trace.StartSpan(ctx, "mock.ConfigureNode") defer span.End() n.Status.Capacity = p.capacity() n.Status.Allocatable = p.capacity() n.Status.Conditions = p.nodeConditions() n.Status.Addresses = p.nodeAddresses() n.Status.DaemonEndpoints = p.nodeDaemonEndpoints() os := p.operatingSystem if os == "" { os = "Linux" } n.Status.NodeInfo.OperatingSystem = os n.Status.NodeInfo.Architecture = "amd64" n.ObjectMeta.Labels["alpha.service-controller.kubernetes.io/exclude-balancer"] = "true" } // Capacity returns a resource list containing the capacity limits. func (p *MockV0Provider) capacity() v1.ResourceList { return v1.ResourceList{ "cpu": resource.MustParse(p.config.CPU), "memory": resource.MustParse(p.config.Memory), "pods": resource.MustParse(p.config.Pods), } } // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status // within Kubernetes. func (p *MockV0Provider) nodeConditions() []v1.NodeCondition { // TODO: Make this configurable return []v1.NodeCondition{ { Type: "Ready", Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: "KubeletReady", Message: "kubelet is ready.", }, { Type: "OutOfDisk", Status: v1.ConditionFalse, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: "KubeletHasSufficientDisk", Message: "kubelet has sufficient disk space available", }, { Type: "MemoryPressure", Status: v1.ConditionFalse, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: "KubeletHasSufficientMemory", Message: "kubelet has sufficient memory available", }, { Type: "DiskPressure", Status: v1.ConditionFalse, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: "KubeletHasNoDiskPressure", Message: "kubelet has no disk pressure", }, { Type: "NetworkUnavailable", Status: v1.ConditionFalse, LastHeartbeatTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: "RouteCreated", Message: "RouteController created a route", }, } } // NodeAddresses returns a list of addresses for the node status // within Kubernetes. func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress { return []v1.NodeAddress{ { Type: "InternalIP", Address: p.internalIP, }, } } // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints { return v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{ Port: p.daemonEndpointPort, }, } } // GetStatsSummary returns dummy stats for all pods known by this provider. func (p *MockV0Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) { ctx, span := trace.StartSpan(ctx, "GetStatsSummary") defer span.End() // Grab the current timestamp so we can report it as the time the stats were generated. time := metav1.NewTime(time.Now()) // Create the Summary object that will later be populated with node and pod stats. res := &stats.Summary{} // Populate the Summary object with basic node stats. res.Node = stats.NodeStats{ NodeName: p.nodeName, StartTime: metav1.NewTime(p.startTime), } // Populate the Summary object with dummy stats for each pod known by this provider. for _, pod := range p.pods { var ( // totalUsageNanoCores will be populated with the sum of the values of UsageNanoCores computes across all containers in the pod. totalUsageNanoCores uint64 // totalUsageBytes will be populated with the sum of the values of UsageBytes computed across all containers in the pod. totalUsageBytes uint64 ) // Create a PodStats object to populate with pod stats. pss := stats.PodStats{ PodRef: stats.PodReference{ Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID), }, StartTime: pod.CreationTimestamp, } // Iterate over all containers in the current pod to compute dummy stats. for _, container := range pod.Spec.Containers { // Grab a dummy value to be used as the total CPU usage. // The value should fit a uint32 in order to avoid overflows later on when computing pod stats. dummyUsageNanoCores := uint64(rand.Uint32()) totalUsageNanoCores += dummyUsageNanoCores // Create a dummy value to be used as the total RAM usage. // The value should fit a uint32 in order to avoid overflows later on when computing pod stats. dummyUsageBytes := uint64(rand.Uint32()) totalUsageBytes += dummyUsageBytes // Append a ContainerStats object containing the dummy stats to the PodStats object. pss.Containers = append(pss.Containers, stats.ContainerStats{ Name: container.Name, StartTime: pod.CreationTimestamp, CPU: &stats.CPUStats{ Time: time, UsageNanoCores: &dummyUsageNanoCores, }, Memory: &stats.MemoryStats{ Time: time, UsageBytes: &dummyUsageBytes, }, }) } // Populate the CPU and RAM stats for the pod and append the PodsStats object to the Summary object to be returned. pss.CPU = &stats.CPUStats{ Time: time, UsageNanoCores: &totalUsageNanoCores, } pss.Memory = &stats.MemoryStats{ Time: time, UsageBytes: &totalUsageBytes, } res.Pods = append(res.Pods, pss) } // Return the dummy stats. return res, nil } // NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done // within the provider. func (p *MockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) { p.notifier = notifier } func buildKeyFromNames(namespace string, name string) (string, error) { return fmt.Sprintf("%s-%s", namespace, name), nil } // buildKey is a helper for building the "key" for the providers pod store. func buildKey(pod *v1.Pod) (string, error) { if pod.ObjectMeta.Namespace == "" { return "", fmt.Errorf("pod namespace not found") } if pod.ObjectMeta.Name == "" { return "", fmt.Errorf("pod name not found") } return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name) } // addAttributes adds the specified attributes to the provided span. // attrs must be an even-sized list of string arguments. // Otherwise, the span won't be modified. // TODO: Refactor and move to a "tracing utilities" package. func addAttributes(ctx context.Context, span trace.Span, attrs ...string) context.Context { if len(attrs)%2 == 1 { return ctx } for i := 0; i < len(attrs); i += 2 { ctx = span.WithField(ctx, attrs[i], attrs[i+1]) } return ctx }