Compare commits

..

24 Commits

Author SHA1 Message Date
Brian Goff
ba940a9739 Merge pull request #786 from cpuguy83/add_sync_provider_support
Re-add support for sync providers
2019-11-01 09:23:38 -07:00
Brian Goff
0ccf5059e4 Put sync lifecycle tests being -short flag.
This lets you skip tests for the slower sync provider.
2019-10-29 15:05:35 -07:00
Brian Goff
31c8fbaa41 Apply suggestions from code review
Typos and punctuation fixes.

Co-Authored-By: Pires <1752631+pires@users.noreply.github.com>
2019-10-24 09:23:33 -07:00
Brian Goff
4ee2c4d370 Re-add support for sync providers
This brings back support for sync providers by wrapping them in a
provider that handles async notifications.
2019-10-24 09:23:28 -07:00
Sargun Dhillon
c314045d60 Ensure that delete dangling pods which are still deleting at startup (#784)
If a pod is being gracefully deleted at podcontroller startup,
it will not get deleted via the deletedanglingpods code. This
ensures the normal deletion loop covers the case.
2019-10-22 06:45:36 -04:00
Brian Goff
d455bd16fc Merge pull request #760 from sargun/notify-pods-v7
Do not delete pods in a non-graceful manner
2019-10-18 11:21:31 -07:00
Sargun Dhillon
d22265e5f5 Do not delete pods in a non-graceful manner
This moves from forcefully deleting pods to deleting pods in a
graceful manner from the API Server. It waits for the pod to
get to a terminal status prior to deleting the pod from api
server.
2019-10-17 09:58:21 -07:00
Sargun Dhillon
871424368f Fix pod status updates for when pod is updated outside of VK
Pods can be updated outside of VK. Right now, if this happens, pod
status updates are dropped because the resourceversion from the
provider will mismatch with what's on the server, breaking
pod status updates.

Since we're the only ones writing to the pod status, we
can do a blind overwrite.
2019-10-11 16:32:48 -07:00
Sargun Dhillon
cdc261a08d Use go-cmp to compare pods to suppress duplicate updates
Rather than copying the pods, this uses go-cmp and filters out
the paths which should not be compared.
2019-10-10 13:25:27 -07:00
Brian Goff
d878af3262 Merge pull request #770 from sargun/remove-sync-providers
Remove sync providers
2019-10-07 11:02:22 -07:00
Sargun Dhillon
4202b03cda Remove sync provider support
This removes the legacy sync provider interface. All new providers
are expected to implement the async NotifyPods interface.

The legacy sync provider interface creates complexities around
how the deletion flow works, and the mixed sync and async APIs
block us from evolving functionality.

This collapses in the NotifyPods interface into the PodLifecycleHandler
interface.
2019-10-02 09:28:09 -07:00
Brian Goff
b3aa0f577b Merge pull request #776 from Uzuku/fix-log-format
Fix log format
2019-09-27 11:05:25 -07:00
Uzuku
f80f823e8b Fix log format
Correctly expand the log args
2019-09-28 01:54:46 +08:00
Brian Goff
1bd53c15d1 Merge pull request #774 from toshi0607/feature/fix-lint-warnings
fix lint warnings
2019-09-26 21:48:04 -07:00
Brian Goff
6f6b92ba57 Merge pull request #772 from sargun/add-linters
Add varcheck, deadcode, and mispell linters
2019-09-26 21:47:02 -07:00
toshi0607
bcfc2accf8 misspell 2019-09-26 20:52:06 +09:00
toshi0607
b712751c6d gofmt 2019-09-26 20:50:36 +09:00
Brian Goff
11321d5092 Merge pull request #771 from virtual-kubelet/rbitia-patch-1
Update ADOPTERS.md
2019-09-25 14:31:55 -07:00
Sargun Dhillon
e02c4d9e1e Add varcheck, deadcode, and mispell linters 2019-09-25 09:03:34 -07:00
Ria Bhatia
eda3e27c9f Update ADOPTERS.md
adding adopters
2019-09-25 08:59:01 -07:00
Ria Bhatia
e37a5cebca Update ADOPTERS.md
adding public end-users
2019-09-25 08:57:55 -07:00
Brian Goff
c0746372ad Merge pull request #769 from sargun/add-unused-linter
Add unused code linter
2019-09-24 22:13:39 -07:00
Sargun Dhillon
82a430ccf7 Add unused code linter 2019-09-24 12:55:52 -07:00
Ria Bhatia
8a5f4af171 readme updates (#766) 2019-09-19 11:33:47 -07:00
18 changed files with 783 additions and 701 deletions

View File

@@ -11,3 +11,7 @@ linters:
- golint
- goconst
- goimports
- unused
- varcheck
- deadcode
- misspell

View File

@@ -1,3 +1,12 @@
## Virtual Kubelet adopters
Are you currently using Virtual Kubelet in production? Please let us know by adding your company name and a description of your use case to this document!
* Microsoft Azure
* AWS
* Alibaba
* VMWare
* Netflix
* Hashi Corp
Since end-users are specific per provider within VK we have many end-user customers that we don't have permission to list publically. Please contact ribhatia@microsoft.com for more informtation.
Are you currently using Virtual Kubelet in production? Please let us know by adding your company name and a description of your use case to this document!

View File

@@ -99,10 +99,6 @@ You can find detailed instructions on how to set it up and how to test it in the
The Azure connector can use a configuration file specified by the `--provider-config` flag.
The config file is in TOML format, and an example lives in `providers/azure/example.toml`.
#### More Details
See the [ACI Readme](https://github.com/virtual-kubelet/alibabacloud-eci/blob/master/eci.toml)
### AWS Fargate Provider
[AWS Fargate](https://aws.amazon.com/fargate/) is a technology that allows you to run containers
@@ -114,7 +110,7 @@ IP addresses to connect to the internet, private IP addresses to connect to your
security groups, IAM roles, CloudWatch Logs and many other AWS services. Pods on Fargate can
co-exist with pods on regular worker nodes in the same Kubernetes cluster.
Easy instructions and a sample configuration file is available in the [AWS Fargate provider documentation](https://github.com/virtual-kubelet/aws-fargate/blob/master/README.md).
Easy instructions and a sample configuration file is available in the [AWS Fargate provider documentation](https://github.com/virtual-kubelet/aws-fargate). Please note that this provider is not currently supported.
### HashiCorp Nomad Provider
@@ -280,8 +276,10 @@ Enable the ServiceNodeExclusion flag, by modifying the Controller Manager manife
Virtual Kubelet follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).
Sign the [CNCF CLA](https://github.com/kubernetes/community/blob/master/CLA.md) to be able to make Pull Requests to this repo.
Bi-weekly Virtual Kubelet Architecture meetings are held at 11am PST in this [zoom meeting room](https://zoom.us/j/245165908). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ).
Bi-weekly Virtual Kubelet Architecture meetings are held at 11am PST every other Wednesday in this [zoom meeting room](https://zoom.us/j/245165908). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ).
Our google drive with design specifications and meeting notes are [here](https://drive.google.com/drive/folders/19Ndu11WBCCBDowo9CrrGUHoIfd2L8Ueg?usp=sharing).
We also have a community slack channel named virtual-kubelet in the Kubernetes slack.

View File

@@ -29,7 +29,7 @@ var (
)
// TracingExporterInitFunc is the function that is called to initialize an exporter.
// This is used when registering an exporter and called when a user specifed they want to use the exporter.
// This is used when registering an exporter and called when a user specified they want to use the exporter.
type TracingExporterInitFunc func(TracingExporterOptions) (trace.Exporter, error)
// RegisterTracingExporter registers a tracing exporter.

View File

@@ -41,8 +41,8 @@ var (
)
*/
// MockV0Provider implements the virtual-kubelet provider interface and stores pods in memory.
type MockV0Provider struct { //nolint:golint
// MockProvider implements the virtual-kubelet provider interface and stores pods in memory.
type MockProvider struct { // nolint:golint
nodeName string
operatingSystem string
internalIP string
@@ -53,11 +53,6 @@ type MockV0Provider struct { //nolint:golint
notifier func(*v1.Pod)
}
// MockProvider is like MockV0Provider, but implements the PodNotifier interface
type MockProvider struct { //nolint:golint
*MockV0Provider
}
// MockConfig contains a mock virtual-kubelet's configurable parameters.
type MockConfig struct { //nolint:golint
CPU string `json:"cpu,omitempty"`
@@ -66,7 +61,7 @@ type MockConfig struct { //nolint:golint
}
// NewMockProviderMockConfig creates a new MockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface
func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) {
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
//set defaults
if config.CPU == "" {
config.CPU = defaultCPUCapacity
@@ -77,7 +72,7 @@ func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem st
if config.Pods == "" {
config.Pods = defaultPodCapacity
}
provider := MockV0Provider{
provider := MockProvider{
nodeName: nodeName,
operatingSystem: operatingSystem,
internalIP: internalIP,
@@ -85,32 +80,11 @@ func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem st
pods: make(map[string]*v1.Pod),
config: config,
startTime: time.Now(),
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
notifier: func(*v1.Pod) {},
}
return &provider, nil
}
// NewMockV0Provider creates a new MockV0Provider
func NewMockV0Provider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) {
config, err := loadConfig(providerConfig, nodeName)
if err != nil {
return nil, err
}
return NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
}
// NewMockProviderMockConfig creates a new MockProvider with the given config
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
p, err := NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
return &MockProvider{MockV0Provider: p}, err
}
// NewMockProvider creates a new MockProvider, which implements the PodNotifier interface
func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
config, err := loadConfig(providerConfig, nodeName)
@@ -158,7 +132,7 @@ func loadConfig(providerConfig, nodeName string) (config MockConfig, err error)
}
// CreatePod accepts a Pod definition and stores it in memory.
func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
ctx, span := trace.StartSpan(ctx, "CreatePod")
defer span.End()
@@ -215,7 +189,7 @@ func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
}
// UpdatePod accepts a Pod definition and updates its reference.
func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
ctx, span := trace.StartSpan(ctx, "UpdatePod")
defer span.End()
@@ -236,7 +210,7 @@ func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
}
// DeletePod deletes the specified pod out of memory.
func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
ctx, span := trace.StartSpan(ctx, "DeletePod")
defer span.End()
@@ -277,7 +251,7 @@ func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error)
}
// GetPod returns a pod by name that is stored in memory.
func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
ctx, span := trace.StartSpan(ctx, "GetPod")
defer func() {
span.SetStatus(err)
@@ -301,7 +275,7 @@ func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (po
}
// GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
ctx, span := trace.StartSpan(ctx, "GetContainerLogs")
defer span.End()
@@ -314,14 +288,14 @@ func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podNam
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *MockV0Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {
func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {
log.G(context.TODO()).Infof("receive ExecInContainer %q", container)
return nil
}
// GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found.
func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
ctx, span := trace.StartSpan(ctx, "GetPodStatus")
defer span.End()
@@ -339,7 +313,7 @@ func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name strin
}
// GetPods returns a list of all pods known to be "running".
func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
ctx, span := trace.StartSpan(ctx, "GetPods")
defer span.End()
@@ -354,7 +328,7 @@ func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
return pods, nil
}
func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) {
func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) {
ctx, span := trace.StartSpan(ctx, "mock.ConfigureNode") //nolint:ineffassign
defer span.End()
@@ -373,7 +347,7 @@ func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) {
}
// Capacity returns a resource list containing the capacity limits.
func (p *MockV0Provider) capacity() v1.ResourceList {
func (p *MockProvider) capacity() v1.ResourceList {
return v1.ResourceList{
"cpu": resource.MustParse(p.config.CPU),
"memory": resource.MustParse(p.config.Memory),
@@ -383,7 +357,7 @@ func (p *MockV0Provider) capacity() v1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
// within Kubernetes.
func (p *MockV0Provider) nodeConditions() []v1.NodeCondition {
func (p *MockProvider) nodeConditions() []v1.NodeCondition {
// TODO: Make this configurable
return []v1.NodeCondition{
{
@@ -432,7 +406,7 @@ func (p *MockV0Provider) nodeConditions() []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress {
func (p *MockProvider) nodeAddresses() []v1.NodeAddress {
return []v1.NodeAddress{
{
Type: "InternalIP",
@@ -443,7 +417,7 @@ func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
func (p *MockProvider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
return v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort,
@@ -452,7 +426,7 @@ func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
}
// GetStatsSummary returns dummy stats for all pods known by this provider.
func (p *MockV0Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
var span trace.Span
ctx, span = trace.StartSpan(ctx, "GetStatsSummary") //nolint: ineffassign
defer span.End()

View File

@@ -15,14 +15,4 @@ func registerMock(s *provider.Store) {
cfg.DaemonPort,
)
})
s.Register("mockV0", func(cfg provider.InitConfig) (provider.Provider, error) { //nolint:errcheck
return mock.NewMockProvider(
cfg.ConfigPath,
cfg.NodeName,
cfg.OperatingSystem,
cfg.InternalIP,
cfg.DaemonPort,
)
})
}

1
go.mod
View File

@@ -5,7 +5,6 @@ go 1.12
require (
contrib.go.opencensus.io/exporter/jaeger v0.1.0
contrib.go.opencensus.io/exporter/ocagent v0.4.12
github.com/davecgh/go-spew v1.1.1
github.com/docker/spdystream v0.0.0-20170912183627-bc6354cbbc29 // indirect
github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 // indirect

View File

@@ -109,7 +109,6 @@ func getExecOptions(req *http.Request) (*remotecommand.Options, error) {
type containerExecContext struct {
h ContainerExecHandlerFunc
eio *execIO
namespace, pod, container string
ctx context.Context
}

View File

@@ -663,7 +663,7 @@ func TestEnvFromConfigMapAndSecretWithInvalidKeys(t *testing.T) {
}
// TestEnvOverridesEnvFrom populates the environment of a container from a configmap, and from another configmap's key with a "conflicting" key.
// Then, it checks that the value of the "conflicting" key has been correctly overriden.
// Then, it checks that the value of the "conflicting" key has been correctly overridden.
func TestEnvOverridesEnvFrom(t *testing.T) {
rm := testutil.FakeResourceManager(configMap3)
er := testutil.FakeEventRecorder(defaultEventRecorderBufferSize)
@@ -672,7 +672,7 @@ func TestEnvOverridesEnvFrom(t *testing.T) {
override := "__override__"
// Create a pod object having a single container.
// The container's environment is to be populated from a configmap, and later overriden with a value provided directly.
// The container's environment is to be populated from a configmap, and later overridden with a value provided directly.
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,

View File

@@ -94,8 +94,7 @@ func TestPodLifecycle(t *testing.T) {
ctx = log.WithLogger(ctx, log.L)
// isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_, which is the VK's
// action when the pod is deleted from the provider
// isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_
isPodDeletedPermanentlyFunc := func(ctx context.Context, watcher watch.Interface) error {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
log.G(ctx).WithField("event", ev).Info("got event")
@@ -107,132 +106,121 @@ func TestPodLifecycle(t *testing.T) {
return watchErr
}
// createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting
// it gracefully
t.Run("createStartDeleteScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
}))
// isPodDeletedGracefullyFunc is a condition func that waits until the pod is in a terminal state, which is the VK's
// action when the pod is deleted from the provider.
isPodDeletedGracefullyFunc := func(ctx context.Context, watcher watch.Interface) error {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
log.G(ctx).WithField("event", ev).Info("got event")
pod := ev.Object.(*corev1.Pod)
return (pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded) && (pod.Status.Reason == mockProviderPodDeletedReason || pod.Status.Reason == statusTerminatedReason), nil
})
return watchErr
}
if testing.Short() {
return
}
t.Run("mockV0Provider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
}))
envs := map[string]func(*testing.T) testingProvider{
"Async": func(t *testing.T) testingProvider {
return newMockProvider()
},
"Sync": func(t *testing.T) testingProvider {
if testing.Short() {
t.Skip()
}
return newSyncMockProvider()
},
}
for env, h := range envs {
t.Run(env, func(t *testing.T) {
// createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting
// it gracefully.
t.Run("createStartDeleteScenario", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, h(t), func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedGracefullyFunc, true)
}))
})
// createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider
// for some reason.
t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) {
mp := h(t)
mp.setErrorOnDelete(errdefs.NotFound("not found"))
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc, false)
}))
})
// createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the
// provider.
t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) {
mp := h(t)
deletionFunc := func(ctx context.Context, watcher watch.Interface) error {
return mp.getAttemptedDeletes().until(ctx, func(v int) bool { return v >= 2 })
}
mp.setErrorOnDelete(errors.New("random error"))
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false)
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
assert.NilError(t, err)
assert.Assert(t, is.Len(pods.Items, 1))
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
}))
})
// danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting,
// and ensures the pod controller deletes the pod prior to continuing.
t.Run("danglingPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenario(ctx, t, s, mp)
}))
})
})
// testDanglingPodScenarioWithDeletionTimestamp tests if a pod is created in the provider and on api server it had
// deletiontimestamp set. It ensures deletion occurs.
t.Run("testDanglingPodScenarioWithDeletionTimestamp", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenarioWithDeletionTimestamp(ctx, t, s, mp)
}))
})
})
// failedPodScenario ensures that the VK ignores failed pods that were failed prior to the pod controller starting up.
t.Run("failedPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testFailedPodScenario(ctx, t, s)
}))
})
})
// succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the pod controller starting up.
t.Run("succeededPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testSucceededPodScenario(ctx, t, s)
}))
})
})
// updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated
// to the provider.
t.Run("updatePodWhileRunningScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testUpdatePodWhileRunningScenario(ctx, t, s, mp)
}))
})
})
})
})
// createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider
// for some reason
t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) {
mp := newMockProvider()
mp.errorOnDelete = errdefs.NotFound("not found")
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
}))
})
// createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the
// provider
t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) {
mp := newMockProvider()
deletionFunc := func(ctx context.Context, watcher watch.Interface) error {
return mp.attemptedDeletes.until(ctx, func(v int) bool { return v >= 2 })
}
mp.errorOnDelete = errors.New("random error")
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, deletionFunc)
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
assert.NilError(t, err)
assert.Assert(t, is.Len(pods.Items, 1))
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
}))
})
// danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting,
// and ensures the pod controller deletes the pod prior to continuing.
t.Run("danglingPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenario(ctx, t, s, mp.mockV0Provider)
}))
})
if testing.Short() {
return
}
t.Run("mockV0Provider", func(t *testing.T) {
mp := newMockV0Provider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenario(ctx, t, s, mp)
}))
})
})
// failedPodScenario ensures that the VK ignores failed pods that were failed prior to the PC starting up
t.Run("failedPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testFailedPodScenario(ctx, t, s)
}))
})
if testing.Short() {
return
}
t.Run("mockV0Provider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
testFailedPodScenario(ctx, t, s)
}))
})
})
// succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the PC starting up.
t.Run("succeededPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testSucceededPodScenario(ctx, t, s)
}))
})
if testing.Short() {
return
}
t.Run("mockV0Provider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
testSucceededPodScenario(ctx, t, s)
}))
})
})
// updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated
// to the provider
t.Run("updatePodWhileRunningScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testUpdatePodWhileRunningScenario(ctx, t, s, mp)
}))
})
})
// podStatusMissingWhileRunningScenario waits for the pod to go into the running state, with a V0 style provider,
// and then makes the pod disappear!
t.Run("podStatusMissingWhileRunningScenario", func(t *testing.T) {
mp := newMockV0Provider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testPodStatusMissingWhileRunningScenario(ctx, t, s, mp)
}))
})
}
}
type testFunction func(ctx context.Context, s *system)
@@ -358,7 +346,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
assert.DeepEqual(t, p1, p2)
}
func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) {
func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m testingProvider) {
t.Parallel()
pod := newPod()
@@ -367,12 +355,54 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo
// Start the pod controller
assert.NilError(t, s.start(ctx))
assert.Assert(t, is.Equal(m.deletes.read(), 1))
assert.Assert(t, is.Equal(m.getDeletes().read(), 1))
}
func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error) {
func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testing.T, s *system, m testingProvider) {
t.Parallel()
pod := newPod()
listOptions := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.ObjectMeta.Name).String(),
}
// Setup a watch (prior to pod creation, and pod controller startup)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
assert.NilError(t, m.CreatePod(ctx, pod))
// Create the Pod
podCopyWithDeletionTimestamp := pod.DeepCopy()
var deletionGracePeriod int64 = 10
podCopyWithDeletionTimestamp.DeletionGracePeriodSeconds = &deletionGracePeriod
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
podCopyWithDeletionTimestamp.DeletionTimestamp = &deletionTimestamp
_, e := s.client.CoreV1().Pods(testNamespace).Create(podCopyWithDeletionTimestamp)
assert.NilError(t, e)
// Start the pod controller
assert.NilError(t, s.start(ctx))
watchErrCh := make(chan error)
go func() {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
func(ev watch.Event) (bool, error) {
return ev.Type == watch.Deleted, nil
})
watchErrCh <- watchErr
}()
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-watchErrCh:
assert.NilError(t, err)
}
}
func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error, waitForPermanentDeletion bool) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -440,6 +470,18 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
assert.NilError(t, err)
}
// Setup a watch to look for the pod eventually going away completely
watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher2.Stop()
waitDeleteCh := make(chan error)
go func() {
_, watchDeleteErr := watchutils.UntilWithoutRetry(ctx, watcher2, func(ev watch.Event) (bool, error) {
return ev.Type == watch.Deleted, nil
})
waitDeleteCh <- watchDeleteErr
}()
// Setup a watch prior to pod deletion
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
@@ -454,7 +496,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{})
assert.NilError(t, err)
// 2. Set the pod's deletion timestamp, version, and so on
var deletionGracePeriod int64 = 30
var deletionGracePeriod int64 = 10
currentPod.DeletionGracePeriodSeconds = &deletionGracePeriod
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
currentPod.DeletionTimestamp = &deletionTimestamp
@@ -468,9 +510,18 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
case err = <-watchErrCh:
assert.NilError(t, err)
}
if waitForPermanentDeletion {
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-waitDeleteCh:
assert.NilError(t, err)
}
}
}
func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) {
func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m testingProvider) {
t.Parallel()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -527,88 +578,7 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
log.G(ctx).WithField("pod", p).Info("Updating pod")
_, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
assert.NilError(t, err)
assert.NilError(t, m.updates.until(ctx, func(v int) bool { return v > 0 }))
}
func testPodStatusMissingWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) {
t.Parallel()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
p := newPod()
key, err := buildKey(p)
assert.NilError(t, err)
listOptions := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(),
}
watchErrCh := make(chan error)
// Create a Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(p)
assert.NilError(t, e)
// Setup a watch to check if the pod is in running
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
// Wait for the pod to be started
func(ev watch.Event) (bool, error) {
pod := ev.Object.(*corev1.Pod)
return pod.Status.Phase == corev1.PodRunning, nil
})
// This deepcopy is required to please the race detector
p = newPod.Object.(*corev1.Pod).DeepCopy()
watchErrCh <- watchErr
}()
// Start the pod controller
assert.NilError(t, s.start(ctx))
// Wait for pod to be in running
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case <-s.pc.Done():
assert.NilError(t, s.pc.Err())
t.Fatal("Pod controller exited prematurely without error")
case err = <-watchErrCh:
assert.NilError(t, err)
}
// Setup a watch to check if the pod is in failed due to provider issues
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
// Wait for the pod to be failed
func(ev watch.Event) (bool, error) {
pod := ev.Object.(*corev1.Pod)
return pod.Status.Phase == corev1.PodFailed, nil
})
// This deepcopy is required to please the race detector
p = newPod.Object.(*corev1.Pod).DeepCopy()
watchErrCh <- watchErr
}()
// delete the pod from the mock provider
m.pods.Delete(key)
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case <-s.pc.Done():
assert.NilError(t, s.pc.Err())
t.Fatal("Pod controller exited prematurely without error")
case err = <-watchErrCh:
assert.NilError(t, err)
}
assert.Equal(t, p.Status.Reason, podStatusReasonNotFound)
assert.NilError(t, m.getUpdates().until(ctx, func(v int) bool { return v > 0 }))
}
func BenchmarkCreatePods(b *testing.B) {
@@ -651,6 +621,7 @@ func randomizeName(pod *corev1.Pod) {
}
func newPod(podmodifiers ...podModifier) *corev1.Pod {
var terminationGracePeriodSeconds int64 = 30
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
@@ -663,7 +634,14 @@ func newPod(podmodifiers ...podModifier) *corev1.Pod {
ResourceVersion: "100",
},
Spec: corev1.PodSpec{
NodeName: testNodeName,
NodeName: testNodeName,
TerminationGracePeriodSeconds: &terminationGracePeriodSeconds,
Containers: []corev1.Container{
{
Name: "my-container",
Image: "nginx",
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodPending,

View File

@@ -12,9 +12,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
mockProviderPodDeletedReason = "MockProviderPodDeleted"
)
var (
_ PodLifecycleHandler = (*mockV0Provider)(nil)
_ PodNotifier = (*mockProvider)(nil)
_ PodLifecycleHandler = (*mockProvider)(nil)
)
type waitableInt struct {
@@ -62,7 +65,7 @@ func (w *waitableInt) increment() {
w.cond.Broadcast()
}
type mockV0Provider struct {
type mockProvider struct {
creates *waitableInt
updates *waitableInt
deletes *waitableInt
@@ -75,40 +78,35 @@ type mockV0Provider struct {
realNotifier func(*v1.Pod)
}
type mockProvider struct {
*mockV0Provider
// newMockProvider creates a new mockProvider.
func newMockProvider() *mockProviderAsync {
provider := newSyncMockProvider()
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
return &mockProviderAsync{provider}
}
// NewMockProviderMockConfig creates a new mockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface
func newMockV0Provider() *mockV0Provider {
provider := mockV0Provider{
func newSyncMockProvider() *mockProvider {
provider := mockProvider{
startTime: time.Now(),
creates: newWaitableInt(),
updates: newWaitableInt(),
deletes: newWaitableInt(),
attemptedDeletes: newWaitableInt(),
}
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
return &provider
}
// NewMockProviderMockConfig creates a new MockProvider with the given config
func newMockProvider() *mockProvider {
return &mockProvider{mockV0Provider: newMockV0Provider()}
}
// notifier calls the callback that we got from the pod controller to notify it of updates (if it is set)
func (p *mockV0Provider) notifier(pod *v1.Pod) {
func (p *mockProvider) notifier(pod *v1.Pod) {
if p.realNotifier != nil {
p.realNotifier(pod)
}
}
// CreatePod accepts a Pod definition and stores it in memory.
func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
func (p *mockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
log.G(ctx).Infof("receive CreatePod %q", pod.Name)
p.creates.increment()
@@ -160,7 +158,7 @@ func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
}
// UpdatePod accepts a Pod definition and updates its reference.
func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
func (p *mockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
log.G(ctx).Infof("receive UpdatePod %q", pod.Name)
p.updates.increment()
@@ -177,19 +175,23 @@ func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
// DeletePod deletes the specified pod out of memory. The PodController deepcopies the pod object
// for us, so we don't have to worry about mutation.
func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
log.G(ctx).Infof("receive DeletePod %q", pod.Name)
p.attemptedDeletes.increment()
key, err := buildKey(pod)
if err != nil {
return err
}
if errdefs.IsNotFound(p.errorOnDelete) {
p.pods.Delete(key)
}
if p.errorOnDelete != nil {
return p.errorOnDelete
}
p.deletes.increment()
key, err := buildKey(pod)
if err != nil {
return err
}
if _, exists := p.pods.Load(key); !exists {
return errdefs.NotFound("pod not found")
@@ -198,7 +200,7 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error)
now := metav1.Now()
pod.Status.Phase = v1.PodSucceeded
pod.Status.Reason = "MockProviderPodDeleted"
pod.Status.Reason = mockProviderPodDeletedReason
for idx := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[idx].Ready = false
@@ -213,21 +215,13 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error)
}
p.notifier(pod)
p.pods.Store(key, pod)
if pod.DeletionGracePeriodSeconds == nil || *pod.DeletionGracePeriodSeconds == 0 {
p.pods.Delete(key)
} else {
time.AfterFunc(time.Duration(*pod.DeletionGracePeriodSeconds)*time.Second, func() {
p.pods.Delete(key)
})
}
p.pods.Delete(key)
return nil
}
// GetPod returns a pod by name that is stored in memory.
func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
func (p *mockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
log.G(ctx).Infof("receive GetPod %q", name)
key, err := buildKeyFromNames(namespace, name)
@@ -243,7 +237,7 @@ func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (po
// GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found.
func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
func (p *mockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
log.G(ctx).Infof("receive GetPodStatus %q", name)
pod, err := p.GetPod(ctx, namespace, name)
@@ -255,7 +249,7 @@ func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name strin
}
// GetPods returns a list of all pods known to be "running".
func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
func (p *mockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
log.G(ctx).Info("receive GetPods")
var pods []*v1.Pod
@@ -268,10 +262,24 @@ func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
return pods, nil
}
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
// within the provider.
func (p *mockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
p.realNotifier = notifier
func (p *mockProvider) setErrorOnDelete(err error) {
p.errorOnDelete = err
}
func (p *mockProvider) getAttemptedDeletes() *waitableInt {
return p.attemptedDeletes
}
func (p *mockProvider) getCreates() *waitableInt {
return p.creates
}
func (p *mockProvider) getDeletes() *waitableInt {
return p.deletes
}
func (p *mockProvider) getUpdates() *waitableInt {
return p.updates
}
func buildKeyFromNames(namespace string, name string) (string, error) {
@@ -290,3 +298,22 @@ func buildKey(pod *v1.Pod) (string, error) {
return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
}
type mockProviderAsync struct {
*mockProvider
}
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
// within the provider.
func (p *mockProviderAsync) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
p.realNotifier = notifier
}
type testingProvider interface {
PodLifecycleHandler
setErrorOnDelete(error)
getAttemptedDeletes() *waitableInt
getDeletes() *waitableInt
getCreates() *waitableInt
getUpdates() *waitableInt
}

View File

@@ -16,30 +16,20 @@ package node
import (
"context"
"hash/fnv"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const (
podStatusReasonProviderFailed = "ProviderFailed"
podStatusReasonNotFound = "NotFound"
podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider"
containerStatusReasonNotFound = "NotFound"
containerStatusMessageNotFound = "Container was not found and was likely deleted"
containerStatusExitCodeNotFound = -137
podStatusReasonProviderFailed = "ProviderFailed"
)
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
@@ -117,20 +107,6 @@ func podsEqual(pod1, pod2 *corev1.Pod) bool {
}
// This is basically the kube runtime's hash container functionality.
// VK only operates at the Pod level so this is adapted for that
func hashPodSpec(spec corev1.PodSpec) uint64 {
hash := fnv.New32a()
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}
printer.Fprintf(hash, "%#v", spec)
return uint64(hash.Sum32())
}
func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) {
podPhase := corev1.PodPending
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
@@ -156,136 +132,22 @@ func (pc *PodController) handleProviderError(ctx context.Context, span trace.Spa
span.SetStatus(origErr)
}
func (pc *PodController) deletePod(ctx context.Context, namespace, name string) error {
func (pc *PodController) deletePod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "deletePod")
defer span.End()
pod, err := pc.provider.GetPod(ctx, namespace, name)
if err != nil {
if errdefs.IsNotFound(err) {
// The provider is not aware of the pod, but we must still delete the Kubernetes API resource.
return pc.forceDeletePodResource(ctx, namespace, name)
}
return err
}
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
if pod == nil {
// The provider is not aware of the pod, but we must still delete the Kubernetes API resource.
return pc.forceDeletePodResource(ctx, namespace, name)
}
ctx = addPodAttributes(ctx, span, pod)
var delErr error
if delErr = pc.provider.DeletePod(ctx, pod.DeepCopy()); delErr != nil && !errdefs.IsNotFound(delErr) {
span.SetStatus(delErr)
return delErr
err := pc.provider.DeletePod(ctx, pod.DeepCopy())
if err != nil {
span.SetStatus(err)
return err
}
log.G(ctx).Debug("Deleted pod from provider")
if err := pc.forceDeletePodResource(ctx, namespace, name); err != nil {
span.SetStatus(err)
return err
}
log.G(ctx).Info("Deleted pod from Kubernetes")
return nil
}
func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, name string) error {
ctx, span := trace.StartSpan(ctx, "forceDeletePodResource")
defer span.End()
ctx = span.WithFields(ctx, log.Fields{
"namespace": namespace,
"name": name,
})
var grace int64
if err := pc.client.Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil {
if errors.IsNotFound(err) {
log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete")
return nil
}
span.SetStatus(err)
return pkgerrors.Wrap(err, "Failed to delete Kubernetes pod")
}
return nil
}
// fetchPodStatusesFromProvider syncs the providers pod status with the kubernetes pod status.
func (pc *PodController) fetchPodStatusesFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
ctx, span := trace.StartSpan(ctx, "fetchPodStatusesFromProvider")
defer span.End()
// Update all the pods with the provider status.
pods, err := pc.podsLister.List(labels.Everything())
if err != nil {
err = pkgerrors.Wrap(err, "error getting pod list from kubernetes")
span.SetStatus(err)
log.G(ctx).WithError(err).Error("Error updating pod statuses")
return
}
ctx = span.WithField(ctx, "nPods", int64(len(pods)))
for _, pod := range pods {
if !shouldSkipPodStatusUpdate(pod) {
enrichedPod, err := pc.fetchPodStatusFromProvider(ctx, q, pod)
if err != nil {
log.G(ctx).WithFields(map[string]interface{}{
"name": pod.Name,
"namespace": pod.Namespace,
}).WithError(err).Error("Could not fetch pod status")
} else if enrichedPod != nil {
pc.enqueuePodStatusUpdate(ctx, q, enrichedPod)
}
}
}
}
// fetchPodStatusFromProvider returns a pod (the pod we pass in) enriched with the pod status from the provider. If the pod is not found,
// and it has been 1 minute since the pod was created, or the pod was previously running, it will be marked as failed.
// If a valid pod status cannot be generated, for example, if a pod is not found in the provider, and it has been less than 1 minute
// since pod creation, we will return nil for the pod.
func (pc *PodController) fetchPodStatusFromProvider(ctx context.Context, q workqueue.RateLimitingInterface, podFromKubernetes *corev1.Pod) (*corev1.Pod, error) {
podStatus, err := pc.provider.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
if errdefs.IsNotFound(err) || (err == nil && podStatus == nil) {
// Only change the status when the pod was already up
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute {
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
podStatus = podFromKubernetes.Status.DeepCopy()
podStatus.Phase = corev1.PodFailed
podStatus.Reason = podStatusReasonNotFound
podStatus.Message = podStatusMessageNotFound
now := metav1.NewTime(time.Now())
for i, c := range podStatus.ContainerStatuses {
if c.State.Running == nil {
continue
}
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: containerStatusExitCodeNotFound,
Reason: containerStatusReasonNotFound,
Message: containerStatusMessageNotFound,
FinishedAt: now,
StartedAt: c.State.Running.StartedAt,
ContainerID: c.ContainerID,
}
podStatus.ContainerStatuses[i].State.Running = nil
}
} else if err != nil {
return nil, nil
}
} else if err != nil {
return nil, err
}
pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status)
return pod, nil
}
func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodSucceeded ||
pod.Status.Phase == corev1.PodFailed ||
@@ -308,8 +170,12 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
}
kPod := obj.(*knownPod)
kPod.Lock()
podFromProvider := kPod.lastPodStatusReceivedFromProvider
podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy()
kPod.Unlock()
// We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating
// the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore
// we need to copy the pod and set ResourceVersion to 0.
podFromProvider.ResourceVersion = "0"
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil {
span.SetStatus(err)
@@ -371,3 +237,55 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE
return pc.updatePodStatus(ctx, pod, key)
}
func (pc *PodController) deletePodHandler(ctx context.Context, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
defer span.End()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
ctx = span.WithFields(ctx, log.Fields{
"namespace": namespace,
"name": name,
})
if err != nil {
// Log the error as a warning, but do not requeue the key as it is invalid.
log.G(ctx).Warn(pkgerrors.Wrapf(err, "invalid resource key: %q", key))
span.SetStatus(err)
return nil
}
defer func() {
if retErr == nil {
if w, ok := pc.provider.(syncWrapper); ok {
w._deletePodKey(ctx, key)
}
}
}()
// If the pod has been deleted from API server, we don't need to do anything.
k8sPod, err := pc.podsLister.Pods(namespace).Get(name)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
span.SetStatus(err)
return err
}
if running(&k8sPod.Status) {
log.G(ctx).Error("Force deleting pod in running state")
}
// We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update
// was in progress,
err = pc.client.Pods(namespace).Delete(name, metav1.NewDeleteOptions(0))
if errors.IsNotFound(err) {
return nil
}
if err != nil {
span.SetStatus(err)
return err
}
return nil
}

View File

@@ -19,14 +19,10 @@ import (
"testing"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
@@ -34,7 +30,7 @@ import (
type TestController struct {
*PodController
mock *mockProvider
mock *mockProviderAsync
client *fake.Clientset
}
@@ -51,6 +47,7 @@ func newTestController() *TestController {
resourceManager: rm,
recorder: testutil.FakeEventRecorder(5),
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
done: make(chan struct{}),
ready: make(chan struct{}),
podsInformer: iFactory.Core().V1().Pods(),
@@ -70,11 +67,11 @@ func TestPodsEqual(t *testing.T) {
p1 := &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
{
Name: "nginx",
Image: "nginx:1.15.12-perl",
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
{
ContainerPort: 443,
Protocol: "tcp",
},
@@ -172,170 +169,14 @@ func TestPodNoSpecChange(t *testing.T) {
assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
}
func TestPodDelete(t *testing.T) {
type testCase struct {
desc string
delErr error
}
cases := []testCase{
{desc: "no error on delete", delErr: nil},
{desc: "not found error on delete", delErr: errdefs.NotFound("not found")},
{desc: "unknown error on delete", delErr: pkgerrors.New("random error")},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
c := newTestController()
c.mock.errorOnDelete = tc.delErr
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Spec = newPodSpec()
pc := c.client.CoreV1().Pods("default")
p, err := pc.Create(pod)
assert.NilError(t, err)
ctx := context.Background()
err = c.createOrUpdatePod(ctx, p.DeepCopy()) // make sure it's actually created
assert.NilError(t, err)
assert.Check(t, is.Equal(c.mock.creates.read(), 1))
err = c.deletePod(ctx, pod.Namespace, pod.Name)
assert.Equal(t, pkgerrors.Cause(err), err)
var expectDeletes int
if tc.delErr == nil {
expectDeletes = 1
}
assert.Check(t, is.Equal(c.mock.deletes.read(), expectDeletes))
expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr)
_, err = pc.Get(pod.Name, metav1.GetOptions{})
if expectDeleted {
assert.Assert(t, errors.IsNotFound(err))
} else {
assert.NilError(t, err)
}
})
}
}
func TestFetchPodStatusFromProvider(t *testing.T) {
startedAt := metav1.NewTime(time.Now())
finishedAt := metav1.NewTime(startedAt.Add(time.Second * 10))
containerStateRunning := &corev1.ContainerStateRunning{StartedAt: startedAt}
containerStateTerminated := &corev1.ContainerStateTerminated{StartedAt: startedAt, FinishedAt: finishedAt}
containerStateWaiting := &corev1.ContainerStateWaiting{}
testCases := []struct {
desc string
running *corev1.ContainerStateRunning
terminated *corev1.ContainerStateTerminated
waiting *corev1.ContainerStateWaiting
expectedStartedAt metav1.Time
expectedFinishedAt metav1.Time
}{
{desc: "container in running state", running: containerStateRunning, expectedStartedAt: startedAt},
{desc: "container in terminated state", terminated: containerStateTerminated, expectedStartedAt: startedAt, expectedFinishedAt: finishedAt},
{desc: "container in waiting state", waiting: containerStateWaiting},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
c := newTestController()
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Status.Phase = corev1.PodRunning
containerStatus := corev1.ContainerStatus{}
if tc.running != nil {
containerStatus.State.Running = tc.running
} else if tc.terminated != nil {
containerStatus.State.Terminated = tc.terminated
} else if tc.waiting != nil {
containerStatus.State.Waiting = tc.waiting
}
pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus}
pc := c.client.CoreV1().Pods("default")
p, err := pc.Create(pod)
assert.NilError(t, err)
ctx := context.Background()
updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p)
assert.NilError(t, err)
assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed)
assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1))
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil)
// Test cases for running and terminated container state
if tc.running != nil || tc.terminated != nil {
// Ensure that the container is in terminated state and other states are nil
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil)
terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated
assert.Equal(t, terminated.StartedAt, tc.expectedStartedAt)
assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt))
if tc.terminated != nil {
assert.Equal(t, terminated.FinishedAt, tc.expectedFinishedAt)
}
} else {
// Test case for waiting container state
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated == nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting != nil)
}
})
}
}
func TestFetchPodStatusFromProviderWithExpiredPod(t *testing.T) {
c := newTestController()
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Status.Phase = corev1.PodRunning
containerStatus := corev1.ContainerStatus{}
// We should terminate containters in a pod that has not provided pod status update for more than a minute
startedAt := time.Now().Add(-(time.Minute + time.Second))
containerStatus.State.Running = &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(startedAt)}
pod.ObjectMeta.CreationTimestamp.Time = startedAt
pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus}
pc := c.client.CoreV1().Pods("default")
p, err := pc.Create(pod)
assert.NilError(t, err)
ctx := context.Background()
updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p)
assert.NilError(t, err)
assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed)
assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1))
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil)
terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated
assert.Equal(t, terminated.StartedAt, metav1.NewTime(startedAt))
assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt))
}
func newPodSpec() corev1.PodSpec {
return corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
{
Name: "nginx",
Image: "nginx:1.15.12",
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
{
ContainerPort: 443,
Protocol: "tcp",
},

View File

@@ -17,10 +17,11 @@ package node
import (
"context"
"fmt"
"reflect"
"strconv"
"sync"
"time"
"github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
@@ -49,7 +50,9 @@ type PodLifecycleHandler interface {
// UpdatePod takes a Kubernetes Pod and updates it within the provider.
UpdatePod(ctx context.Context, pod *corev1.Pod) error
// DeletePod takes a Kubernetes Pod and deletes it from the provider.
// DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is
// expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal
// state, as well as the pod. DeletePod may be called multiple times for the same pod.
DeletePod(ctx context.Context, pod *corev1.Pod) error
// GetPod retrieves a pod by name from the provider (can be cached).
@@ -71,9 +74,7 @@ type PodLifecycleHandler interface {
GetPods(context.Context) ([]*corev1.Pod, error)
}
// PodNotifier notifies callers of pod changes.
// Providers should implement this interface to enable callers to be notified
// of pod status updates asynchronously.
// PodNotifier is used as an extension to PodLifecycleHandler to support async updates of pod statuses.
type PodNotifier interface {
// NotifyPods instructs the notifier to call the passed in function when
// the pod status changes. It should be called when a pod's status changes.
@@ -104,6 +105,9 @@ type PodController struct {
k8sQ workqueue.RateLimitingInterface
// deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period
deletionQ workqueue.RateLimitingInterface
// From the time of creation, to termination the knownPods map will contain the pods key
// (derived from Kubernetes' cache library) -> a *knownPod struct.
knownPods sync.Map
@@ -130,6 +134,7 @@ type knownPod struct {
// should be immutable to avoid having to hold the lock the entire time you're working with them
sync.Mutex
lastPodStatusReceivedFromProvider *corev1.Pod
lastPodUsed *corev1.Pod
}
// PodControllerConfig is used to configure a new PodController.
@@ -155,6 +160,7 @@ type PodControllerConfig struct {
ServiceInformer corev1informers.ServiceInformer
}
// NewPodController creates a new pod controller with the provided config.
func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.PodClient == nil {
return nil, errdefs.InvalidInput("missing core client")
@@ -193,11 +199,17 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
done: make(chan struct{}),
recorder: cfg.EventRecorder,
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"),
}
return pc, nil
}
type asyncProvider interface {
PodLifecycleHandler
PodNotifier
}
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until the
// context is cancelled, at which point it will shutdown the work queue and
@@ -210,15 +222,32 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
defer func() {
pc.k8sQ.ShutDown()
pc.deletionQ.ShutDown()
pc.mu.Lock()
pc.err = retErr
close(pc.done)
pc.mu.Unlock()
}()
var provider asyncProvider
runProvider := func(context.Context) {}
if p, ok := pc.provider.(asyncProvider); ok {
provider = p
} else {
wrapped := &syncProviderWrapper{PodLifecycleHandler: pc.provider, l: pc.podsLister}
runProvider = wrapped.run
provider = wrapped
log.G(ctx).Debug("Wrapped non-async provider with async")
}
pc.provider = provider
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
pc.runSyncFromProvider(ctx, podStatusQueue)
provider.NotifyPods(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
})
go runProvider(ctx)
defer podStatusQueue.ShutDown()
// Wait for the caches to be synced *before* starting to do work.
@@ -241,16 +270,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Create a copy of the old and new pod objects so we don't mutate the cache.
oldPod := oldObj.(*corev1.Pod).DeepCopy()
newPod := newObj.(*corev1.Pod).DeepCopy()
// We want to check if the two objects differ in anything other than their resource versions.
// Hence, we make them equal so that this change isn't picked up by reflect.DeepEqual.
newPod.ResourceVersion = oldPod.ResourceVersion
// Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed.
// This guarantees that we don't attempt to sync the pod every time its .status field is updated.
if reflect.DeepEqual(oldPod.ObjectMeta, newPod.ObjectMeta) && reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
return
}
newPod := newObj.(*corev1.Pod)
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.G(ctx).Error(err)
@@ -264,6 +285,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
} else {
pc.knownPods.Delete(key)
pc.k8sQ.AddRateLimited(key)
// If this pod was in the deletion queue, forget about it
pc.deletionQ.Forget(key)
}
},
})
@@ -295,6 +318,15 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
}()
}
for id := 0; id < podSyncWorkers; id++ {
wg.Add(1)
workerID := strconv.Itoa(id)
go func() {
defer wg.Done()
pc.runDeletionReconcilationWorker(ctx, workerID, pc.deletionQ)
}()
}
close(pc.ready)
log.G(ctx).Info("started workers")
@@ -302,6 +334,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
log.G(ctx).Info("shutting down workers")
pc.k8sQ.ShutDown()
podStatusQueue.ShutDown()
pc.deletionQ.ShutDown()
wg.Wait()
return nil
@@ -353,6 +386,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key)
log.G(ctx).WithField("key", key).Debug("sync handled")
// Convert the namespace/name string into a distinct namespace and name.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
@@ -372,35 +406,81 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
span.SetStatus(err)
return err
}
// At this point we know the Pod resource doesn't exist, which most probably means it was deleted.
// Hence, we must delete it from the provider if it still exists there.
if err := pc.deletePod(ctx, namespace, name); err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
pod, err = pc.provider.GetPod(ctx, namespace, name)
if err != nil && !errdefs.IsNotFound(err) {
err = pkgerrors.Wrapf(err, "failed to fetch pod with key %q from provider", key)
span.SetStatus(err)
return err
}
return nil
if errdefs.IsNotFound(err) || pod == nil {
return nil
}
err = pc.provider.DeletePod(ctx, pod)
if errdefs.IsNotFound(err) {
return nil
}
if err != nil {
err = pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
span.SetStatus(err)
}
return err
}
// At this point we know the Pod resource has either been created or updated (which includes being marked for deletion).
return pc.syncPodInProvider(ctx, pod)
return pc.syncPodInProvider(ctx, pod, key)
}
// syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation.
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) error {
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "syncPodInProvider")
defer span.End()
// Add the pod's attributes to the current span.
ctx = addPodAttributes(ctx, span, pod)
// If the pod('s containers) is no longer in a running state then we force-delete the pod from API server
// more context is here: https://github.com/virtual-kubelet/virtual-kubelet/pull/760
if pod.DeletionTimestamp != nil && !running(&pod.Status) {
log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running")
pc.deletionQ.Add(key)
return nil
}
obj, ok := pc.knownPods.Load(key)
if !ok {
// That means the pod was deleted while we were working
return nil
}
kPod := obj.(*knownPod)
kPod.Lock()
if kPod.lastPodUsed != nil && podsEffectivelyEqual(kPod.lastPodUsed, pod) {
kPod.Unlock()
return nil
}
kPod.Unlock()
defer func() {
if retErr == nil {
kPod.Lock()
defer kPod.Unlock()
kPod.lastPodUsed = pod
}
}()
// Check whether the pod has been marked for deletion.
// If it does, guarantee it is deleted in the provider and Kubernetes.
if pod.DeletionTimestamp != nil {
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
log.G(ctx).Debug("Deleting pod in provider")
if err := pc.deletePod(ctx, pod); errdefs.IsNotFound(err) {
log.G(ctx).Debug("Pod not found in provider")
} else if err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod))
span.SetStatus(err)
return err
}
pc.deletionQ.AddAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds))
return nil
}
@@ -419,6 +499,25 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod)
return nil
}
// runDeletionReconcilationWorker is a long-running function that will continually call the processDeletionReconcilationWorkItem
// function in order to read and process an item on the work queue that is generated by the pod informer.
func (pc *PodController) runDeletionReconcilationWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processDeletionReconcilationWorkItem(ctx, workerID, q) {
}
}
// processDeletionReconcilationWorkItem will read a single work item off the work queue and attempt to process it,by calling the deletionReconcilation.
func (pc *PodController) processDeletionReconcilationWorkItem(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerId", workerID)
return handleQueueItem(ctx, q, pc.deletePodHandler)
}
// deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them.
func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) {
ctx, span := trace.StartSpan(ctx, "deleteDanglingPods")
@@ -474,7 +573,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int
// Add the pod's attributes to the current span.
ctx = addPodAttributes(ctx, span, pod)
// Actually delete the pod.
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
if err := pc.provider.DeletePod(ctx, pod.DeepCopy()); err != nil && !errdefs.IsNotFound(err) {
span.SetStatus(err)
log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod))
} else {
@@ -503,3 +602,31 @@ func loggablePodName(pod *corev1.Pod) string {
func loggablePodNameFromCoordinates(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}
// podsEffectivelyEqual compares two pods, and ignores the pod status, and the resource version
func podsEffectivelyEqual(p1, p2 *corev1.Pod) bool {
filterForResourceVersion := func(p cmp.Path) bool {
if p.String() == "ObjectMeta.ResourceVersion" {
return true
}
if p.String() == "Status" {
return true
}
return false
}
return cmp.Equal(p1, p2, cmp.FilterPath(filterForResourceVersion, cmp.Ignore()))
}
// borrowed from https://github.com/kubernetes/kubernetes/blob/f64c631cd7aea58d2552ae2038c1225067d30dde/pkg/kubelet/kubelet_pods.go#L944-L953
// running returns true, unless if every status is terminated or waiting, or the status list
// is empty.
func running(podStatus *corev1.PodStatus) bool {
statuses := podStatus.ContainerStatuses
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return true
}
}
return false
}

View File

@@ -6,6 +6,8 @@ import (
"time"
"gotest.tools/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestPodControllerExitOnContextCancel(t *testing.T) {
@@ -42,3 +44,49 @@ func TestPodControllerExitOnContextCancel(t *testing.T) {
}
assert.NilError(t, tc.Err())
}
func TestCompareResourceVersion(t *testing.T) {
p1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
}
p2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
},
}
assert.Assert(t, podsEffectivelyEqual(p1, p2))
}
func TestCompareStatus(t *testing.T) {
p1 := &corev1.Pod{
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}
p2 := &corev1.Pod{
Status: corev1.PodStatus{
Phase: corev1.PodFailed,
},
}
assert.Assert(t, podsEffectivelyEqual(p1, p2))
}
func TestCompareLabels(t *testing.T) {
p1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar1",
},
},
}
p2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar2",
},
},
}
assert.Assert(t, !podsEffectivelyEqual(p1, p2))
}

View File

@@ -16,12 +16,10 @@ package node
import (
"context"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
)
@@ -105,40 +103,3 @@ func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID st
return handleQueueItem(ctx, q, pc.podStatusHandler)
}
// providerSyncLoop synchronizes pod states from the provider back to kubernetes
// Deprecated: This is only used when the provider does not support async updates
// Providers should implement async update support, even if it just means copying
// something like this in.
func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) {
const sleepTime = 5 * time.Second
t := time.NewTimer(sleepTime)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
t.Stop()
ctx, span := trace.StartSpan(ctx, "syncActualState")
pc.fetchPodStatusesFromProvider(ctx, q)
span.End()
// restart the timer
t.Reset(sleepTime)
}
}
}
func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
if pn, ok := pc.provider.(PodNotifier); ok {
pn.NotifyPods(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, q, pod.DeepCopy())
})
} else {
go pc.providerSyncLoop(ctx, q)
}
}

209
node/sync.go Normal file
View File

@@ -0,0 +1,209 @@
package node
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)
const (
podStatusReasonNotFound = "NotFound"
podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider"
containerStatusReasonNotFound = "NotFound"
containerStatusMessageNotFound = "Container was not found and was likely deleted"
containerStatusExitCodeNotFound = -137
statusTerminatedReason = "Terminated"
containerStatusTerminatedMessage = "Container was terminated. The exit code may not reflect the real exit code"
)
// syncProviderWrapper wraps a PodLifecycleHandler to give it async-like pod status notification behavior.
type syncProviderWrapper struct {
PodLifecycleHandler
notify func(*corev1.Pod)
l corev1listers.PodLister
// deletedPods makes sure we don't set the "NotFound" status
// for pods which have been requested to be deleted.
// This is needed for our loop which just grabs pod statuses every 5 seconds.
deletedPods sync.Map
}
// This is used to clean up keys for deleted pods after they have been fully deleted in the API server.
type syncWrapper interface {
_deletePodKey(context.Context, string)
}
func (p *syncProviderWrapper) NotifyPods(ctx context.Context, f func(*corev1.Pod)) {
p.notify = f
}
func (p *syncProviderWrapper) _deletePodKey(ctx context.Context, key string) {
log.G(ctx).WithField("key", key).Debug("Cleaning up pod from deletion cache")
p.deletedPods.Delete(key)
}
func (p *syncProviderWrapper) DeletePod(ctx context.Context, pod *corev1.Pod) error {
log.G(ctx).Debug("syncProviderWrappper.DeletePod")
key, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
return err
}
p.deletedPods.Store(key, pod)
if err := p.PodLifecycleHandler.DeletePod(ctx, pod.DeepCopy()); err != nil {
log.G(ctx).WithField("key", key).WithError(err).Debug("Removed key from deleted pods cache")
// We aren't going to actually delete the pod from the provider since there is an error so delete it from our cache,
// otherwise we could end up leaking pods in our deletion cache.
// Delete will be retried by the pod controller.
p.deletedPods.Delete(key)
return err
}
if shouldSkipPodStatusUpdate(pod) {
log.G(ctx).Debug("skipping pod status update for terminated pod")
return nil
}
updated := pod.DeepCopy()
updated.Status.Phase = corev1.PodSucceeded
now := metav1.NewTime(time.Now())
for i, cs := range updated.Status.ContainerStatuses {
updated.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
Reason: statusTerminatedReason,
Message: containerStatusTerminatedMessage,
FinishedAt: now,
}
if cs.State.Running != nil {
updated.Status.ContainerStatuses[i].State.Terminated.StartedAt = cs.State.Running.StartedAt
}
}
updated.Status.Reason = statusTerminatedReason
p.notify(updated)
log.G(ctx).Debug("Notified pod terminal pod status")
return nil
}
func (p *syncProviderWrapper) run(ctx context.Context) {
interval := 5 * time.Second
timer := time.NewTimer(interval)
defer timer.Stop()
if !timer.Stop() {
<-timer.C
}
for {
log.G(ctx).Debug("Pod status update loop start")
timer.Reset(interval)
select {
case <-ctx.Done():
log.G(ctx).WithError(ctx.Err()).Debug("sync wrapper loop exiting")
return
case <-timer.C:
}
p.syncPodStatuses(ctx)
}
}
func (p *syncProviderWrapper) syncPodStatuses(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.syncPodStatuses")
defer span.End()
// Update all the pods with the provider status.
pods, err := p.l.List(labels.Everything())
if err != nil {
err = errors.Wrap(err, "error getting pod list from kubernetes")
span.SetStatus(err)
log.G(ctx).WithError(err).Error("Error updating pod statuses")
return
}
ctx = span.WithField(ctx, "nPods", int64(len(pods)))
for _, pod := range pods {
if shouldSkipPodStatusUpdate(pod) {
log.G(ctx).Debug("Skipping pod status update")
continue
}
if err := p.updatePodStatus(ctx, pod); err != nil {
log.G(ctx).WithFields(map[string]interface{}{
"name": pod.Name,
"namespace": pod.Namespace,
}).WithError(err).Error("Could not fetch pod status")
}
}
}
func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubernetes *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.updatePodStatus")
defer span.End()
ctx = addPodAttributes(ctx, span, podFromKubernetes)
podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
if err != nil {
if !errdefs.IsNotFound(err) {
span.SetStatus(err)
return err
}
}
if podStatus != nil {
pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status)
p.notify(pod)
return nil
}
key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes)
if err != nil {
return err
}
if _, exists := p.deletedPods.Load(key); exists {
log.G(ctx).Debug("pod is in known deleted state, ignoring")
return nil
}
// Only change the status when the pod was already up.
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute {
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
podStatus = podFromKubernetes.Status.DeepCopy()
podStatus.Phase = corev1.PodFailed
podStatus.Reason = podStatusReasonNotFound
podStatus.Message = podStatusMessageNotFound
now := metav1.NewTime(time.Now())
for i, c := range podStatus.ContainerStatuses {
if c.State.Running == nil {
continue
}
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: containerStatusExitCodeNotFound,
Reason: containerStatusReasonNotFound,
Message: containerStatusMessageNotFound,
FinishedAt: now,
StartedAt: c.State.Running.StartedAt,
ContainerID: c.ContainerID,
}
podStatus.ContainerStatuses[i].State.Running = nil
}
log.G(ctx).Debug("Setting pod not found on pod status")
}
pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status)
p.notify(pod)
return nil
}

View File

@@ -140,7 +140,7 @@ func (l *logger) Debug(args ...interface{}) {
}
func (l *logger) Debugf(f string, args ...interface{}) {
l.l.Debugf(f, args)
l.l.Debugf(f, args...)
l.s.Annotatef(withLevel(lDebug, l.a), f, args...)
}
@@ -156,7 +156,7 @@ func (l *logger) Info(args ...interface{}) {
}
func (l *logger) Infof(f string, args ...interface{}) {
l.l.Infof(f, args)
l.l.Infof(f, args...)
l.s.Annotatef(withLevel(lInfo, l.a), f, args...)
}
@@ -172,7 +172,7 @@ func (l *logger) Warn(args ...interface{}) {
}
func (l *logger) Warnf(f string, args ...interface{}) {
l.l.Warnf(f, args)
l.l.Warnf(f, args...)
l.s.Annotatef(withLevel(lWarn, l.a), f, args...)
}
@@ -188,7 +188,7 @@ func (l *logger) Error(args ...interface{}) {
}
func (l *logger) Errorf(f string, args ...interface{}) {
l.l.Errorf(f, args)
l.l.Errorf(f, args...)
l.s.Annotatef(withLevel(lErr, l.a), f, args...)
}
@@ -205,7 +205,7 @@ func (l *logger) Fatal(args ...interface{}) {
func (l *logger) Fatalf(f string, args ...interface{}) {
l.s.Annotatef(withLevel(lFatal, l.a), f, args...)
l.l.Fatalf(f, args)
l.l.Fatalf(f, args...)
}
func (l *logger) WithError(err error) log.Logger {