Merge pull request #770 from sargun/remove-sync-providers
Remove sync providers
This commit is contained in:
@@ -41,8 +41,8 @@ var (
|
||||
)
|
||||
*/
|
||||
|
||||
// MockV0Provider implements the virtual-kubelet provider interface and stores pods in memory.
|
||||
type MockV0Provider struct { //nolint:golint
|
||||
// MockProvider implements the virtual-kubelet provider interface and stores pods in memory.
|
||||
type MockProvider struct { // nolint:golint
|
||||
nodeName string
|
||||
operatingSystem string
|
||||
internalIP string
|
||||
@@ -53,11 +53,6 @@ type MockV0Provider struct { //nolint:golint
|
||||
notifier func(*v1.Pod)
|
||||
}
|
||||
|
||||
// MockProvider is like MockV0Provider, but implements the PodNotifier interface
|
||||
type MockProvider struct { //nolint:golint
|
||||
*MockV0Provider
|
||||
}
|
||||
|
||||
// MockConfig contains a mock virtual-kubelet's configurable parameters.
|
||||
type MockConfig struct { //nolint:golint
|
||||
CPU string `json:"cpu,omitempty"`
|
||||
@@ -66,7 +61,7 @@ type MockConfig struct { //nolint:golint
|
||||
}
|
||||
|
||||
// NewMockProviderMockConfig creates a new MockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface
|
||||
func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) {
|
||||
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
|
||||
//set defaults
|
||||
if config.CPU == "" {
|
||||
config.CPU = defaultCPUCapacity
|
||||
@@ -77,7 +72,7 @@ func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem st
|
||||
if config.Pods == "" {
|
||||
config.Pods = defaultPodCapacity
|
||||
}
|
||||
provider := MockV0Provider{
|
||||
provider := MockProvider{
|
||||
nodeName: nodeName,
|
||||
operatingSystem: operatingSystem,
|
||||
internalIP: internalIP,
|
||||
@@ -85,32 +80,11 @@ func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem st
|
||||
pods: make(map[string]*v1.Pod),
|
||||
config: config,
|
||||
startTime: time.Now(),
|
||||
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
|
||||
// it will be set, and then we'll call a real underlying implementation.
|
||||
// This makes it easier in the sense we don't need to wrap each method.
|
||||
notifier: func(*v1.Pod) {},
|
||||
}
|
||||
|
||||
return &provider, nil
|
||||
}
|
||||
|
||||
// NewMockV0Provider creates a new MockV0Provider
|
||||
func NewMockV0Provider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) {
|
||||
config, err := loadConfig(providerConfig, nodeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
}
|
||||
|
||||
// NewMockProviderMockConfig creates a new MockProvider with the given config
|
||||
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
|
||||
p, err := NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
|
||||
return &MockProvider{MockV0Provider: p}, err
|
||||
}
|
||||
|
||||
// NewMockProvider creates a new MockProvider, which implements the PodNotifier interface
|
||||
func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
|
||||
config, err := loadConfig(providerConfig, nodeName)
|
||||
@@ -158,7 +132,7 @@ func loadConfig(providerConfig, nodeName string) (config MockConfig, err error)
|
||||
}
|
||||
|
||||
// CreatePod accepts a Pod definition and stores it in memory.
|
||||
func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
ctx, span := trace.StartSpan(ctx, "CreatePod")
|
||||
defer span.End()
|
||||
|
||||
@@ -215,7 +189,7 @@ func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
}
|
||||
|
||||
// UpdatePod accepts a Pod definition and updates its reference.
|
||||
func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
ctx, span := trace.StartSpan(ctx, "UpdatePod")
|
||||
defer span.End()
|
||||
|
||||
@@ -236,7 +210,7 @@ func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
}
|
||||
|
||||
// DeletePod deletes the specified pod out of memory.
|
||||
func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
|
||||
func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
|
||||
ctx, span := trace.StartSpan(ctx, "DeletePod")
|
||||
defer span.End()
|
||||
|
||||
@@ -277,7 +251,7 @@ func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error)
|
||||
}
|
||||
|
||||
// GetPod returns a pod by name that is stored in memory.
|
||||
func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
|
||||
func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetPod")
|
||||
defer func() {
|
||||
span.SetStatus(err)
|
||||
@@ -301,7 +275,7 @@ func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (po
|
||||
}
|
||||
|
||||
// GetContainerLogs retrieves the logs of a container by name from the provider.
|
||||
func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
|
||||
func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetContainerLogs")
|
||||
defer span.End()
|
||||
|
||||
@@ -314,14 +288,14 @@ func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podNam
|
||||
|
||||
// RunInContainer executes a command in a container in the pod, copying data
|
||||
// between in/out/err and the container's stdin/stdout/stderr.
|
||||
func (p *MockV0Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {
|
||||
func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {
|
||||
log.G(context.TODO()).Infof("receive ExecInContainer %q", container)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPodStatus returns the status of a pod by name that is "running".
|
||||
// returns nil if a pod by that name is not found.
|
||||
func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
|
||||
func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetPodStatus")
|
||||
defer span.End()
|
||||
|
||||
@@ -339,7 +313,7 @@ func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name strin
|
||||
}
|
||||
|
||||
// GetPods returns a list of all pods known to be "running".
|
||||
func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
|
||||
func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetPods")
|
||||
defer span.End()
|
||||
|
||||
@@ -354,7 +328,7 @@ func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) {
|
||||
func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) {
|
||||
ctx, span := trace.StartSpan(ctx, "mock.ConfigureNode") //nolint:ineffassign
|
||||
defer span.End()
|
||||
|
||||
@@ -373,7 +347,7 @@ func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) {
|
||||
}
|
||||
|
||||
// Capacity returns a resource list containing the capacity limits.
|
||||
func (p *MockV0Provider) capacity() v1.ResourceList {
|
||||
func (p *MockProvider) capacity() v1.ResourceList {
|
||||
return v1.ResourceList{
|
||||
"cpu": resource.MustParse(p.config.CPU),
|
||||
"memory": resource.MustParse(p.config.Memory),
|
||||
@@ -383,7 +357,7 @@ func (p *MockV0Provider) capacity() v1.ResourceList {
|
||||
|
||||
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
|
||||
// within Kubernetes.
|
||||
func (p *MockV0Provider) nodeConditions() []v1.NodeCondition {
|
||||
func (p *MockProvider) nodeConditions() []v1.NodeCondition {
|
||||
// TODO: Make this configurable
|
||||
return []v1.NodeCondition{
|
||||
{
|
||||
@@ -432,7 +406,7 @@ func (p *MockV0Provider) nodeConditions() []v1.NodeCondition {
|
||||
|
||||
// NodeAddresses returns a list of addresses for the node status
|
||||
// within Kubernetes.
|
||||
func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress {
|
||||
func (p *MockProvider) nodeAddresses() []v1.NodeAddress {
|
||||
return []v1.NodeAddress{
|
||||
{
|
||||
Type: "InternalIP",
|
||||
@@ -443,7 +417,7 @@ func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress {
|
||||
|
||||
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
|
||||
// within Kubernetes.
|
||||
func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
|
||||
func (p *MockProvider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
|
||||
return v1.NodeDaemonEndpoints{
|
||||
KubeletEndpoint: v1.DaemonEndpoint{
|
||||
Port: p.daemonEndpointPort,
|
||||
@@ -452,7 +426,7 @@ func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
|
||||
}
|
||||
|
||||
// GetStatsSummary returns dummy stats for all pods known by this provider.
|
||||
func (p *MockV0Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
|
||||
func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
|
||||
var span trace.Span
|
||||
ctx, span = trace.StartSpan(ctx, "GetStatsSummary") //nolint: ineffassign
|
||||
defer span.End()
|
||||
|
||||
@@ -15,14 +15,4 @@ func registerMock(s *provider.Store) {
|
||||
cfg.DaemonPort,
|
||||
)
|
||||
})
|
||||
|
||||
s.Register("mockV0", func(cfg provider.InitConfig) (provider.Provider, error) { //nolint:errcheck
|
||||
return mock.NewMockProvider(
|
||||
cfg.ConfigPath,
|
||||
cfg.NodeName,
|
||||
cfg.OperatingSystem,
|
||||
cfg.InternalIP,
|
||||
cfg.DaemonPort,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -116,15 +116,6 @@ func TestPodLifecycle(t *testing.T) {
|
||||
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
|
||||
}))
|
||||
})
|
||||
|
||||
if testing.Short() {
|
||||
return
|
||||
}
|
||||
t.Run("mockV0Provider", func(t *testing.T) {
|
||||
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
|
||||
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
|
||||
}))
|
||||
})
|
||||
})
|
||||
|
||||
// createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider
|
||||
@@ -160,20 +151,13 @@ func TestPodLifecycle(t *testing.T) {
|
||||
t.Run("mockProvider", func(t *testing.T) {
|
||||
mp := newMockProvider()
|
||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||
testDanglingPodScenario(ctx, t, s, mp.mockV0Provider)
|
||||
testDanglingPodScenario(ctx, t, s, mp)
|
||||
}))
|
||||
})
|
||||
|
||||
if testing.Short() {
|
||||
return
|
||||
}
|
||||
|
||||
t.Run("mockV0Provider", func(t *testing.T) {
|
||||
mp := newMockV0Provider()
|
||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||
testDanglingPodScenario(ctx, t, s, mp)
|
||||
}))
|
||||
})
|
||||
})
|
||||
|
||||
// failedPodScenario ensures that the VK ignores failed pods that were failed prior to the PC starting up
|
||||
@@ -184,16 +168,6 @@ func TestPodLifecycle(t *testing.T) {
|
||||
testFailedPodScenario(ctx, t, s)
|
||||
}))
|
||||
})
|
||||
|
||||
if testing.Short() {
|
||||
return
|
||||
}
|
||||
|
||||
t.Run("mockV0Provider", func(t *testing.T) {
|
||||
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
|
||||
testFailedPodScenario(ctx, t, s)
|
||||
}))
|
||||
})
|
||||
})
|
||||
|
||||
// succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the PC starting up.
|
||||
@@ -204,14 +178,6 @@ func TestPodLifecycle(t *testing.T) {
|
||||
testSucceededPodScenario(ctx, t, s)
|
||||
}))
|
||||
})
|
||||
if testing.Short() {
|
||||
return
|
||||
}
|
||||
t.Run("mockV0Provider", func(t *testing.T) {
|
||||
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
|
||||
testSucceededPodScenario(ctx, t, s)
|
||||
}))
|
||||
})
|
||||
})
|
||||
|
||||
// updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated
|
||||
@@ -224,15 +190,6 @@ func TestPodLifecycle(t *testing.T) {
|
||||
}))
|
||||
})
|
||||
})
|
||||
|
||||
// podStatusMissingWhileRunningScenario waits for the pod to go into the running state, with a V0 style provider,
|
||||
// and then makes the pod disappear!
|
||||
t.Run("podStatusMissingWhileRunningScenario", func(t *testing.T) {
|
||||
mp := newMockV0Provider()
|
||||
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
|
||||
testPodStatusMissingWhileRunningScenario(ctx, t, s, mp)
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
type testFunction func(ctx context.Context, s *system)
|
||||
@@ -358,7 +315,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
|
||||
assert.DeepEqual(t, p1, p2)
|
||||
}
|
||||
|
||||
func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) {
|
||||
func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) {
|
||||
t.Parallel()
|
||||
|
||||
pod := newPod()
|
||||
@@ -372,7 +329,6 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo
|
||||
}
|
||||
|
||||
func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error) {
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
@@ -530,87 +486,6 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
|
||||
assert.NilError(t, m.updates.until(ctx, func(v int) bool { return v > 0 }))
|
||||
}
|
||||
|
||||
func testPodStatusMissingWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) {
|
||||
t.Parallel()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
p := newPod()
|
||||
key, err := buildKey(p)
|
||||
assert.NilError(t, err)
|
||||
|
||||
listOptions := metav1.ListOptions{
|
||||
FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(),
|
||||
}
|
||||
|
||||
watchErrCh := make(chan error)
|
||||
|
||||
// Create a Pod
|
||||
_, e := s.client.CoreV1().Pods(testNamespace).Create(p)
|
||||
assert.NilError(t, e)
|
||||
|
||||
// Setup a watch to check if the pod is in running
|
||||
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
|
||||
assert.NilError(t, err)
|
||||
defer watcher.Stop()
|
||||
go func() {
|
||||
newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
|
||||
// Wait for the pod to be started
|
||||
func(ev watch.Event) (bool, error) {
|
||||
pod := ev.Object.(*corev1.Pod)
|
||||
return pod.Status.Phase == corev1.PodRunning, nil
|
||||
})
|
||||
// This deepcopy is required to please the race detector
|
||||
p = newPod.Object.(*corev1.Pod).DeepCopy()
|
||||
watchErrCh <- watchErr
|
||||
}()
|
||||
|
||||
// Start the pod controller
|
||||
assert.NilError(t, s.start(ctx))
|
||||
|
||||
// Wait for pod to be in running
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatalf("Context ended early: %s", ctx.Err().Error())
|
||||
case <-s.pc.Done():
|
||||
assert.NilError(t, s.pc.Err())
|
||||
t.Fatal("Pod controller exited prematurely without error")
|
||||
case err = <-watchErrCh:
|
||||
assert.NilError(t, err)
|
||||
|
||||
}
|
||||
|
||||
// Setup a watch to check if the pod is in failed due to provider issues
|
||||
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
|
||||
assert.NilError(t, err)
|
||||
defer watcher.Stop()
|
||||
go func() {
|
||||
newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
|
||||
// Wait for the pod to be failed
|
||||
func(ev watch.Event) (bool, error) {
|
||||
pod := ev.Object.(*corev1.Pod)
|
||||
return pod.Status.Phase == corev1.PodFailed, nil
|
||||
})
|
||||
// This deepcopy is required to please the race detector
|
||||
p = newPod.Object.(*corev1.Pod).DeepCopy()
|
||||
watchErrCh <- watchErr
|
||||
}()
|
||||
|
||||
// delete the pod from the mock provider
|
||||
m.pods.Delete(key)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatalf("Context ended early: %s", ctx.Err().Error())
|
||||
case <-s.pc.Done():
|
||||
assert.NilError(t, s.pc.Err())
|
||||
t.Fatal("Pod controller exited prematurely without error")
|
||||
case err = <-watchErrCh:
|
||||
assert.NilError(t, err)
|
||||
}
|
||||
|
||||
assert.Equal(t, p.Status.Reason, podStatusReasonNotFound)
|
||||
}
|
||||
|
||||
func BenchmarkCreatePods(b *testing.B) {
|
||||
sl := logrus.StandardLogger()
|
||||
sl.SetLevel(logrus.ErrorLevel)
|
||||
|
||||
@@ -13,8 +13,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
_ PodLifecycleHandler = (*mockV0Provider)(nil)
|
||||
_ PodNotifier = (*mockProvider)(nil)
|
||||
_ PodLifecycleHandler = (*mockProvider)(nil)
|
||||
)
|
||||
|
||||
type waitableInt struct {
|
||||
@@ -62,7 +61,7 @@ func (w *waitableInt) increment() {
|
||||
w.cond.Broadcast()
|
||||
}
|
||||
|
||||
type mockV0Provider struct {
|
||||
type mockProvider struct {
|
||||
creates *waitableInt
|
||||
updates *waitableInt
|
||||
deletes *waitableInt
|
||||
@@ -75,13 +74,9 @@ type mockV0Provider struct {
|
||||
realNotifier func(*v1.Pod)
|
||||
}
|
||||
|
||||
type mockProvider struct {
|
||||
*mockV0Provider
|
||||
}
|
||||
|
||||
// NewMockProviderMockConfig creates a new mockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface
|
||||
func newMockV0Provider() *mockV0Provider {
|
||||
provider := mockV0Provider{
|
||||
// newMockProvider creates a new mockProvider.
|
||||
func newMockProvider() *mockProvider {
|
||||
provider := mockProvider{
|
||||
startTime: time.Now(),
|
||||
creates: newWaitableInt(),
|
||||
updates: newWaitableInt(),
|
||||
@@ -95,20 +90,15 @@ func newMockV0Provider() *mockV0Provider {
|
||||
return &provider
|
||||
}
|
||||
|
||||
// NewMockProviderMockConfig creates a new MockProvider with the given config
|
||||
func newMockProvider() *mockProvider {
|
||||
return &mockProvider{mockV0Provider: newMockV0Provider()}
|
||||
}
|
||||
|
||||
// notifier calls the callback that we got from the pod controller to notify it of updates (if it is set)
|
||||
func (p *mockV0Provider) notifier(pod *v1.Pod) {
|
||||
func (p *mockProvider) notifier(pod *v1.Pod) {
|
||||
if p.realNotifier != nil {
|
||||
p.realNotifier(pod)
|
||||
}
|
||||
}
|
||||
|
||||
// CreatePod accepts a Pod definition and stores it in memory.
|
||||
func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
func (p *mockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
log.G(ctx).Infof("receive CreatePod %q", pod.Name)
|
||||
|
||||
p.creates.increment()
|
||||
@@ -160,7 +150,7 @@ func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
}
|
||||
|
||||
// UpdatePod accepts a Pod definition and updates its reference.
|
||||
func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
func (p *mockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
log.G(ctx).Infof("receive UpdatePod %q", pod.Name)
|
||||
|
||||
p.updates.increment()
|
||||
@@ -177,7 +167,7 @@ func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
|
||||
// DeletePod deletes the specified pod out of memory. The PodController deepcopies the pod object
|
||||
// for us, so we don't have to worry about mutation.
|
||||
func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
|
||||
func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
|
||||
log.G(ctx).Infof("receive DeletePod %q", pod.Name)
|
||||
|
||||
p.attemptedDeletes.increment()
|
||||
@@ -213,21 +203,13 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error)
|
||||
}
|
||||
|
||||
p.notifier(pod)
|
||||
p.pods.Store(key, pod)
|
||||
if pod.DeletionGracePeriodSeconds == nil || *pod.DeletionGracePeriodSeconds == 0 {
|
||||
p.pods.Delete(key)
|
||||
} else {
|
||||
time.AfterFunc(time.Duration(*pod.DeletionGracePeriodSeconds)*time.Second, func() {
|
||||
p.pods.Delete(key)
|
||||
})
|
||||
|
||||
}
|
||||
p.pods.Delete(key)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPod returns a pod by name that is stored in memory.
|
||||
func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
|
||||
func (p *mockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
|
||||
log.G(ctx).Infof("receive GetPod %q", name)
|
||||
|
||||
key, err := buildKeyFromNames(namespace, name)
|
||||
@@ -243,7 +225,7 @@ func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (po
|
||||
|
||||
// GetPodStatus returns the status of a pod by name that is "running".
|
||||
// returns nil if a pod by that name is not found.
|
||||
func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
|
||||
func (p *mockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
|
||||
log.G(ctx).Infof("receive GetPodStatus %q", name)
|
||||
|
||||
pod, err := p.GetPod(ctx, namespace, name)
|
||||
@@ -255,7 +237,7 @@ func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name strin
|
||||
}
|
||||
|
||||
// GetPods returns a list of all pods known to be "running".
|
||||
func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
|
||||
func (p *mockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
|
||||
log.G(ctx).Info("receive GetPods")
|
||||
|
||||
var pods []*v1.Pod
|
||||
|
||||
81
node/pod.go
81
node/pod.go
@@ -16,7 +16,6 @@ package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
pkgerrors "github.com/pkg/errors"
|
||||
@@ -26,18 +25,12 @@ import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
const (
|
||||
podStatusReasonProviderFailed = "ProviderFailed"
|
||||
podStatusReasonNotFound = "NotFound"
|
||||
podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider"
|
||||
containerStatusReasonNotFound = "NotFound"
|
||||
containerStatusMessageNotFound = "Container was not found and was likely deleted"
|
||||
containerStatusExitCodeNotFound = -137
|
||||
podStatusReasonProviderFailed = "ProviderFailed"
|
||||
)
|
||||
|
||||
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
|
||||
@@ -198,78 +191,6 @@ func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace,
|
||||
return nil
|
||||
}
|
||||
|
||||
// fetchPodStatusesFromProvider syncs the providers pod status with the kubernetes pod status.
|
||||
func (pc *PodController) fetchPodStatusesFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
|
||||
ctx, span := trace.StartSpan(ctx, "fetchPodStatusesFromProvider")
|
||||
defer span.End()
|
||||
|
||||
// Update all the pods with the provider status.
|
||||
pods, err := pc.podsLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
err = pkgerrors.Wrap(err, "error getting pod list from kubernetes")
|
||||
span.SetStatus(err)
|
||||
log.G(ctx).WithError(err).Error("Error updating pod statuses")
|
||||
return
|
||||
}
|
||||
ctx = span.WithField(ctx, "nPods", int64(len(pods)))
|
||||
|
||||
for _, pod := range pods {
|
||||
if !shouldSkipPodStatusUpdate(pod) {
|
||||
enrichedPod, err := pc.fetchPodStatusFromProvider(ctx, q, pod)
|
||||
if err != nil {
|
||||
log.G(ctx).WithFields(map[string]interface{}{
|
||||
"name": pod.Name,
|
||||
"namespace": pod.Namespace,
|
||||
}).WithError(err).Error("Could not fetch pod status")
|
||||
} else if enrichedPod != nil {
|
||||
pc.enqueuePodStatusUpdate(ctx, q, enrichedPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fetchPodStatusFromProvider returns a pod (the pod we pass in) enriched with the pod status from the provider. If the pod is not found,
|
||||
// and it has been 1 minute since the pod was created, or the pod was previously running, it will be marked as failed.
|
||||
// If a valid pod status cannot be generated, for example, if a pod is not found in the provider, and it has been less than 1 minute
|
||||
// since pod creation, we will return nil for the pod.
|
||||
func (pc *PodController) fetchPodStatusFromProvider(ctx context.Context, q workqueue.RateLimitingInterface, podFromKubernetes *corev1.Pod) (*corev1.Pod, error) {
|
||||
podStatus, err := pc.provider.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
|
||||
if errdefs.IsNotFound(err) || (err == nil && podStatus == nil) {
|
||||
// Only change the status when the pod was already up
|
||||
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
|
||||
if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute {
|
||||
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
|
||||
podStatus = podFromKubernetes.Status.DeepCopy()
|
||||
podStatus.Phase = corev1.PodFailed
|
||||
podStatus.Reason = podStatusReasonNotFound
|
||||
podStatus.Message = podStatusMessageNotFound
|
||||
now := metav1.NewTime(time.Now())
|
||||
for i, c := range podStatus.ContainerStatuses {
|
||||
if c.State.Running == nil {
|
||||
continue
|
||||
}
|
||||
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
|
||||
ExitCode: containerStatusExitCodeNotFound,
|
||||
Reason: containerStatusReasonNotFound,
|
||||
Message: containerStatusMessageNotFound,
|
||||
FinishedAt: now,
|
||||
StartedAt: c.State.Running.StartedAt,
|
||||
ContainerID: c.ContainerID,
|
||||
}
|
||||
podStatus.ContainerStatuses[i].State.Running = nil
|
||||
}
|
||||
} else if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pod := podFromKubernetes.DeepCopy()
|
||||
podStatus.DeepCopyInto(&pod.Status)
|
||||
return pod, nil
|
||||
}
|
||||
|
||||
func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool {
|
||||
return pod.Status.Phase == corev1.PodSucceeded ||
|
||||
pod.Status.Phase == corev1.PodFailed ||
|
||||
|
||||
103
node/pod_test.go
103
node/pod_test.go
@@ -225,109 +225,6 @@ func TestPodDelete(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchPodStatusFromProvider(t *testing.T) {
|
||||
startedAt := metav1.NewTime(time.Now())
|
||||
finishedAt := metav1.NewTime(startedAt.Add(time.Second * 10))
|
||||
containerStateRunning := &corev1.ContainerStateRunning{StartedAt: startedAt}
|
||||
containerStateTerminated := &corev1.ContainerStateTerminated{StartedAt: startedAt, FinishedAt: finishedAt}
|
||||
containerStateWaiting := &corev1.ContainerStateWaiting{}
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
running *corev1.ContainerStateRunning
|
||||
terminated *corev1.ContainerStateTerminated
|
||||
waiting *corev1.ContainerStateWaiting
|
||||
expectedStartedAt metav1.Time
|
||||
expectedFinishedAt metav1.Time
|
||||
}{
|
||||
{desc: "container in running state", running: containerStateRunning, expectedStartedAt: startedAt},
|
||||
{desc: "container in terminated state", terminated: containerStateTerminated, expectedStartedAt: startedAt, expectedFinishedAt: finishedAt},
|
||||
{desc: "container in waiting state", waiting: containerStateWaiting},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
c := newTestController()
|
||||
pod := &corev1.Pod{}
|
||||
pod.ObjectMeta.Namespace = "default"
|
||||
pod.ObjectMeta.Name = "nginx"
|
||||
pod.Status.Phase = corev1.PodRunning
|
||||
containerStatus := corev1.ContainerStatus{}
|
||||
if tc.running != nil {
|
||||
containerStatus.State.Running = tc.running
|
||||
} else if tc.terminated != nil {
|
||||
containerStatus.State.Terminated = tc.terminated
|
||||
} else if tc.waiting != nil {
|
||||
containerStatus.State.Waiting = tc.waiting
|
||||
}
|
||||
pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus}
|
||||
|
||||
pc := c.client.CoreV1().Pods("default")
|
||||
p, err := pc.Create(pod)
|
||||
assert.NilError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p)
|
||||
assert.NilError(t, err)
|
||||
|
||||
assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed)
|
||||
assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1))
|
||||
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil)
|
||||
|
||||
// Test cases for running and terminated container state
|
||||
if tc.running != nil || tc.terminated != nil {
|
||||
// Ensure that the container is in terminated state and other states are nil
|
||||
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil)
|
||||
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil)
|
||||
|
||||
terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated
|
||||
assert.Equal(t, terminated.StartedAt, tc.expectedStartedAt)
|
||||
assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt))
|
||||
if tc.terminated != nil {
|
||||
assert.Equal(t, terminated.FinishedAt, tc.expectedFinishedAt)
|
||||
}
|
||||
} else {
|
||||
// Test case for waiting container state
|
||||
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated == nil)
|
||||
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting != nil)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchPodStatusFromProviderWithExpiredPod(t *testing.T) {
|
||||
c := newTestController()
|
||||
pod := &corev1.Pod{}
|
||||
pod.ObjectMeta.Namespace = "default"
|
||||
pod.ObjectMeta.Name = "nginx"
|
||||
pod.Status.Phase = corev1.PodRunning
|
||||
containerStatus := corev1.ContainerStatus{}
|
||||
|
||||
// We should terminate containers in a pod that has not provided pod status update for more than a minute
|
||||
startedAt := time.Now().Add(-(time.Minute + time.Second))
|
||||
containerStatus.State.Running = &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(startedAt)}
|
||||
pod.ObjectMeta.CreationTimestamp.Time = startedAt
|
||||
pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus}
|
||||
|
||||
pc := c.client.CoreV1().Pods("default")
|
||||
p, err := pc.Create(pod)
|
||||
assert.NilError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p)
|
||||
assert.NilError(t, err)
|
||||
|
||||
assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed)
|
||||
assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1))
|
||||
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil)
|
||||
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil)
|
||||
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil)
|
||||
|
||||
terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated
|
||||
assert.Equal(t, terminated.StartedAt, metav1.NewTime(startedAt))
|
||||
assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt))
|
||||
}
|
||||
|
||||
func newPodSpec() corev1.PodSpec {
|
||||
return corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
|
||||
@@ -49,7 +49,9 @@ type PodLifecycleHandler interface {
|
||||
// UpdatePod takes a Kubernetes Pod and updates it within the provider.
|
||||
UpdatePod(ctx context.Context, pod *corev1.Pod) error
|
||||
|
||||
// DeletePod takes a Kubernetes Pod and deletes it from the provider.
|
||||
// DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is
|
||||
// expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal
|
||||
// state, as well as the pod.
|
||||
DeletePod(ctx context.Context, pod *corev1.Pod) error
|
||||
|
||||
// GetPod retrieves a pod by name from the provider (can be cached).
|
||||
@@ -69,12 +71,7 @@ type PodLifecycleHandler interface {
|
||||
// concurrently outside of the calling goroutine. Therefore it is recommended
|
||||
// to return a version after DeepCopy.
|
||||
GetPods(context.Context) ([]*corev1.Pod, error)
|
||||
}
|
||||
|
||||
// PodNotifier notifies callers of pod changes.
|
||||
// Providers should implement this interface to enable callers to be notified
|
||||
// of pod status updates asynchronously.
|
||||
type PodNotifier interface {
|
||||
// NotifyPods instructs the notifier to call the passed in function when
|
||||
// the pod status changes. It should be called when a pod's status changes.
|
||||
//
|
||||
@@ -218,7 +215,10 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
|
||||
}()
|
||||
|
||||
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
|
||||
pc.runSyncFromProvider(ctx, podStatusQueue)
|
||||
pc.provider.NotifyPods(ctx, func(pod *corev1.Pod) {
|
||||
pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
|
||||
})
|
||||
|
||||
defer podStatusQueue.ShutDown()
|
||||
|
||||
// Wait for the caches to be synced *before* starting to do work.
|
||||
|
||||
@@ -16,12 +16,10 @@ package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
pkgerrors "github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
@@ -105,40 +103,3 @@ func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID st
|
||||
|
||||
return handleQueueItem(ctx, q, pc.podStatusHandler)
|
||||
}
|
||||
|
||||
// providerSyncLoop synchronizes pod states from the provider back to kubernetes
|
||||
// Deprecated: This is only used when the provider does not support async updates
|
||||
// Providers should implement async update support, even if it just means copying
|
||||
// something like this in.
|
||||
func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) {
|
||||
const sleepTime = 5 * time.Second
|
||||
|
||||
t := time.NewTimer(sleepTime)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
t.Stop()
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "syncActualState")
|
||||
pc.fetchPodStatusesFromProvider(ctx, q)
|
||||
span.End()
|
||||
|
||||
// restart the timer
|
||||
t.Reset(sleepTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
|
||||
if pn, ok := pc.provider.(PodNotifier); ok {
|
||||
pn.NotifyPods(ctx, func(pod *corev1.Pod) {
|
||||
pc.enqueuePodStatusUpdate(ctx, q, pod.DeepCopy())
|
||||
})
|
||||
} else {
|
||||
go pc.providerSyncLoop(ctx, q)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user