From 4202b03cda64137259d9bc8c293a41fc2acc5ee8 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 2 Oct 2019 09:28:09 -0700 Subject: [PATCH] Remove sync provider support This removes the legacy sync provider interface. All new providers are expected to implement the async NotifyPods interface. The legacy sync provider interface creates complexities around how the deletion flow works, and the mixed sync and async APIs block us from evolving functionality. This collapses in the NotifyPods interface into the PodLifecycleHandler interface. --- .../internal/provider/mock/mock.go | 62 +++------ cmd/virtual-kubelet/register.go | 10 -- node/lifecycle_test.go | 129 +----------------- node/mock_test.go | 44 ++---- node/pod.go | 81 +---------- node/pod_test.go | 103 -------------- node/podcontroller.go | 14 +- node/queue.go | 39 ------ 8 files changed, 41 insertions(+), 441 deletions(-) diff --git a/cmd/virtual-kubelet/internal/provider/mock/mock.go b/cmd/virtual-kubelet/internal/provider/mock/mock.go index 48b9e8322..31296e9ae 100644 --- a/cmd/virtual-kubelet/internal/provider/mock/mock.go +++ b/cmd/virtual-kubelet/internal/provider/mock/mock.go @@ -41,8 +41,8 @@ var ( ) */ -// MockV0Provider implements the virtual-kubelet provider interface and stores pods in memory. -type MockV0Provider struct { //nolint:golint +// MockProvider implements the virtual-kubelet provider interface and stores pods in memory. +type MockProvider struct { // nolint:golint nodeName string operatingSystem string internalIP string @@ -53,11 +53,6 @@ type MockV0Provider struct { //nolint:golint notifier func(*v1.Pod) } -// MockProvider is like MockV0Provider, but implements the PodNotifier interface -type MockProvider struct { //nolint:golint - *MockV0Provider -} - // MockConfig contains a mock virtual-kubelet's configurable parameters. type MockConfig struct { //nolint:golint CPU string `json:"cpu,omitempty"` @@ -66,7 +61,7 @@ type MockConfig struct { //nolint:golint } // NewMockProviderMockConfig creates a new MockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface -func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) { +func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) { //set defaults if config.CPU == "" { config.CPU = defaultCPUCapacity @@ -77,7 +72,7 @@ func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem st if config.Pods == "" { config.Pods = defaultPodCapacity } - provider := MockV0Provider{ + provider := MockProvider{ nodeName: nodeName, operatingSystem: operatingSystem, internalIP: internalIP, @@ -85,32 +80,11 @@ func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem st pods: make(map[string]*v1.Pod), config: config, startTime: time.Now(), - // By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface, - // it will be set, and then we'll call a real underlying implementation. - // This makes it easier in the sense we don't need to wrap each method. - notifier: func(*v1.Pod) {}, } return &provider, nil } -// NewMockV0Provider creates a new MockV0Provider -func NewMockV0Provider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) { - config, err := loadConfig(providerConfig, nodeName) - if err != nil { - return nil, err - } - - return NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort) -} - -// NewMockProviderMockConfig creates a new MockProvider with the given config -func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) { - p, err := NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort) - - return &MockProvider{MockV0Provider: p}, err -} - // NewMockProvider creates a new MockProvider, which implements the PodNotifier interface func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) { config, err := loadConfig(providerConfig, nodeName) @@ -158,7 +132,7 @@ func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) } // CreatePod accepts a Pod definition and stores it in memory. -func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { +func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "CreatePod") defer span.End() @@ -215,7 +189,7 @@ func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { } // UpdatePod accepts a Pod definition and updates its reference. -func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { +func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "UpdatePod") defer span.End() @@ -236,7 +210,7 @@ func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { } // DeletePod deletes the specified pod out of memory. -func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { +func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { ctx, span := trace.StartSpan(ctx, "DeletePod") defer span.End() @@ -277,7 +251,7 @@ func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) } // GetPod returns a pod by name that is stored in memory. -func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { +func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { ctx, span := trace.StartSpan(ctx, "GetPod") defer func() { span.SetStatus(err) @@ -301,7 +275,7 @@ func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (po } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { +func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { ctx, span := trace.StartSpan(ctx, "GetContainerLogs") defer span.End() @@ -314,14 +288,14 @@ func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podNam // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *MockV0Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { +func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { log.G(context.TODO()).Infof("receive ExecInContainer %q", container) return nil } // GetPodStatus returns the status of a pod by name that is "running". // returns nil if a pod by that name is not found. -func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { +func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { ctx, span := trace.StartSpan(ctx, "GetPodStatus") defer span.End() @@ -339,7 +313,7 @@ func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name strin } // GetPods returns a list of all pods known to be "running". -func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { +func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { ctx, span := trace.StartSpan(ctx, "GetPods") defer span.End() @@ -354,7 +328,7 @@ func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { return pods, nil } -func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) { +func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) { ctx, span := trace.StartSpan(ctx, "mock.ConfigureNode") //nolint:ineffassign defer span.End() @@ -373,7 +347,7 @@ func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) { } // Capacity returns a resource list containing the capacity limits. -func (p *MockV0Provider) capacity() v1.ResourceList { +func (p *MockProvider) capacity() v1.ResourceList { return v1.ResourceList{ "cpu": resource.MustParse(p.config.CPU), "memory": resource.MustParse(p.config.Memory), @@ -383,7 +357,7 @@ func (p *MockV0Provider) capacity() v1.ResourceList { // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status // within Kubernetes. -func (p *MockV0Provider) nodeConditions() []v1.NodeCondition { +func (p *MockProvider) nodeConditions() []v1.NodeCondition { // TODO: Make this configurable return []v1.NodeCondition{ { @@ -432,7 +406,7 @@ func (p *MockV0Provider) nodeConditions() []v1.NodeCondition { // NodeAddresses returns a list of addresses for the node status // within Kubernetes. -func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress { +func (p *MockProvider) nodeAddresses() []v1.NodeAddress { return []v1.NodeAddress{ { Type: "InternalIP", @@ -443,7 +417,7 @@ func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress { // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. -func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints { +func (p *MockProvider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints { return v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{ Port: p.daemonEndpointPort, @@ -452,7 +426,7 @@ func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints { } // GetStatsSummary returns dummy stats for all pods known by this provider. -func (p *MockV0Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) { +func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) { var span trace.Span ctx, span = trace.StartSpan(ctx, "GetStatsSummary") //nolint: ineffassign defer span.End() diff --git a/cmd/virtual-kubelet/register.go b/cmd/virtual-kubelet/register.go index e845e5642..f315338a3 100644 --- a/cmd/virtual-kubelet/register.go +++ b/cmd/virtual-kubelet/register.go @@ -15,14 +15,4 @@ func registerMock(s *provider.Store) { cfg.DaemonPort, ) }) - - s.Register("mockV0", func(cfg provider.InitConfig) (provider.Provider, error) { //nolint:errcheck - return mock.NewMockProvider( - cfg.ConfigPath, - cfg.NodeName, - cfg.OperatingSystem, - cfg.InternalIP, - cfg.DaemonPort, - ) - }) } diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 6a4ea0325..9b0d0971d 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -116,15 +116,6 @@ func TestPodLifecycle(t *testing.T) { testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) })) }) - - if testing.Short() { - return - } - t.Run("mockV0Provider", func(t *testing.T) { - assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) - })) - }) }) // createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider @@ -160,20 +151,13 @@ func TestPodLifecycle(t *testing.T) { t.Run("mockProvider", func(t *testing.T) { mp := newMockProvider() assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testDanglingPodScenario(ctx, t, s, mp.mockV0Provider) + testDanglingPodScenario(ctx, t, s, mp) })) }) if testing.Short() { return } - - t.Run("mockV0Provider", func(t *testing.T) { - mp := newMockV0Provider() - assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testDanglingPodScenario(ctx, t, s, mp) - })) - }) }) // failedPodScenario ensures that the VK ignores failed pods that were failed prior to the PC starting up @@ -184,16 +168,6 @@ func TestPodLifecycle(t *testing.T) { testFailedPodScenario(ctx, t, s) })) }) - - if testing.Short() { - return - } - - t.Run("mockV0Provider", func(t *testing.T) { - assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { - testFailedPodScenario(ctx, t, s) - })) - }) }) // succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the PC starting up. @@ -204,14 +178,6 @@ func TestPodLifecycle(t *testing.T) { testSucceededPodScenario(ctx, t, s) })) }) - if testing.Short() { - return - } - t.Run("mockV0Provider", func(t *testing.T) { - assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { - testSucceededPodScenario(ctx, t, s) - })) - }) }) // updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated @@ -224,15 +190,6 @@ func TestPodLifecycle(t *testing.T) { })) }) }) - - // podStatusMissingWhileRunningScenario waits for the pod to go into the running state, with a V0 style provider, - // and then makes the pod disappear! - t.Run("podStatusMissingWhileRunningScenario", func(t *testing.T) { - mp := newMockV0Provider() - assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testPodStatusMissingWhileRunningScenario(ctx, t, s, mp) - })) - }) } type testFunction func(ctx context.Context, s *system) @@ -358,7 +315,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system, assert.DeepEqual(t, p1, p2) } -func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) { +func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) { t.Parallel() pod := newPod() @@ -372,7 +329,6 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo } func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error) { - ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -530,87 +486,6 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys assert.NilError(t, m.updates.until(ctx, func(v int) bool { return v > 0 })) } -func testPodStatusMissingWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) { - t.Parallel() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - p := newPod() - key, err := buildKey(p) - assert.NilError(t, err) - - listOptions := metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(), - } - - watchErrCh := make(chan error) - - // Create a Pod - _, e := s.client.CoreV1().Pods(testNamespace).Create(p) - assert.NilError(t, e) - - // Setup a watch to check if the pod is in running - watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions) - assert.NilError(t, err) - defer watcher.Stop() - go func() { - newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, - // Wait for the pod to be started - func(ev watch.Event) (bool, error) { - pod := ev.Object.(*corev1.Pod) - return pod.Status.Phase == corev1.PodRunning, nil - }) - // This deepcopy is required to please the race detector - p = newPod.Object.(*corev1.Pod).DeepCopy() - watchErrCh <- watchErr - }() - - // Start the pod controller - assert.NilError(t, s.start(ctx)) - - // Wait for pod to be in running - select { - case <-ctx.Done(): - t.Fatalf("Context ended early: %s", ctx.Err().Error()) - case <-s.pc.Done(): - assert.NilError(t, s.pc.Err()) - t.Fatal("Pod controller exited prematurely without error") - case err = <-watchErrCh: - assert.NilError(t, err) - - } - - // Setup a watch to check if the pod is in failed due to provider issues - watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions) - assert.NilError(t, err) - defer watcher.Stop() - go func() { - newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, - // Wait for the pod to be failed - func(ev watch.Event) (bool, error) { - pod := ev.Object.(*corev1.Pod) - return pod.Status.Phase == corev1.PodFailed, nil - }) - // This deepcopy is required to please the race detector - p = newPod.Object.(*corev1.Pod).DeepCopy() - watchErrCh <- watchErr - }() - - // delete the pod from the mock provider - m.pods.Delete(key) - select { - case <-ctx.Done(): - t.Fatalf("Context ended early: %s", ctx.Err().Error()) - case <-s.pc.Done(): - assert.NilError(t, s.pc.Err()) - t.Fatal("Pod controller exited prematurely without error") - case err = <-watchErrCh: - assert.NilError(t, err) - } - - assert.Equal(t, p.Status.Reason, podStatusReasonNotFound) -} - func BenchmarkCreatePods(b *testing.B) { sl := logrus.StandardLogger() sl.SetLevel(logrus.ErrorLevel) diff --git a/node/mock_test.go b/node/mock_test.go index 3ff7cebb7..8554a20cf 100644 --- a/node/mock_test.go +++ b/node/mock_test.go @@ -13,8 +13,7 @@ import ( ) var ( - _ PodLifecycleHandler = (*mockV0Provider)(nil) - _ PodNotifier = (*mockProvider)(nil) + _ PodLifecycleHandler = (*mockProvider)(nil) ) type waitableInt struct { @@ -62,7 +61,7 @@ func (w *waitableInt) increment() { w.cond.Broadcast() } -type mockV0Provider struct { +type mockProvider struct { creates *waitableInt updates *waitableInt deletes *waitableInt @@ -75,13 +74,9 @@ type mockV0Provider struct { realNotifier func(*v1.Pod) } -type mockProvider struct { - *mockV0Provider -} - -// NewMockProviderMockConfig creates a new mockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface -func newMockV0Provider() *mockV0Provider { - provider := mockV0Provider{ +// newMockProvider creates a new mockProvider. +func newMockProvider() *mockProvider { + provider := mockProvider{ startTime: time.Now(), creates: newWaitableInt(), updates: newWaitableInt(), @@ -95,20 +90,15 @@ func newMockV0Provider() *mockV0Provider { return &provider } -// NewMockProviderMockConfig creates a new MockProvider with the given config -func newMockProvider() *mockProvider { - return &mockProvider{mockV0Provider: newMockV0Provider()} -} - // notifier calls the callback that we got from the pod controller to notify it of updates (if it is set) -func (p *mockV0Provider) notifier(pod *v1.Pod) { +func (p *mockProvider) notifier(pod *v1.Pod) { if p.realNotifier != nil { p.realNotifier(pod) } } // CreatePod accepts a Pod definition and stores it in memory. -func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { +func (p *mockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { log.G(ctx).Infof("receive CreatePod %q", pod.Name) p.creates.increment() @@ -160,7 +150,7 @@ func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { } // UpdatePod accepts a Pod definition and updates its reference. -func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { +func (p *mockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { log.G(ctx).Infof("receive UpdatePod %q", pod.Name) p.updates.increment() @@ -177,7 +167,7 @@ func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { // DeletePod deletes the specified pod out of memory. The PodController deepcopies the pod object // for us, so we don't have to worry about mutation. -func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { +func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { log.G(ctx).Infof("receive DeletePod %q", pod.Name) p.attemptedDeletes.increment() @@ -213,21 +203,13 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) } p.notifier(pod) - p.pods.Store(key, pod) - if pod.DeletionGracePeriodSeconds == nil || *pod.DeletionGracePeriodSeconds == 0 { - p.pods.Delete(key) - } else { - time.AfterFunc(time.Duration(*pod.DeletionGracePeriodSeconds)*time.Second, func() { - p.pods.Delete(key) - }) - - } + p.pods.Delete(key) return nil } // GetPod returns a pod by name that is stored in memory. -func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { +func (p *mockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { log.G(ctx).Infof("receive GetPod %q", name) key, err := buildKeyFromNames(namespace, name) @@ -243,7 +225,7 @@ func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (po // GetPodStatus returns the status of a pod by name that is "running". // returns nil if a pod by that name is not found. -func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { +func (p *mockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { log.G(ctx).Infof("receive GetPodStatus %q", name) pod, err := p.GetPod(ctx, namespace, name) @@ -255,7 +237,7 @@ func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name strin } // GetPods returns a list of all pods known to be "running". -func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { +func (p *mockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { log.G(ctx).Info("receive GetPods") var pods []*v1.Pod diff --git a/node/pod.go b/node/pod.go index eb2294f17..bd2a901e9 100644 --- a/node/pod.go +++ b/node/pod.go @@ -16,7 +16,6 @@ package node import ( "context" - "time" "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" @@ -26,18 +25,12 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) const ( - podStatusReasonProviderFailed = "ProviderFailed" - podStatusReasonNotFound = "NotFound" - podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider" - containerStatusReasonNotFound = "NotFound" - containerStatusMessageNotFound = "Container was not found and was likely deleted" - containerStatusExitCodeNotFound = -137 + podStatusReasonProviderFailed = "ProviderFailed" ) func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { @@ -198,78 +191,6 @@ func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, return nil } -// fetchPodStatusesFromProvider syncs the providers pod status with the kubernetes pod status. -func (pc *PodController) fetchPodStatusesFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) { - ctx, span := trace.StartSpan(ctx, "fetchPodStatusesFromProvider") - defer span.End() - - // Update all the pods with the provider status. - pods, err := pc.podsLister.List(labels.Everything()) - if err != nil { - err = pkgerrors.Wrap(err, "error getting pod list from kubernetes") - span.SetStatus(err) - log.G(ctx).WithError(err).Error("Error updating pod statuses") - return - } - ctx = span.WithField(ctx, "nPods", int64(len(pods))) - - for _, pod := range pods { - if !shouldSkipPodStatusUpdate(pod) { - enrichedPod, err := pc.fetchPodStatusFromProvider(ctx, q, pod) - if err != nil { - log.G(ctx).WithFields(map[string]interface{}{ - "name": pod.Name, - "namespace": pod.Namespace, - }).WithError(err).Error("Could not fetch pod status") - } else if enrichedPod != nil { - pc.enqueuePodStatusUpdate(ctx, q, enrichedPod) - } - } - } -} - -// fetchPodStatusFromProvider returns a pod (the pod we pass in) enriched with the pod status from the provider. If the pod is not found, -// and it has been 1 minute since the pod was created, or the pod was previously running, it will be marked as failed. -// If a valid pod status cannot be generated, for example, if a pod is not found in the provider, and it has been less than 1 minute -// since pod creation, we will return nil for the pod. -func (pc *PodController) fetchPodStatusFromProvider(ctx context.Context, q workqueue.RateLimitingInterface, podFromKubernetes *corev1.Pod) (*corev1.Pod, error) { - podStatus, err := pc.provider.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name) - if errdefs.IsNotFound(err) || (err == nil && podStatus == nil) { - // Only change the status when the pod was already up - // Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation. - if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute { - // Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created. - podStatus = podFromKubernetes.Status.DeepCopy() - podStatus.Phase = corev1.PodFailed - podStatus.Reason = podStatusReasonNotFound - podStatus.Message = podStatusMessageNotFound - now := metav1.NewTime(time.Now()) - for i, c := range podStatus.ContainerStatuses { - if c.State.Running == nil { - continue - } - podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ - ExitCode: containerStatusExitCodeNotFound, - Reason: containerStatusReasonNotFound, - Message: containerStatusMessageNotFound, - FinishedAt: now, - StartedAt: c.State.Running.StartedAt, - ContainerID: c.ContainerID, - } - podStatus.ContainerStatuses[i].State.Running = nil - } - } else if err != nil { - return nil, nil - } - } else if err != nil { - return nil, err - } - - pod := podFromKubernetes.DeepCopy() - podStatus.DeepCopyInto(&pod.Status) - return pod, nil -} - func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool { return pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed || diff --git a/node/pod_test.go b/node/pod_test.go index 0d7df7523..abf0ec515 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -225,109 +225,6 @@ func TestPodDelete(t *testing.T) { } } -func TestFetchPodStatusFromProvider(t *testing.T) { - startedAt := metav1.NewTime(time.Now()) - finishedAt := metav1.NewTime(startedAt.Add(time.Second * 10)) - containerStateRunning := &corev1.ContainerStateRunning{StartedAt: startedAt} - containerStateTerminated := &corev1.ContainerStateTerminated{StartedAt: startedAt, FinishedAt: finishedAt} - containerStateWaiting := &corev1.ContainerStateWaiting{} - - testCases := []struct { - desc string - running *corev1.ContainerStateRunning - terminated *corev1.ContainerStateTerminated - waiting *corev1.ContainerStateWaiting - expectedStartedAt metav1.Time - expectedFinishedAt metav1.Time - }{ - {desc: "container in running state", running: containerStateRunning, expectedStartedAt: startedAt}, - {desc: "container in terminated state", terminated: containerStateTerminated, expectedStartedAt: startedAt, expectedFinishedAt: finishedAt}, - {desc: "container in waiting state", waiting: containerStateWaiting}, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - c := newTestController() - pod := &corev1.Pod{} - pod.ObjectMeta.Namespace = "default" - pod.ObjectMeta.Name = "nginx" - pod.Status.Phase = corev1.PodRunning - containerStatus := corev1.ContainerStatus{} - if tc.running != nil { - containerStatus.State.Running = tc.running - } else if tc.terminated != nil { - containerStatus.State.Terminated = tc.terminated - } else if tc.waiting != nil { - containerStatus.State.Waiting = tc.waiting - } - pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus} - - pc := c.client.CoreV1().Pods("default") - p, err := pc.Create(pod) - assert.NilError(t, err) - - ctx := context.Background() - updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p) - assert.NilError(t, err) - - assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed) - assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1)) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil) - - // Test cases for running and terminated container state - if tc.running != nil || tc.terminated != nil { - // Ensure that the container is in terminated state and other states are nil - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil) - - terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated - assert.Equal(t, terminated.StartedAt, tc.expectedStartedAt) - assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt)) - if tc.terminated != nil { - assert.Equal(t, terminated.FinishedAt, tc.expectedFinishedAt) - } - } else { - // Test case for waiting container state - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated == nil) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting != nil) - } - }) - } -} - -func TestFetchPodStatusFromProviderWithExpiredPod(t *testing.T) { - c := newTestController() - pod := &corev1.Pod{} - pod.ObjectMeta.Namespace = "default" - pod.ObjectMeta.Name = "nginx" - pod.Status.Phase = corev1.PodRunning - containerStatus := corev1.ContainerStatus{} - - // We should terminate containers in a pod that has not provided pod status update for more than a minute - startedAt := time.Now().Add(-(time.Minute + time.Second)) - containerStatus.State.Running = &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(startedAt)} - pod.ObjectMeta.CreationTimestamp.Time = startedAt - pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus} - - pc := c.client.CoreV1().Pods("default") - p, err := pc.Create(pod) - assert.NilError(t, err) - - ctx := context.Background() - updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p) - assert.NilError(t, err) - - assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed) - assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1)) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil) - - terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated - assert.Equal(t, terminated.StartedAt, metav1.NewTime(startedAt)) - assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt)) -} - func newPodSpec() corev1.PodSpec { return corev1.PodSpec{ Containers: []corev1.Container{ diff --git a/node/podcontroller.go b/node/podcontroller.go index d70e90360..05dc7b88b 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -49,7 +49,9 @@ type PodLifecycleHandler interface { // UpdatePod takes a Kubernetes Pod and updates it within the provider. UpdatePod(ctx context.Context, pod *corev1.Pod) error - // DeletePod takes a Kubernetes Pod and deletes it from the provider. + // DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is + // expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal + // state, as well as the pod. DeletePod(ctx context.Context, pod *corev1.Pod) error // GetPod retrieves a pod by name from the provider (can be cached). @@ -69,12 +71,7 @@ type PodLifecycleHandler interface { // concurrently outside of the calling goroutine. Therefore it is recommended // to return a version after DeepCopy. GetPods(context.Context) ([]*corev1.Pod, error) -} -// PodNotifier notifies callers of pod changes. -// Providers should implement this interface to enable callers to be notified -// of pod status updates asynchronously. -type PodNotifier interface { // NotifyPods instructs the notifier to call the passed in function when // the pod status changes. It should be called when a pod's status changes. // @@ -218,7 +215,10 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er }() podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") - pc.runSyncFromProvider(ctx, podStatusQueue) + pc.provider.NotifyPods(ctx, func(pod *corev1.Pod) { + pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy()) + }) + defer podStatusQueue.ShutDown() // Wait for the caches to be synced *before* starting to do work. diff --git a/node/queue.go b/node/queue.go index 540d6a4ad..12572b5de 100644 --- a/node/queue.go +++ b/node/queue.go @@ -16,12 +16,10 @@ package node import ( "context" - "time" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" - corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" ) @@ -105,40 +103,3 @@ func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID st return handleQueueItem(ctx, q, pc.podStatusHandler) } - -// providerSyncLoop synchronizes pod states from the provider back to kubernetes -// Deprecated: This is only used when the provider does not support async updates -// Providers should implement async update support, even if it just means copying -// something like this in. -func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) { - const sleepTime = 5 * time.Second - - t := time.NewTimer(sleepTime) - defer t.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-t.C: - t.Stop() - - ctx, span := trace.StartSpan(ctx, "syncActualState") - pc.fetchPodStatusesFromProvider(ctx, q) - span.End() - - // restart the timer - t.Reset(sleepTime) - } - } -} - -func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) { - if pn, ok := pc.provider.(PodNotifier); ok { - pn.NotifyPods(ctx, func(pod *corev1.Pod) { - pc.enqueuePodStatusUpdate(ctx, q, pod.DeepCopy()) - }) - } else { - go pc.providerSyncLoop(ctx, q) - } -}