From 87e72bf4df8b4df47d81b63bbe42d49891f420a8 Mon Sep 17 00:00:00 2001 From: Jeremy Rickard Date: Fri, 17 May 2019 12:14:29 -0600 Subject: [PATCH] Light up UpdatePod (#613) * Light up UpdatePod This PR updates the vkublet/pod.go createOrUpdate(..) method to actually handle updates. It gets the pod from the provider as before, but now if it exists the method checks the hash of the spec against the spec of the new pod. If they've changed, it calls UpdatePod(..). Also makes a small change to the Server struct to swap from kuberentes.Clientset to kubernetes.Interface to better facilitate testing with fake ClientSet. Co-Authored-By: Brian Goff --- Gopkg.lock | 1 + providers/mock/mock.go | 26 ++++ vkubelet/pod.go | 111 ++++++++++------ vkubelet/pod_test.go | 286 +++++++++++++++++++++++++++++++++++++++++ vkubelet/vkubelet.go | 2 +- 5 files changed, 384 insertions(+), 42 deletions(-) create mode 100644 vkubelet/pod_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 9b98da774..c5592d306 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1334,6 +1334,7 @@ "github.com/cpuguy83/strongerrors", "github.com/cpuguy83/strongerrors/status", "github.com/cpuguy83/strongerrors/status/ocstatus", + "github.com/davecgh/go-spew/spew", "github.com/google/go-cmp/cmp", "github.com/google/uuid", "github.com/gophercloud/gophercloud", diff --git a/providers/mock/mock.go b/providers/mock/mock.go index b3cf41c60..900730cd9 100644 --- a/providers/mock/mock.go +++ b/providers/mock/mock.go @@ -71,6 +71,32 @@ func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalI return &provider, nil } +// NewMockProviderMockConfig creates a new MockProvider with the given Mock Config +func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) { + + //set defaults + if config.CPU == "" { + config.CPU = defaultCPUCapacity + } + if config.Memory == "" { + config.Memory = defaultMemoryCapacity + } + if config.Pods == "" { + config.Pods = defaultPodCapacity + } + + provider := MockProvider{ + nodeName: nodeName, + operatingSystem: operatingSystem, + internalIP: internalIP, + daemonEndpointPort: daemonEndpointPort, + pods: make(map[string]*v1.Pod), + config: config, + startTime: time.Now(), + } + return &provider, nil +} + // loadConfig loads the given json configuration files. func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) { diff --git a/vkubelet/pod.go b/vkubelet/pod.go index b74013f22..3338a52fc 100644 --- a/vkubelet/pod.go +++ b/vkubelet/pod.go @@ -2,9 +2,11 @@ package vkubelet import ( "context" + "hash/fnv" "time" "github.com/cpuguy83/strongerrors/status/ocstatus" + "github.com/davecgh/go-spew/spew" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" @@ -28,62 +30,89 @@ func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) con } func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorder record.EventRecorder) error { - // Check if the pod is already known by the provider. - // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. - // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). - if pp, _ := s.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil { - // The pod has already been created in the provider. - // Hence, we return since pod updates are not yet supported. - log.G(ctx).Warnf("skipping update of pod %s as pod updates are not supported", pp.Name) - return nil - } ctx, span := trace.StartSpan(ctx, "createOrUpdatePod") defer span.End() addPodAttributes(ctx, span, pod) - if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil { - span.SetStatus(ocstatus.FromError(err)) - return err - } - ctx = span.WithFields(ctx, log.Fields{ "pod": pod.GetName(), "namespace": pod.GetNamespace(), }) - if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { - podPhase := corev1.PodPending - if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { - podPhase = corev1.PodFailed - } - - pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error - pod.Status.Phase = podPhase - pod.Status.Reason = podStatusReasonProviderFailed - pod.Status.Message = origErr.Error() - - logger := log.G(ctx).WithFields(log.Fields{ - "podPhase": podPhase, - "reason": pod.Status.Reason, - }) - - _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) - if err != nil { - logger.WithError(err).Warn("Failed to update pod status") - } else { - logger.Info("Updated k8s pod status") - } - - span.SetStatus(ocstatus.FromError(origErr)) - return origErr + if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil { + span.SetStatus(ocstatus.FromError(err)) + return err } - log.G(ctx).Info("Created pod in provider") - + // Check if the pod is already known by the provider. + // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. + // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). + if pp, _ := s.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil { + // Pod Update Only Permits update of: + // - `spec.containers[*].image` + // - `spec.initContainers[*].image` + // - `spec.activeDeadlineSeconds` + // - `spec.tolerations` (only additions to existing tolerations) + // compare the hashes of the pod specs to see if the specs actually changed + expected := hashPodSpec(pp.Spec) + if actual := hashPodSpec(pod.Spec); actual != expected { + log.G(ctx).Debugf("Pod %s exists, updating pod in provider", pp.Name) + if origErr := s.provider.UpdatePod(ctx, pod); origErr != nil { + s.handleProviderError(ctx, span, origErr, pod) + return origErr + } + log.G(ctx).Info("Updated pod in provider") + } + } else { + if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { + s.handleProviderError(ctx, span, origErr, pod) + return origErr + } + log.G(ctx).Info("Created pod in provider") + } return nil } +// This is basically the kube runtime's hash container functionality. +// VK only operates at the Pod level so this is adapted for that +func hashPodSpec(spec corev1.PodSpec) uint64 { + hash := fnv.New32a() + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + printer.Fprintf(hash, "%#v", spec) + return uint64(hash.Sum32()) +} + +func (s *Server) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) { + podPhase := corev1.PodPending + if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { + podPhase = corev1.PodFailed + } + + pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error + pod.Status.Phase = podPhase + pod.Status.Reason = podStatusReasonProviderFailed + pod.Status.Message = origErr.Error() + + logger := log.G(ctx).WithFields(log.Fields{ + "podPhase": podPhase, + "reason": pod.Status.Reason, + }) + + _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) + if err != nil { + logger.WithError(err).Warn("Failed to update pod status") + } else { + logger.Info("Updated k8s pod status") + } + span.SetStatus(ocstatus.FromError(origErr)) +} + func (s *Server) deletePod(ctx context.Context, namespace, name string) error { // Grab the pod as known by the provider. // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. diff --git a/vkubelet/pod_test.go b/vkubelet/pod_test.go new file mode 100644 index 000000000..9e02c788f --- /dev/null +++ b/vkubelet/pod_test.go @@ -0,0 +1,286 @@ +package vkubelet + +import ( + "context" + "testing" + + "github.com/virtual-kubelet/virtual-kubelet/providers/mock" + testutil "github.com/virtual-kubelet/virtual-kubelet/test/util" + "gotest.tools/assert" + is "gotest.tools/assert/cmp" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes/fake" +) + +type FakeProvider struct { + *mock.MockProvider + createFn func() + updateFn func() +} + +func (f *FakeProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error { + f.createFn() + return f.MockProvider.CreatePod(ctx, pod) +} + +func (f *FakeProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error { + f.updateFn() + return f.MockProvider.CreatePod(ctx, pod) +} + +type TestServer struct { + *Server + mock *FakeProvider + client *fake.Clientset +} + +func newMockProvider(t *testing.T) (*mock.MockProvider, error) { + return mock.NewMockProviderMockConfig( + mock.MockConfig{}, + "vk123", + "linux", + "127.0.0.1", + 443, + ) +} + +func newTestServer(t *testing.T) *TestServer { + + mockProvider, err := newMockProvider(t) + assert.Check(t, is.Nil(err)) + + fk8s := fake.NewSimpleClientset() + + fakeProvider := &FakeProvider{ + MockProvider: mockProvider, + } + + rm := testutil.FakeResourceManager() + + tsvr := &TestServer{ + Server: &Server{ + namespace: "default", + nodeName: "vk123", + provider: fakeProvider, + resourceManager: rm, + k8sClient: fk8s, + }, + mock: fakeProvider, + client: fk8s, + } + return tsvr +} + +func TestPodHashingEqual(t *testing.T) { + p1 := corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12-perl", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + h1 := hashPodSpec(p1) + + p2 := corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12-perl", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + h2 := hashPodSpec(p2) + assert.Check(t, is.Equal(h1, h2)) +} + +func TestPodHashingDifferent(t *testing.T) { + p1 := corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + h1 := hashPodSpec(p1) + + p2 := corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12-perl", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + h2 := hashPodSpec(p2) + assert.Check(t, h1 != h2) +} + +func TestPodCreateNewPod(t *testing.T) { + svr := newTestServer(t) + + pod := &corev1.Pod{} + pod.ObjectMeta.Namespace = "default" + pod.ObjectMeta.Name = "nginx" + pod.Spec = corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + created := false + updated := false + // The pod doesn't exist, we should invoke the CreatePod() method of the provider + svr.mock.createFn = func() { + created = true + } + svr.mock.updateFn = func() { + updated = true + } + er := testutil.FakeEventRecorder(5) + err := svr.createOrUpdatePod(context.Background(), pod, er) + assert.Check(t, is.Nil(err)) + // createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist + assert.Check(t, created) + assert.Check(t, !updated) +} + +func TestPodUpdateExisting(t *testing.T) { + svr := newTestServer(t) + + pod := &corev1.Pod{} + pod.ObjectMeta.Namespace = "default" + pod.ObjectMeta.Name = "nginx" + pod.Spec = corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + err := svr.mock.MockProvider.CreatePod(context.Background(), pod) + assert.Check(t, is.Nil(err)) + created := false + updated := false + // The pod doesn't exist, we should invoke the CreatePod() method of the provider + svr.mock.createFn = func() { + created = true + } + svr.mock.updateFn = func() { + updated = true + } + + pod2 := &corev1.Pod{} + pod2.ObjectMeta.Namespace = "default" + pod2.ObjectMeta.Name = "nginx" + pod2.Spec = corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12-perl", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + er := testutil.FakeEventRecorder(5) + err = svr.createOrUpdatePod(context.Background(), pod2, er) + assert.Check(t, is.Nil(err)) + + // createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed + assert.Check(t, !created) + assert.Check(t, updated) +} + +func TestPodNoSpecChange(t *testing.T) { + svr := newTestServer(t) + + pod := &corev1.Pod{} + pod.ObjectMeta.Namespace = "default" + pod.ObjectMeta.Name = "nginx" + pod.Spec = corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + err := svr.mock.MockProvider.CreatePod(context.Background(), pod) + assert.Check(t, is.Nil(err)) + created := false + updated := false + // The pod doesn't exist, we should invoke the CreatePod() method of the provider + svr.mock.createFn = func() { + created = true + } + svr.mock.updateFn = func() { + updated = true + } + + er := testutil.FakeEventRecorder(5) + err = svr.createOrUpdatePod(context.Background(), pod, er) + assert.Check(t, is.Nil(err)) + + // createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change + assert.Check(t, !created) + assert.Check(t, !updated) +} diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index 87e1757c9..1ac1731f9 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -22,7 +22,7 @@ const ( type Server struct { namespace string nodeName string - k8sClient *kubernetes.Clientset + k8sClient kubernetes.Interface provider providers.Provider resourceManager *manager.ResourceManager podSyncWorkers int