Compare commits

...

28 Commits

Author SHA1 Message Date
Brian Goff
e6e1dbed87 Merge pull request #794 from cpuguy83/cherry_picks_1.2.1
Cherry picks 1.2.1
2019-11-15 15:19:11 -08:00
Thomas Hartland
eb9498cdde Add test for node ping interval
(cherry picked from commit 3783a39b26)
2019-11-15 14:31:04 -08:00
Thomas Hartland
df16317a89 After handling status update, reset update timer with correct duration
If the ping timer is being used, it should be reset with the ping update
interval. If the status update interval is used then Ping stops being
called for long enough to cause kubernetes to mark the node as NotReady.

(cherry picked from commit c258614d8f)
2019-11-15 14:30:54 -08:00
Brian Goff
7585e11542 [Sync Provider] Fix panic on not found pod status
(cherry picked from commit 6e33b0f084)
2019-11-15 14:30:22 -08:00
Brian Goff
ba940a9739 Merge pull request #786 from cpuguy83/add_sync_provider_support
Re-add support for sync providers
2019-11-01 09:23:38 -07:00
Brian Goff
0ccf5059e4 Put sync lifecycle tests being -short flag.
This lets you skip tests for the slower sync provider.
2019-10-29 15:05:35 -07:00
Brian Goff
31c8fbaa41 Apply suggestions from code review
Typos and punctuation fixes.

Co-Authored-By: Pires <1752631+pires@users.noreply.github.com>
2019-10-24 09:23:33 -07:00
Brian Goff
4ee2c4d370 Re-add support for sync providers
This brings back support for sync providers by wrapping them in a
provider that handles async notifications.
2019-10-24 09:23:28 -07:00
Sargun Dhillon
c314045d60 Ensure that delete dangling pods which are still deleting at startup (#784)
If a pod is being gracefully deleted at podcontroller startup,
it will not get deleted via the deletedanglingpods code. This
ensures the normal deletion loop covers the case.
2019-10-22 06:45:36 -04:00
Brian Goff
d455bd16fc Merge pull request #760 from sargun/notify-pods-v7
Do not delete pods in a non-graceful manner
2019-10-18 11:21:31 -07:00
Sargun Dhillon
d22265e5f5 Do not delete pods in a non-graceful manner
This moves from forcefully deleting pods to deleting pods in a
graceful manner from the API Server. It waits for the pod to
get to a terminal status prior to deleting the pod from api
server.
2019-10-17 09:58:21 -07:00
Sargun Dhillon
871424368f Fix pod status updates for when pod is updated outside of VK
Pods can be updated outside of VK. Right now, if this happens, pod
status updates are dropped because the resourceversion from the
provider will mismatch with what's on the server, breaking
pod status updates.

Since we're the only ones writing to the pod status, we
can do a blind overwrite.
2019-10-11 16:32:48 -07:00
Sargun Dhillon
cdc261a08d Use go-cmp to compare pods to suppress duplicate updates
Rather than copying the pods, this uses go-cmp and filters out
the paths which should not be compared.
2019-10-10 13:25:27 -07:00
Brian Goff
d878af3262 Merge pull request #770 from sargun/remove-sync-providers
Remove sync providers
2019-10-07 11:02:22 -07:00
Sargun Dhillon
4202b03cda Remove sync provider support
This removes the legacy sync provider interface. All new providers
are expected to implement the async NotifyPods interface.

The legacy sync provider interface creates complexities around
how the deletion flow works, and the mixed sync and async APIs
block us from evolving functionality.

This collapses in the NotifyPods interface into the PodLifecycleHandler
interface.
2019-10-02 09:28:09 -07:00
Brian Goff
b3aa0f577b Merge pull request #776 from Uzuku/fix-log-format
Fix log format
2019-09-27 11:05:25 -07:00
Uzuku
f80f823e8b Fix log format
Correctly expand the log args
2019-09-28 01:54:46 +08:00
Brian Goff
1bd53c15d1 Merge pull request #774 from toshi0607/feature/fix-lint-warnings
fix lint warnings
2019-09-26 21:48:04 -07:00
Brian Goff
6f6b92ba57 Merge pull request #772 from sargun/add-linters
Add varcheck, deadcode, and mispell linters
2019-09-26 21:47:02 -07:00
toshi0607
bcfc2accf8 misspell 2019-09-26 20:52:06 +09:00
toshi0607
b712751c6d gofmt 2019-09-26 20:50:36 +09:00
Brian Goff
11321d5092 Merge pull request #771 from virtual-kubelet/rbitia-patch-1
Update ADOPTERS.md
2019-09-25 14:31:55 -07:00
Sargun Dhillon
e02c4d9e1e Add varcheck, deadcode, and mispell linters 2019-09-25 09:03:34 -07:00
Ria Bhatia
eda3e27c9f Update ADOPTERS.md
adding adopters
2019-09-25 08:59:01 -07:00
Ria Bhatia
e37a5cebca Update ADOPTERS.md
adding public end-users
2019-09-25 08:57:55 -07:00
Brian Goff
c0746372ad Merge pull request #769 from sargun/add-unused-linter
Add unused code linter
2019-09-24 22:13:39 -07:00
Sargun Dhillon
82a430ccf7 Add unused code linter 2019-09-24 12:55:52 -07:00
Ria Bhatia
8a5f4af171 readme updates (#766) 2019-09-19 11:33:47 -07:00
20 changed files with 887 additions and 702 deletions

View File

@@ -11,3 +11,7 @@ linters:
- golint - golint
- goconst - goconst
- goimports - goimports
- unused
- varcheck
- deadcode
- misspell

View File

@@ -1,3 +1,12 @@
## Virtual Kubelet adopters ## Virtual Kubelet adopters
Are you currently using Virtual Kubelet in production? Please let us know by adding your company name and a description of your use case to this document! * Microsoft Azure
* AWS
* Alibaba
* VMWare
* Netflix
* Hashi Corp
Since end-users are specific per provider within VK we have many end-user customers that we don't have permission to list publically. Please contact ribhatia@microsoft.com for more informtation.
Are you currently using Virtual Kubelet in production? Please let us know by adding your company name and a description of your use case to this document!

View File

@@ -99,10 +99,6 @@ You can find detailed instructions on how to set it up and how to test it in the
The Azure connector can use a configuration file specified by the `--provider-config` flag. The Azure connector can use a configuration file specified by the `--provider-config` flag.
The config file is in TOML format, and an example lives in `providers/azure/example.toml`. The config file is in TOML format, and an example lives in `providers/azure/example.toml`.
#### More Details
See the [ACI Readme](https://github.com/virtual-kubelet/alibabacloud-eci/blob/master/eci.toml)
### AWS Fargate Provider ### AWS Fargate Provider
[AWS Fargate](https://aws.amazon.com/fargate/) is a technology that allows you to run containers [AWS Fargate](https://aws.amazon.com/fargate/) is a technology that allows you to run containers
@@ -114,7 +110,7 @@ IP addresses to connect to the internet, private IP addresses to connect to your
security groups, IAM roles, CloudWatch Logs and many other AWS services. Pods on Fargate can security groups, IAM roles, CloudWatch Logs and many other AWS services. Pods on Fargate can
co-exist with pods on regular worker nodes in the same Kubernetes cluster. co-exist with pods on regular worker nodes in the same Kubernetes cluster.
Easy instructions and a sample configuration file is available in the [AWS Fargate provider documentation](https://github.com/virtual-kubelet/aws-fargate/blob/master/README.md). Easy instructions and a sample configuration file is available in the [AWS Fargate provider documentation](https://github.com/virtual-kubelet/aws-fargate). Please note that this provider is not currently supported.
### HashiCorp Nomad Provider ### HashiCorp Nomad Provider
@@ -280,8 +276,10 @@ Enable the ServiceNodeExclusion flag, by modifying the Controller Manager manife
Virtual Kubelet follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md). Virtual Kubelet follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).
Sign the [CNCF CLA](https://github.com/kubernetes/community/blob/master/CLA.md) to be able to make Pull Requests to this repo. Sign the [CNCF CLA](https://github.com/kubernetes/community/blob/master/CLA.md) to be able to make Pull Requests to this repo.
Bi-weekly Virtual Kubelet Architecture meetings are held at 11am PST in this [zoom meeting room](https://zoom.us/j/245165908). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ). Bi-weekly Virtual Kubelet Architecture meetings are held at 11am PST every other Wednesday in this [zoom meeting room](https://zoom.us/j/245165908). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ).
Our google drive with design specifications and meeting notes are [here](https://drive.google.com/drive/folders/19Ndu11WBCCBDowo9CrrGUHoIfd2L8Ueg?usp=sharing). Our google drive with design specifications and meeting notes are [here](https://drive.google.com/drive/folders/19Ndu11WBCCBDowo9CrrGUHoIfd2L8Ueg?usp=sharing).
We also have a community slack channel named virtual-kubelet in the Kubernetes slack.

View File

@@ -29,7 +29,7 @@ var (
) )
// TracingExporterInitFunc is the function that is called to initialize an exporter. // TracingExporterInitFunc is the function that is called to initialize an exporter.
// This is used when registering an exporter and called when a user specifed they want to use the exporter. // This is used when registering an exporter and called when a user specified they want to use the exporter.
type TracingExporterInitFunc func(TracingExporterOptions) (trace.Exporter, error) type TracingExporterInitFunc func(TracingExporterOptions) (trace.Exporter, error)
// RegisterTracingExporter registers a tracing exporter. // RegisterTracingExporter registers a tracing exporter.

View File

@@ -41,8 +41,8 @@ var (
) )
*/ */
// MockV0Provider implements the virtual-kubelet provider interface and stores pods in memory. // MockProvider implements the virtual-kubelet provider interface and stores pods in memory.
type MockV0Provider struct { //nolint:golint type MockProvider struct { // nolint:golint
nodeName string nodeName string
operatingSystem string operatingSystem string
internalIP string internalIP string
@@ -53,11 +53,6 @@ type MockV0Provider struct { //nolint:golint
notifier func(*v1.Pod) notifier func(*v1.Pod)
} }
// MockProvider is like MockV0Provider, but implements the PodNotifier interface
type MockProvider struct { //nolint:golint
*MockV0Provider
}
// MockConfig contains a mock virtual-kubelet's configurable parameters. // MockConfig contains a mock virtual-kubelet's configurable parameters.
type MockConfig struct { //nolint:golint type MockConfig struct { //nolint:golint
CPU string `json:"cpu,omitempty"` CPU string `json:"cpu,omitempty"`
@@ -66,7 +61,7 @@ type MockConfig struct { //nolint:golint
} }
// NewMockProviderMockConfig creates a new MockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface // NewMockProviderMockConfig creates a new MockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface
func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) { func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
//set defaults //set defaults
if config.CPU == "" { if config.CPU == "" {
config.CPU = defaultCPUCapacity config.CPU = defaultCPUCapacity
@@ -77,7 +72,7 @@ func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem st
if config.Pods == "" { if config.Pods == "" {
config.Pods = defaultPodCapacity config.Pods = defaultPodCapacity
} }
provider := MockV0Provider{ provider := MockProvider{
nodeName: nodeName, nodeName: nodeName,
operatingSystem: operatingSystem, operatingSystem: operatingSystem,
internalIP: internalIP, internalIP: internalIP,
@@ -85,32 +80,11 @@ func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem st
pods: make(map[string]*v1.Pod), pods: make(map[string]*v1.Pod),
config: config, config: config,
startTime: time.Now(), startTime: time.Now(),
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
notifier: func(*v1.Pod) {},
} }
return &provider, nil return &provider, nil
} }
// NewMockV0Provider creates a new MockV0Provider
func NewMockV0Provider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) {
config, err := loadConfig(providerConfig, nodeName)
if err != nil {
return nil, err
}
return NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
}
// NewMockProviderMockConfig creates a new MockProvider with the given config
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
p, err := NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
return &MockProvider{MockV0Provider: p}, err
}
// NewMockProvider creates a new MockProvider, which implements the PodNotifier interface // NewMockProvider creates a new MockProvider, which implements the PodNotifier interface
func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) { func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
config, err := loadConfig(providerConfig, nodeName) config, err := loadConfig(providerConfig, nodeName)
@@ -158,7 +132,7 @@ func loadConfig(providerConfig, nodeName string) (config MockConfig, err error)
} }
// CreatePod accepts a Pod definition and stores it in memory. // CreatePod accepts a Pod definition and stores it in memory.
func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
ctx, span := trace.StartSpan(ctx, "CreatePod") ctx, span := trace.StartSpan(ctx, "CreatePod")
defer span.End() defer span.End()
@@ -215,7 +189,7 @@ func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
} }
// UpdatePod accepts a Pod definition and updates its reference. // UpdatePod accepts a Pod definition and updates its reference.
func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
ctx, span := trace.StartSpan(ctx, "UpdatePod") ctx, span := trace.StartSpan(ctx, "UpdatePod")
defer span.End() defer span.End()
@@ -236,7 +210,7 @@ func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
} }
// DeletePod deletes the specified pod out of memory. // DeletePod deletes the specified pod out of memory.
func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
ctx, span := trace.StartSpan(ctx, "DeletePod") ctx, span := trace.StartSpan(ctx, "DeletePod")
defer span.End() defer span.End()
@@ -277,7 +251,7 @@ func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error)
} }
// GetPod returns a pod by name that is stored in memory. // GetPod returns a pod by name that is stored in memory.
func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
ctx, span := trace.StartSpan(ctx, "GetPod") ctx, span := trace.StartSpan(ctx, "GetPod")
defer func() { defer func() {
span.SetStatus(err) span.SetStatus(err)
@@ -301,7 +275,7 @@ func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (po
} }
// GetContainerLogs retrieves the logs of a container by name from the provider. // GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
ctx, span := trace.StartSpan(ctx, "GetContainerLogs") ctx, span := trace.StartSpan(ctx, "GetContainerLogs")
defer span.End() defer span.End()
@@ -314,14 +288,14 @@ func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podNam
// RunInContainer executes a command in a container in the pod, copying data // RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr. // between in/out/err and the container's stdin/stdout/stderr.
func (p *MockV0Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error { func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {
log.G(context.TODO()).Infof("receive ExecInContainer %q", container) log.G(context.TODO()).Infof("receive ExecInContainer %q", container)
return nil return nil
} }
// GetPodStatus returns the status of a pod by name that is "running". // GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found. // returns nil if a pod by that name is not found.
func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
ctx, span := trace.StartSpan(ctx, "GetPodStatus") ctx, span := trace.StartSpan(ctx, "GetPodStatus")
defer span.End() defer span.End()
@@ -339,7 +313,7 @@ func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name strin
} }
// GetPods returns a list of all pods known to be "running". // GetPods returns a list of all pods known to be "running".
func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
ctx, span := trace.StartSpan(ctx, "GetPods") ctx, span := trace.StartSpan(ctx, "GetPods")
defer span.End() defer span.End()
@@ -354,7 +328,7 @@ func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
return pods, nil return pods, nil
} }
func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) { func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) {
ctx, span := trace.StartSpan(ctx, "mock.ConfigureNode") //nolint:ineffassign ctx, span := trace.StartSpan(ctx, "mock.ConfigureNode") //nolint:ineffassign
defer span.End() defer span.End()
@@ -373,7 +347,7 @@ func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) {
} }
// Capacity returns a resource list containing the capacity limits. // Capacity returns a resource list containing the capacity limits.
func (p *MockV0Provider) capacity() v1.ResourceList { func (p *MockProvider) capacity() v1.ResourceList {
return v1.ResourceList{ return v1.ResourceList{
"cpu": resource.MustParse(p.config.CPU), "cpu": resource.MustParse(p.config.CPU),
"memory": resource.MustParse(p.config.Memory), "memory": resource.MustParse(p.config.Memory),
@@ -383,7 +357,7 @@ func (p *MockV0Provider) capacity() v1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status // NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
// within Kubernetes. // within Kubernetes.
func (p *MockV0Provider) nodeConditions() []v1.NodeCondition { func (p *MockProvider) nodeConditions() []v1.NodeCondition {
// TODO: Make this configurable // TODO: Make this configurable
return []v1.NodeCondition{ return []v1.NodeCondition{
{ {
@@ -432,7 +406,7 @@ func (p *MockV0Provider) nodeConditions() []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status // NodeAddresses returns a list of addresses for the node status
// within Kubernetes. // within Kubernetes.
func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress { func (p *MockProvider) nodeAddresses() []v1.NodeAddress {
return []v1.NodeAddress{ return []v1.NodeAddress{
{ {
Type: "InternalIP", Type: "InternalIP",
@@ -443,7 +417,7 @@ func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status // NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes. // within Kubernetes.
func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints { func (p *MockProvider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
return v1.NodeDaemonEndpoints{ return v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{ KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort, Port: p.daemonEndpointPort,
@@ -452,7 +426,7 @@ func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
} }
// GetStatsSummary returns dummy stats for all pods known by this provider. // GetStatsSummary returns dummy stats for all pods known by this provider.
func (p *MockV0Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) { func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
var span trace.Span var span trace.Span
ctx, span = trace.StartSpan(ctx, "GetStatsSummary") //nolint: ineffassign ctx, span = trace.StartSpan(ctx, "GetStatsSummary") //nolint: ineffassign
defer span.End() defer span.End()

View File

@@ -15,14 +15,4 @@ func registerMock(s *provider.Store) {
cfg.DaemonPort, cfg.DaemonPort,
) )
}) })
s.Register("mockV0", func(cfg provider.InitConfig) (provider.Provider, error) { //nolint:errcheck
return mock.NewMockProvider(
cfg.ConfigPath,
cfg.NodeName,
cfg.OperatingSystem,
cfg.InternalIP,
cfg.DaemonPort,
)
})
} }

1
go.mod
View File

@@ -5,7 +5,6 @@ go 1.12
require ( require (
contrib.go.opencensus.io/exporter/jaeger v0.1.0 contrib.go.opencensus.io/exporter/jaeger v0.1.0
contrib.go.opencensus.io/exporter/ocagent v0.4.12 contrib.go.opencensus.io/exporter/ocagent v0.4.12
github.com/davecgh/go-spew v1.1.1
github.com/docker/spdystream v0.0.0-20170912183627-bc6354cbbc29 // indirect github.com/docker/spdystream v0.0.0-20170912183627-bc6354cbbc29 // indirect
github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 // indirect github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 // indirect

View File

@@ -109,7 +109,6 @@ func getExecOptions(req *http.Request) (*remotecommand.Options, error) {
type containerExecContext struct { type containerExecContext struct {
h ContainerExecHandlerFunc h ContainerExecHandlerFunc
eio *execIO
namespace, pod, container string namespace, pod, container string
ctx context.Context ctx context.Context
} }

View File

@@ -663,7 +663,7 @@ func TestEnvFromConfigMapAndSecretWithInvalidKeys(t *testing.T) {
} }
// TestEnvOverridesEnvFrom populates the environment of a container from a configmap, and from another configmap's key with a "conflicting" key. // TestEnvOverridesEnvFrom populates the environment of a container from a configmap, and from another configmap's key with a "conflicting" key.
// Then, it checks that the value of the "conflicting" key has been correctly overriden. // Then, it checks that the value of the "conflicting" key has been correctly overridden.
func TestEnvOverridesEnvFrom(t *testing.T) { func TestEnvOverridesEnvFrom(t *testing.T) {
rm := testutil.FakeResourceManager(configMap3) rm := testutil.FakeResourceManager(configMap3)
er := testutil.FakeEventRecorder(defaultEventRecorderBufferSize) er := testutil.FakeEventRecorder(defaultEventRecorderBufferSize)
@@ -672,7 +672,7 @@ func TestEnvOverridesEnvFrom(t *testing.T) {
override := "__override__" override := "__override__"
// Create a pod object having a single container. // Create a pod object having a single container.
// The container's environment is to be populated from a configmap, and later overriden with a value provided directly. // The container's environment is to be populated from a configmap, and later overridden with a value provided directly.
pod := &corev1.Pod{ pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Namespace: namespace, Namespace: namespace,

View File

@@ -94,8 +94,7 @@ func TestPodLifecycle(t *testing.T) {
ctx = log.WithLogger(ctx, log.L) ctx = log.WithLogger(ctx, log.L)
// isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_, which is the VK's // isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_
// action when the pod is deleted from the provider
isPodDeletedPermanentlyFunc := func(ctx context.Context, watcher watch.Interface) error { isPodDeletedPermanentlyFunc := func(ctx context.Context, watcher watch.Interface) error {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) { _, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
log.G(ctx).WithField("event", ev).Info("got event") log.G(ctx).WithField("event", ev).Info("got event")
@@ -107,132 +106,121 @@ func TestPodLifecycle(t *testing.T) {
return watchErr return watchErr
} }
// createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting // isPodDeletedGracefullyFunc is a condition func that waits until the pod is in a terminal state, which is the VK's
// it gracefully // action when the pod is deleted from the provider.
t.Run("createStartDeleteScenario", func(t *testing.T) { isPodDeletedGracefullyFunc := func(ctx context.Context, watcher watch.Interface) error {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
t.Run("mockProvider", func(t *testing.T) { log.G(ctx).WithField("event", ev).Info("got event")
assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) { pod := ev.Object.(*corev1.Pod)
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) return (pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded) && (pod.Status.Reason == mockProviderPodDeletedReason || pod.Status.Reason == statusTerminatedReason), nil
}))
}) })
return watchErr
}
if testing.Short() { envs := map[string]func(*testing.T) testingProvider{
return "Async": func(t *testing.T) testingProvider {
} return newMockProvider()
t.Run("mockV0Provider", func(t *testing.T) { },
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) { "Sync": func(t *testing.T) testingProvider {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc) if testing.Short() {
})) t.Skip()
}
return newSyncMockProvider()
},
}
for env, h := range envs {
t.Run(env, func(t *testing.T) {
// createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting
// it gracefully.
t.Run("createStartDeleteScenario", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, h(t), func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedGracefullyFunc, true)
}))
})
// createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider
// for some reason.
t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) {
mp := h(t)
mp.setErrorOnDelete(errdefs.NotFound("not found"))
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc, false)
}))
})
// createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the
// provider.
t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) {
mp := h(t)
deletionFunc := func(ctx context.Context, watcher watch.Interface) error {
return mp.getAttemptedDeletes().until(ctx, func(v int) bool { return v >= 2 })
}
mp.setErrorOnDelete(errors.New("random error"))
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false)
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
assert.NilError(t, err)
assert.Assert(t, is.Len(pods.Items, 1))
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
}))
})
// danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting,
// and ensures the pod controller deletes the pod prior to continuing.
t.Run("danglingPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenario(ctx, t, s, mp)
}))
})
})
// testDanglingPodScenarioWithDeletionTimestamp tests if a pod is created in the provider and on api server it had
// deletiontimestamp set. It ensures deletion occurs.
t.Run("testDanglingPodScenarioWithDeletionTimestamp", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenarioWithDeletionTimestamp(ctx, t, s, mp)
}))
})
})
// failedPodScenario ensures that the VK ignores failed pods that were failed prior to the pod controller starting up.
t.Run("failedPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testFailedPodScenario(ctx, t, s)
}))
})
})
// succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the pod controller starting up.
t.Run("succeededPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testSucceededPodScenario(ctx, t, s)
}))
})
})
// updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated
// to the provider.
t.Run("updatePodWhileRunningScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testUpdatePodWhileRunningScenario(ctx, t, s, mp)
}))
})
})
}) })
}) }
// createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider
// for some reason
t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) {
mp := newMockProvider()
mp.errorOnDelete = errdefs.NotFound("not found")
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
}))
})
// createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the
// provider
t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) {
mp := newMockProvider()
deletionFunc := func(ctx context.Context, watcher watch.Interface) error {
return mp.attemptedDeletes.until(ctx, func(v int) bool { return v >= 2 })
}
mp.errorOnDelete = errors.New("random error")
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, deletionFunc)
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
assert.NilError(t, err)
assert.Assert(t, is.Len(pods.Items, 1))
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
}))
})
// danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting,
// and ensures the pod controller deletes the pod prior to continuing.
t.Run("danglingPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenario(ctx, t, s, mp.mockV0Provider)
}))
})
if testing.Short() {
return
}
t.Run("mockV0Provider", func(t *testing.T) {
mp := newMockV0Provider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenario(ctx, t, s, mp)
}))
})
})
// failedPodScenario ensures that the VK ignores failed pods that were failed prior to the PC starting up
t.Run("failedPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testFailedPodScenario(ctx, t, s)
}))
})
if testing.Short() {
return
}
t.Run("mockV0Provider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
testFailedPodScenario(ctx, t, s)
}))
})
})
// succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the PC starting up.
t.Run("succeededPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testSucceededPodScenario(ctx, t, s)
}))
})
if testing.Short() {
return
}
t.Run("mockV0Provider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
testSucceededPodScenario(ctx, t, s)
}))
})
})
// updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated
// to the provider
t.Run("updatePodWhileRunningScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testUpdatePodWhileRunningScenario(ctx, t, s, mp)
}))
})
})
// podStatusMissingWhileRunningScenario waits for the pod to go into the running state, with a V0 style provider,
// and then makes the pod disappear!
t.Run("podStatusMissingWhileRunningScenario", func(t *testing.T) {
mp := newMockV0Provider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testPodStatusMissingWhileRunningScenario(ctx, t, s, mp)
}))
})
} }
type testFunction func(ctx context.Context, s *system) type testFunction func(ctx context.Context, s *system)
@@ -358,7 +346,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
assert.DeepEqual(t, p1, p2) assert.DeepEqual(t, p1, p2)
} }
func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) { func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m testingProvider) {
t.Parallel() t.Parallel()
pod := newPod() pod := newPod()
@@ -367,12 +355,54 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mo
// Start the pod controller // Start the pod controller
assert.NilError(t, s.start(ctx)) assert.NilError(t, s.start(ctx))
assert.Assert(t, is.Equal(m.deletes.read(), 1)) assert.Assert(t, is.Equal(m.getDeletes().read(), 1))
} }
func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error) { func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testing.T, s *system, m testingProvider) {
t.Parallel()
pod := newPod()
listOptions := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.ObjectMeta.Name).String(),
}
// Setup a watch (prior to pod creation, and pod controller startup)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
assert.NilError(t, m.CreatePod(ctx, pod))
// Create the Pod
podCopyWithDeletionTimestamp := pod.DeepCopy()
var deletionGracePeriod int64 = 10
podCopyWithDeletionTimestamp.DeletionGracePeriodSeconds = &deletionGracePeriod
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
podCopyWithDeletionTimestamp.DeletionTimestamp = &deletionTimestamp
_, e := s.client.CoreV1().Pods(testNamespace).Create(podCopyWithDeletionTimestamp)
assert.NilError(t, e)
// Start the pod controller
assert.NilError(t, s.start(ctx))
watchErrCh := make(chan error)
go func() {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
func(ev watch.Event) (bool, error) {
return ev.Type == watch.Deleted, nil
})
watchErrCh <- watchErr
}()
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-watchErrCh:
assert.NilError(t, err)
}
}
func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error, waitForPermanentDeletion bool) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@@ -440,6 +470,18 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
assert.NilError(t, err) assert.NilError(t, err)
} }
// Setup a watch to look for the pod eventually going away completely
watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher2.Stop()
waitDeleteCh := make(chan error)
go func() {
_, watchDeleteErr := watchutils.UntilWithoutRetry(ctx, watcher2, func(ev watch.Event) (bool, error) {
return ev.Type == watch.Deleted, nil
})
waitDeleteCh <- watchDeleteErr
}()
// Setup a watch prior to pod deletion // Setup a watch prior to pod deletion
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions) watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err) assert.NilError(t, err)
@@ -454,7 +496,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{}) currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{})
assert.NilError(t, err) assert.NilError(t, err)
// 2. Set the pod's deletion timestamp, version, and so on // 2. Set the pod's deletion timestamp, version, and so on
var deletionGracePeriod int64 = 30 var deletionGracePeriod int64 = 10
currentPod.DeletionGracePeriodSeconds = &deletionGracePeriod currentPod.DeletionGracePeriodSeconds = &deletionGracePeriod
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod))) deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
currentPod.DeletionTimestamp = &deletionTimestamp currentPod.DeletionTimestamp = &deletionTimestamp
@@ -468,9 +510,18 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
case err = <-watchErrCh: case err = <-watchErrCh:
assert.NilError(t, err) assert.NilError(t, err)
} }
if waitForPermanentDeletion {
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-waitDeleteCh:
assert.NilError(t, err)
}
}
} }
func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) { func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m testingProvider) {
t.Parallel() t.Parallel()
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@@ -527,88 +578,7 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
log.G(ctx).WithField("pod", p).Info("Updating pod") log.G(ctx).WithField("pod", p).Info("Updating pod")
_, err = s.client.CoreV1().Pods(p.Namespace).Update(p) _, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
assert.NilError(t, err) assert.NilError(t, err)
assert.NilError(t, m.updates.until(ctx, func(v int) bool { return v > 0 })) assert.NilError(t, m.getUpdates().until(ctx, func(v int) bool { return v > 0 }))
}
func testPodStatusMissingWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) {
t.Parallel()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
p := newPod()
key, err := buildKey(p)
assert.NilError(t, err)
listOptions := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(),
}
watchErrCh := make(chan error)
// Create a Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(p)
assert.NilError(t, e)
// Setup a watch to check if the pod is in running
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
// Wait for the pod to be started
func(ev watch.Event) (bool, error) {
pod := ev.Object.(*corev1.Pod)
return pod.Status.Phase == corev1.PodRunning, nil
})
// This deepcopy is required to please the race detector
p = newPod.Object.(*corev1.Pod).DeepCopy()
watchErrCh <- watchErr
}()
// Start the pod controller
assert.NilError(t, s.start(ctx))
// Wait for pod to be in running
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case <-s.pc.Done():
assert.NilError(t, s.pc.Err())
t.Fatal("Pod controller exited prematurely without error")
case err = <-watchErrCh:
assert.NilError(t, err)
}
// Setup a watch to check if the pod is in failed due to provider issues
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
// Wait for the pod to be failed
func(ev watch.Event) (bool, error) {
pod := ev.Object.(*corev1.Pod)
return pod.Status.Phase == corev1.PodFailed, nil
})
// This deepcopy is required to please the race detector
p = newPod.Object.(*corev1.Pod).DeepCopy()
watchErrCh <- watchErr
}()
// delete the pod from the mock provider
m.pods.Delete(key)
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case <-s.pc.Done():
assert.NilError(t, s.pc.Err())
t.Fatal("Pod controller exited prematurely without error")
case err = <-watchErrCh:
assert.NilError(t, err)
}
assert.Equal(t, p.Status.Reason, podStatusReasonNotFound)
} }
func BenchmarkCreatePods(b *testing.B) { func BenchmarkCreatePods(b *testing.B) {
@@ -651,6 +621,7 @@ func randomizeName(pod *corev1.Pod) {
} }
func newPod(podmodifiers ...podModifier) *corev1.Pod { func newPod(podmodifiers ...podModifier) *corev1.Pod {
var terminationGracePeriodSeconds int64 = 30
pod := &corev1.Pod{ pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{
Kind: "Pod", Kind: "Pod",
@@ -663,7 +634,14 @@ func newPod(podmodifiers ...podModifier) *corev1.Pod {
ResourceVersion: "100", ResourceVersion: "100",
}, },
Spec: corev1.PodSpec{ Spec: corev1.PodSpec{
NodeName: testNodeName, NodeName: testNodeName,
TerminationGracePeriodSeconds: &terminationGracePeriodSeconds,
Containers: []corev1.Container{
{
Name: "my-container",
Image: "nginx",
},
},
}, },
Status: corev1.PodStatus{ Status: corev1.PodStatus{
Phase: corev1.PodPending, Phase: corev1.PodPending,

View File

@@ -12,9 +12,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
const (
mockProviderPodDeletedReason = "MockProviderPodDeleted"
)
var ( var (
_ PodLifecycleHandler = (*mockV0Provider)(nil) _ PodLifecycleHandler = (*mockProvider)(nil)
_ PodNotifier = (*mockProvider)(nil)
) )
type waitableInt struct { type waitableInt struct {
@@ -62,7 +65,7 @@ func (w *waitableInt) increment() {
w.cond.Broadcast() w.cond.Broadcast()
} }
type mockV0Provider struct { type mockProvider struct {
creates *waitableInt creates *waitableInt
updates *waitableInt updates *waitableInt
deletes *waitableInt deletes *waitableInt
@@ -75,40 +78,35 @@ type mockV0Provider struct {
realNotifier func(*v1.Pod) realNotifier func(*v1.Pod)
} }
type mockProvider struct { // newMockProvider creates a new mockProvider.
*mockV0Provider func newMockProvider() *mockProviderAsync {
provider := newSyncMockProvider()
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
return &mockProviderAsync{provider}
} }
// NewMockProviderMockConfig creates a new mockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface func newSyncMockProvider() *mockProvider {
func newMockV0Provider() *mockV0Provider { provider := mockProvider{
provider := mockV0Provider{
startTime: time.Now(), startTime: time.Now(),
creates: newWaitableInt(), creates: newWaitableInt(),
updates: newWaitableInt(), updates: newWaitableInt(),
deletes: newWaitableInt(), deletes: newWaitableInt(),
attemptedDeletes: newWaitableInt(), attemptedDeletes: newWaitableInt(),
} }
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
return &provider return &provider
} }
// NewMockProviderMockConfig creates a new MockProvider with the given config
func newMockProvider() *mockProvider {
return &mockProvider{mockV0Provider: newMockV0Provider()}
}
// notifier calls the callback that we got from the pod controller to notify it of updates (if it is set) // notifier calls the callback that we got from the pod controller to notify it of updates (if it is set)
func (p *mockV0Provider) notifier(pod *v1.Pod) { func (p *mockProvider) notifier(pod *v1.Pod) {
if p.realNotifier != nil { if p.realNotifier != nil {
p.realNotifier(pod) p.realNotifier(pod)
} }
} }
// CreatePod accepts a Pod definition and stores it in memory. // CreatePod accepts a Pod definition and stores it in memory.
func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error { func (p *mockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
log.G(ctx).Infof("receive CreatePod %q", pod.Name) log.G(ctx).Infof("receive CreatePod %q", pod.Name)
p.creates.increment() p.creates.increment()
@@ -160,7 +158,7 @@ func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
} }
// UpdatePod accepts a Pod definition and updates its reference. // UpdatePod accepts a Pod definition and updates its reference.
func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error { func (p *mockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
log.G(ctx).Infof("receive UpdatePod %q", pod.Name) log.G(ctx).Infof("receive UpdatePod %q", pod.Name)
p.updates.increment() p.updates.increment()
@@ -177,19 +175,23 @@ func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
// DeletePod deletes the specified pod out of memory. The PodController deepcopies the pod object // DeletePod deletes the specified pod out of memory. The PodController deepcopies the pod object
// for us, so we don't have to worry about mutation. // for us, so we don't have to worry about mutation.
func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
log.G(ctx).Infof("receive DeletePod %q", pod.Name) log.G(ctx).Infof("receive DeletePod %q", pod.Name)
p.attemptedDeletes.increment() p.attemptedDeletes.increment()
key, err := buildKey(pod)
if err != nil {
return err
}
if errdefs.IsNotFound(p.errorOnDelete) {
p.pods.Delete(key)
}
if p.errorOnDelete != nil { if p.errorOnDelete != nil {
return p.errorOnDelete return p.errorOnDelete
} }
p.deletes.increment() p.deletes.increment()
key, err := buildKey(pod)
if err != nil {
return err
}
if _, exists := p.pods.Load(key); !exists { if _, exists := p.pods.Load(key); !exists {
return errdefs.NotFound("pod not found") return errdefs.NotFound("pod not found")
@@ -198,7 +200,7 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error)
now := metav1.Now() now := metav1.Now()
pod.Status.Phase = v1.PodSucceeded pod.Status.Phase = v1.PodSucceeded
pod.Status.Reason = "MockProviderPodDeleted" pod.Status.Reason = mockProviderPodDeletedReason
for idx := range pod.Status.ContainerStatuses { for idx := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[idx].Ready = false pod.Status.ContainerStatuses[idx].Ready = false
@@ -213,21 +215,13 @@ func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error)
} }
p.notifier(pod) p.notifier(pod)
p.pods.Store(key, pod) p.pods.Delete(key)
if pod.DeletionGracePeriodSeconds == nil || *pod.DeletionGracePeriodSeconds == 0 {
p.pods.Delete(key)
} else {
time.AfterFunc(time.Duration(*pod.DeletionGracePeriodSeconds)*time.Second, func() {
p.pods.Delete(key)
})
}
return nil return nil
} }
// GetPod returns a pod by name that is stored in memory. // GetPod returns a pod by name that is stored in memory.
func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { func (p *mockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
log.G(ctx).Infof("receive GetPod %q", name) log.G(ctx).Infof("receive GetPod %q", name)
key, err := buildKeyFromNames(namespace, name) key, err := buildKeyFromNames(namespace, name)
@@ -243,7 +237,7 @@ func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (po
// GetPodStatus returns the status of a pod by name that is "running". // GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found. // returns nil if a pod by that name is not found.
func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { func (p *mockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
log.G(ctx).Infof("receive GetPodStatus %q", name) log.G(ctx).Infof("receive GetPodStatus %q", name)
pod, err := p.GetPod(ctx, namespace, name) pod, err := p.GetPod(ctx, namespace, name)
@@ -255,7 +249,7 @@ func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name strin
} }
// GetPods returns a list of all pods known to be "running". // GetPods returns a list of all pods known to be "running".
func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) { func (p *mockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
log.G(ctx).Info("receive GetPods") log.G(ctx).Info("receive GetPods")
var pods []*v1.Pod var pods []*v1.Pod
@@ -268,10 +262,24 @@ func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
return pods, nil return pods, nil
} }
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done func (p *mockProvider) setErrorOnDelete(err error) {
// within the provider. p.errorOnDelete = err
func (p *mockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) { }
p.realNotifier = notifier
func (p *mockProvider) getAttemptedDeletes() *waitableInt {
return p.attemptedDeletes
}
func (p *mockProvider) getCreates() *waitableInt {
return p.creates
}
func (p *mockProvider) getDeletes() *waitableInt {
return p.deletes
}
func (p *mockProvider) getUpdates() *waitableInt {
return p.updates
} }
func buildKeyFromNames(namespace string, name string) (string, error) { func buildKeyFromNames(namespace string, name string) (string, error) {
@@ -290,3 +298,22 @@ func buildKey(pod *v1.Pod) (string, error) {
return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name) return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
} }
type mockProviderAsync struct {
*mockProvider
}
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
// within the provider.
func (p *mockProviderAsync) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
p.realNotifier = notifier
}
type testingProvider interface {
PodLifecycleHandler
setErrorOnDelete(error)
getAttemptedDeletes() *waitableInt
getDeletes() *waitableInt
getCreates() *waitableInt
getUpdates() *waitableInt
}

View File

@@ -244,7 +244,12 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
statusTimer := time.NewTimer(n.statusInterval) statusTimer := time.NewTimer(n.statusInterval)
defer statusTimer.Stop() defer statusTimer.Stop()
timerResetDuration := n.statusInterval
if n.disableLease { if n.disableLease {
// when resetting the timer after processing a status update, reset it to the ping interval
// (since it will be the ping timer as n.disableLease == true)
timerResetDuration = n.pingInterval
// hack to make sure this channel always blocks since we won't be using it // hack to make sure this channel always blocks since we won't be using it
if !statusTimer.Stop() { if !statusTimer.Stop() {
<-statusTimer.C <-statusTimer.C
@@ -276,7 +281,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
if err := n.updateStatus(ctx, false); err != nil { if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update") log.G(ctx).WithError(err).Error("Error handling node status update")
} }
t.Reset(n.statusInterval) t.Reset(timerResetDuration)
case <-statusTimer.C: case <-statusTimer.C:
if err := n.updateStatus(ctx, false); err != nil { if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update") log.G(ctx).WithError(err).Error("Error handling node status update")

View File

@@ -302,6 +302,78 @@ func TestUpdateNodeLease(t *testing.T) {
assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity)) assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity))
} }
// TestPingAfterStatusUpdate checks that Ping continues to be called with the specified interval
// after a node status update occurs, when leases are disabled.
//
// Timing ratios used in this test:
// ping interval (10 ms)
// maximum allowed interval = 2.5 * ping interval
// status update interval = 6 * ping interval
//
// The allowed maximum time is 2.5 times the ping interval because
// the status update resets the ping interval timer, meaning
// that there can be a full two interval durations between
// successive calls to Ping. The extra half is to allow
// for timing variations when using such short durations.
//
// Once the node controller is ready:
// send status update after 10 * ping interval
// end test after another 10 * ping interval
func TestPingAfterStatusUpdate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
nodes := c.CoreV1().Nodes()
testP := &testNodeProviderPing{}
interval := 10 * time.Millisecond
maxAllowedInterval := time.Duration(2.5 * float64(interval.Nanoseconds()))
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
WithNodeStatusUpdateInterval(interval * time.Duration(6)),
}
testNode := testNode(t)
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error, 1)
go func() {
chErr <- node.Run(ctx)
}()
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
// wait for the node to be ready
select {
case <-timer.C:
t.Fatal("timeout waiting for node to be ready")
case <-chErr:
t.Fatalf("node.Run returned earlier than expected: %v", err)
case <-node.Ready():
}
notifyTimer := time.After(interval * time.Duration(10))
select {
case <-notifyTimer:
testP.triggerStatusUpdate(testNodeCopy)
}
endTimer := time.After(interval * time.Duration(10))
select {
case <-endTimer:
break
}
assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval)
}
func testNode(t *testing.T) *corev1.Node { func testNode(t *testing.T) *corev1.Node {
n := &corev1.Node{} n := &corev1.Node{}
n.Name = strings.ToLower(t.Name()) n.Name = strings.ToLower(t.Name())
@@ -323,6 +395,26 @@ func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
} }
} }
// testNodeProviderPing tracks the maximum time interval between calls to Ping
type testNodeProviderPing struct {
testNodeProvider
lastPingTime time.Time
maxPingInterval time.Duration
}
func (tnp *testNodeProviderPing) Ping(ctx context.Context) error {
now := time.Now()
if tnp.lastPingTime.IsZero() {
tnp.lastPingTime = now
return nil
}
if now.Sub(tnp.lastPingTime) > tnp.maxPingInterval {
tnp.maxPingInterval = now.Sub(tnp.lastPingTime)
}
tnp.lastPingTime = now
return nil
}
type watchGetter interface { type watchGetter interface {
Watch(metav1.ListOptions) (watch.Interface, error) Watch(metav1.ListOptions) (watch.Interface, error)
} }

View File

@@ -16,30 +16,20 @@ package node
import ( import (
"context" "context"
"hash/fnv"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors" pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace" "github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
const ( const (
podStatusReasonProviderFailed = "ProviderFailed" podStatusReasonProviderFailed = "ProviderFailed"
podStatusReasonNotFound = "NotFound"
podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider"
containerStatusReasonNotFound = "NotFound"
containerStatusMessageNotFound = "Container was not found and was likely deleted"
containerStatusExitCodeNotFound = -137
) )
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
@@ -117,20 +107,6 @@ func podsEqual(pod1, pod2 *corev1.Pod) bool {
} }
// This is basically the kube runtime's hash container functionality.
// VK only operates at the Pod level so this is adapted for that
func hashPodSpec(spec corev1.PodSpec) uint64 {
hash := fnv.New32a()
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}
printer.Fprintf(hash, "%#v", spec)
return uint64(hash.Sum32())
}
func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) { func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) {
podPhase := corev1.PodPending podPhase := corev1.PodPending
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
@@ -156,136 +132,22 @@ func (pc *PodController) handleProviderError(ctx context.Context, span trace.Spa
span.SetStatus(origErr) span.SetStatus(origErr)
} }
func (pc *PodController) deletePod(ctx context.Context, namespace, name string) error { func (pc *PodController) deletePod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "deletePod") ctx, span := trace.StartSpan(ctx, "deletePod")
defer span.End() defer span.End()
pod, err := pc.provider.GetPod(ctx, namespace, name)
if err != nil {
if errdefs.IsNotFound(err) {
// The provider is not aware of the pod, but we must still delete the Kubernetes API resource.
return pc.forceDeletePodResource(ctx, namespace, name)
}
return err
}
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
if pod == nil {
// The provider is not aware of the pod, but we must still delete the Kubernetes API resource.
return pc.forceDeletePodResource(ctx, namespace, name)
}
ctx = addPodAttributes(ctx, span, pod) ctx = addPodAttributes(ctx, span, pod)
var delErr error err := pc.provider.DeletePod(ctx, pod.DeepCopy())
if delErr = pc.provider.DeletePod(ctx, pod.DeepCopy()); delErr != nil && !errdefs.IsNotFound(delErr) { if err != nil {
span.SetStatus(delErr) span.SetStatus(err)
return delErr return err
} }
log.G(ctx).Debug("Deleted pod from provider") log.G(ctx).Debug("Deleted pod from provider")
if err := pc.forceDeletePodResource(ctx, namespace, name); err != nil {
span.SetStatus(err)
return err
}
log.G(ctx).Info("Deleted pod from Kubernetes")
return nil return nil
} }
func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, name string) error {
ctx, span := trace.StartSpan(ctx, "forceDeletePodResource")
defer span.End()
ctx = span.WithFields(ctx, log.Fields{
"namespace": namespace,
"name": name,
})
var grace int64
if err := pc.client.Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil {
if errors.IsNotFound(err) {
log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete")
return nil
}
span.SetStatus(err)
return pkgerrors.Wrap(err, "Failed to delete Kubernetes pod")
}
return nil
}
// fetchPodStatusesFromProvider syncs the providers pod status with the kubernetes pod status.
func (pc *PodController) fetchPodStatusesFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
ctx, span := trace.StartSpan(ctx, "fetchPodStatusesFromProvider")
defer span.End()
// Update all the pods with the provider status.
pods, err := pc.podsLister.List(labels.Everything())
if err != nil {
err = pkgerrors.Wrap(err, "error getting pod list from kubernetes")
span.SetStatus(err)
log.G(ctx).WithError(err).Error("Error updating pod statuses")
return
}
ctx = span.WithField(ctx, "nPods", int64(len(pods)))
for _, pod := range pods {
if !shouldSkipPodStatusUpdate(pod) {
enrichedPod, err := pc.fetchPodStatusFromProvider(ctx, q, pod)
if err != nil {
log.G(ctx).WithFields(map[string]interface{}{
"name": pod.Name,
"namespace": pod.Namespace,
}).WithError(err).Error("Could not fetch pod status")
} else if enrichedPod != nil {
pc.enqueuePodStatusUpdate(ctx, q, enrichedPod)
}
}
}
}
// fetchPodStatusFromProvider returns a pod (the pod we pass in) enriched with the pod status from the provider. If the pod is not found,
// and it has been 1 minute since the pod was created, or the pod was previously running, it will be marked as failed.
// If a valid pod status cannot be generated, for example, if a pod is not found in the provider, and it has been less than 1 minute
// since pod creation, we will return nil for the pod.
func (pc *PodController) fetchPodStatusFromProvider(ctx context.Context, q workqueue.RateLimitingInterface, podFromKubernetes *corev1.Pod) (*corev1.Pod, error) {
podStatus, err := pc.provider.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
if errdefs.IsNotFound(err) || (err == nil && podStatus == nil) {
// Only change the status when the pod was already up
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute {
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
podStatus = podFromKubernetes.Status.DeepCopy()
podStatus.Phase = corev1.PodFailed
podStatus.Reason = podStatusReasonNotFound
podStatus.Message = podStatusMessageNotFound
now := metav1.NewTime(time.Now())
for i, c := range podStatus.ContainerStatuses {
if c.State.Running == nil {
continue
}
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: containerStatusExitCodeNotFound,
Reason: containerStatusReasonNotFound,
Message: containerStatusMessageNotFound,
FinishedAt: now,
StartedAt: c.State.Running.StartedAt,
ContainerID: c.ContainerID,
}
podStatus.ContainerStatuses[i].State.Running = nil
}
} else if err != nil {
return nil, nil
}
} else if err != nil {
return nil, err
}
pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status)
return pod, nil
}
func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool { func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodSucceeded || return pod.Status.Phase == corev1.PodSucceeded ||
pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodFailed ||
@@ -308,8 +170,12 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
} }
kPod := obj.(*knownPod) kPod := obj.(*knownPod)
kPod.Lock() kPod.Lock()
podFromProvider := kPod.lastPodStatusReceivedFromProvider podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy()
kPod.Unlock() kPod.Unlock()
// We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating
// the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore
// we need to copy the pod and set ResourceVersion to 0.
podFromProvider.ResourceVersion = "0"
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil { if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil {
span.SetStatus(err) span.SetStatus(err)
@@ -371,3 +237,55 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE
return pc.updatePodStatus(ctx, pod, key) return pc.updatePodStatus(ctx, pod, key)
} }
func (pc *PodController) deletePodHandler(ctx context.Context, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
defer span.End()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
ctx = span.WithFields(ctx, log.Fields{
"namespace": namespace,
"name": name,
})
if err != nil {
// Log the error as a warning, but do not requeue the key as it is invalid.
log.G(ctx).Warn(pkgerrors.Wrapf(err, "invalid resource key: %q", key))
span.SetStatus(err)
return nil
}
defer func() {
if retErr == nil {
if w, ok := pc.provider.(syncWrapper); ok {
w._deletePodKey(ctx, key)
}
}
}()
// If the pod has been deleted from API server, we don't need to do anything.
k8sPod, err := pc.podsLister.Pods(namespace).Get(name)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
span.SetStatus(err)
return err
}
if running(&k8sPod.Status) {
log.G(ctx).Error("Force deleting pod in running state")
}
// We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update
// was in progress,
err = pc.client.Pods(namespace).Delete(name, metav1.NewDeleteOptions(0))
if errors.IsNotFound(err) {
return nil
}
if err != nil {
span.SetStatus(err)
return err
}
return nil
}

View File

@@ -19,14 +19,10 @@ import (
"testing" "testing"
"time" "time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util" testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util"
"gotest.tools/assert" "gotest.tools/assert"
is "gotest.tools/assert/cmp" is "gotest.tools/assert/cmp"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@@ -34,7 +30,7 @@ import (
type TestController struct { type TestController struct {
*PodController *PodController
mock *mockProvider mock *mockProviderAsync
client *fake.Clientset client *fake.Clientset
} }
@@ -51,6 +47,7 @@ func newTestController() *TestController {
resourceManager: rm, resourceManager: rm,
recorder: testutil.FakeEventRecorder(5), recorder: testutil.FakeEventRecorder(5),
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
done: make(chan struct{}), done: make(chan struct{}),
ready: make(chan struct{}), ready: make(chan struct{}),
podsInformer: iFactory.Core().V1().Pods(), podsInformer: iFactory.Core().V1().Pods(),
@@ -70,11 +67,11 @@ func TestPodsEqual(t *testing.T) {
p1 := &corev1.Pod{ p1 := &corev1.Pod{
Spec: corev1.PodSpec{ Spec: corev1.PodSpec{
Containers: []corev1.Container{ Containers: []corev1.Container{
corev1.Container{ {
Name: "nginx", Name: "nginx",
Image: "nginx:1.15.12-perl", Image: "nginx:1.15.12-perl",
Ports: []corev1.ContainerPort{ Ports: []corev1.ContainerPort{
corev1.ContainerPort{ {
ContainerPort: 443, ContainerPort: 443,
Protocol: "tcp", Protocol: "tcp",
}, },
@@ -172,170 +169,14 @@ func TestPodNoSpecChange(t *testing.T) {
assert.Check(t, is.Equal(svr.mock.updates.read(), 0)) assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
} }
func TestPodDelete(t *testing.T) {
type testCase struct {
desc string
delErr error
}
cases := []testCase{
{desc: "no error on delete", delErr: nil},
{desc: "not found error on delete", delErr: errdefs.NotFound("not found")},
{desc: "unknown error on delete", delErr: pkgerrors.New("random error")},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
c := newTestController()
c.mock.errorOnDelete = tc.delErr
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Spec = newPodSpec()
pc := c.client.CoreV1().Pods("default")
p, err := pc.Create(pod)
assert.NilError(t, err)
ctx := context.Background()
err = c.createOrUpdatePod(ctx, p.DeepCopy()) // make sure it's actually created
assert.NilError(t, err)
assert.Check(t, is.Equal(c.mock.creates.read(), 1))
err = c.deletePod(ctx, pod.Namespace, pod.Name)
assert.Equal(t, pkgerrors.Cause(err), err)
var expectDeletes int
if tc.delErr == nil {
expectDeletes = 1
}
assert.Check(t, is.Equal(c.mock.deletes.read(), expectDeletes))
expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr)
_, err = pc.Get(pod.Name, metav1.GetOptions{})
if expectDeleted {
assert.Assert(t, errors.IsNotFound(err))
} else {
assert.NilError(t, err)
}
})
}
}
func TestFetchPodStatusFromProvider(t *testing.T) {
startedAt := metav1.NewTime(time.Now())
finishedAt := metav1.NewTime(startedAt.Add(time.Second * 10))
containerStateRunning := &corev1.ContainerStateRunning{StartedAt: startedAt}
containerStateTerminated := &corev1.ContainerStateTerminated{StartedAt: startedAt, FinishedAt: finishedAt}
containerStateWaiting := &corev1.ContainerStateWaiting{}
testCases := []struct {
desc string
running *corev1.ContainerStateRunning
terminated *corev1.ContainerStateTerminated
waiting *corev1.ContainerStateWaiting
expectedStartedAt metav1.Time
expectedFinishedAt metav1.Time
}{
{desc: "container in running state", running: containerStateRunning, expectedStartedAt: startedAt},
{desc: "container in terminated state", terminated: containerStateTerminated, expectedStartedAt: startedAt, expectedFinishedAt: finishedAt},
{desc: "container in waiting state", waiting: containerStateWaiting},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
c := newTestController()
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Status.Phase = corev1.PodRunning
containerStatus := corev1.ContainerStatus{}
if tc.running != nil {
containerStatus.State.Running = tc.running
} else if tc.terminated != nil {
containerStatus.State.Terminated = tc.terminated
} else if tc.waiting != nil {
containerStatus.State.Waiting = tc.waiting
}
pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus}
pc := c.client.CoreV1().Pods("default")
p, err := pc.Create(pod)
assert.NilError(t, err)
ctx := context.Background()
updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p)
assert.NilError(t, err)
assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed)
assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1))
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil)
// Test cases for running and terminated container state
if tc.running != nil || tc.terminated != nil {
// Ensure that the container is in terminated state and other states are nil
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil)
terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated
assert.Equal(t, terminated.StartedAt, tc.expectedStartedAt)
assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt))
if tc.terminated != nil {
assert.Equal(t, terminated.FinishedAt, tc.expectedFinishedAt)
}
} else {
// Test case for waiting container state
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated == nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting != nil)
}
})
}
}
func TestFetchPodStatusFromProviderWithExpiredPod(t *testing.T) {
c := newTestController()
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Status.Phase = corev1.PodRunning
containerStatus := corev1.ContainerStatus{}
// We should terminate containters in a pod that has not provided pod status update for more than a minute
startedAt := time.Now().Add(-(time.Minute + time.Second))
containerStatus.State.Running = &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(startedAt)}
pod.ObjectMeta.CreationTimestamp.Time = startedAt
pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus}
pc := c.client.CoreV1().Pods("default")
p, err := pc.Create(pod)
assert.NilError(t, err)
ctx := context.Background()
updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p)
assert.NilError(t, err)
assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed)
assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1))
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil)
terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated
assert.Equal(t, terminated.StartedAt, metav1.NewTime(startedAt))
assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt))
}
func newPodSpec() corev1.PodSpec { func newPodSpec() corev1.PodSpec {
return corev1.PodSpec{ return corev1.PodSpec{
Containers: []corev1.Container{ Containers: []corev1.Container{
corev1.Container{ {
Name: "nginx", Name: "nginx",
Image: "nginx:1.15.12", Image: "nginx:1.15.12",
Ports: []corev1.ContainerPort{ Ports: []corev1.ContainerPort{
corev1.ContainerPort{ {
ContainerPort: 443, ContainerPort: 443,
Protocol: "tcp", Protocol: "tcp",
}, },

View File

@@ -17,10 +17,11 @@ package node
import ( import (
"context" "context"
"fmt" "fmt"
"reflect"
"strconv" "strconv"
"sync" "sync"
"time"
"github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors" pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/internal/manager" "github.com/virtual-kubelet/virtual-kubelet/internal/manager"
@@ -49,7 +50,9 @@ type PodLifecycleHandler interface {
// UpdatePod takes a Kubernetes Pod and updates it within the provider. // UpdatePod takes a Kubernetes Pod and updates it within the provider.
UpdatePod(ctx context.Context, pod *corev1.Pod) error UpdatePod(ctx context.Context, pod *corev1.Pod) error
// DeletePod takes a Kubernetes Pod and deletes it from the provider. // DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is
// expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal
// state, as well as the pod. DeletePod may be called multiple times for the same pod.
DeletePod(ctx context.Context, pod *corev1.Pod) error DeletePod(ctx context.Context, pod *corev1.Pod) error
// GetPod retrieves a pod by name from the provider (can be cached). // GetPod retrieves a pod by name from the provider (can be cached).
@@ -71,9 +74,7 @@ type PodLifecycleHandler interface {
GetPods(context.Context) ([]*corev1.Pod, error) GetPods(context.Context) ([]*corev1.Pod, error)
} }
// PodNotifier notifies callers of pod changes. // PodNotifier is used as an extension to PodLifecycleHandler to support async updates of pod statuses.
// Providers should implement this interface to enable callers to be notified
// of pod status updates asynchronously.
type PodNotifier interface { type PodNotifier interface {
// NotifyPods instructs the notifier to call the passed in function when // NotifyPods instructs the notifier to call the passed in function when
// the pod status changes. It should be called when a pod's status changes. // the pod status changes. It should be called when a pod's status changes.
@@ -104,6 +105,9 @@ type PodController struct {
k8sQ workqueue.RateLimitingInterface k8sQ workqueue.RateLimitingInterface
// deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period
deletionQ workqueue.RateLimitingInterface
// From the time of creation, to termination the knownPods map will contain the pods key // From the time of creation, to termination the knownPods map will contain the pods key
// (derived from Kubernetes' cache library) -> a *knownPod struct. // (derived from Kubernetes' cache library) -> a *knownPod struct.
knownPods sync.Map knownPods sync.Map
@@ -130,6 +134,7 @@ type knownPod struct {
// should be immutable to avoid having to hold the lock the entire time you're working with them // should be immutable to avoid having to hold the lock the entire time you're working with them
sync.Mutex sync.Mutex
lastPodStatusReceivedFromProvider *corev1.Pod lastPodStatusReceivedFromProvider *corev1.Pod
lastPodUsed *corev1.Pod
} }
// PodControllerConfig is used to configure a new PodController. // PodControllerConfig is used to configure a new PodController.
@@ -155,6 +160,7 @@ type PodControllerConfig struct {
ServiceInformer corev1informers.ServiceInformer ServiceInformer corev1informers.ServiceInformer
} }
// NewPodController creates a new pod controller with the provided config.
func NewPodController(cfg PodControllerConfig) (*PodController, error) { func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.PodClient == nil { if cfg.PodClient == nil {
return nil, errdefs.InvalidInput("missing core client") return nil, errdefs.InvalidInput("missing core client")
@@ -193,11 +199,17 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
done: make(chan struct{}), done: make(chan struct{}),
recorder: cfg.EventRecorder, recorder: cfg.EventRecorder,
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"), k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"),
} }
return pc, nil return pc, nil
} }
type asyncProvider interface {
PodLifecycleHandler
PodNotifier
}
// Run will set up the event handlers for types we are interested in, as well // Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until the // as syncing informer caches and starting workers. It will block until the
// context is cancelled, at which point it will shutdown the work queue and // context is cancelled, at which point it will shutdown the work queue and
@@ -210,15 +222,32 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
defer func() { defer func() {
pc.k8sQ.ShutDown() pc.k8sQ.ShutDown()
pc.deletionQ.ShutDown()
pc.mu.Lock() pc.mu.Lock()
pc.err = retErr pc.err = retErr
close(pc.done) close(pc.done)
pc.mu.Unlock() pc.mu.Unlock()
}() }()
var provider asyncProvider
runProvider := func(context.Context) {}
if p, ok := pc.provider.(asyncProvider); ok {
provider = p
} else {
wrapped := &syncProviderWrapper{PodLifecycleHandler: pc.provider, l: pc.podsLister}
runProvider = wrapped.run
provider = wrapped
log.G(ctx).Debug("Wrapped non-async provider with async")
}
pc.provider = provider
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider") podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
pc.runSyncFromProvider(ctx, podStatusQueue) provider.NotifyPods(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
})
go runProvider(ctx)
defer podStatusQueue.ShutDown() defer podStatusQueue.ShutDown()
// Wait for the caches to be synced *before* starting to do work. // Wait for the caches to be synced *before* starting to do work.
@@ -241,16 +270,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
// Create a copy of the old and new pod objects so we don't mutate the cache. // Create a copy of the old and new pod objects so we don't mutate the cache.
oldPod := oldObj.(*corev1.Pod).DeepCopy() newPod := newObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod).DeepCopy()
// We want to check if the two objects differ in anything other than their resource versions.
// Hence, we make them equal so that this change isn't picked up by reflect.DeepEqual.
newPod.ResourceVersion = oldPod.ResourceVersion
// Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed.
// This guarantees that we don't attempt to sync the pod every time its .status field is updated.
if reflect.DeepEqual(oldPod.ObjectMeta, newPod.ObjectMeta) && reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
return
}
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod. // At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.G(ctx).Error(err) log.G(ctx).Error(err)
@@ -264,6 +285,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
} else { } else {
pc.knownPods.Delete(key) pc.knownPods.Delete(key)
pc.k8sQ.AddRateLimited(key) pc.k8sQ.AddRateLimited(key)
// If this pod was in the deletion queue, forget about it
pc.deletionQ.Forget(key)
} }
}, },
}) })
@@ -295,6 +318,15 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
}() }()
} }
for id := 0; id < podSyncWorkers; id++ {
wg.Add(1)
workerID := strconv.Itoa(id)
go func() {
defer wg.Done()
pc.runDeletionReconcilationWorker(ctx, workerID, pc.deletionQ)
}()
}
close(pc.ready) close(pc.ready)
log.G(ctx).Info("started workers") log.G(ctx).Info("started workers")
@@ -302,6 +334,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
log.G(ctx).Info("shutting down workers") log.G(ctx).Info("shutting down workers")
pc.k8sQ.ShutDown() pc.k8sQ.ShutDown()
podStatusQueue.ShutDown() podStatusQueue.ShutDown()
pc.deletionQ.ShutDown()
wg.Wait() wg.Wait()
return nil return nil
@@ -353,6 +386,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
// Add the current key as an attribute to the current span. // Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key) ctx = span.WithField(ctx, "key", key)
log.G(ctx).WithField("key", key).Debug("sync handled")
// Convert the namespace/name string into a distinct namespace and name. // Convert the namespace/name string into a distinct namespace and name.
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
@@ -372,35 +406,81 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
span.SetStatus(err) span.SetStatus(err)
return err return err
} }
// At this point we know the Pod resource doesn't exist, which most probably means it was deleted.
// Hence, we must delete it from the provider if it still exists there. pod, err = pc.provider.GetPod(ctx, namespace, name)
if err := pc.deletePod(ctx, namespace, name); err != nil { if err != nil && !errdefs.IsNotFound(err) {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name)) err = pkgerrors.Wrapf(err, "failed to fetch pod with key %q from provider", key)
span.SetStatus(err) span.SetStatus(err)
return err return err
} }
return nil if errdefs.IsNotFound(err) || pod == nil {
return nil
}
err = pc.provider.DeletePod(ctx, pod)
if errdefs.IsNotFound(err) {
return nil
}
if err != nil {
err = pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
span.SetStatus(err)
}
return err
} }
// At this point we know the Pod resource has either been created or updated (which includes being marked for deletion). // At this point we know the Pod resource has either been created or updated (which includes being marked for deletion).
return pc.syncPodInProvider(ctx, pod) return pc.syncPodInProvider(ctx, pod, key)
} }
// syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation. // syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation.
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) error { func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "syncPodInProvider") ctx, span := trace.StartSpan(ctx, "syncPodInProvider")
defer span.End() defer span.End()
// Add the pod's attributes to the current span. // Add the pod's attributes to the current span.
ctx = addPodAttributes(ctx, span, pod) ctx = addPodAttributes(ctx, span, pod)
// If the pod('s containers) is no longer in a running state then we force-delete the pod from API server
// more context is here: https://github.com/virtual-kubelet/virtual-kubelet/pull/760
if pod.DeletionTimestamp != nil && !running(&pod.Status) {
log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running")
pc.deletionQ.Add(key)
return nil
}
obj, ok := pc.knownPods.Load(key)
if !ok {
// That means the pod was deleted while we were working
return nil
}
kPod := obj.(*knownPod)
kPod.Lock()
if kPod.lastPodUsed != nil && podsEffectivelyEqual(kPod.lastPodUsed, pod) {
kPod.Unlock()
return nil
}
kPod.Unlock()
defer func() {
if retErr == nil {
kPod.Lock()
defer kPod.Unlock()
kPod.lastPodUsed = pod
}
}()
// Check whether the pod has been marked for deletion. // Check whether the pod has been marked for deletion.
// If it does, guarantee it is deleted in the provider and Kubernetes. // If it does, guarantee it is deleted in the provider and Kubernetes.
if pod.DeletionTimestamp != nil { if pod.DeletionTimestamp != nil {
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { log.G(ctx).Debug("Deleting pod in provider")
if err := pc.deletePod(ctx, pod); errdefs.IsNotFound(err) {
log.G(ctx).Debug("Pod not found in provider")
} else if err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod)) err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod))
span.SetStatus(err) span.SetStatus(err)
return err return err
} }
pc.deletionQ.AddAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds))
return nil return nil
} }
@@ -419,6 +499,25 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod)
return nil return nil
} }
// runDeletionReconcilationWorker is a long-running function that will continually call the processDeletionReconcilationWorkItem
// function in order to read and process an item on the work queue that is generated by the pod informer.
func (pc *PodController) runDeletionReconcilationWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processDeletionReconcilationWorkItem(ctx, workerID, q) {
}
}
// processDeletionReconcilationWorkItem will read a single work item off the work queue and attempt to process it,by calling the deletionReconcilation.
func (pc *PodController) processDeletionReconcilationWorkItem(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerId", workerID)
return handleQueueItem(ctx, q, pc.deletePodHandler)
}
// deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them. // deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them.
func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) { func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) {
ctx, span := trace.StartSpan(ctx, "deleteDanglingPods") ctx, span := trace.StartSpan(ctx, "deleteDanglingPods")
@@ -474,7 +573,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int
// Add the pod's attributes to the current span. // Add the pod's attributes to the current span.
ctx = addPodAttributes(ctx, span, pod) ctx = addPodAttributes(ctx, span, pod)
// Actually delete the pod. // Actually delete the pod.
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { if err := pc.provider.DeletePod(ctx, pod.DeepCopy()); err != nil && !errdefs.IsNotFound(err) {
span.SetStatus(err) span.SetStatus(err)
log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod)) log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod))
} else { } else {
@@ -503,3 +602,31 @@ func loggablePodName(pod *corev1.Pod) string {
func loggablePodNameFromCoordinates(namespace, name string) string { func loggablePodNameFromCoordinates(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name) return fmt.Sprintf("%s/%s", namespace, name)
} }
// podsEffectivelyEqual compares two pods, and ignores the pod status, and the resource version
func podsEffectivelyEqual(p1, p2 *corev1.Pod) bool {
filterForResourceVersion := func(p cmp.Path) bool {
if p.String() == "ObjectMeta.ResourceVersion" {
return true
}
if p.String() == "Status" {
return true
}
return false
}
return cmp.Equal(p1, p2, cmp.FilterPath(filterForResourceVersion, cmp.Ignore()))
}
// borrowed from https://github.com/kubernetes/kubernetes/blob/f64c631cd7aea58d2552ae2038c1225067d30dde/pkg/kubelet/kubelet_pods.go#L944-L953
// running returns true, unless if every status is terminated or waiting, or the status list
// is empty.
func running(podStatus *corev1.PodStatus) bool {
statuses := podStatus.ContainerStatuses
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return true
}
}
return false
}

View File

@@ -6,6 +6,8 @@ import (
"time" "time"
"gotest.tools/assert" "gotest.tools/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
func TestPodControllerExitOnContextCancel(t *testing.T) { func TestPodControllerExitOnContextCancel(t *testing.T) {
@@ -42,3 +44,49 @@ func TestPodControllerExitOnContextCancel(t *testing.T) {
} }
assert.NilError(t, tc.Err()) assert.NilError(t, tc.Err())
} }
func TestCompareResourceVersion(t *testing.T) {
p1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
}
p2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
},
}
assert.Assert(t, podsEffectivelyEqual(p1, p2))
}
func TestCompareStatus(t *testing.T) {
p1 := &corev1.Pod{
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}
p2 := &corev1.Pod{
Status: corev1.PodStatus{
Phase: corev1.PodFailed,
},
}
assert.Assert(t, podsEffectivelyEqual(p1, p2))
}
func TestCompareLabels(t *testing.T) {
p1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar1",
},
},
}
p2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar2",
},
},
}
assert.Assert(t, !podsEffectivelyEqual(p1, p2))
}

View File

@@ -16,12 +16,10 @@ package node
import ( import (
"context" "context"
"time"
pkgerrors "github.com/pkg/errors" pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace" "github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
@@ -105,40 +103,3 @@ func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID st
return handleQueueItem(ctx, q, pc.podStatusHandler) return handleQueueItem(ctx, q, pc.podStatusHandler)
} }
// providerSyncLoop synchronizes pod states from the provider back to kubernetes
// Deprecated: This is only used when the provider does not support async updates
// Providers should implement async update support, even if it just means copying
// something like this in.
func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) {
const sleepTime = 5 * time.Second
t := time.NewTimer(sleepTime)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
t.Stop()
ctx, span := trace.StartSpan(ctx, "syncActualState")
pc.fetchPodStatusesFromProvider(ctx, q)
span.End()
// restart the timer
t.Reset(sleepTime)
}
}
}
func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
if pn, ok := pc.provider.(PodNotifier); ok {
pn.NotifyPods(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, q, pod.DeepCopy())
})
} else {
go pc.providerSyncLoop(ctx, q)
}
}

215
node/sync.go Normal file
View File

@@ -0,0 +1,215 @@
package node
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)
const (
podStatusReasonNotFound = "NotFound"
podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider"
containerStatusReasonNotFound = "NotFound"
containerStatusMessageNotFound = "Container was not found and was likely deleted"
containerStatusExitCodeNotFound = -137
statusTerminatedReason = "Terminated"
containerStatusTerminatedMessage = "Container was terminated. The exit code may not reflect the real exit code"
)
// syncProviderWrapper wraps a PodLifecycleHandler to give it async-like pod status notification behavior.
type syncProviderWrapper struct {
PodLifecycleHandler
notify func(*corev1.Pod)
l corev1listers.PodLister
// deletedPods makes sure we don't set the "NotFound" status
// for pods which have been requested to be deleted.
// This is needed for our loop which just grabs pod statuses every 5 seconds.
deletedPods sync.Map
}
// This is used to clean up keys for deleted pods after they have been fully deleted in the API server.
type syncWrapper interface {
_deletePodKey(context.Context, string)
}
func (p *syncProviderWrapper) NotifyPods(ctx context.Context, f func(*corev1.Pod)) {
p.notify = f
}
func (p *syncProviderWrapper) _deletePodKey(ctx context.Context, key string) {
log.G(ctx).WithField("key", key).Debug("Cleaning up pod from deletion cache")
p.deletedPods.Delete(key)
}
func (p *syncProviderWrapper) DeletePod(ctx context.Context, pod *corev1.Pod) error {
log.G(ctx).Debug("syncProviderWrappper.DeletePod")
key, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
return err
}
p.deletedPods.Store(key, pod)
if err := p.PodLifecycleHandler.DeletePod(ctx, pod.DeepCopy()); err != nil {
log.G(ctx).WithField("key", key).WithError(err).Debug("Removed key from deleted pods cache")
// We aren't going to actually delete the pod from the provider since there is an error so delete it from our cache,
// otherwise we could end up leaking pods in our deletion cache.
// Delete will be retried by the pod controller.
p.deletedPods.Delete(key)
return err
}
if shouldSkipPodStatusUpdate(pod) {
log.G(ctx).Debug("skipping pod status update for terminated pod")
return nil
}
updated := pod.DeepCopy()
updated.Status.Phase = corev1.PodSucceeded
now := metav1.NewTime(time.Now())
for i, cs := range updated.Status.ContainerStatuses {
updated.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
Reason: statusTerminatedReason,
Message: containerStatusTerminatedMessage,
FinishedAt: now,
}
if cs.State.Running != nil {
updated.Status.ContainerStatuses[i].State.Terminated.StartedAt = cs.State.Running.StartedAt
}
}
updated.Status.Reason = statusTerminatedReason
p.notify(updated)
log.G(ctx).Debug("Notified pod terminal pod status")
return nil
}
func (p *syncProviderWrapper) run(ctx context.Context) {
interval := 5 * time.Second
timer := time.NewTimer(interval)
defer timer.Stop()
if !timer.Stop() {
<-timer.C
}
for {
log.G(ctx).Debug("Pod status update loop start")
timer.Reset(interval)
select {
case <-ctx.Done():
log.G(ctx).WithError(ctx.Err()).Debug("sync wrapper loop exiting")
return
case <-timer.C:
}
p.syncPodStatuses(ctx)
}
}
func (p *syncProviderWrapper) syncPodStatuses(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.syncPodStatuses")
defer span.End()
// Update all the pods with the provider status.
pods, err := p.l.List(labels.Everything())
if err != nil {
err = errors.Wrap(err, "error getting pod list from kubernetes")
span.SetStatus(err)
log.G(ctx).WithError(err).Error("Error updating pod statuses")
return
}
ctx = span.WithField(ctx, "nPods", int64(len(pods)))
for _, pod := range pods {
if shouldSkipPodStatusUpdate(pod) {
log.G(ctx).Debug("Skipping pod status update")
continue
}
if err := p.updatePodStatus(ctx, pod); err != nil {
log.G(ctx).WithFields(map[string]interface{}{
"name": pod.Name,
"namespace": pod.Namespace,
}).WithError(err).Error("Could not fetch pod status")
}
}
}
func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubernetes *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.updatePodStatus")
defer span.End()
ctx = addPodAttributes(ctx, span, podFromKubernetes)
var statusErr error
podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
if err != nil {
if !errdefs.IsNotFound(err) {
span.SetStatus(err)
return err
}
statusErr = err
}
if podStatus != nil {
pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status)
p.notify(pod)
return nil
}
key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes)
if err != nil {
span.SetStatus(err)
return err
}
if _, exists := p.deletedPods.Load(key); exists {
log.G(ctx).Debug("pod is in known deleted state, ignoring")
return nil
}
if podFromKubernetes.Status.Phase != corev1.PodRunning && time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) <= time.Minute {
span.SetStatus(statusErr)
return statusErr
}
// Only change the status when the pod was already up.
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
podStatus = podFromKubernetes.Status.DeepCopy()
podStatus.Phase = corev1.PodFailed
podStatus.Reason = podStatusReasonNotFound
podStatus.Message = podStatusMessageNotFound
now := metav1.NewTime(time.Now())
for i, c := range podStatus.ContainerStatuses {
if c.State.Running == nil {
continue
}
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: containerStatusExitCodeNotFound,
Reason: containerStatusReasonNotFound,
Message: containerStatusMessageNotFound,
FinishedAt: now,
StartedAt: c.State.Running.StartedAt,
ContainerID: c.ContainerID,
}
podStatus.ContainerStatuses[i].State.Running = nil
}
log.G(ctx).Debug("Setting pod not found on pod status")
pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status)
p.notify(pod)
return nil
}

View File

@@ -140,7 +140,7 @@ func (l *logger) Debug(args ...interface{}) {
} }
func (l *logger) Debugf(f string, args ...interface{}) { func (l *logger) Debugf(f string, args ...interface{}) {
l.l.Debugf(f, args) l.l.Debugf(f, args...)
l.s.Annotatef(withLevel(lDebug, l.a), f, args...) l.s.Annotatef(withLevel(lDebug, l.a), f, args...)
} }
@@ -156,7 +156,7 @@ func (l *logger) Info(args ...interface{}) {
} }
func (l *logger) Infof(f string, args ...interface{}) { func (l *logger) Infof(f string, args ...interface{}) {
l.l.Infof(f, args) l.l.Infof(f, args...)
l.s.Annotatef(withLevel(lInfo, l.a), f, args...) l.s.Annotatef(withLevel(lInfo, l.a), f, args...)
} }
@@ -172,7 +172,7 @@ func (l *logger) Warn(args ...interface{}) {
} }
func (l *logger) Warnf(f string, args ...interface{}) { func (l *logger) Warnf(f string, args ...interface{}) {
l.l.Warnf(f, args) l.l.Warnf(f, args...)
l.s.Annotatef(withLevel(lWarn, l.a), f, args...) l.s.Annotatef(withLevel(lWarn, l.a), f, args...)
} }
@@ -188,7 +188,7 @@ func (l *logger) Error(args ...interface{}) {
} }
func (l *logger) Errorf(f string, args ...interface{}) { func (l *logger) Errorf(f string, args ...interface{}) {
l.l.Errorf(f, args) l.l.Errorf(f, args...)
l.s.Annotatef(withLevel(lErr, l.a), f, args...) l.s.Annotatef(withLevel(lErr, l.a), f, args...)
} }
@@ -205,7 +205,7 @@ func (l *logger) Fatal(args ...interface{}) {
func (l *logger) Fatalf(f string, args ...interface{}) { func (l *logger) Fatalf(f string, args ...interface{}) {
l.s.Annotatef(withLevel(lFatal, l.a), f, args...) l.s.Annotatef(withLevel(lFatal, l.a), f, args...)
l.l.Fatalf(f, args) l.l.Fatalf(f, args...)
} }
func (l *logger) WithError(err error) log.Logger { func (l *logger) WithError(err error) log.Logger {