diff --git a/cmd/virtual-kubelet/internal/provider/mock/mock.go b/cmd/virtual-kubelet/internal/provider/mock/mock.go index 48b9e8322..31296e9ae 100644 --- a/cmd/virtual-kubelet/internal/provider/mock/mock.go +++ b/cmd/virtual-kubelet/internal/provider/mock/mock.go @@ -41,8 +41,8 @@ var ( ) */ -// MockV0Provider implements the virtual-kubelet provider interface and stores pods in memory. -type MockV0Provider struct { //nolint:golint +// MockProvider implements the virtual-kubelet provider interface and stores pods in memory. +type MockProvider struct { // nolint:golint nodeName string operatingSystem string internalIP string @@ -53,11 +53,6 @@ type MockV0Provider struct { //nolint:golint notifier func(*v1.Pod) } -// MockProvider is like MockV0Provider, but implements the PodNotifier interface -type MockProvider struct { //nolint:golint - *MockV0Provider -} - // MockConfig contains a mock virtual-kubelet's configurable parameters. type MockConfig struct { //nolint:golint CPU string `json:"cpu,omitempty"` @@ -66,7 +61,7 @@ type MockConfig struct { //nolint:golint } // NewMockProviderMockConfig creates a new MockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface -func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) { +func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) { //set defaults if config.CPU == "" { config.CPU = defaultCPUCapacity @@ -77,7 +72,7 @@ func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem st if config.Pods == "" { config.Pods = defaultPodCapacity } - provider := MockV0Provider{ + provider := MockProvider{ nodeName: nodeName, operatingSystem: operatingSystem, internalIP: internalIP, @@ -85,32 +80,11 @@ func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem st pods: make(map[string]*v1.Pod), config: config, startTime: time.Now(), - // By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface, - // it will be set, and then we'll call a real underlying implementation. - // This makes it easier in the sense we don't need to wrap each method. - notifier: func(*v1.Pod) {}, } return &provider, nil } -// NewMockV0Provider creates a new MockV0Provider -func NewMockV0Provider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) { - config, err := loadConfig(providerConfig, nodeName) - if err != nil { - return nil, err - } - - return NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort) -} - -// NewMockProviderMockConfig creates a new MockProvider with the given config -func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) { - p, err := NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort) - - return &MockProvider{MockV0Provider: p}, err -} - // NewMockProvider creates a new MockProvider, which implements the PodNotifier interface func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) { config, err := loadConfig(providerConfig, nodeName) @@ -158,7 +132,7 @@ func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) } // CreatePod accepts a Pod definition and stores it in memory. -func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { +func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "CreatePod") defer span.End() @@ -215,7 +189,7 @@ func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { } // UpdatePod accepts a Pod definition and updates its reference. -func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { +func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { ctx, span := trace.StartSpan(ctx, "UpdatePod") defer span.End() @@ -236,7 +210,7 @@ func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { } // DeletePod deletes the specified pod out of memory. -func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { +func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { ctx, span := trace.StartSpan(ctx, "DeletePod") defer span.End() @@ -277,7 +251,7 @@ func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) } // GetPod returns a pod by name that is stored in memory. -func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { +func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { ctx, span := trace.StartSpan(ctx, "GetPod") defer func() { span.SetStatus(err) @@ -301,7 +275,7 @@ func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (po } // GetContainerLogs retrieves the logs of a container by name from the provider. -func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { +func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { ctx, span := trace.StartSpan(ctx, "GetContainerLogs") defer span.End() @@ -314,14 +288,14 @@ func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podNam // RunInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. -func (p *MockV0Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { +func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { log.G(context.TODO()).Infof("receive ExecInContainer %q", container) return nil } // GetPodStatus returns the status of a pod by name that is "running". // returns nil if a pod by that name is not found. -func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { +func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { ctx, span := trace.StartSpan(ctx, "GetPodStatus") defer span.End() @@ -339,7 +313,7 @@ func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name strin } // GetPods returns a list of all pods known to be "running". -func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { +func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { ctx, span := trace.StartSpan(ctx, "GetPods") defer span.End() @@ -354,7 +328,7 @@ func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { return pods, nil } -func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) { +func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) { ctx, span := trace.StartSpan(ctx, "mock.ConfigureNode") //nolint:ineffassign defer span.End() @@ -373,7 +347,7 @@ func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) { } // Capacity returns a resource list containing the capacity limits. -func (p *MockV0Provider) capacity() v1.ResourceList { +func (p *MockProvider) capacity() v1.ResourceList { return v1.ResourceList{ "cpu": resource.MustParse(p.config.CPU), "memory": resource.MustParse(p.config.Memory), @@ -383,7 +357,7 @@ func (p *MockV0Provider) capacity() v1.ResourceList { // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status // within Kubernetes. -func (p *MockV0Provider) nodeConditions() []v1.NodeCondition { +func (p *MockProvider) nodeConditions() []v1.NodeCondition { // TODO: Make this configurable return []v1.NodeCondition{ { @@ -432,7 +406,7 @@ func (p *MockV0Provider) nodeConditions() []v1.NodeCondition { // NodeAddresses returns a list of addresses for the node status // within Kubernetes. -func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress { +func (p *MockProvider) nodeAddresses() []v1.NodeAddress { return []v1.NodeAddress{ { Type: "InternalIP", @@ -443,7 +417,7 @@ func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress { // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // within Kubernetes. -func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints { +func (p *MockProvider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints { return v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{ Port: p.daemonEndpointPort, @@ -452,7 +426,7 @@ func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints { } // GetStatsSummary returns dummy stats for all pods known by this provider. -func (p *MockV0Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) { +func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) { var span trace.Span ctx, span = trace.StartSpan(ctx, "GetStatsSummary") //nolint: ineffassign defer span.End() diff --git a/cmd/virtual-kubelet/register.go b/cmd/virtual-kubelet/register.go index e845e5642..f315338a3 100644 --- a/cmd/virtual-kubelet/register.go +++ b/cmd/virtual-kubelet/register.go @@ -15,14 +15,4 @@ func registerMock(s *provider.Store) { cfg.DaemonPort, ) }) - - s.Register("mockV0", func(cfg provider.InitConfig) (provider.Provider, error) { //nolint:errcheck - return mock.NewMockProvider( - cfg.ConfigPath, - cfg.NodeName, - cfg.OperatingSystem, - cfg.InternalIP, - cfg.DaemonPort, - ) - }) } diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 6a4ea0325..9b0d0971d 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -116,15 +116,6 @@ func TestPodLifecycle(t *testing.T) { testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) })) }) - - if testing.Short() { - return - } - t.Run("mockV0Provider", func(t *testing.T) { - assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { - testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) - })) - }) }) // createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider @@ -160,20 +151,13 @@ func TestPodLifecycle(t *testing.T) { t.Run("mockProvider", func(t *testing.T) { mp := newMockProvider() assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testDanglingPodScenario(ctx, t, s, mp.mockV0Provider) + testDanglingPodScenario(ctx, t, s, mp) })) }) if testing.Short() { return } - - t.Run("mockV0Provider", func(t *testing.T) { - mp := newMockV0Provider() - assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testDanglingPodScenario(ctx, t, s, mp) - })) - }) }) // failedPodScenario ensures that the VK ignores failed pods that were failed prior to the PC starting up @@ -184,16 +168,6 @@ func TestPodLifecycle(t *testing.T) { testFailedPodScenario(ctx, t, s) })) }) - - if testing.Short() { - return - } - - t.Run("mockV0Provider", func(t *testing.T) { - assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { - testFailedPodScenario(ctx, t, s) - })) - }) }) // succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the PC starting up. @@ -204,14 +178,6 @@ func TestPodLifecycle(t *testing.T) { testSucceededPodScenario(ctx, t, s) })) }) - if testing.Short() { - return - } - t.Run("mockV0Provider", func(t *testing.T) { - assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { - testSucceededPodScenario(ctx, t, s) - })) - }) }) // updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated @@ -224,15 +190,6 @@ func TestPodLifecycle(t *testing.T) { })) }) }) - - // podStatusMissingWhileRunningScenario waits for the pod to go into the running state, with a V0 style provider, - // and then makes the pod disappear! - t.Run("podStatusMissingWhileRunningScenario", func(t *testing.T) { - mp := newMockV0Provider() - assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { - testPodStatusMissingWhileRunningScenario(ctx, t, s, mp) - })) - }) } type testFunction func(ctx context.Context, s *system) @@ -358,7 +315,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system, assert.DeepEqual(t, p1, p2) } -func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) { +func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) { t.Parallel() pod := newPod() @@ -372,7 +329,6 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo } func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error) { - ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -530,87 +486,6 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys assert.NilError(t, m.updates.until(ctx, func(v int) bool { return v > 0 })) } -func testPodStatusMissingWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) { - t.Parallel() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - p := newPod() - key, err := buildKey(p) - assert.NilError(t, err) - - listOptions := metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(), - } - - watchErrCh := make(chan error) - - // Create a Pod - _, e := s.client.CoreV1().Pods(testNamespace).Create(p) - assert.NilError(t, e) - - // Setup a watch to check if the pod is in running - watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions) - assert.NilError(t, err) - defer watcher.Stop() - go func() { - newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, - // Wait for the pod to be started - func(ev watch.Event) (bool, error) { - pod := ev.Object.(*corev1.Pod) - return pod.Status.Phase == corev1.PodRunning, nil - }) - // This deepcopy is required to please the race detector - p = newPod.Object.(*corev1.Pod).DeepCopy() - watchErrCh <- watchErr - }() - - // Start the pod controller - assert.NilError(t, s.start(ctx)) - - // Wait for pod to be in running - select { - case <-ctx.Done(): - t.Fatalf("Context ended early: %s", ctx.Err().Error()) - case <-s.pc.Done(): - assert.NilError(t, s.pc.Err()) - t.Fatal("Pod controller exited prematurely without error") - case err = <-watchErrCh: - assert.NilError(t, err) - - } - - // Setup a watch to check if the pod is in failed due to provider issues - watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions) - assert.NilError(t, err) - defer watcher.Stop() - go func() { - newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, - // Wait for the pod to be failed - func(ev watch.Event) (bool, error) { - pod := ev.Object.(*corev1.Pod) - return pod.Status.Phase == corev1.PodFailed, nil - }) - // This deepcopy is required to please the race detector - p = newPod.Object.(*corev1.Pod).DeepCopy() - watchErrCh <- watchErr - }() - - // delete the pod from the mock provider - m.pods.Delete(key) - select { - case <-ctx.Done(): - t.Fatalf("Context ended early: %s", ctx.Err().Error()) - case <-s.pc.Done(): - assert.NilError(t, s.pc.Err()) - t.Fatal("Pod controller exited prematurely without error") - case err = <-watchErrCh: - assert.NilError(t, err) - } - - assert.Equal(t, p.Status.Reason, podStatusReasonNotFound) -} - func BenchmarkCreatePods(b *testing.B) { sl := logrus.StandardLogger() sl.SetLevel(logrus.ErrorLevel) diff --git a/node/mock_test.go b/node/mock_test.go index 3ff7cebb7..8554a20cf 100644 --- a/node/mock_test.go +++ b/node/mock_test.go @@ -13,8 +13,7 @@ import ( ) var ( - _ PodLifecycleHandler = (*mockV0Provider)(nil) - _ PodNotifier = (*mockProvider)(nil) + _ PodLifecycleHandler = (*mockProvider)(nil) ) type waitableInt struct { @@ -62,7 +61,7 @@ func (w *waitableInt) increment() { w.cond.Broadcast() } -type mockV0Provider struct { +type mockProvider struct { creates *waitableInt updates *waitableInt deletes *waitableInt @@ -75,13 +74,9 @@ type mockV0Provider struct { realNotifier func(*v1.Pod) } -type mockProvider struct { - *mockV0Provider -} - -// NewMockProviderMockConfig creates a new mockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface -func newMockV0Provider() *mockV0Provider { - provider := mockV0Provider{ +// newMockProvider creates a new mockProvider. +func newMockProvider() *mockProvider { + provider := mockProvider{ startTime: time.Now(), creates: newWaitableInt(), updates: newWaitableInt(), @@ -95,20 +90,15 @@ func newMockV0Provider() *mockV0Provider { return &provider } -// NewMockProviderMockConfig creates a new MockProvider with the given config -func newMockProvider() *mockProvider { - return &mockProvider{mockV0Provider: newMockV0Provider()} -} - // notifier calls the callback that we got from the pod controller to notify it of updates (if it is set) -func (p *mockV0Provider) notifier(pod *v1.Pod) { +func (p *mockProvider) notifier(pod *v1.Pod) { if p.realNotifier != nil { p.realNotifier(pod) } } // CreatePod accepts a Pod definition and stores it in memory. -func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { +func (p *mockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { log.G(ctx).Infof("receive CreatePod %q", pod.Name) p.creates.increment() @@ -160,7 +150,7 @@ func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { } // UpdatePod accepts a Pod definition and updates its reference. -func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { +func (p *mockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { log.G(ctx).Infof("receive UpdatePod %q", pod.Name) p.updates.increment() @@ -177,7 +167,7 @@ func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { // DeletePod deletes the specified pod out of memory. The PodController deepcopies the pod object // for us, so we don't have to worry about mutation. -func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { +func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { log.G(ctx).Infof("receive DeletePod %q", pod.Name) p.attemptedDeletes.increment() @@ -213,21 +203,13 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) } p.notifier(pod) - p.pods.Store(key, pod) - if pod.DeletionGracePeriodSeconds == nil || *pod.DeletionGracePeriodSeconds == 0 { - p.pods.Delete(key) - } else { - time.AfterFunc(time.Duration(*pod.DeletionGracePeriodSeconds)*time.Second, func() { - p.pods.Delete(key) - }) - - } + p.pods.Delete(key) return nil } // GetPod returns a pod by name that is stored in memory. -func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { +func (p *mockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { log.G(ctx).Infof("receive GetPod %q", name) key, err := buildKeyFromNames(namespace, name) @@ -243,7 +225,7 @@ func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (po // GetPodStatus returns the status of a pod by name that is "running". // returns nil if a pod by that name is not found. -func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { +func (p *mockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { log.G(ctx).Infof("receive GetPodStatus %q", name) pod, err := p.GetPod(ctx, namespace, name) @@ -255,7 +237,7 @@ func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name strin } // GetPods returns a list of all pods known to be "running". -func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { +func (p *mockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { log.G(ctx).Info("receive GetPods") var pods []*v1.Pod diff --git a/node/pod.go b/node/pod.go index eb2294f17..bd2a901e9 100644 --- a/node/pod.go +++ b/node/pod.go @@ -16,7 +16,6 @@ package node import ( "context" - "time" "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" @@ -26,18 +25,12 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) const ( - podStatusReasonProviderFailed = "ProviderFailed" - podStatusReasonNotFound = "NotFound" - podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider" - containerStatusReasonNotFound = "NotFound" - containerStatusMessageNotFound = "Container was not found and was likely deleted" - containerStatusExitCodeNotFound = -137 + podStatusReasonProviderFailed = "ProviderFailed" ) func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { @@ -198,78 +191,6 @@ func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, return nil } -// fetchPodStatusesFromProvider syncs the providers pod status with the kubernetes pod status. -func (pc *PodController) fetchPodStatusesFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) { - ctx, span := trace.StartSpan(ctx, "fetchPodStatusesFromProvider") - defer span.End() - - // Update all the pods with the provider status. - pods, err := pc.podsLister.List(labels.Everything()) - if err != nil { - err = pkgerrors.Wrap(err, "error getting pod list from kubernetes") - span.SetStatus(err) - log.G(ctx).WithError(err).Error("Error updating pod statuses") - return - } - ctx = span.WithField(ctx, "nPods", int64(len(pods))) - - for _, pod := range pods { - if !shouldSkipPodStatusUpdate(pod) { - enrichedPod, err := pc.fetchPodStatusFromProvider(ctx, q, pod) - if err != nil { - log.G(ctx).WithFields(map[string]interface{}{ - "name": pod.Name, - "namespace": pod.Namespace, - }).WithError(err).Error("Could not fetch pod status") - } else if enrichedPod != nil { - pc.enqueuePodStatusUpdate(ctx, q, enrichedPod) - } - } - } -} - -// fetchPodStatusFromProvider returns a pod (the pod we pass in) enriched with the pod status from the provider. If the pod is not found, -// and it has been 1 minute since the pod was created, or the pod was previously running, it will be marked as failed. -// If a valid pod status cannot be generated, for example, if a pod is not found in the provider, and it has been less than 1 minute -// since pod creation, we will return nil for the pod. -func (pc *PodController) fetchPodStatusFromProvider(ctx context.Context, q workqueue.RateLimitingInterface, podFromKubernetes *corev1.Pod) (*corev1.Pod, error) { - podStatus, err := pc.provider.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name) - if errdefs.IsNotFound(err) || (err == nil && podStatus == nil) { - // Only change the status when the pod was already up - // Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation. - if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute { - // Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created. - podStatus = podFromKubernetes.Status.DeepCopy() - podStatus.Phase = corev1.PodFailed - podStatus.Reason = podStatusReasonNotFound - podStatus.Message = podStatusMessageNotFound - now := metav1.NewTime(time.Now()) - for i, c := range podStatus.ContainerStatuses { - if c.State.Running == nil { - continue - } - podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ - ExitCode: containerStatusExitCodeNotFound, - Reason: containerStatusReasonNotFound, - Message: containerStatusMessageNotFound, - FinishedAt: now, - StartedAt: c.State.Running.StartedAt, - ContainerID: c.ContainerID, - } - podStatus.ContainerStatuses[i].State.Running = nil - } - } else if err != nil { - return nil, nil - } - } else if err != nil { - return nil, err - } - - pod := podFromKubernetes.DeepCopy() - podStatus.DeepCopyInto(&pod.Status) - return pod, nil -} - func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool { return pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed || diff --git a/node/pod_test.go b/node/pod_test.go index 0d7df7523..abf0ec515 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -225,109 +225,6 @@ func TestPodDelete(t *testing.T) { } } -func TestFetchPodStatusFromProvider(t *testing.T) { - startedAt := metav1.NewTime(time.Now()) - finishedAt := metav1.NewTime(startedAt.Add(time.Second * 10)) - containerStateRunning := &corev1.ContainerStateRunning{StartedAt: startedAt} - containerStateTerminated := &corev1.ContainerStateTerminated{StartedAt: startedAt, FinishedAt: finishedAt} - containerStateWaiting := &corev1.ContainerStateWaiting{} - - testCases := []struct { - desc string - running *corev1.ContainerStateRunning - terminated *corev1.ContainerStateTerminated - waiting *corev1.ContainerStateWaiting - expectedStartedAt metav1.Time - expectedFinishedAt metav1.Time - }{ - {desc: "container in running state", running: containerStateRunning, expectedStartedAt: startedAt}, - {desc: "container in terminated state", terminated: containerStateTerminated, expectedStartedAt: startedAt, expectedFinishedAt: finishedAt}, - {desc: "container in waiting state", waiting: containerStateWaiting}, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - c := newTestController() - pod := &corev1.Pod{} - pod.ObjectMeta.Namespace = "default" - pod.ObjectMeta.Name = "nginx" - pod.Status.Phase = corev1.PodRunning - containerStatus := corev1.ContainerStatus{} - if tc.running != nil { - containerStatus.State.Running = tc.running - } else if tc.terminated != nil { - containerStatus.State.Terminated = tc.terminated - } else if tc.waiting != nil { - containerStatus.State.Waiting = tc.waiting - } - pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus} - - pc := c.client.CoreV1().Pods("default") - p, err := pc.Create(pod) - assert.NilError(t, err) - - ctx := context.Background() - updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p) - assert.NilError(t, err) - - assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed) - assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1)) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil) - - // Test cases for running and terminated container state - if tc.running != nil || tc.terminated != nil { - // Ensure that the container is in terminated state and other states are nil - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil) - - terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated - assert.Equal(t, terminated.StartedAt, tc.expectedStartedAt) - assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt)) - if tc.terminated != nil { - assert.Equal(t, terminated.FinishedAt, tc.expectedFinishedAt) - } - } else { - // Test case for waiting container state - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated == nil) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting != nil) - } - }) - } -} - -func TestFetchPodStatusFromProviderWithExpiredPod(t *testing.T) { - c := newTestController() - pod := &corev1.Pod{} - pod.ObjectMeta.Namespace = "default" - pod.ObjectMeta.Name = "nginx" - pod.Status.Phase = corev1.PodRunning - containerStatus := corev1.ContainerStatus{} - - // We should terminate containers in a pod that has not provided pod status update for more than a minute - startedAt := time.Now().Add(-(time.Minute + time.Second)) - containerStatus.State.Running = &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(startedAt)} - pod.ObjectMeta.CreationTimestamp.Time = startedAt - pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus} - - pc := c.client.CoreV1().Pods("default") - p, err := pc.Create(pod) - assert.NilError(t, err) - - ctx := context.Background() - updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p) - assert.NilError(t, err) - - assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed) - assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1)) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil) - assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil) - - terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated - assert.Equal(t, terminated.StartedAt, metav1.NewTime(startedAt)) - assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt)) -} - func newPodSpec() corev1.PodSpec { return corev1.PodSpec{ Containers: []corev1.Container{ diff --git a/node/podcontroller.go b/node/podcontroller.go index d70e90360..05dc7b88b 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -49,7 +49,9 @@ type PodLifecycleHandler interface { // UpdatePod takes a Kubernetes Pod and updates it within the provider. UpdatePod(ctx context.Context, pod *corev1.Pod) error - // DeletePod takes a Kubernetes Pod and deletes it from the provider. + // DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is + // expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal + // state, as well as the pod. DeletePod(ctx context.Context, pod *corev1.Pod) error // GetPod retrieves a pod by name from the provider (can be cached). @@ -69,12 +71,7 @@ type PodLifecycleHandler interface { // concurrently outside of the calling goroutine. Therefore it is recommended // to return a version after DeepCopy. GetPods(context.Context) ([]*corev1.Pod, error) -} -// PodNotifier notifies callers of pod changes. -// Providers should implement this interface to enable callers to be notified -// of pod status updates asynchronously. -type PodNotifier interface { // NotifyPods instructs the notifier to call the passed in function when // the pod status changes. It should be called when a pod's status changes. // @@ -218,7 +215,10 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er }() podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") - pc.runSyncFromProvider(ctx, podStatusQueue) + pc.provider.NotifyPods(ctx, func(pod *corev1.Pod) { + pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy()) + }) + defer podStatusQueue.ShutDown() // Wait for the caches to be synced *before* starting to do work. diff --git a/node/queue.go b/node/queue.go index 540d6a4ad..12572b5de 100644 --- a/node/queue.go +++ b/node/queue.go @@ -16,12 +16,10 @@ package node import ( "context" - "time" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" - corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" ) @@ -105,40 +103,3 @@ func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID st return handleQueueItem(ctx, q, pc.podStatusHandler) } - -// providerSyncLoop synchronizes pod states from the provider back to kubernetes -// Deprecated: This is only used when the provider does not support async updates -// Providers should implement async update support, even if it just means copying -// something like this in. -func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) { - const sleepTime = 5 * time.Second - - t := time.NewTimer(sleepTime) - defer t.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-t.C: - t.Stop() - - ctx, span := trace.StartSpan(ctx, "syncActualState") - pc.fetchPodStatusesFromProvider(ctx, q) - span.End() - - // restart the timer - t.Reset(sleepTime) - } - } -} - -func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) { - if pn, ok := pc.provider.(PodNotifier); ok { - pn.NotifyPods(ctx, func(pod *corev1.Pod) { - pc.enqueuePodStatusUpdate(ctx, q, pod.DeepCopy()) - }) - } else { - go pc.providerSyncLoop(ctx, q) - } -}