Merge pull request #627 from sargun/make-mock-provider-async2

Convert mock provider to async provider
This commit is contained in:
Brian Goff
2019-06-05 15:06:35 -07:00
committed by GitHub
3 changed files with 156 additions and 81 deletions

View File

@@ -32,8 +32,17 @@ const (
containerNameKey = "containerName"
)
// MockProvider implements the virtual-kubelet provider interface and stores pods in memory.
type MockProvider struct {
// See: https://github.com/virtual-kubelet/virtual-kubelet/issues/632
/*
var (
_ providers.Provider = (*MockLegacyProvider)(nil)
_ providers.PodMetricsProvider = (*MockLegacyProvider)(nil)
_ vkubelet.PodNotifier = (*MockProvider)(nil)
)
*/
// MockLegacyProvider implements the virtual-kubelet provider interface and stores pods in memory.
type MockLegacyProvider struct {
nodeName string
operatingSystem string
internalIP string
@@ -41,6 +50,12 @@ type MockProvider struct {
pods map[string]*v1.Pod
config MockConfig
startTime time.Time
notifier func(*v1.Pod)
}
// MockProvider is like MockLegacyProvider, but implements the PodNotifier interface
type MockProvider struct {
*MockLegacyProvider
}
// MockConfig contains a mock virtual-kubelet's configurable parameters.
@@ -50,28 +65,8 @@ type MockConfig struct {
Pods string `json:"pods,omitempty"`
}
// NewMockProvider creates a new MockProvider
func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
config, err := loadConfig(providerConfig, nodeName)
if err != nil {
return nil, err
}
provider := MockProvider{
nodeName: nodeName,
operatingSystem: operatingSystem,
internalIP: internalIP,
daemonEndpointPort: daemonEndpointPort,
pods: make(map[string]*v1.Pod),
config: config,
startTime: time.Now(),
}
return &provider, nil
}
// NewMockProviderMockConfig creates a new MockProvider with the given Mock Config
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
// NewMockProviderMockConfig creates a new MockLegacyProvider. Mock legacy provider does not implement the new asynchronous podnotifier interface
func NewMockLegacyProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockLegacyProvider, error) {
//set defaults
if config.CPU == "" {
config.CPU = defaultCPUCapacity
@@ -82,8 +77,7 @@ func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem stri
if config.Pods == "" {
config.Pods = defaultPodCapacity
}
provider := MockProvider{
provider := MockLegacyProvider{
nodeName: nodeName,
operatingSystem: operatingSystem,
internalIP: internalIP,
@@ -91,12 +85,43 @@ func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem stri
pods: make(map[string]*v1.Pod),
config: config,
startTime: time.Now(),
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
notifier: func(*v1.Pod) {},
}
return &provider, nil
}
// loadConfig loads the given json configuration files.
// NewMockLegacyProvider creates a new MockLegacyProvider
func NewMockLegacyProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockLegacyProvider, error) {
config, err := loadConfig(providerConfig, nodeName)
if err != nil {
return nil, err
}
return NewMockLegacyProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
}
// NewMockProviderMockConfig creates a new MockProvider with the given config
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
p, err := NewMockLegacyProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
return &MockProvider{MockLegacyProvider: p}, err
}
// NewMockProvider creates a new MockProvider, which implements the PodNotifier interface
func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
config, err := loadConfig(providerConfig, nodeName)
if err != nil {
return nil, err
}
return NewMockProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
}
// loadConfig loads the given json configuration files.
func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) {
data, err := ioutil.ReadFile(providerConfig)
if err != nil {
@@ -133,27 +158,64 @@ func loadConfig(providerConfig, nodeName string) (config MockConfig, err error)
}
// CreatePod accepts a Pod definition and stores it in memory.
func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
func (p *MockLegacyProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
ctx, span := trace.StartSpan(ctx, "CreatePod")
defer span.End()
// Add the pod's coordinates to the current span.
ctx = addAttributes(ctx, span, namespaceKey, pod.Namespace, nameKey, pod.Name)
log.G(ctx).Info("receive CreatePod %q", pod.Name)
log.G(ctx).Infof("receive CreatePod %q", pod.Name)
key, err := buildKey(pod)
if err != nil {
return err
}
now := metav1.NewTime(time.Now())
pod.Status = v1.PodStatus{
Phase: v1.PodRunning,
HostIP: "1.2.3.4",
PodIP: "5.6.7.8",
StartTime: &now,
Conditions: []v1.PodCondition{
{
Type: v1.PodInitialized,
Status: v1.ConditionTrue,
},
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
}
for _, container := range pod.Spec.Containers {
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
Name: container.Name,
Image: container.Image,
Ready: true,
RestartCount: 0,
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{
StartedAt: now,
},
},
})
}
p.pods[key] = pod
p.notifier(pod)
return nil
}
// UpdatePod accepts a Pod definition and updates its reference.
func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
func (p *MockLegacyProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
ctx, span := trace.StartSpan(ctx, "UpdatePod")
defer span.End()
@@ -168,12 +230,13 @@ func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
}
p.pods[key] = pod
p.notifier(pod)
return nil
}
// DeletePod deletes the specified pod out of memory.
func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
func (p *MockLegacyProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
ctx, span := trace.StartSpan(ctx, "DeletePod")
defer span.End()
@@ -191,13 +254,30 @@ func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
return errdefs.NotFound("pod not found")
}
now := metav1.Now()
delete(p.pods, key)
pod.Status.Phase = v1.PodSucceeded
pod.Status.Reason = "MockProviderPodDeleted"
for idx := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[idx].Ready = false
pod.Status.ContainerStatuses[idx].State = v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
Message: "Mock provider terminated container upon deletion",
FinishedAt: now,
Reason: "MockProviderPodContainerDeleted",
StartedAt: pod.Status.ContainerStatuses[idx].State.Running.StartedAt,
},
}
}
p.notifier(pod)
return nil
}
// GetPod returns a pod by name that is stored in memory.
func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
func (p *MockLegacyProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
ctx, span := trace.StartSpan(ctx, "GetPod")
defer func() {
span.SetStatus(err)
@@ -221,7 +301,7 @@ func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod
}
// GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
func (p *MockLegacyProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
ctx, span := trace.StartSpan(ctx, "GetContainerLogs")
defer span.End()
@@ -234,20 +314,20 @@ func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName,
// Get full pod name as defined in the provider context
// TODO: Implementation
func (p *MockProvider) GetPodFullName(namespace string, pod string) string {
func (p *MockLegacyProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {
func (p *MockLegacyProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {
log.G(context.TODO()).Infof("receive ExecInContainer %q", container)
return nil
}
// GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found.
func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
func (p *MockLegacyProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
ctx, span := trace.StartSpan(ctx, "GetPodStatus")
defer span.End()
@@ -256,53 +336,16 @@ func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string)
log.G(ctx).Infof("receive GetPodStatus %q", name)
now := metav1.NewTime(time.Now())
status := &v1.PodStatus{
Phase: v1.PodRunning,
HostIP: "1.2.3.4",
PodIP: "5.6.7.8",
StartTime: &now,
Conditions: []v1.PodCondition{
{
Type: v1.PodInitialized,
Status: v1.ConditionTrue,
},
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
},
},
}
pod, err := p.GetPod(ctx, namespace, name)
if err != nil {
return nil, err
}
for _, container := range pod.Spec.Containers {
status.ContainerStatuses = append(status.ContainerStatuses, v1.ContainerStatus{
Name: container.Name,
Image: container.Image,
Ready: true,
RestartCount: 0,
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{
StartedAt: now,
},
},
})
}
return status, nil
return &pod.Status, nil
}
// GetPods returns a list of all pods known to be "running".
func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
func (p *MockLegacyProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
ctx, span := trace.StartSpan(ctx, "GetPods")
defer span.End()
@@ -318,7 +361,7 @@ func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
}
// Capacity returns a resource list containing the capacity limits.
func (p *MockProvider) Capacity(ctx context.Context) v1.ResourceList {
func (p *MockLegacyProvider) Capacity(ctx context.Context) v1.ResourceList {
ctx, span := trace.StartSpan(ctx, "Capacity")
defer span.End()
@@ -331,7 +374,7 @@ func (p *MockProvider) Capacity(ctx context.Context) v1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
// within Kubernetes.
func (p *MockProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
func (p *MockLegacyProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
ctx, span := trace.StartSpan(ctx, "NodeConditions")
defer span.End()
@@ -383,7 +426,7 @@ func (p *MockProvider) NodeConditions(ctx context.Context) []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *MockProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
func (p *MockLegacyProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
ctx, span := trace.StartSpan(ctx, "NodeAddresses")
defer span.End()
@@ -397,7 +440,7 @@ func (p *MockProvider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *MockProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
func (p *MockLegacyProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
ctx, span := trace.StartSpan(ctx, "NodeDaemonEndpoints")
defer span.End()
@@ -410,12 +453,14 @@ func (p *MockProvider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEn
// OperatingSystem returns the operating system for this provider.
// This is a noop to default to Linux for now.
func (p *MockProvider) OperatingSystem() string {
func (p *MockLegacyProvider) OperatingSystem() string {
// This is harcoded due to: https://github.com/virtual-kubelet/virtual-kubelet/issues/632
return "Linux"
}
// GetStatsSummary returns dummy stats for all pods known by this provider.
func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
func (p *MockLegacyProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
ctx, span := trace.StartSpan(ctx, "GetStatsSummary")
defer span.End()
@@ -491,6 +536,12 @@ func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, err
return res, nil
}
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
// within the provider.
func (p *MockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
p.notifier = notifier
}
func buildKeyFromNames(namespace string, name string) (string, error) {
return fmt.Sprintf("%s-%s", namespace, name), nil
}

View File

@@ -0,0 +1,12 @@
package mock
// We can guarantee the right interfaces are implemented inside of by putting casts in place. We must do the verification
// that a given type *does not* implement a given interface in this test.
// Cannot implement this due to: https://github.com/virtual-kubelet/virtual-kubelet/issues/632
/*
func TestMockLegacyInterface(t *testing.T) {
var mlp providers.Provider = &MockLegacyProvider{}
_, ok := mlp.(vkubelet.PodNotifier)
assert.Assert(t, !ok)
}
*/

View File

@@ -9,6 +9,8 @@ import (
func init() {
register("mock", initMock)
register("mocklegacy", initMockLegacy)
}
func initMock(cfg InitConfig) (providers.Provider, error) {
@@ -20,3 +22,13 @@ func initMock(cfg InitConfig) (providers.Provider, error) {
cfg.DaemonPort,
)
}
func initMockLegacy(cfg InitConfig) (providers.Provider, error) {
return mock.NewMockLegacyProvider(
cfg.ConfigPath,
cfg.NodeName,
cfg.OperatingSystem,
cfg.InternalIP,
cfg.DaemonPort,
)
}