Move CLI related packages into internal (#697)
We don't want people to import these packages, so move these out into private packages.
This commit is contained in:
@@ -19,12 +19,12 @@ import (
|
||||
"os"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
|
||||
)
|
||||
|
||||
// NewCommand creates a new providers subcommand
|
||||
// This subcommand is used to determine which providers are registered.
|
||||
func NewCommand(s *providers.Store) *cobra.Command {
|
||||
func NewCommand(s *provider.Store) *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "providers",
|
||||
Short: "Show the list of supported providers",
|
||||
@@ -24,9 +24,9 @@ import (
|
||||
"os"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
)
|
||||
|
||||
// AcceptedCiphers is the list of accepted TLS ciphers, with known weak ciphers elided
|
||||
@@ -57,7 +57,7 @@ func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func setupHTTPServer(ctx context.Context, p providers.Provider, cfg *apiServerConfig) (_ func(), retErr error) {
|
||||
func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig) (_ func(), retErr error) {
|
||||
var closers []io.Closer
|
||||
cancel := func() {
|
||||
for _, c := range closers {
|
||||
@@ -113,7 +113,7 @@ func setupHTTPServer(ctx context.Context, p providers.Provider, cfg *apiServerCo
|
||||
mux := http.NewServeMux()
|
||||
|
||||
var summaryHandlerFunc api.PodStatsSummaryHandlerFunc
|
||||
if mp, ok := p.(providers.PodMetricsProvider); ok {
|
||||
if mp, ok := p.(provider.PodMetricsProvider); ok {
|
||||
summaryHandlerFunc = mp.GetStatsSummary
|
||||
}
|
||||
podMetricsRoutes := api.PodMetricsConfig{
|
||||
@@ -18,8 +18,8 @@ import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -29,7 +29,7 @@ const osLabel = "beta.kubernetes.io/os"
|
||||
|
||||
// NodeFromProvider builds a kubernetes node object from a provider
|
||||
// This is a temporary solution until node stuff actually split off from the provider interface itself.
|
||||
func NodeFromProvider(ctx context.Context, name string, taint *v1.Taint, p providers.Provider, version string) *v1.Node {
|
||||
func NodeFromProvider(ctx context.Context, name string, taint *v1.Taint, p provider.Provider, version string) *v1.Node {
|
||||
taints := make([]v1.Taint, 0)
|
||||
|
||||
if taint != nil {
|
||||
@@ -22,11 +22,11 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -43,7 +43,7 @@ import (
|
||||
|
||||
// NewCommand creates a new top-level command.
|
||||
// This command is used to start the virtual-kubelet daemon
|
||||
func NewCommand(ctx context.Context, name string, s *providers.Store, c Opts) *cobra.Command {
|
||||
func NewCommand(ctx context.Context, name string, s *provider.Store, c Opts) *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: name,
|
||||
Short: name + " provides a virtual kubelet interface for your kubernetes cluster.",
|
||||
@@ -59,11 +59,11 @@ This allows users to schedule kubernetes workloads on nodes that aren't running
|
||||
return cmd
|
||||
}
|
||||
|
||||
func runRootCommand(ctx context.Context, s *providers.Store, c Opts) error {
|
||||
func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if ok := providers.ValidOperatingSystems[c.OperatingSystem]; !ok {
|
||||
if ok := provider.ValidOperatingSystems[c.OperatingSystem]; !ok {
|
||||
return errdefs.InvalidInputf("operating system %q is not supported", c.OperatingSystem)
|
||||
}
|
||||
|
||||
@@ -119,13 +119,13 @@ func runRootCommand(ctx context.Context, s *providers.Store, c Opts) error {
|
||||
return err
|
||||
}
|
||||
|
||||
initConfig := providers.InitConfig{
|
||||
ConfigPath: c.ProviderConfigPath,
|
||||
NodeName: c.NodeName,
|
||||
OperatingSystem: c.OperatingSystem,
|
||||
ResourceManager: rm,
|
||||
DaemonPort: int32(c.ListenPort),
|
||||
InternalIP: os.Getenv("VKUBELET_POD_IP"),
|
||||
initConfig := provider.InitConfig{
|
||||
ConfigPath: c.ProviderConfigPath,
|
||||
NodeName: c.NodeName,
|
||||
OperatingSystem: c.OperatingSystem,
|
||||
ResourceManager: rm,
|
||||
DaemonPort: int32(c.ListenPort),
|
||||
InternalIP: os.Getenv("VKUBELET_POD_IP"),
|
||||
KubeClusterDomain: c.KubeClusterDomain,
|
||||
}
|
||||
|
||||
10
cmd/virtual-kubelet/internal/provider/README.md
Normal file
10
cmd/virtual-kubelet/internal/provider/README.md
Normal file
@@ -0,0 +1,10 @@
|
||||
Follow these steps to be accepted as a provider within the Virtual Kubelet repo.
|
||||
|
||||
1. Replicate the life-cycle of a pod for example creation and deletion of a pod and how that maps to your service.
|
||||
2. Create a new provider folder with a descriptive name and the necessary code.
|
||||
3. When committing your code add a README.md, helm chart, dockerfile and specify a maintainer of the provider.
|
||||
4. Within the PR itself add a justification for why the provider should be accepted, as well as customer use cases if applicable.
|
||||
|
||||
Some providers are translations of Virtual Kubelet to allow others to adapt their service or applications that are written in other languages.
|
||||
|
||||
|
||||
566
cmd/virtual-kubelet/internal/provider/mock/mock.go
Normal file
566
cmd/virtual-kubelet/internal/provider/mock/mock.go
Normal file
@@ -0,0 +1,566 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
)
|
||||
|
||||
const (
|
||||
// Provider configuration defaults.
|
||||
defaultCPUCapacity = "20"
|
||||
defaultMemoryCapacity = "100Gi"
|
||||
defaultPodCapacity = "20"
|
||||
|
||||
// Values used in tracing as attribute keys.
|
||||
namespaceKey = "namespace"
|
||||
nameKey = "name"
|
||||
containerNameKey = "containerName"
|
||||
)
|
||||
|
||||
// See: https://github.com/virtual-kubelet/virtual-kubelet/issues/632
|
||||
/*
|
||||
var (
|
||||
_ providers.Provider = (*MockV0Provider)(nil)
|
||||
_ providers.PodMetricsProvider = (*MockV0Provider)(nil)
|
||||
_ node.PodNotifier = (*MockProvider)(nil)
|
||||
)
|
||||
*/
|
||||
|
||||
// MockV0Provider implements the virtual-kubelet provider interface and stores pods in memory.
|
||||
type MockV0Provider struct {
|
||||
nodeName string
|
||||
operatingSystem string
|
||||
internalIP string
|
||||
daemonEndpointPort int32
|
||||
pods map[string]*v1.Pod
|
||||
config MockConfig
|
||||
startTime time.Time
|
||||
notifier func(*v1.Pod)
|
||||
}
|
||||
|
||||
// MockProvider is like MockV0Provider, but implements the PodNotifier interface
|
||||
type MockProvider struct {
|
||||
*MockV0Provider
|
||||
}
|
||||
|
||||
// MockConfig contains a mock virtual-kubelet's configurable parameters.
|
||||
type MockConfig struct {
|
||||
CPU string `json:"cpu,omitempty"`
|
||||
Memory string `json:"memory,omitempty"`
|
||||
Pods string `json:"pods,omitempty"`
|
||||
}
|
||||
|
||||
// NewMockProviderMockConfig creates a new MockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface
|
||||
func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) {
|
||||
//set defaults
|
||||
if config.CPU == "" {
|
||||
config.CPU = defaultCPUCapacity
|
||||
}
|
||||
if config.Memory == "" {
|
||||
config.Memory = defaultMemoryCapacity
|
||||
}
|
||||
if config.Pods == "" {
|
||||
config.Pods = defaultPodCapacity
|
||||
}
|
||||
provider := MockV0Provider{
|
||||
nodeName: nodeName,
|
||||
operatingSystem: operatingSystem,
|
||||
internalIP: internalIP,
|
||||
daemonEndpointPort: daemonEndpointPort,
|
||||
pods: make(map[string]*v1.Pod),
|
||||
config: config,
|
||||
startTime: time.Now(),
|
||||
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
|
||||
// it will be set, and then we'll call a real underlying implementation.
|
||||
// This makes it easier in the sense we don't need to wrap each method.
|
||||
notifier: func(*v1.Pod) {},
|
||||
}
|
||||
|
||||
return &provider, nil
|
||||
}
|
||||
|
||||
// NewMockV0Provider creates a new MockV0Provider
|
||||
func NewMockV0Provider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) {
|
||||
config, err := loadConfig(providerConfig, nodeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
}
|
||||
|
||||
// NewMockProviderMockConfig creates a new MockProvider with the given config
|
||||
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
|
||||
p, err := NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
|
||||
return &MockProvider{MockV0Provider: p}, err
|
||||
}
|
||||
|
||||
// NewMockProvider creates a new MockProvider, which implements the PodNotifier interface
|
||||
func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
|
||||
config, err := loadConfig(providerConfig, nodeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewMockProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
}
|
||||
|
||||
// loadConfig loads the given json configuration files.
|
||||
func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) {
|
||||
data, err := ioutil.ReadFile(providerConfig)
|
||||
if err != nil {
|
||||
return config, err
|
||||
}
|
||||
configMap := map[string]MockConfig{}
|
||||
err = json.Unmarshal(data, &configMap)
|
||||
if err != nil {
|
||||
return config, err
|
||||
}
|
||||
if _, exist := configMap[nodeName]; exist {
|
||||
config = configMap[nodeName]
|
||||
if config.CPU == "" {
|
||||
config.CPU = defaultCPUCapacity
|
||||
}
|
||||
if config.Memory == "" {
|
||||
config.Memory = defaultMemoryCapacity
|
||||
}
|
||||
if config.Pods == "" {
|
||||
config.Pods = defaultPodCapacity
|
||||
}
|
||||
}
|
||||
|
||||
if _, err = resource.ParseQuantity(config.CPU); err != nil {
|
||||
return config, fmt.Errorf("Invalid CPU value %v", config.CPU)
|
||||
}
|
||||
if _, err = resource.ParseQuantity(config.Memory); err != nil {
|
||||
return config, fmt.Errorf("Invalid memory value %v", config.Memory)
|
||||
}
|
||||
if _, err = resource.ParseQuantity(config.Pods); err != nil {
|
||||
return config, fmt.Errorf("Invalid pods value %v", config.Pods)
|
||||
}
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// CreatePod accepts a Pod definition and stores it in memory.
|
||||
func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
ctx, span := trace.StartSpan(ctx, "CreatePod")
|
||||
defer span.End()
|
||||
|
||||
// Add the pod's coordinates to the current span.
|
||||
ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name)
|
||||
|
||||
log.G(ctx).Infof("receive CreatePod %q", pod.Name)
|
||||
|
||||
key, err := buildKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
now := metav1.NewTime(time.Now())
|
||||
pod.Status = v1.PodStatus{
|
||||
Phase: v1.PodRunning,
|
||||
HostIP: "1.2.3.4",
|
||||
PodIP: "5.6.7.8",
|
||||
StartTime: &now,
|
||||
Conditions: []v1.PodCondition{
|
||||
{
|
||||
Type: v1.PodInitialized,
|
||||
Status: v1.ConditionTrue,
|
||||
},
|
||||
{
|
||||
Type: v1.PodReady,
|
||||
Status: v1.ConditionTrue,
|
||||
},
|
||||
{
|
||||
Type: v1.PodScheduled,
|
||||
Status: v1.ConditionTrue,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, container := range pod.Spec.Containers {
|
||||
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
|
||||
Name: container.Name,
|
||||
Image: container.Image,
|
||||
Ready: true,
|
||||
RestartCount: 0,
|
||||
State: v1.ContainerState{
|
||||
Running: &v1.ContainerStateRunning{
|
||||
StartedAt: now,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
p.pods[key] = pod
|
||||
p.notifier(pod)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdatePod accepts a Pod definition and updates its reference.
|
||||
func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
ctx, span := trace.StartSpan(ctx, "UpdatePod")
|
||||
defer span.End()
|
||||
|
||||
// Add the pod's coordinates to the current span.
|
||||
ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name)
|
||||
|
||||
log.G(ctx).Infof("receive UpdatePod %q", pod.Name)
|
||||
|
||||
key, err := buildKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.pods[key] = pod
|
||||
p.notifier(pod)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeletePod deletes the specified pod out of memory.
|
||||
func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
|
||||
ctx, span := trace.StartSpan(ctx, "DeletePod")
|
||||
defer span.End()
|
||||
|
||||
// Add the pod's coordinates to the current span.
|
||||
ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name)
|
||||
|
||||
log.G(ctx).Infof("receive DeletePod %q", pod.Name)
|
||||
|
||||
key, err := buildKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, exists := p.pods[key]; !exists {
|
||||
return errdefs.NotFound("pod not found")
|
||||
}
|
||||
|
||||
now := metav1.Now()
|
||||
delete(p.pods, key)
|
||||
pod.Status.Phase = v1.PodSucceeded
|
||||
pod.Status.Reason = "MockProviderPodDeleted"
|
||||
|
||||
for idx := range pod.Status.ContainerStatuses {
|
||||
pod.Status.ContainerStatuses[idx].Ready = false
|
||||
pod.Status.ContainerStatuses[idx].State = v1.ContainerState{
|
||||
Terminated: &v1.ContainerStateTerminated{
|
||||
Message: "Mock provider terminated container upon deletion",
|
||||
FinishedAt: now,
|
||||
Reason: "MockProviderPodContainerDeleted",
|
||||
StartedAt: pod.Status.ContainerStatuses[idx].State.Running.StartedAt,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
p.notifier(pod)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPod returns a pod by name that is stored in memory.
|
||||
func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetPod")
|
||||
defer func() {
|
||||
span.SetStatus(err)
|
||||
span.End()
|
||||
}()
|
||||
|
||||
// Add the pod's coordinates to the current span.
|
||||
ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name)
|
||||
|
||||
log.G(ctx).Infof("receive GetPod %q", name)
|
||||
|
||||
key, err := buildKeyFromNames(namespace, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if pod, ok := p.pods[key]; ok {
|
||||
return pod, nil
|
||||
}
|
||||
return nil, errdefs.NotFoundf("pod \"%s/%s\" is not known to the provider", namespace, name)
|
||||
}
|
||||
|
||||
// GetContainerLogs retrieves the logs of a container by name from the provider.
|
||||
func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetContainerLogs")
|
||||
defer span.End()
|
||||
|
||||
// Add pod and container attributes to the current span.
|
||||
ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, podName, containerNameKey, containerName)
|
||||
|
||||
log.G(ctx).Info("receive GetContainerLogs %q", podName)
|
||||
return ioutil.NopCloser(strings.NewReader("")), nil
|
||||
}
|
||||
|
||||
// RunInContainer executes a command in a container in the pod, copying data
|
||||
// between in/out/err and the container's stdin/stdout/stderr.
|
||||
func (p *MockV0Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {
|
||||
log.G(context.TODO()).Infof("receive ExecInContainer %q", container)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPodStatus returns the status of a pod by name that is "running".
|
||||
// returns nil if a pod by that name is not found.
|
||||
func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetPodStatus")
|
||||
defer span.End()
|
||||
|
||||
// Add namespace and name as attributes to the current span.
|
||||
ctx = addAttributes(ctx, span, namespaceKey, namespace, nameKey, name)
|
||||
|
||||
log.G(ctx).Infof("receive GetPodStatus %q", name)
|
||||
|
||||
pod, err := p.GetPod(ctx, namespace, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pod.Status, nil
|
||||
}
|
||||
|
||||
// GetPods returns a list of all pods known to be "running".
|
||||
func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetPods")
|
||||
defer span.End()
|
||||
|
||||
log.G(ctx).Info("receive GetPods")
|
||||
|
||||
var pods []*v1.Pod
|
||||
|
||||
for _, pod := range p.pods {
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) {
|
||||
ctx, span := trace.StartSpan(ctx, "mock.ConfigureNode")
|
||||
defer span.End()
|
||||
|
||||
n.Status.Capacity = p.capacity()
|
||||
n.Status.Allocatable = p.capacity()
|
||||
n.Status.Conditions = p.nodeConditions()
|
||||
n.Status.Addresses = p.nodeAddresses()
|
||||
n.Status.DaemonEndpoints = p.nodeDaemonEndpoints()
|
||||
os := p.operatingSystem
|
||||
if os == "" {
|
||||
os = "Linux"
|
||||
}
|
||||
n.Status.NodeInfo.OperatingSystem = os
|
||||
n.Status.NodeInfo.Architecture = "amd64"
|
||||
n.ObjectMeta.Labels["alpha.service-controller.kubernetes.io/exclude-balancer"] = "true"
|
||||
}
|
||||
|
||||
// Capacity returns a resource list containing the capacity limits.
|
||||
func (p *MockV0Provider) capacity() v1.ResourceList {
|
||||
return v1.ResourceList{
|
||||
"cpu": resource.MustParse(p.config.CPU),
|
||||
"memory": resource.MustParse(p.config.Memory),
|
||||
"pods": resource.MustParse(p.config.Pods),
|
||||
}
|
||||
}
|
||||
|
||||
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
|
||||
// within Kubernetes.
|
||||
func (p *MockV0Provider) nodeConditions() []v1.NodeCondition {
|
||||
// TODO: Make this configurable
|
||||
return []v1.NodeCondition{
|
||||
{
|
||||
Type: "Ready",
|
||||
Status: v1.ConditionTrue,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletReady",
|
||||
Message: "kubelet is ready.",
|
||||
},
|
||||
{
|
||||
Type: "OutOfDisk",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletHasSufficientDisk",
|
||||
Message: "kubelet has sufficient disk space available",
|
||||
},
|
||||
{
|
||||
Type: "MemoryPressure",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletHasSufficientMemory",
|
||||
Message: "kubelet has sufficient memory available",
|
||||
},
|
||||
{
|
||||
Type: "DiskPressure",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletHasNoDiskPressure",
|
||||
Message: "kubelet has no disk pressure",
|
||||
},
|
||||
{
|
||||
Type: "NetworkUnavailable",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "RouteCreated",
|
||||
Message: "RouteController created a route",
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// NodeAddresses returns a list of addresses for the node status
|
||||
// within Kubernetes.
|
||||
func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress {
|
||||
return []v1.NodeAddress{
|
||||
{
|
||||
Type: "InternalIP",
|
||||
Address: p.internalIP,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
|
||||
// within Kubernetes.
|
||||
func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
|
||||
return v1.NodeDaemonEndpoints{
|
||||
KubeletEndpoint: v1.DaemonEndpoint{
|
||||
Port: p.daemonEndpointPort,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// GetStatsSummary returns dummy stats for all pods known by this provider.
|
||||
func (p *MockV0Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetStatsSummary")
|
||||
defer span.End()
|
||||
|
||||
// Grab the current timestamp so we can report it as the time the stats were generated.
|
||||
time := metav1.NewTime(time.Now())
|
||||
|
||||
// Create the Summary object that will later be populated with node and pod stats.
|
||||
res := &stats.Summary{}
|
||||
|
||||
// Populate the Summary object with basic node stats.
|
||||
res.Node = stats.NodeStats{
|
||||
NodeName: p.nodeName,
|
||||
StartTime: metav1.NewTime(p.startTime),
|
||||
}
|
||||
|
||||
// Populate the Summary object with dummy stats for each pod known by this provider.
|
||||
for _, pod := range p.pods {
|
||||
var (
|
||||
// totalUsageNanoCores will be populated with the sum of the values of UsageNanoCores computes across all containers in the pod.
|
||||
totalUsageNanoCores uint64
|
||||
// totalUsageBytes will be populated with the sum of the values of UsageBytes computed across all containers in the pod.
|
||||
totalUsageBytes uint64
|
||||
)
|
||||
|
||||
// Create a PodStats object to populate with pod stats.
|
||||
pss := stats.PodStats{
|
||||
PodRef: stats.PodReference{
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
UID: string(pod.UID),
|
||||
},
|
||||
StartTime: pod.CreationTimestamp,
|
||||
}
|
||||
|
||||
// Iterate over all containers in the current pod to compute dummy stats.
|
||||
for _, container := range pod.Spec.Containers {
|
||||
// Grab a dummy value to be used as the total CPU usage.
|
||||
// The value should fit a uint32 in order to avoid overflows later on when computing pod stats.
|
||||
dummyUsageNanoCores := uint64(rand.Uint32())
|
||||
totalUsageNanoCores += dummyUsageNanoCores
|
||||
// Create a dummy value to be used as the total RAM usage.
|
||||
// The value should fit a uint32 in order to avoid overflows later on when computing pod stats.
|
||||
dummyUsageBytes := uint64(rand.Uint32())
|
||||
totalUsageBytes += dummyUsageBytes
|
||||
// Append a ContainerStats object containing the dummy stats to the PodStats object.
|
||||
pss.Containers = append(pss.Containers, stats.ContainerStats{
|
||||
Name: container.Name,
|
||||
StartTime: pod.CreationTimestamp,
|
||||
CPU: &stats.CPUStats{
|
||||
Time: time,
|
||||
UsageNanoCores: &dummyUsageNanoCores,
|
||||
},
|
||||
Memory: &stats.MemoryStats{
|
||||
Time: time,
|
||||
UsageBytes: &dummyUsageBytes,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Populate the CPU and RAM stats for the pod and append the PodsStats object to the Summary object to be returned.
|
||||
pss.CPU = &stats.CPUStats{
|
||||
Time: time,
|
||||
UsageNanoCores: &totalUsageNanoCores,
|
||||
}
|
||||
pss.Memory = &stats.MemoryStats{
|
||||
Time: time,
|
||||
UsageBytes: &totalUsageBytes,
|
||||
}
|
||||
res.Pods = append(res.Pods, pss)
|
||||
}
|
||||
|
||||
// Return the dummy stats.
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
|
||||
// within the provider.
|
||||
func (p *MockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
|
||||
p.notifier = notifier
|
||||
}
|
||||
|
||||
func buildKeyFromNames(namespace string, name string) (string, error) {
|
||||
return fmt.Sprintf("%s-%s", namespace, name), nil
|
||||
}
|
||||
|
||||
// buildKey is a helper for building the "key" for the providers pod store.
|
||||
func buildKey(pod *v1.Pod) (string, error) {
|
||||
if pod.ObjectMeta.Namespace == "" {
|
||||
return "", fmt.Errorf("pod namespace not found")
|
||||
}
|
||||
|
||||
if pod.ObjectMeta.Name == "" {
|
||||
return "", fmt.Errorf("pod name not found")
|
||||
}
|
||||
|
||||
return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
|
||||
}
|
||||
|
||||
// addAttributes adds the specified attributes to the provided span.
|
||||
// attrs must be an even-sized list of string arguments.
|
||||
// Otherwise, the span won't be modified.
|
||||
// TODO: Refactor and move to a "tracing utilities" package.
|
||||
func addAttributes(ctx context.Context, span trace.Span, attrs ...string) context.Context {
|
||||
if len(attrs)%2 == 1 {
|
||||
return ctx
|
||||
}
|
||||
for i := 0; i < len(attrs); i += 2 {
|
||||
ctx = span.WithField(ctx, attrs[i], attrs[i+1])
|
||||
}
|
||||
return ctx
|
||||
}
|
||||
12
cmd/virtual-kubelet/internal/provider/mock/mock_test.go
Normal file
12
cmd/virtual-kubelet/internal/provider/mock/mock_test.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package mock
|
||||
|
||||
// We can guarantee the right interfaces are implemented inside of by putting casts in place. We must do the verification
|
||||
// that a given type *does not* implement a given interface in this test.
|
||||
// Cannot implement this due to: https://github.com/virtual-kubelet/virtual-kubelet/issues/632
|
||||
/*
|
||||
func TestMockLegacyInterface(t *testing.T) {
|
||||
var mlp providers.Provider = &MockLegacyProvider{}
|
||||
_, ok := mlp.(node.PodNotifier)
|
||||
assert.Assert(t, !ok)
|
||||
}
|
||||
*/
|
||||
36
cmd/virtual-kubelet/internal/provider/provider.go
Normal file
36
cmd/virtual-kubelet/internal/provider/provider.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/node/api"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
)
|
||||
|
||||
// Provider contains the methods required to implement a virtual-kubelet provider.
|
||||
//
|
||||
// Errors produced by these methods should implement an interface from
|
||||
// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the
|
||||
// core logic to be able to understand the type of failure.
|
||||
type Provider interface {
|
||||
node.PodLifecycleHandler
|
||||
|
||||
// GetContainerLogs retrieves the logs of a container by name from the provider.
|
||||
GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error)
|
||||
|
||||
// RunInContainer executes a command in a container in the pod, copying data
|
||||
// between in/out/err and the container's stdin/stdout/stderr.
|
||||
RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error
|
||||
|
||||
// ConfigureNode enables a provider to configure the node object that
|
||||
// will be used for Kubernetes.
|
||||
ConfigureNode(context.Context, *v1.Node)
|
||||
}
|
||||
|
||||
// PodMetricsProvider is an optional interface that providers can implement to expose pod stats
|
||||
type PodMetricsProvider interface {
|
||||
GetStatsSummary(context.Context) (*stats.Summary, error)
|
||||
}
|
||||
74
cmd/virtual-kubelet/internal/provider/store.go
Normal file
74
cmd/virtual-kubelet/internal/provider/store.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
||||
)
|
||||
|
||||
// Store is used for registering/fetching providers
|
||||
type Store struct {
|
||||
mu sync.Mutex
|
||||
ls map[string]InitFunc
|
||||
}
|
||||
|
||||
func NewStore() *Store {
|
||||
return &Store{
|
||||
ls: make(map[string]InitFunc),
|
||||
}
|
||||
}
|
||||
|
||||
// Register registers a providers init func by name
|
||||
func (s *Store) Register(name string, f InitFunc) error {
|
||||
if f == nil {
|
||||
return errdefs.InvalidInput("provided init function cannot not be nil")
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.ls[name] = f
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get gets the registered init func for the given name
|
||||
// The returned function may be nil if the given name is not registered.
|
||||
func (s *Store) Get(name string) InitFunc {
|
||||
s.mu.Lock()
|
||||
f := s.ls[name]
|
||||
s.mu.Unlock()
|
||||
return f
|
||||
}
|
||||
|
||||
// List lists all the registered providers
|
||||
func (s *Store) List() []string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
ls := make([]string, 0, len(s.ls))
|
||||
for p := range s.ls {
|
||||
ls = append(ls, p)
|
||||
}
|
||||
|
||||
return ls
|
||||
}
|
||||
|
||||
// Exists returns if there is an init function registered under the provided name
|
||||
func (s *Store) Exists(name string) bool {
|
||||
s.mu.Lock()
|
||||
_, ok := s.ls[name]
|
||||
s.mu.Unlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
// InitConfig is the config passed to initialize a registered provider.
|
||||
type InitConfig struct {
|
||||
ConfigPath string
|
||||
NodeName string
|
||||
OperatingSystem string
|
||||
InternalIP string
|
||||
DaemonPort int32
|
||||
KubeClusterDomain string
|
||||
ResourceManager *manager.ResourceManager
|
||||
}
|
||||
|
||||
type InitFunc func(InitConfig) (Provider, error)
|
||||
27
cmd/virtual-kubelet/internal/provider/types.go
Normal file
27
cmd/virtual-kubelet/internal/provider/types.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package provider
|
||||
|
||||
const (
|
||||
// OperatingSystemLinux is the configuration value for defining Linux.
|
||||
OperatingSystemLinux = "Linux"
|
||||
// OperatingSystemWindows is the configuration value for defining Windows.
|
||||
OperatingSystemWindows = "Windows"
|
||||
)
|
||||
|
||||
type OperatingSystems map[string]bool
|
||||
|
||||
var (
|
||||
// ValidOperatingSystems defines the group of operating systems
|
||||
// that can be used as a kubelet node.
|
||||
ValidOperatingSystems = OperatingSystems{
|
||||
OperatingSystemLinux: true,
|
||||
OperatingSystemWindows: true,
|
||||
}
|
||||
)
|
||||
|
||||
func (o OperatingSystems) Names() []string {
|
||||
keys := make([]string, 0, len(o))
|
||||
for k := range o {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
@@ -25,12 +25,12 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
cmdproviders "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/commands/providers"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/commands/root"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/commands/version"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/commands/providers"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/commands/root"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/commands/version"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/trace/opencensus"
|
||||
)
|
||||
@@ -57,11 +57,11 @@ func main() {
|
||||
optsErr := root.SetDefaultOpts(&opts)
|
||||
opts.Version = strings.Join([]string{k8sVersion, "vk", buildVersion}, "-")
|
||||
|
||||
s := providers.NewStore()
|
||||
s := provider.NewStore()
|
||||
registerMock(s)
|
||||
|
||||
rootCmd := root.NewCommand(ctx, filepath.Base(os.Args[0]), s, opts)
|
||||
rootCmd.AddCommand(version.NewCommand(buildVersion, buildTime), cmdproviders.NewCommand(s))
|
||||
rootCmd.AddCommand(version.NewCommand(buildVersion, buildTime), providers.NewCommand(s))
|
||||
preRun := rootCmd.PreRunE
|
||||
|
||||
var logLevel string
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/mock"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider/mock"
|
||||
)
|
||||
|
||||
func registerMock(s *providers.Store) {
|
||||
s.Register("mock", func(cfg providers.InitConfig) (providers.Provider, error) {
|
||||
func registerMock(s *provider.Store) {
|
||||
s.Register("mock", func(cfg provider.InitConfig) (provider.Provider, error) {
|
||||
return mock.NewMockProvider(
|
||||
cfg.ConfigPath,
|
||||
cfg.NodeName,
|
||||
@@ -16,7 +16,7 @@ func registerMock(s *providers.Store) {
|
||||
)
|
||||
})
|
||||
|
||||
s.Register("mockV0", func(cfg providers.InitConfig) (providers.Provider, error) {
|
||||
s.Register("mockV0", func(cfg provider.InitConfig) (provider.Provider, error) {
|
||||
return mock.NewMockProvider(
|
||||
cfg.ConfigPath,
|
||||
cfg.NodeName,
|
||||
|
||||
Reference in New Issue
Block a user