Compare commits

..

3 Commits

Author SHA1 Message Date
Brian Goff
50f360d798 Merge pull request #795 from cpuguy83/cherry_picks_1.1.1
Cherry picks 1.1.1
2019-12-02 11:27:21 -08:00
Thomas Hartland
64873c0816 Add test for node ping interval
(cherry picked from commit 3783a39b26)
2019-11-15 14:33:02 -08:00
Thomas Hartland
c660940a7b After handling status update, reset update timer with correct duration
If the ping timer is being used, it should be reset with the ping update
interval. If the status update interval is used then Ping stops being
called for long enough to cause kubernetes to mark the node as NotReady.

(cherry picked from commit c258614d8f)
2019-11-15 14:32:56 -08:00
18 changed files with 701 additions and 789 deletions

View File

@@ -11,7 +11,3 @@ linters:
- golint
- goconst
- goimports
- unused
- varcheck
- deadcode
- misspell

View File

@@ -1,12 +1,3 @@
## Virtual Kubelet adopters
* Microsoft Azure
* AWS
* Alibaba
* VMWare
* Netflix
* Hashi Corp
Since end-users are specific per provider within VK we have many end-user customers that we don't have permission to list publically. Please contact ribhatia@microsoft.com for more informtation.
Are you currently using Virtual Kubelet in production? Please let us know by adding your company name and a description of your use case to this document!
Are you currently using Virtual Kubelet in production? Please let us know by adding your company name and a description of your use case to this document!

View File

@@ -99,6 +99,10 @@ You can find detailed instructions on how to set it up and how to test it in the
The Azure connector can use a configuration file specified by the `--provider-config` flag.
The config file is in TOML format, and an example lives in `providers/azure/example.toml`.
#### More Details
See the [ACI Readme](https://github.com/virtual-kubelet/alibabacloud-eci/blob/master/eci.toml)
### AWS Fargate Provider
[AWS Fargate](https://aws.amazon.com/fargate/) is a technology that allows you to run containers
@@ -110,7 +114,7 @@ IP addresses to connect to the internet, private IP addresses to connect to your
security groups, IAM roles, CloudWatch Logs and many other AWS services. Pods on Fargate can
co-exist with pods on regular worker nodes in the same Kubernetes cluster.
Easy instructions and a sample configuration file is available in the [AWS Fargate provider documentation](https://github.com/virtual-kubelet/aws-fargate). Please note that this provider is not currently supported.
Easy instructions and a sample configuration file is available in the [AWS Fargate provider documentation](https://github.com/virtual-kubelet/aws-fargate/blob/master/README.md).
### HashiCorp Nomad Provider
@@ -276,10 +280,8 @@ Enable the ServiceNodeExclusion flag, by modifying the Controller Manager manife
Virtual Kubelet follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).
Sign the [CNCF CLA](https://github.com/kubernetes/community/blob/master/CLA.md) to be able to make Pull Requests to this repo.
Bi-weekly Virtual Kubelet Architecture meetings are held at 11am PST every other Wednesday in this [zoom meeting room](https://zoom.us/j/245165908). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ).
Bi-weekly Virtual Kubelet Architecture meetings are held at 11am PST in this [zoom meeting room](https://zoom.us/j/245165908). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ).
Our google drive with design specifications and meeting notes are [here](https://drive.google.com/drive/folders/19Ndu11WBCCBDowo9CrrGUHoIfd2L8Ueg?usp=sharing).
We also have a community slack channel named virtual-kubelet in the Kubernetes slack.

View File

@@ -29,7 +29,7 @@ var (
)
// TracingExporterInitFunc is the function that is called to initialize an exporter.
// This is used when registering an exporter and called when a user specified they want to use the exporter.
// This is used when registering an exporter and called when a user specifed they want to use the exporter.
type TracingExporterInitFunc func(TracingExporterOptions) (trace.Exporter, error)
// RegisterTracingExporter registers a tracing exporter.

View File

@@ -41,8 +41,8 @@ var (
)
*/
// MockProvider implements the virtual-kubelet provider interface and stores pods in memory.
type MockProvider struct { // nolint:golint
// MockV0Provider implements the virtual-kubelet provider interface and stores pods in memory.
type MockV0Provider struct { //nolint:golint
nodeName string
operatingSystem string
internalIP string
@@ -53,6 +53,11 @@ type MockProvider struct { // nolint:golint
notifier func(*v1.Pod)
}
// MockProvider is like MockV0Provider, but implements the PodNotifier interface
type MockProvider struct { //nolint:golint
*MockV0Provider
}
// MockConfig contains a mock virtual-kubelet's configurable parameters.
type MockConfig struct { //nolint:golint
CPU string `json:"cpu,omitempty"`
@@ -61,7 +66,7 @@ type MockConfig struct { //nolint:golint
}
// NewMockProviderMockConfig creates a new MockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
func NewMockV0ProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) {
//set defaults
if config.CPU == "" {
config.CPU = defaultCPUCapacity
@@ -72,7 +77,7 @@ func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem stri
if config.Pods == "" {
config.Pods = defaultPodCapacity
}
provider := MockProvider{
provider := MockV0Provider{
nodeName: nodeName,
operatingSystem: operatingSystem,
internalIP: internalIP,
@@ -80,11 +85,32 @@ func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem stri
pods: make(map[string]*v1.Pod),
config: config,
startTime: time.Now(),
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
notifier: func(*v1.Pod) {},
}
return &provider, nil
}
// NewMockV0Provider creates a new MockV0Provider
func NewMockV0Provider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockV0Provider, error) {
config, err := loadConfig(providerConfig, nodeName)
if err != nil {
return nil, err
}
return NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
}
// NewMockProviderMockConfig creates a new MockProvider with the given config
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
p, err := NewMockV0ProviderMockConfig(config, nodeName, operatingSystem, internalIP, daemonEndpointPort)
return &MockProvider{MockV0Provider: p}, err
}
// NewMockProvider creates a new MockProvider, which implements the PodNotifier interface
func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
config, err := loadConfig(providerConfig, nodeName)
@@ -132,7 +158,7 @@ func loadConfig(providerConfig, nodeName string) (config MockConfig, err error)
}
// CreatePod accepts a Pod definition and stores it in memory.
func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
func (p *MockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
ctx, span := trace.StartSpan(ctx, "CreatePod")
defer span.End()
@@ -189,7 +215,7 @@ func (p *MockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
}
// UpdatePod accepts a Pod definition and updates its reference.
func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
func (p *MockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
ctx, span := trace.StartSpan(ctx, "UpdatePod")
defer span.End()
@@ -210,7 +236,7 @@ func (p *MockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
}
// DeletePod deletes the specified pod out of memory.
func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
func (p *MockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
ctx, span := trace.StartSpan(ctx, "DeletePod")
defer span.End()
@@ -251,7 +277,7 @@ func (p *MockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
}
// GetPod returns a pod by name that is stored in memory.
func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
func (p *MockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
ctx, span := trace.StartSpan(ctx, "GetPod")
defer func() {
span.SetStatus(err)
@@ -275,7 +301,7 @@ func (p *MockProvider) GetPod(ctx context.Context, namespace, name string) (pod
}
// GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
func (p *MockV0Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
ctx, span := trace.StartSpan(ctx, "GetContainerLogs")
defer span.End()
@@ -288,14 +314,14 @@ func (p *MockProvider) GetContainerLogs(ctx context.Context, namespace, podName,
// RunInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {
func (p *MockV0Provider) RunInContainer(ctx context.Context, namespace, name, container string, cmd []string, attach api.AttachIO) error {
log.G(context.TODO()).Infof("receive ExecInContainer %q", container)
return nil
}
// GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found.
func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
func (p *MockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
ctx, span := trace.StartSpan(ctx, "GetPodStatus")
defer span.End()
@@ -313,7 +339,7 @@ func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string)
}
// GetPods returns a list of all pods known to be "running".
func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
func (p *MockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
ctx, span := trace.StartSpan(ctx, "GetPods")
defer span.End()
@@ -328,7 +354,7 @@ func (p *MockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
return pods, nil
}
func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) {
func (p *MockV0Provider) ConfigureNode(ctx context.Context, n *v1.Node) {
ctx, span := trace.StartSpan(ctx, "mock.ConfigureNode") //nolint:ineffassign
defer span.End()
@@ -347,7 +373,7 @@ func (p *MockProvider) ConfigureNode(ctx context.Context, n *v1.Node) {
}
// Capacity returns a resource list containing the capacity limits.
func (p *MockProvider) capacity() v1.ResourceList {
func (p *MockV0Provider) capacity() v1.ResourceList {
return v1.ResourceList{
"cpu": resource.MustParse(p.config.CPU),
"memory": resource.MustParse(p.config.Memory),
@@ -357,7 +383,7 @@ func (p *MockProvider) capacity() v1.ResourceList {
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
// within Kubernetes.
func (p *MockProvider) nodeConditions() []v1.NodeCondition {
func (p *MockV0Provider) nodeConditions() []v1.NodeCondition {
// TODO: Make this configurable
return []v1.NodeCondition{
{
@@ -406,7 +432,7 @@ func (p *MockProvider) nodeConditions() []v1.NodeCondition {
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *MockProvider) nodeAddresses() []v1.NodeAddress {
func (p *MockV0Provider) nodeAddresses() []v1.NodeAddress {
return []v1.NodeAddress{
{
Type: "InternalIP",
@@ -417,7 +443,7 @@ func (p *MockProvider) nodeAddresses() []v1.NodeAddress {
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *MockProvider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
func (p *MockV0Provider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
return v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort,
@@ -426,7 +452,7 @@ func (p *MockProvider) nodeDaemonEndpoints() v1.NodeDaemonEndpoints {
}
// GetStatsSummary returns dummy stats for all pods known by this provider.
func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
func (p *MockV0Provider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
var span trace.Span
ctx, span = trace.StartSpan(ctx, "GetStatsSummary") //nolint: ineffassign
defer span.End()

View File

@@ -15,4 +15,14 @@ func registerMock(s *provider.Store) {
cfg.DaemonPort,
)
})
s.Register("mockV0", func(cfg provider.InitConfig) (provider.Provider, error) { //nolint:errcheck
return mock.NewMockProvider(
cfg.ConfigPath,
cfg.NodeName,
cfg.OperatingSystem,
cfg.InternalIP,
cfg.DaemonPort,
)
})
}

1
go.mod
View File

@@ -5,6 +5,7 @@ go 1.12
require (
contrib.go.opencensus.io/exporter/jaeger v0.1.0
contrib.go.opencensus.io/exporter/ocagent v0.4.12
github.com/davecgh/go-spew v1.1.1
github.com/docker/spdystream v0.0.0-20170912183627-bc6354cbbc29 // indirect
github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 // indirect

View File

@@ -109,6 +109,7 @@ func getExecOptions(req *http.Request) (*remotecommand.Options, error) {
type containerExecContext struct {
h ContainerExecHandlerFunc
eio *execIO
namespace, pod, container string
ctx context.Context
}

View File

@@ -663,7 +663,7 @@ func TestEnvFromConfigMapAndSecretWithInvalidKeys(t *testing.T) {
}
// TestEnvOverridesEnvFrom populates the environment of a container from a configmap, and from another configmap's key with a "conflicting" key.
// Then, it checks that the value of the "conflicting" key has been correctly overridden.
// Then, it checks that the value of the "conflicting" key has been correctly overriden.
func TestEnvOverridesEnvFrom(t *testing.T) {
rm := testutil.FakeResourceManager(configMap3)
er := testutil.FakeEventRecorder(defaultEventRecorderBufferSize)
@@ -672,7 +672,7 @@ func TestEnvOverridesEnvFrom(t *testing.T) {
override := "__override__"
// Create a pod object having a single container.
// The container's environment is to be populated from a configmap, and later overridden with a value provided directly.
// The container's environment is to be populated from a configmap, and later overriden with a value provided directly.
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,

View File

@@ -94,7 +94,8 @@ func TestPodLifecycle(t *testing.T) {
ctx = log.WithLogger(ctx, log.L)
// isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_
// isPodDeletedPermanentlyFunc is a condition func that waits until the pod is _deleted_, which is the VK's
// action when the pod is deleted from the provider
isPodDeletedPermanentlyFunc := func(ctx context.Context, watcher watch.Interface) error {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
log.G(ctx).WithField("event", ev).Info("got event")
@@ -106,121 +107,132 @@ func TestPodLifecycle(t *testing.T) {
return watchErr
}
// isPodDeletedGracefullyFunc is a condition func that waits until the pod is in a terminal state, which is the VK's
// action when the pod is deleted from the provider.
isPodDeletedGracefullyFunc := func(ctx context.Context, watcher watch.Interface) error {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher, func(ev watch.Event) (bool, error) {
log.G(ctx).WithField("event", ev).Info("got event")
pod := ev.Object.(*corev1.Pod)
return (pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded) && (pod.Status.Reason == mockProviderPodDeletedReason || pod.Status.Reason == statusTerminatedReason), nil
// createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting
// it gracefully
t.Run("createStartDeleteScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockProvider(), func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
}))
})
return watchErr
}
envs := map[string]func(*testing.T) testingProvider{
"Async": func(t *testing.T) testingProvider {
return newMockProvider()
},
"Sync": func(t *testing.T) testingProvider {
if testing.Short() {
t.Skip()
}
return newSyncMockProvider()
},
}
for env, h := range envs {
t.Run(env, func(t *testing.T) {
// createStartDeleteScenario tests the basic flow of creating a pod, waiting for it to start, and deleting
// it gracefully.
t.Run("createStartDeleteScenario", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, h(t), func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedGracefullyFunc, true)
}))
})
// createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider
// for some reason.
t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) {
mp := h(t)
mp.setErrorOnDelete(errdefs.NotFound("not found"))
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc, false)
}))
})
// createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the
// provider.
t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) {
mp := h(t)
deletionFunc := func(ctx context.Context, watcher watch.Interface) error {
return mp.getAttemptedDeletes().until(ctx, func(v int) bool { return v >= 2 })
}
mp.setErrorOnDelete(errors.New("random error"))
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false)
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
assert.NilError(t, err)
assert.Assert(t, is.Len(pods.Items, 1))
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
}))
})
// danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting,
// and ensures the pod controller deletes the pod prior to continuing.
t.Run("danglingPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenario(ctx, t, s, mp)
}))
})
})
// testDanglingPodScenarioWithDeletionTimestamp tests if a pod is created in the provider and on api server it had
// deletiontimestamp set. It ensures deletion occurs.
t.Run("testDanglingPodScenarioWithDeletionTimestamp", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenarioWithDeletionTimestamp(ctx, t, s, mp)
}))
})
})
// failedPodScenario ensures that the VK ignores failed pods that were failed prior to the pod controller starting up.
t.Run("failedPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testFailedPodScenario(ctx, t, s)
}))
})
})
// succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the pod controller starting up.
t.Run("succeededPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testSucceededPodScenario(ctx, t, s)
}))
})
})
// updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated
// to the provider.
t.Run("updatePodWhileRunningScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := h(t)
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testUpdatePodWhileRunningScenario(ctx, t, s, mp)
}))
})
})
if testing.Short() {
return
}
t.Run("mockV0Provider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
}))
})
}
})
// createStartDeleteScenarioWithDeletionErrorNotFound tests the flow if the pod was not found in the provider
// for some reason
t.Run("createStartDeleteScenarioWithDeletionErrorNotFound", func(t *testing.T) {
mp := newMockProvider()
mp.errorOnDelete = errdefs.NotFound("not found")
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, isPodDeletedPermanentlyFunc)
}))
})
// createStartDeleteScenarioWithDeletionRandomError tests the flow if the pod was unable to be deleted in the
// provider
t.Run("createStartDeleteScenarioWithDeletionRandomError", func(t *testing.T) {
mp := newMockProvider()
deletionFunc := func(ctx context.Context, watcher watch.Interface) error {
return mp.attemptedDeletes.until(ctx, func(v int) bool { return v >= 2 })
}
mp.errorOnDelete = errors.New("random error")
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, deletionFunc)
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
assert.NilError(t, err)
assert.Assert(t, is.Len(pods.Items, 1))
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
}))
})
// danglingPodScenario tests if a pod is created in the provider prior to the pod controller starting,
// and ensures the pod controller deletes the pod prior to continuing.
t.Run("danglingPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenario(ctx, t, s, mp.mockV0Provider)
}))
})
if testing.Short() {
return
}
t.Run("mockV0Provider", func(t *testing.T) {
mp := newMockV0Provider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testDanglingPodScenario(ctx, t, s, mp)
}))
})
})
// failedPodScenario ensures that the VK ignores failed pods that were failed prior to the PC starting up
t.Run("failedPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testFailedPodScenario(ctx, t, s)
}))
})
if testing.Short() {
return
}
t.Run("mockV0Provider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
testFailedPodScenario(ctx, t, s)
}))
})
})
// succeededPodScenario ensures that the VK ignores succeeded pods that were succeeded prior to the PC starting up.
t.Run("succeededPodScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testSucceededPodScenario(ctx, t, s)
}))
})
if testing.Short() {
return
}
t.Run("mockV0Provider", func(t *testing.T) {
assert.NilError(t, wireUpSystem(ctx, newMockV0Provider(), func(ctx context.Context, s *system) {
testSucceededPodScenario(ctx, t, s)
}))
})
})
// updatePodWhileRunningScenario updates a pod while the VK is running to ensure the update is propagated
// to the provider
t.Run("updatePodWhileRunningScenario", func(t *testing.T) {
t.Run("mockProvider", func(t *testing.T) {
mp := newMockProvider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testUpdatePodWhileRunningScenario(ctx, t, s, mp)
}))
})
})
// podStatusMissingWhileRunningScenario waits for the pod to go into the running state, with a V0 style provider,
// and then makes the pod disappear!
t.Run("podStatusMissingWhileRunningScenario", func(t *testing.T) {
mp := newMockV0Provider()
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testPodStatusMissingWhileRunningScenario(ctx, t, s, mp)
}))
})
}
type testFunction func(ctx context.Context, s *system)
@@ -346,7 +358,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
assert.DeepEqual(t, p1, p2)
}
func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m testingProvider) {
func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) {
t.Parallel()
pod := newPod()
@@ -355,54 +367,12 @@ func testDanglingPodScenario(ctx context.Context, t *testing.T, s *system, m tes
// Start the pod controller
assert.NilError(t, s.start(ctx))
assert.Assert(t, is.Equal(m.getDeletes().read(), 1))
assert.Assert(t, is.Equal(m.deletes.read(), 1))
}
func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testing.T, s *system, m testingProvider) {
t.Parallel()
func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error) {
pod := newPod()
listOptions := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.ObjectMeta.Name).String(),
}
// Setup a watch (prior to pod creation, and pod controller startup)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
assert.NilError(t, m.CreatePod(ctx, pod))
// Create the Pod
podCopyWithDeletionTimestamp := pod.DeepCopy()
var deletionGracePeriod int64 = 10
podCopyWithDeletionTimestamp.DeletionGracePeriodSeconds = &deletionGracePeriod
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
podCopyWithDeletionTimestamp.DeletionTimestamp = &deletionTimestamp
_, e := s.client.CoreV1().Pods(testNamespace).Create(podCopyWithDeletionTimestamp)
assert.NilError(t, e)
// Start the pod controller
assert.NilError(t, s.start(ctx))
watchErrCh := make(chan error)
go func() {
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
func(ev watch.Event) (bool, error) {
return ev.Type == watch.Deleted, nil
})
watchErrCh <- watchErr
}()
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-watchErrCh:
assert.NilError(t, err)
}
}
func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system, waitFunction func(ctx context.Context, watch watch.Interface) error, waitForPermanentDeletion bool) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -470,18 +440,6 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
assert.NilError(t, err)
}
// Setup a watch to look for the pod eventually going away completely
watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher2.Stop()
waitDeleteCh := make(chan error)
go func() {
_, watchDeleteErr := watchutils.UntilWithoutRetry(ctx, watcher2, func(ev watch.Event) (bool, error) {
return ev.Type == watch.Deleted, nil
})
waitDeleteCh <- watchDeleteErr
}()
// Setup a watch prior to pod deletion
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
@@ -496,7 +454,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{})
assert.NilError(t, err)
// 2. Set the pod's deletion timestamp, version, and so on
var deletionGracePeriod int64 = 10
var deletionGracePeriod int64 = 30
currentPod.DeletionGracePeriodSeconds = &deletionGracePeriod
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
currentPod.DeletionTimestamp = &deletionTimestamp
@@ -510,18 +468,9 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
case err = <-watchErrCh:
assert.NilError(t, err)
}
if waitForPermanentDeletion {
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case err = <-waitDeleteCh:
assert.NilError(t, err)
}
}
}
func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m testingProvider) {
func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockProvider) {
t.Parallel()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -578,7 +527,88 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
log.G(ctx).WithField("pod", p).Info("Updating pod")
_, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
assert.NilError(t, err)
assert.NilError(t, m.getUpdates().until(ctx, func(v int) bool { return v > 0 }))
assert.NilError(t, m.updates.until(ctx, func(v int) bool { return v > 0 }))
}
func testPodStatusMissingWhileRunningScenario(ctx context.Context, t *testing.T, s *system, m *mockV0Provider) {
t.Parallel()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
p := newPod()
key, err := buildKey(p)
assert.NilError(t, err)
listOptions := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", p.ObjectMeta.Name).String(),
}
watchErrCh := make(chan error)
// Create a Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(p)
assert.NilError(t, e)
// Setup a watch to check if the pod is in running
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
// Wait for the pod to be started
func(ev watch.Event) (bool, error) {
pod := ev.Object.(*corev1.Pod)
return pod.Status.Phase == corev1.PodRunning, nil
})
// This deepcopy is required to please the race detector
p = newPod.Object.(*corev1.Pod).DeepCopy()
watchErrCh <- watchErr
}()
// Start the pod controller
assert.NilError(t, s.start(ctx))
// Wait for pod to be in running
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case <-s.pc.Done():
assert.NilError(t, s.pc.Err())
t.Fatal("Pod controller exited prematurely without error")
case err = <-watchErrCh:
assert.NilError(t, err)
}
// Setup a watch to check if the pod is in failed due to provider issues
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
newPod, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
// Wait for the pod to be failed
func(ev watch.Event) (bool, error) {
pod := ev.Object.(*corev1.Pod)
return pod.Status.Phase == corev1.PodFailed, nil
})
// This deepcopy is required to please the race detector
p = newPod.Object.(*corev1.Pod).DeepCopy()
watchErrCh <- watchErr
}()
// delete the pod from the mock provider
m.pods.Delete(key)
select {
case <-ctx.Done():
t.Fatalf("Context ended early: %s", ctx.Err().Error())
case <-s.pc.Done():
assert.NilError(t, s.pc.Err())
t.Fatal("Pod controller exited prematurely without error")
case err = <-watchErrCh:
assert.NilError(t, err)
}
assert.Equal(t, p.Status.Reason, podStatusReasonNotFound)
}
func BenchmarkCreatePods(b *testing.B) {
@@ -621,7 +651,6 @@ func randomizeName(pod *corev1.Pod) {
}
func newPod(podmodifiers ...podModifier) *corev1.Pod {
var terminationGracePeriodSeconds int64 = 30
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
@@ -634,14 +663,7 @@ func newPod(podmodifiers ...podModifier) *corev1.Pod {
ResourceVersion: "100",
},
Spec: corev1.PodSpec{
NodeName: testNodeName,
TerminationGracePeriodSeconds: &terminationGracePeriodSeconds,
Containers: []corev1.Container{
{
Name: "my-container",
Image: "nginx",
},
},
NodeName: testNodeName,
},
Status: corev1.PodStatus{
Phase: corev1.PodPending,

View File

@@ -12,12 +12,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
mockProviderPodDeletedReason = "MockProviderPodDeleted"
)
var (
_ PodLifecycleHandler = (*mockProvider)(nil)
_ PodLifecycleHandler = (*mockV0Provider)(nil)
_ PodNotifier = (*mockProvider)(nil)
)
type waitableInt struct {
@@ -65,7 +62,7 @@ func (w *waitableInt) increment() {
w.cond.Broadcast()
}
type mockProvider struct {
type mockV0Provider struct {
creates *waitableInt
updates *waitableInt
deletes *waitableInt
@@ -78,35 +75,40 @@ type mockProvider struct {
realNotifier func(*v1.Pod)
}
// newMockProvider creates a new mockProvider.
func newMockProvider() *mockProviderAsync {
provider := newSyncMockProvider()
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
return &mockProviderAsync{provider}
type mockProvider struct {
*mockV0Provider
}
func newSyncMockProvider() *mockProvider {
provider := mockProvider{
// NewMockProviderMockConfig creates a new mockV0Provider. Mock legacy provider does not implement the new asynchronous podnotifier interface
func newMockV0Provider() *mockV0Provider {
provider := mockV0Provider{
startTime: time.Now(),
creates: newWaitableInt(),
updates: newWaitableInt(),
deletes: newWaitableInt(),
attemptedDeletes: newWaitableInt(),
}
// By default notifier is set to a function which is a no-op. In the event we've implemented the PodNotifier interface,
// it will be set, and then we'll call a real underlying implementation.
// This makes it easier in the sense we don't need to wrap each method.
return &provider
}
// NewMockProviderMockConfig creates a new MockProvider with the given config
func newMockProvider() *mockProvider {
return &mockProvider{mockV0Provider: newMockV0Provider()}
}
// notifier calls the callback that we got from the pod controller to notify it of updates (if it is set)
func (p *mockProvider) notifier(pod *v1.Pod) {
func (p *mockV0Provider) notifier(pod *v1.Pod) {
if p.realNotifier != nil {
p.realNotifier(pod)
}
}
// CreatePod accepts a Pod definition and stores it in memory.
func (p *mockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
func (p *mockV0Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
log.G(ctx).Infof("receive CreatePod %q", pod.Name)
p.creates.increment()
@@ -158,7 +160,7 @@ func (p *mockProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
}
// UpdatePod accepts a Pod definition and updates its reference.
func (p *mockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
func (p *mockV0Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
log.G(ctx).Infof("receive UpdatePod %q", pod.Name)
p.updates.increment()
@@ -175,23 +177,19 @@ func (p *mockProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
// DeletePod deletes the specified pod out of memory. The PodController deepcopies the pod object
// for us, so we don't have to worry about mutation.
func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
func (p *mockV0Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
log.G(ctx).Infof("receive DeletePod %q", pod.Name)
p.attemptedDeletes.increment()
key, err := buildKey(pod)
if err != nil {
return err
}
if errdefs.IsNotFound(p.errorOnDelete) {
p.pods.Delete(key)
}
if p.errorOnDelete != nil {
return p.errorOnDelete
}
p.deletes.increment()
key, err := buildKey(pod)
if err != nil {
return err
}
if _, exists := p.pods.Load(key); !exists {
return errdefs.NotFound("pod not found")
@@ -200,7 +198,7 @@ func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
now := metav1.Now()
pod.Status.Phase = v1.PodSucceeded
pod.Status.Reason = mockProviderPodDeletedReason
pod.Status.Reason = "MockProviderPodDeleted"
for idx := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[idx].Ready = false
@@ -215,13 +213,21 @@ func (p *mockProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
}
p.notifier(pod)
p.pods.Delete(key)
p.pods.Store(key, pod)
if pod.DeletionGracePeriodSeconds == nil || *pod.DeletionGracePeriodSeconds == 0 {
p.pods.Delete(key)
} else {
time.AfterFunc(time.Duration(*pod.DeletionGracePeriodSeconds)*time.Second, func() {
p.pods.Delete(key)
})
}
return nil
}
// GetPod returns a pod by name that is stored in memory.
func (p *mockProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
func (p *mockV0Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
log.G(ctx).Infof("receive GetPod %q", name)
key, err := buildKeyFromNames(namespace, name)
@@ -237,7 +243,7 @@ func (p *mockProvider) GetPod(ctx context.Context, namespace, name string) (pod
// GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found.
func (p *mockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
func (p *mockV0Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
log.G(ctx).Infof("receive GetPodStatus %q", name)
pod, err := p.GetPod(ctx, namespace, name)
@@ -249,7 +255,7 @@ func (p *mockProvider) GetPodStatus(ctx context.Context, namespace, name string)
}
// GetPods returns a list of all pods known to be "running".
func (p *mockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
func (p *mockV0Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
log.G(ctx).Info("receive GetPods")
var pods []*v1.Pod
@@ -262,24 +268,10 @@ func (p *mockProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
return pods, nil
}
func (p *mockProvider) setErrorOnDelete(err error) {
p.errorOnDelete = err
}
func (p *mockProvider) getAttemptedDeletes() *waitableInt {
return p.attemptedDeletes
}
func (p *mockProvider) getCreates() *waitableInt {
return p.creates
}
func (p *mockProvider) getDeletes() *waitableInt {
return p.deletes
}
func (p *mockProvider) getUpdates() *waitableInt {
return p.updates
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
// within the provider.
func (p *mockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
p.realNotifier = notifier
}
func buildKeyFromNames(namespace string, name string) (string, error) {
@@ -298,22 +290,3 @@ func buildKey(pod *v1.Pod) (string, error) {
return buildKeyFromNames(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
}
type mockProviderAsync struct {
*mockProvider
}
// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
// within the provider.
func (p *mockProviderAsync) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
p.realNotifier = notifier
}
type testingProvider interface {
PodLifecycleHandler
setErrorOnDelete(error)
getAttemptedDeletes() *waitableInt
getDeletes() *waitableInt
getCreates() *waitableInt
getUpdates() *waitableInt
}

View File

@@ -16,20 +16,30 @@ package node
import (
"context"
"hash/fnv"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const (
podStatusReasonProviderFailed = "ProviderFailed"
podStatusReasonProviderFailed = "ProviderFailed"
podStatusReasonNotFound = "NotFound"
podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider"
containerStatusReasonNotFound = "NotFound"
containerStatusMessageNotFound = "Container was not found and was likely deleted"
containerStatusExitCodeNotFound = -137
)
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
@@ -107,6 +117,20 @@ func podsEqual(pod1, pod2 *corev1.Pod) bool {
}
// This is basically the kube runtime's hash container functionality.
// VK only operates at the Pod level so this is adapted for that
func hashPodSpec(spec corev1.PodSpec) uint64 {
hash := fnv.New32a()
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}
printer.Fprintf(hash, "%#v", spec)
return uint64(hash.Sum32())
}
func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) {
podPhase := corev1.PodPending
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
@@ -132,22 +156,136 @@ func (pc *PodController) handleProviderError(ctx context.Context, span trace.Spa
span.SetStatus(origErr)
}
func (pc *PodController) deletePod(ctx context.Context, pod *corev1.Pod) error {
func (pc *PodController) deletePod(ctx context.Context, namespace, name string) error {
ctx, span := trace.StartSpan(ctx, "deletePod")
defer span.End()
pod, err := pc.provider.GetPod(ctx, namespace, name)
if err != nil {
if errdefs.IsNotFound(err) {
// The provider is not aware of the pod, but we must still delete the Kubernetes API resource.
return pc.forceDeletePodResource(ctx, namespace, name)
}
return err
}
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
if pod == nil {
// The provider is not aware of the pod, but we must still delete the Kubernetes API resource.
return pc.forceDeletePodResource(ctx, namespace, name)
}
ctx = addPodAttributes(ctx, span, pod)
err := pc.provider.DeletePod(ctx, pod.DeepCopy())
if err != nil {
span.SetStatus(err)
return err
var delErr error
if delErr = pc.provider.DeletePod(ctx, pod.DeepCopy()); delErr != nil && !errdefs.IsNotFound(delErr) {
span.SetStatus(delErr)
return delErr
}
log.G(ctx).Debug("Deleted pod from provider")
if err := pc.forceDeletePodResource(ctx, namespace, name); err != nil {
span.SetStatus(err)
return err
}
log.G(ctx).Info("Deleted pod from Kubernetes")
return nil
}
func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, name string) error {
ctx, span := trace.StartSpan(ctx, "forceDeletePodResource")
defer span.End()
ctx = span.WithFields(ctx, log.Fields{
"namespace": namespace,
"name": name,
})
var grace int64
if err := pc.client.Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil {
if errors.IsNotFound(err) {
log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete")
return nil
}
span.SetStatus(err)
return pkgerrors.Wrap(err, "Failed to delete Kubernetes pod")
}
return nil
}
// fetchPodStatusesFromProvider syncs the providers pod status with the kubernetes pod status.
func (pc *PodController) fetchPodStatusesFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
ctx, span := trace.StartSpan(ctx, "fetchPodStatusesFromProvider")
defer span.End()
// Update all the pods with the provider status.
pods, err := pc.podsLister.List(labels.Everything())
if err != nil {
err = pkgerrors.Wrap(err, "error getting pod list from kubernetes")
span.SetStatus(err)
log.G(ctx).WithError(err).Error("Error updating pod statuses")
return
}
ctx = span.WithField(ctx, "nPods", int64(len(pods)))
for _, pod := range pods {
if !shouldSkipPodStatusUpdate(pod) {
enrichedPod, err := pc.fetchPodStatusFromProvider(ctx, q, pod)
if err != nil {
log.G(ctx).WithFields(map[string]interface{}{
"name": pod.Name,
"namespace": pod.Namespace,
}).WithError(err).Error("Could not fetch pod status")
} else if enrichedPod != nil {
pc.enqueuePodStatusUpdate(ctx, q, enrichedPod)
}
}
}
}
// fetchPodStatusFromProvider returns a pod (the pod we pass in) enriched with the pod status from the provider. If the pod is not found,
// and it has been 1 minute since the pod was created, or the pod was previously running, it will be marked as failed.
// If a valid pod status cannot be generated, for example, if a pod is not found in the provider, and it has been less than 1 minute
// since pod creation, we will return nil for the pod.
func (pc *PodController) fetchPodStatusFromProvider(ctx context.Context, q workqueue.RateLimitingInterface, podFromKubernetes *corev1.Pod) (*corev1.Pod, error) {
podStatus, err := pc.provider.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
if errdefs.IsNotFound(err) || (err == nil && podStatus == nil) {
// Only change the status when the pod was already up
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute {
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
podStatus = podFromKubernetes.Status.DeepCopy()
podStatus.Phase = corev1.PodFailed
podStatus.Reason = podStatusReasonNotFound
podStatus.Message = podStatusMessageNotFound
now := metav1.NewTime(time.Now())
for i, c := range podStatus.ContainerStatuses {
if c.State.Running == nil {
continue
}
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: containerStatusExitCodeNotFound,
Reason: containerStatusReasonNotFound,
Message: containerStatusMessageNotFound,
FinishedAt: now,
StartedAt: c.State.Running.StartedAt,
ContainerID: c.ContainerID,
}
podStatus.ContainerStatuses[i].State.Running = nil
}
} else if err != nil {
return nil, nil
}
} else if err != nil {
return nil, err
}
pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status)
return pod, nil
}
func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodSucceeded ||
pod.Status.Phase == corev1.PodFailed ||
@@ -170,12 +308,8 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
}
kPod := obj.(*knownPod)
kPod.Lock()
podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy()
podFromProvider := kPod.lastPodStatusReceivedFromProvider
kPod.Unlock()
// We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating
// the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore
// we need to copy the pod and set ResourceVersion to 0.
podFromProvider.ResourceVersion = "0"
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil {
span.SetStatus(err)
@@ -237,55 +371,3 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE
return pc.updatePodStatus(ctx, pod, key)
}
func (pc *PodController) deletePodHandler(ctx context.Context, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
defer span.End()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
ctx = span.WithFields(ctx, log.Fields{
"namespace": namespace,
"name": name,
})
if err != nil {
// Log the error as a warning, but do not requeue the key as it is invalid.
log.G(ctx).Warn(pkgerrors.Wrapf(err, "invalid resource key: %q", key))
span.SetStatus(err)
return nil
}
defer func() {
if retErr == nil {
if w, ok := pc.provider.(syncWrapper); ok {
w._deletePodKey(ctx, key)
}
}
}()
// If the pod has been deleted from API server, we don't need to do anything.
k8sPod, err := pc.podsLister.Pods(namespace).Get(name)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
span.SetStatus(err)
return err
}
if running(&k8sPod.Status) {
log.G(ctx).Error("Force deleting pod in running state")
}
// We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update
// was in progress,
err = pc.client.Pods(namespace).Delete(name, metav1.NewDeleteOptions(0))
if errors.IsNotFound(err) {
return nil
}
if err != nil {
span.SetStatus(err)
return err
}
return nil
}

View File

@@ -19,10 +19,14 @@ import (
"testing"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
@@ -30,7 +34,7 @@ import (
type TestController struct {
*PodController
mock *mockProviderAsync
mock *mockProvider
client *fake.Clientset
}
@@ -47,7 +51,6 @@ func newTestController() *TestController {
resourceManager: rm,
recorder: testutil.FakeEventRecorder(5),
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
done: make(chan struct{}),
ready: make(chan struct{}),
podsInformer: iFactory.Core().V1().Pods(),
@@ -67,11 +70,11 @@ func TestPodsEqual(t *testing.T) {
p1 := &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
corev1.Container{
Name: "nginx",
Image: "nginx:1.15.12-perl",
Ports: []corev1.ContainerPort{
{
corev1.ContainerPort{
ContainerPort: 443,
Protocol: "tcp",
},
@@ -169,14 +172,170 @@ func TestPodNoSpecChange(t *testing.T) {
assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
}
func TestPodDelete(t *testing.T) {
type testCase struct {
desc string
delErr error
}
cases := []testCase{
{desc: "no error on delete", delErr: nil},
{desc: "not found error on delete", delErr: errdefs.NotFound("not found")},
{desc: "unknown error on delete", delErr: pkgerrors.New("random error")},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
c := newTestController()
c.mock.errorOnDelete = tc.delErr
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Spec = newPodSpec()
pc := c.client.CoreV1().Pods("default")
p, err := pc.Create(pod)
assert.NilError(t, err)
ctx := context.Background()
err = c.createOrUpdatePod(ctx, p.DeepCopy()) // make sure it's actually created
assert.NilError(t, err)
assert.Check(t, is.Equal(c.mock.creates.read(), 1))
err = c.deletePod(ctx, pod.Namespace, pod.Name)
assert.Equal(t, pkgerrors.Cause(err), err)
var expectDeletes int
if tc.delErr == nil {
expectDeletes = 1
}
assert.Check(t, is.Equal(c.mock.deletes.read(), expectDeletes))
expectDeleted := tc.delErr == nil || errdefs.IsNotFound(tc.delErr)
_, err = pc.Get(pod.Name, metav1.GetOptions{})
if expectDeleted {
assert.Assert(t, errors.IsNotFound(err))
} else {
assert.NilError(t, err)
}
})
}
}
func TestFetchPodStatusFromProvider(t *testing.T) {
startedAt := metav1.NewTime(time.Now())
finishedAt := metav1.NewTime(startedAt.Add(time.Second * 10))
containerStateRunning := &corev1.ContainerStateRunning{StartedAt: startedAt}
containerStateTerminated := &corev1.ContainerStateTerminated{StartedAt: startedAt, FinishedAt: finishedAt}
containerStateWaiting := &corev1.ContainerStateWaiting{}
testCases := []struct {
desc string
running *corev1.ContainerStateRunning
terminated *corev1.ContainerStateTerminated
waiting *corev1.ContainerStateWaiting
expectedStartedAt metav1.Time
expectedFinishedAt metav1.Time
}{
{desc: "container in running state", running: containerStateRunning, expectedStartedAt: startedAt},
{desc: "container in terminated state", terminated: containerStateTerminated, expectedStartedAt: startedAt, expectedFinishedAt: finishedAt},
{desc: "container in waiting state", waiting: containerStateWaiting},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
c := newTestController()
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Status.Phase = corev1.PodRunning
containerStatus := corev1.ContainerStatus{}
if tc.running != nil {
containerStatus.State.Running = tc.running
} else if tc.terminated != nil {
containerStatus.State.Terminated = tc.terminated
} else if tc.waiting != nil {
containerStatus.State.Waiting = tc.waiting
}
pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus}
pc := c.client.CoreV1().Pods("default")
p, err := pc.Create(pod)
assert.NilError(t, err)
ctx := context.Background()
updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p)
assert.NilError(t, err)
assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed)
assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1))
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil)
// Test cases for running and terminated container state
if tc.running != nil || tc.terminated != nil {
// Ensure that the container is in terminated state and other states are nil
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil)
terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated
assert.Equal(t, terminated.StartedAt, tc.expectedStartedAt)
assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt))
if tc.terminated != nil {
assert.Equal(t, terminated.FinishedAt, tc.expectedFinishedAt)
}
} else {
// Test case for waiting container state
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated == nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting != nil)
}
})
}
}
func TestFetchPodStatusFromProviderWithExpiredPod(t *testing.T) {
c := newTestController()
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Status.Phase = corev1.PodRunning
containerStatus := corev1.ContainerStatus{}
// We should terminate containters in a pod that has not provided pod status update for more than a minute
startedAt := time.Now().Add(-(time.Minute + time.Second))
containerStatus.State.Running = &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(startedAt)}
pod.ObjectMeta.CreationTimestamp.Time = startedAt
pod.Status.ContainerStatuses = []corev1.ContainerStatus{containerStatus}
pc := c.client.CoreV1().Pods("default")
p, err := pc.Create(pod)
assert.NilError(t, err)
ctx := context.Background()
updatedPod, err := c.fetchPodStatusFromProvider(ctx, nil, p)
assert.NilError(t, err)
assert.Equal(t, updatedPod.Status.Phase, corev1.PodFailed)
assert.Assert(t, is.Len(updatedPod.Status.ContainerStatuses, 1))
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Running == nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Terminated != nil)
assert.Assert(t, updatedPod.Status.ContainerStatuses[0].State.Waiting == nil)
terminated := updatedPod.Status.ContainerStatuses[0].State.Terminated
assert.Equal(t, terminated.StartedAt, metav1.NewTime(startedAt))
assert.Assert(t, terminated.StartedAt.Before(&terminated.FinishedAt))
}
func newPodSpec() corev1.PodSpec {
return corev1.PodSpec{
Containers: []corev1.Container{
{
corev1.Container{
Name: "nginx",
Image: "nginx:1.15.12",
Ports: []corev1.ContainerPort{
{
corev1.ContainerPort{
ContainerPort: 443,
Protocol: "tcp",
},

View File

@@ -17,11 +17,10 @@ package node
import (
"context"
"fmt"
"reflect"
"strconv"
"sync"
"time"
"github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
@@ -50,9 +49,7 @@ type PodLifecycleHandler interface {
// UpdatePod takes a Kubernetes Pod and updates it within the provider.
UpdatePod(ctx context.Context, pod *corev1.Pod) error
// DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is
// expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal
// state, as well as the pod. DeletePod may be called multiple times for the same pod.
// DeletePod takes a Kubernetes Pod and deletes it from the provider.
DeletePod(ctx context.Context, pod *corev1.Pod) error
// GetPod retrieves a pod by name from the provider (can be cached).
@@ -74,7 +71,9 @@ type PodLifecycleHandler interface {
GetPods(context.Context) ([]*corev1.Pod, error)
}
// PodNotifier is used as an extension to PodLifecycleHandler to support async updates of pod statuses.
// PodNotifier notifies callers of pod changes.
// Providers should implement this interface to enable callers to be notified
// of pod status updates asynchronously.
type PodNotifier interface {
// NotifyPods instructs the notifier to call the passed in function when
// the pod status changes. It should be called when a pod's status changes.
@@ -105,9 +104,6 @@ type PodController struct {
k8sQ workqueue.RateLimitingInterface
// deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period
deletionQ workqueue.RateLimitingInterface
// From the time of creation, to termination the knownPods map will contain the pods key
// (derived from Kubernetes' cache library) -> a *knownPod struct.
knownPods sync.Map
@@ -134,7 +130,6 @@ type knownPod struct {
// should be immutable to avoid having to hold the lock the entire time you're working with them
sync.Mutex
lastPodStatusReceivedFromProvider *corev1.Pod
lastPodUsed *corev1.Pod
}
// PodControllerConfig is used to configure a new PodController.
@@ -160,7 +155,6 @@ type PodControllerConfig struct {
ServiceInformer corev1informers.ServiceInformer
}
// NewPodController creates a new pod controller with the provided config.
func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.PodClient == nil {
return nil, errdefs.InvalidInput("missing core client")
@@ -199,17 +193,11 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
done: make(chan struct{}),
recorder: cfg.EventRecorder,
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"),
}
return pc, nil
}
type asyncProvider interface {
PodLifecycleHandler
PodNotifier
}
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until the
// context is cancelled, at which point it will shutdown the work queue and
@@ -222,32 +210,15 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
defer func() {
pc.k8sQ.ShutDown()
pc.deletionQ.ShutDown()
pc.mu.Lock()
pc.err = retErr
close(pc.done)
pc.mu.Unlock()
}()
var provider asyncProvider
runProvider := func(context.Context) {}
if p, ok := pc.provider.(asyncProvider); ok {
provider = p
} else {
wrapped := &syncProviderWrapper{PodLifecycleHandler: pc.provider, l: pc.podsLister}
runProvider = wrapped.run
provider = wrapped
log.G(ctx).Debug("Wrapped non-async provider with async")
}
pc.provider = provider
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
provider.NotifyPods(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
})
go runProvider(ctx)
pc.runSyncFromProvider(ctx, podStatusQueue)
defer podStatusQueue.ShutDown()
// Wait for the caches to be synced *before* starting to do work.
@@ -270,8 +241,16 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Create a copy of the old and new pod objects so we don't mutate the cache.
newPod := newObj.(*corev1.Pod)
oldPod := oldObj.(*corev1.Pod).DeepCopy()
newPod := newObj.(*corev1.Pod).DeepCopy()
// We want to check if the two objects differ in anything other than their resource versions.
// Hence, we make them equal so that this change isn't picked up by reflect.DeepEqual.
newPod.ResourceVersion = oldPod.ResourceVersion
// Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed.
// This guarantees that we don't attempt to sync the pod every time its .status field is updated.
if reflect.DeepEqual(oldPod.ObjectMeta, newPod.ObjectMeta) && reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
return
}
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.G(ctx).Error(err)
@@ -285,8 +264,6 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
} else {
pc.knownPods.Delete(key)
pc.k8sQ.AddRateLimited(key)
// If this pod was in the deletion queue, forget about it
pc.deletionQ.Forget(key)
}
},
})
@@ -318,15 +295,6 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
}()
}
for id := 0; id < podSyncWorkers; id++ {
wg.Add(1)
workerID := strconv.Itoa(id)
go func() {
defer wg.Done()
pc.runDeletionReconcilationWorker(ctx, workerID, pc.deletionQ)
}()
}
close(pc.ready)
log.G(ctx).Info("started workers")
@@ -334,7 +302,6 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
log.G(ctx).Info("shutting down workers")
pc.k8sQ.ShutDown()
podStatusQueue.ShutDown()
pc.deletionQ.ShutDown()
wg.Wait()
return nil
@@ -386,7 +353,6 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key)
log.G(ctx).WithField("key", key).Debug("sync handled")
// Convert the namespace/name string into a distinct namespace and name.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
@@ -406,81 +372,35 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
span.SetStatus(err)
return err
}
pod, err = pc.provider.GetPod(ctx, namespace, name)
if err != nil && !errdefs.IsNotFound(err) {
err = pkgerrors.Wrapf(err, "failed to fetch pod with key %q from provider", key)
// At this point we know the Pod resource doesn't exist, which most probably means it was deleted.
// Hence, we must delete it from the provider if it still exists there.
if err := pc.deletePod(ctx, namespace, name); err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
span.SetStatus(err)
return err
}
if errdefs.IsNotFound(err) || pod == nil {
return nil
}
err = pc.provider.DeletePod(ctx, pod)
if errdefs.IsNotFound(err) {
return nil
}
if err != nil {
err = pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
span.SetStatus(err)
}
return err
return nil
}
// At this point we know the Pod resource has either been created or updated (which includes being marked for deletion).
return pc.syncPodInProvider(ctx, pod, key)
return pc.syncPodInProvider(ctx, pod)
}
// syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation.
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, key string) (retErr error) {
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "syncPodInProvider")
defer span.End()
// Add the pod's attributes to the current span.
ctx = addPodAttributes(ctx, span, pod)
// If the pod('s containers) is no longer in a running state then we force-delete the pod from API server
// more context is here: https://github.com/virtual-kubelet/virtual-kubelet/pull/760
if pod.DeletionTimestamp != nil && !running(&pod.Status) {
log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running")
pc.deletionQ.Add(key)
return nil
}
obj, ok := pc.knownPods.Load(key)
if !ok {
// That means the pod was deleted while we were working
return nil
}
kPod := obj.(*knownPod)
kPod.Lock()
if kPod.lastPodUsed != nil && podsEffectivelyEqual(kPod.lastPodUsed, pod) {
kPod.Unlock()
return nil
}
kPod.Unlock()
defer func() {
if retErr == nil {
kPod.Lock()
defer kPod.Unlock()
kPod.lastPodUsed = pod
}
}()
// Check whether the pod has been marked for deletion.
// If it does, guarantee it is deleted in the provider and Kubernetes.
if pod.DeletionTimestamp != nil {
log.G(ctx).Debug("Deleting pod in provider")
if err := pc.deletePod(ctx, pod); errdefs.IsNotFound(err) {
log.G(ctx).Debug("Pod not found in provider")
} else if err != nil {
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod))
span.SetStatus(err)
return err
}
pc.deletionQ.AddAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds))
return nil
}
@@ -499,25 +419,6 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod,
return nil
}
// runDeletionReconcilationWorker is a long-running function that will continually call the processDeletionReconcilationWorkItem
// function in order to read and process an item on the work queue that is generated by the pod informer.
func (pc *PodController) runDeletionReconcilationWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processDeletionReconcilationWorkItem(ctx, workerID, q) {
}
}
// processDeletionReconcilationWorkItem will read a single work item off the work queue and attempt to process it,by calling the deletionReconcilation.
func (pc *PodController) processDeletionReconcilationWorkItem(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerId", workerID)
return handleQueueItem(ctx, q, pc.deletePodHandler)
}
// deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them.
func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) {
ctx, span := trace.StartSpan(ctx, "deleteDanglingPods")
@@ -573,7 +474,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int
// Add the pod's attributes to the current span.
ctx = addPodAttributes(ctx, span, pod)
// Actually delete the pod.
if err := pc.provider.DeletePod(ctx, pod.DeepCopy()); err != nil && !errdefs.IsNotFound(err) {
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
span.SetStatus(err)
log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod))
} else {
@@ -602,31 +503,3 @@ func loggablePodName(pod *corev1.Pod) string {
func loggablePodNameFromCoordinates(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}
// podsEffectivelyEqual compares two pods, and ignores the pod status, and the resource version
func podsEffectivelyEqual(p1, p2 *corev1.Pod) bool {
filterForResourceVersion := func(p cmp.Path) bool {
if p.String() == "ObjectMeta.ResourceVersion" {
return true
}
if p.String() == "Status" {
return true
}
return false
}
return cmp.Equal(p1, p2, cmp.FilterPath(filterForResourceVersion, cmp.Ignore()))
}
// borrowed from https://github.com/kubernetes/kubernetes/blob/f64c631cd7aea58d2552ae2038c1225067d30dde/pkg/kubelet/kubelet_pods.go#L944-L953
// running returns true, unless if every status is terminated or waiting, or the status list
// is empty.
func running(podStatus *corev1.PodStatus) bool {
statuses := podStatus.ContainerStatuses
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return true
}
}
return false
}

View File

@@ -6,8 +6,6 @@ import (
"time"
"gotest.tools/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestPodControllerExitOnContextCancel(t *testing.T) {
@@ -44,49 +42,3 @@ func TestPodControllerExitOnContextCancel(t *testing.T) {
}
assert.NilError(t, tc.Err())
}
func TestCompareResourceVersion(t *testing.T) {
p1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
}
p2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
},
}
assert.Assert(t, podsEffectivelyEqual(p1, p2))
}
func TestCompareStatus(t *testing.T) {
p1 := &corev1.Pod{
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}
p2 := &corev1.Pod{
Status: corev1.PodStatus{
Phase: corev1.PodFailed,
},
}
assert.Assert(t, podsEffectivelyEqual(p1, p2))
}
func TestCompareLabels(t *testing.T) {
p1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar1",
},
},
}
p2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar2",
},
},
}
assert.Assert(t, !podsEffectivelyEqual(p1, p2))
}

View File

@@ -16,10 +16,12 @@ package node
import (
"context"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
)
@@ -103,3 +105,40 @@ func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID st
return handleQueueItem(ctx, q, pc.podStatusHandler)
}
// providerSyncLoop synchronizes pod states from the provider back to kubernetes
// Deprecated: This is only used when the provider does not support async updates
// Providers should implement async update support, even if it just means copying
// something like this in.
func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) {
const sleepTime = 5 * time.Second
t := time.NewTimer(sleepTime)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
t.Stop()
ctx, span := trace.StartSpan(ctx, "syncActualState")
pc.fetchPodStatusesFromProvider(ctx, q)
span.End()
// restart the timer
t.Reset(sleepTime)
}
}
}
func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
if pn, ok := pc.provider.(PodNotifier); ok {
pn.NotifyPods(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, q, pod.DeepCopy())
})
} else {
go pc.providerSyncLoop(ctx, q)
}
}

View File

@@ -1,215 +0,0 @@
package node
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)
const (
podStatusReasonNotFound = "NotFound"
podStatusMessageNotFound = "The pod status was not found and may have been deleted from the provider"
containerStatusReasonNotFound = "NotFound"
containerStatusMessageNotFound = "Container was not found and was likely deleted"
containerStatusExitCodeNotFound = -137
statusTerminatedReason = "Terminated"
containerStatusTerminatedMessage = "Container was terminated. The exit code may not reflect the real exit code"
)
// syncProviderWrapper wraps a PodLifecycleHandler to give it async-like pod status notification behavior.
type syncProviderWrapper struct {
PodLifecycleHandler
notify func(*corev1.Pod)
l corev1listers.PodLister
// deletedPods makes sure we don't set the "NotFound" status
// for pods which have been requested to be deleted.
// This is needed for our loop which just grabs pod statuses every 5 seconds.
deletedPods sync.Map
}
// This is used to clean up keys for deleted pods after they have been fully deleted in the API server.
type syncWrapper interface {
_deletePodKey(context.Context, string)
}
func (p *syncProviderWrapper) NotifyPods(ctx context.Context, f func(*corev1.Pod)) {
p.notify = f
}
func (p *syncProviderWrapper) _deletePodKey(ctx context.Context, key string) {
log.G(ctx).WithField("key", key).Debug("Cleaning up pod from deletion cache")
p.deletedPods.Delete(key)
}
func (p *syncProviderWrapper) DeletePod(ctx context.Context, pod *corev1.Pod) error {
log.G(ctx).Debug("syncProviderWrappper.DeletePod")
key, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
return err
}
p.deletedPods.Store(key, pod)
if err := p.PodLifecycleHandler.DeletePod(ctx, pod.DeepCopy()); err != nil {
log.G(ctx).WithField("key", key).WithError(err).Debug("Removed key from deleted pods cache")
// We aren't going to actually delete the pod from the provider since there is an error so delete it from our cache,
// otherwise we could end up leaking pods in our deletion cache.
// Delete will be retried by the pod controller.
p.deletedPods.Delete(key)
return err
}
if shouldSkipPodStatusUpdate(pod) {
log.G(ctx).Debug("skipping pod status update for terminated pod")
return nil
}
updated := pod.DeepCopy()
updated.Status.Phase = corev1.PodSucceeded
now := metav1.NewTime(time.Now())
for i, cs := range updated.Status.ContainerStatuses {
updated.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
Reason: statusTerminatedReason,
Message: containerStatusTerminatedMessage,
FinishedAt: now,
}
if cs.State.Running != nil {
updated.Status.ContainerStatuses[i].State.Terminated.StartedAt = cs.State.Running.StartedAt
}
}
updated.Status.Reason = statusTerminatedReason
p.notify(updated)
log.G(ctx).Debug("Notified pod terminal pod status")
return nil
}
func (p *syncProviderWrapper) run(ctx context.Context) {
interval := 5 * time.Second
timer := time.NewTimer(interval)
defer timer.Stop()
if !timer.Stop() {
<-timer.C
}
for {
log.G(ctx).Debug("Pod status update loop start")
timer.Reset(interval)
select {
case <-ctx.Done():
log.G(ctx).WithError(ctx.Err()).Debug("sync wrapper loop exiting")
return
case <-timer.C:
}
p.syncPodStatuses(ctx)
}
}
func (p *syncProviderWrapper) syncPodStatuses(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.syncPodStatuses")
defer span.End()
// Update all the pods with the provider status.
pods, err := p.l.List(labels.Everything())
if err != nil {
err = errors.Wrap(err, "error getting pod list from kubernetes")
span.SetStatus(err)
log.G(ctx).WithError(err).Error("Error updating pod statuses")
return
}
ctx = span.WithField(ctx, "nPods", int64(len(pods)))
for _, pod := range pods {
if shouldSkipPodStatusUpdate(pod) {
log.G(ctx).Debug("Skipping pod status update")
continue
}
if err := p.updatePodStatus(ctx, pod); err != nil {
log.G(ctx).WithFields(map[string]interface{}{
"name": pod.Name,
"namespace": pod.Namespace,
}).WithError(err).Error("Could not fetch pod status")
}
}
}
func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubernetes *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "syncProviderWrapper.updatePodStatus")
defer span.End()
ctx = addPodAttributes(ctx, span, podFromKubernetes)
var statusErr error
podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
if err != nil {
if !errdefs.IsNotFound(err) {
span.SetStatus(err)
return err
}
statusErr = err
}
if podStatus != nil {
pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status)
p.notify(pod)
return nil
}
key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes)
if err != nil {
span.SetStatus(err)
return err
}
if _, exists := p.deletedPods.Load(key); exists {
log.G(ctx).Debug("pod is in known deleted state, ignoring")
return nil
}
if podFromKubernetes.Status.Phase != corev1.PodRunning && time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) <= time.Minute {
span.SetStatus(statusErr)
return statusErr
}
// Only change the status when the pod was already up.
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
podStatus = podFromKubernetes.Status.DeepCopy()
podStatus.Phase = corev1.PodFailed
podStatus.Reason = podStatusReasonNotFound
podStatus.Message = podStatusMessageNotFound
now := metav1.NewTime(time.Now())
for i, c := range podStatus.ContainerStatuses {
if c.State.Running == nil {
continue
}
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: containerStatusExitCodeNotFound,
Reason: containerStatusReasonNotFound,
Message: containerStatusMessageNotFound,
FinishedAt: now,
StartedAt: c.State.Running.StartedAt,
ContainerID: c.ContainerID,
}
podStatus.ContainerStatuses[i].State.Running = nil
}
log.G(ctx).Debug("Setting pod not found on pod status")
pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status)
p.notify(pod)
return nil
}

View File

@@ -140,7 +140,7 @@ func (l *logger) Debug(args ...interface{}) {
}
func (l *logger) Debugf(f string, args ...interface{}) {
l.l.Debugf(f, args...)
l.l.Debugf(f, args)
l.s.Annotatef(withLevel(lDebug, l.a), f, args...)
}
@@ -156,7 +156,7 @@ func (l *logger) Info(args ...interface{}) {
}
func (l *logger) Infof(f string, args ...interface{}) {
l.l.Infof(f, args...)
l.l.Infof(f, args)
l.s.Annotatef(withLevel(lInfo, l.a), f, args...)
}
@@ -172,7 +172,7 @@ func (l *logger) Warn(args ...interface{}) {
}
func (l *logger) Warnf(f string, args ...interface{}) {
l.l.Warnf(f, args...)
l.l.Warnf(f, args)
l.s.Annotatef(withLevel(lWarn, l.a), f, args...)
}
@@ -188,7 +188,7 @@ func (l *logger) Error(args ...interface{}) {
}
func (l *logger) Errorf(f string, args ...interface{}) {
l.l.Errorf(f, args...)
l.l.Errorf(f, args)
l.s.Annotatef(withLevel(lErr, l.a), f, args...)
}
@@ -205,7 +205,7 @@ func (l *logger) Fatal(args ...interface{}) {
func (l *logger) Fatalf(f string, args ...interface{}) {
l.s.Annotatef(withLevel(lFatal, l.a), f, args...)
l.l.Fatalf(f, args...)
l.l.Fatalf(f, args)
}
func (l *logger) WithError(err error) log.Logger {