Merge pull request #734 from sargun/do-not-change-pods
Do not mutate pods, nor hand off pod references to provider
This commit is contained in:
2
go.mod
2
go.mod
@@ -12,7 +12,7 @@ require (
|
||||
github.com/evanphx/json-patch v4.1.0+incompatible // indirect
|
||||
github.com/gogo/protobuf v1.2.1 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
|
||||
github.com/google/go-cmp v0.3.0
|
||||
github.com/google/go-cmp v0.3.1
|
||||
github.com/google/gofuzz v1.0.0 // indirect
|
||||
github.com/googleapis/gnostic v0.1.0 // indirect
|
||||
github.com/gorilla/mux v1.7.0
|
||||
|
||||
2
go.sum
2
go.sum
@@ -159,6 +159,8 @@ github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
|
||||
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
|
||||
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
|
||||
|
||||
50
node/pod.go
50
node/pod.go
@@ -20,6 +20,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
pkgerrors "github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
@@ -57,32 +58,31 @@ func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod)
|
||||
"namespace": pod.GetNamespace(),
|
||||
})
|
||||
|
||||
// We do this so we don't mutate the pod from the informer cache
|
||||
pod = pod.DeepCopy()
|
||||
if err := populateEnvironmentVariables(ctx, pod, pc.resourceManager, pc.recorder); err != nil {
|
||||
span.SetStatus(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// We have to use a different pod that we pass to the provider than the one that gets used in handleProviderError
|
||||
// because the provider may manipulate the pod in a separate goroutine while we were doing work
|
||||
podForProvider := pod.DeepCopy()
|
||||
|
||||
// Check if the pod is already known by the provider.
|
||||
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
|
||||
// Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod).
|
||||
if pp, _ := pc.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil {
|
||||
// Pod Update Only Permits update of:
|
||||
// - `spec.containers[*].image`
|
||||
// - `spec.initContainers[*].image`
|
||||
// - `spec.activeDeadlineSeconds`
|
||||
// - `spec.tolerations` (only additions to existing tolerations)
|
||||
// compare the hashes of the pod specs to see if the specs actually changed
|
||||
expected := hashPodSpec(pp.Spec)
|
||||
if actual := hashPodSpec(pod.Spec); actual != expected {
|
||||
log.G(ctx).Debugf("Pod %s exists, updating pod in provider", pp.Name)
|
||||
if origErr := pc.provider.UpdatePod(ctx, pod); origErr != nil {
|
||||
if podFromProvider, _ := pc.provider.GetPod(ctx, pod.Namespace, pod.Name); podFromProvider != nil {
|
||||
if !podsEqual(podFromProvider, podForProvider) {
|
||||
log.G(ctx).Debugf("Pod %s exists, updating pod in provider", podFromProvider.Name)
|
||||
if origErr := pc.provider.UpdatePod(ctx, podForProvider); origErr != nil {
|
||||
pc.handleProviderError(ctx, span, origErr, pod)
|
||||
return origErr
|
||||
}
|
||||
log.G(ctx).Info("Updated pod in provider")
|
||||
}
|
||||
} else {
|
||||
if origErr := pc.provider.CreatePod(ctx, pod); origErr != nil {
|
||||
if origErr := pc.provider.CreatePod(ctx, podForProvider); origErr != nil {
|
||||
pc.handleProviderError(ctx, span, origErr, pod)
|
||||
return origErr
|
||||
}
|
||||
@@ -91,6 +91,27 @@ func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod)
|
||||
return nil
|
||||
}
|
||||
|
||||
// podsEqual checks if two pods are equal according to the fields we know that are allowed
|
||||
// to be modified after startup time.
|
||||
func podsEqual(pod1, pod2 *corev1.Pod) bool {
|
||||
// Pod Update Only Permits update of:
|
||||
// - `spec.containers[*].image`
|
||||
// - `spec.initContainers[*].image`
|
||||
// - `spec.activeDeadlineSeconds`
|
||||
// - `spec.tolerations` (only additions to existing tolerations)
|
||||
// - `objectmeta.labels`
|
||||
// - `objectmeta.annotations`
|
||||
// compare the values of the pods to see if the values actually changed
|
||||
|
||||
return cmp.Equal(pod1.Spec.Containers, pod2.Spec.Containers) &&
|
||||
cmp.Equal(pod1.Spec.InitContainers, pod2.Spec.InitContainers) &&
|
||||
cmp.Equal(pod1.Spec.ActiveDeadlineSeconds, pod2.Spec.ActiveDeadlineSeconds) &&
|
||||
cmp.Equal(pod1.Spec.Tolerations, pod2.Spec.Tolerations) &&
|
||||
cmp.Equal(pod1.ObjectMeta.Labels, pod2.Labels) &&
|
||||
cmp.Equal(pod1.ObjectMeta.Annotations, pod2.Annotations)
|
||||
|
||||
}
|
||||
|
||||
// This is basically the kube runtime's hash container functionality.
|
||||
// VK only operates at the Pod level so this is adapted for that
|
||||
func hashPodSpec(spec corev1.PodSpec) uint64 {
|
||||
@@ -152,7 +173,7 @@ func (pc *PodController) deletePod(ctx context.Context, namespace, name string)
|
||||
ctx = addPodAttributes(ctx, span, pod)
|
||||
|
||||
var delErr error
|
||||
if delErr = pc.provider.DeletePod(ctx, pod); delErr != nil && !errdefs.IsNotFound(delErr) {
|
||||
if delErr = pc.provider.DeletePod(ctx, pod.DeepCopy()); delErr != nil && !errdefs.IsNotFound(delErr) {
|
||||
span.SetStatus(delErr)
|
||||
return delErr
|
||||
}
|
||||
@@ -231,6 +252,9 @@ func (pc *PodController) updatePodStatus(ctx context.Context, pod *corev1.Pod) e
|
||||
return pkgerrors.Wrap(err, "error retrieving pod status")
|
||||
}
|
||||
|
||||
// Do not modify the pod that we got from the cache
|
||||
pod = pod.DeepCopy()
|
||||
|
||||
// Update the pod's status
|
||||
if status != nil {
|
||||
pod.Status = *status
|
||||
|
||||
130
node/pod_test.go
130
node/pod_test.go
@@ -53,78 +53,104 @@ func newTestController() *TestController {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPodHashingEqual(t *testing.T) {
|
||||
p1 := corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12-perl",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
func TestPodsEqual(t *testing.T) {
|
||||
p1 := &corev1.Pod{
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12-perl",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
h1 := hashPodSpec(p1)
|
||||
|
||||
p2 := corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12-perl",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
p2 := &corev1.Pod{
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12-perl",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
h2 := hashPodSpec(p2)
|
||||
assert.Check(t, is.Equal(h1, h2))
|
||||
assert.Assert(t, podsEqual(p1, p2))
|
||||
}
|
||||
|
||||
func TestPodHashingDifferent(t *testing.T) {
|
||||
p1 := corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
func TestPodsDifferent(t *testing.T) {
|
||||
p1 := &corev1.Pod{
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
h1 := hashPodSpec(p1)
|
||||
|
||||
p2 := corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12-perl",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
p2 := &corev1.Pod{
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12-perl",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
h2 := hashPodSpec(p2)
|
||||
assert.Check(t, h1 != h2)
|
||||
assert.Assert(t, !podsEqual(p1, p2))
|
||||
}
|
||||
|
||||
func TestPodsDifferentIgnoreValue(t *testing.T) {
|
||||
p1 := &corev1.Pod{
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
p2 := p1.DeepCopy()
|
||||
p2.Status.Phase = corev1.PodFailed
|
||||
|
||||
assert.Assert(t, podsEqual(p1, p2))
|
||||
}
|
||||
|
||||
func TestPodCreateNewPod(t *testing.T) {
|
||||
@@ -148,7 +174,7 @@ func TestPodCreateNewPod(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
err := svr.createOrUpdatePod(context.Background(), pod)
|
||||
err := svr.createOrUpdatePod(context.Background(), pod.DeepCopy())
|
||||
|
||||
assert.Check(t, is.Nil(err))
|
||||
// createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist
|
||||
@@ -177,7 +203,7 @@ func TestPodUpdateExisting(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
err := svr.provider.CreatePod(context.Background(), pod)
|
||||
err := svr.createOrUpdatePod(context.Background(), pod.DeepCopy())
|
||||
assert.Check(t, is.Nil(err))
|
||||
assert.Check(t, is.Equal(svr.mock.creates.read(), 1))
|
||||
assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
|
||||
@@ -200,7 +226,7 @@ func TestPodUpdateExisting(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
err = svr.createOrUpdatePod(context.Background(), pod2)
|
||||
err = svr.createOrUpdatePod(context.Background(), pod2.DeepCopy())
|
||||
assert.Check(t, is.Nil(err))
|
||||
|
||||
// createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed
|
||||
@@ -229,12 +255,12 @@ func TestPodNoSpecChange(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
err := svr.mock.CreatePod(context.Background(), pod)
|
||||
err := svr.createOrUpdatePod(context.Background(), pod.DeepCopy())
|
||||
assert.Check(t, is.Nil(err))
|
||||
assert.Check(t, is.Equal(svr.mock.creates.read(), 1))
|
||||
assert.Check(t, is.Equal(svr.mock.updates.read(), 0))
|
||||
|
||||
err = svr.createOrUpdatePod(context.Background(), pod)
|
||||
err = svr.createOrUpdatePod(context.Background(), pod.DeepCopy())
|
||||
assert.Check(t, is.Nil(err))
|
||||
|
||||
// createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change
|
||||
@@ -277,7 +303,7 @@ func TestPodDelete(t *testing.T) {
|
||||
assert.NilError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
err = c.createOrUpdatePod(ctx, p) // make sure it's actually created
|
||||
err = c.createOrUpdatePod(ctx, p.DeepCopy()) // make sure it's actually created
|
||||
assert.NilError(t, err)
|
||||
assert.Check(t, is.Equal(c.mock.creates.read(), 1))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user