diff --git a/Gopkg.lock b/Gopkg.lock index 9b98da774..c5592d306 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1334,6 +1334,7 @@ "github.com/cpuguy83/strongerrors", "github.com/cpuguy83/strongerrors/status", "github.com/cpuguy83/strongerrors/status/ocstatus", + "github.com/davecgh/go-spew/spew", "github.com/google/go-cmp/cmp", "github.com/google/uuid", "github.com/gophercloud/gophercloud", diff --git a/providers/mock/mock.go b/providers/mock/mock.go index b3cf41c60..900730cd9 100644 --- a/providers/mock/mock.go +++ b/providers/mock/mock.go @@ -71,6 +71,32 @@ func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalI return &provider, nil } +// NewMockProviderMockConfig creates a new MockProvider with the given Mock Config +func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) { + + //set defaults + if config.CPU == "" { + config.CPU = defaultCPUCapacity + } + if config.Memory == "" { + config.Memory = defaultMemoryCapacity + } + if config.Pods == "" { + config.Pods = defaultPodCapacity + } + + provider := MockProvider{ + nodeName: nodeName, + operatingSystem: operatingSystem, + internalIP: internalIP, + daemonEndpointPort: daemonEndpointPort, + pods: make(map[string]*v1.Pod), + config: config, + startTime: time.Now(), + } + return &provider, nil +} + // loadConfig loads the given json configuration files. func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) { diff --git a/vkubelet/pod.go b/vkubelet/pod.go index b74013f22..3338a52fc 100644 --- a/vkubelet/pod.go +++ b/vkubelet/pod.go @@ -2,9 +2,11 @@ package vkubelet import ( "context" + "hash/fnv" "time" "github.com/cpuguy83/strongerrors/status/ocstatus" + "github.com/davecgh/go-spew/spew" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" @@ -28,62 +30,89 @@ func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) con } func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorder record.EventRecorder) error { - // Check if the pod is already known by the provider. - // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. - // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). - if pp, _ := s.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil { - // The pod has already been created in the provider. - // Hence, we return since pod updates are not yet supported. - log.G(ctx).Warnf("skipping update of pod %s as pod updates are not supported", pp.Name) - return nil - } ctx, span := trace.StartSpan(ctx, "createOrUpdatePod") defer span.End() addPodAttributes(ctx, span, pod) - if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil { - span.SetStatus(ocstatus.FromError(err)) - return err - } - ctx = span.WithFields(ctx, log.Fields{ "pod": pod.GetName(), "namespace": pod.GetNamespace(), }) - if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { - podPhase := corev1.PodPending - if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { - podPhase = corev1.PodFailed - } - - pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error - pod.Status.Phase = podPhase - pod.Status.Reason = podStatusReasonProviderFailed - pod.Status.Message = origErr.Error() - - logger := log.G(ctx).WithFields(log.Fields{ - "podPhase": podPhase, - "reason": pod.Status.Reason, - }) - - _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) - if err != nil { - logger.WithError(err).Warn("Failed to update pod status") - } else { - logger.Info("Updated k8s pod status") - } - - span.SetStatus(ocstatus.FromError(origErr)) - return origErr + if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil { + span.SetStatus(ocstatus.FromError(err)) + return err } - log.G(ctx).Info("Created pod in provider") - + // Check if the pod is already known by the provider. + // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. + // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). + if pp, _ := s.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil { + // Pod Update Only Permits update of: + // - `spec.containers[*].image` + // - `spec.initContainers[*].image` + // - `spec.activeDeadlineSeconds` + // - `spec.tolerations` (only additions to existing tolerations) + // compare the hashes of the pod specs to see if the specs actually changed + expected := hashPodSpec(pp.Spec) + if actual := hashPodSpec(pod.Spec); actual != expected { + log.G(ctx).Debugf("Pod %s exists, updating pod in provider", pp.Name) + if origErr := s.provider.UpdatePod(ctx, pod); origErr != nil { + s.handleProviderError(ctx, span, origErr, pod) + return origErr + } + log.G(ctx).Info("Updated pod in provider") + } + } else { + if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { + s.handleProviderError(ctx, span, origErr, pod) + return origErr + } + log.G(ctx).Info("Created pod in provider") + } return nil } +// This is basically the kube runtime's hash container functionality. +// VK only operates at the Pod level so this is adapted for that +func hashPodSpec(spec corev1.PodSpec) uint64 { + hash := fnv.New32a() + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + printer.Fprintf(hash, "%#v", spec) + return uint64(hash.Sum32()) +} + +func (s *Server) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) { + podPhase := corev1.PodPending + if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { + podPhase = corev1.PodFailed + } + + pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error + pod.Status.Phase = podPhase + pod.Status.Reason = podStatusReasonProviderFailed + pod.Status.Message = origErr.Error() + + logger := log.G(ctx).WithFields(log.Fields{ + "podPhase": podPhase, + "reason": pod.Status.Reason, + }) + + _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) + if err != nil { + logger.WithError(err).Warn("Failed to update pod status") + } else { + logger.Info("Updated k8s pod status") + } + span.SetStatus(ocstatus.FromError(origErr)) +} + func (s *Server) deletePod(ctx context.Context, namespace, name string) error { // Grab the pod as known by the provider. // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. diff --git a/vkubelet/pod_test.go b/vkubelet/pod_test.go new file mode 100644 index 000000000..9e02c788f --- /dev/null +++ b/vkubelet/pod_test.go @@ -0,0 +1,286 @@ +package vkubelet + +import ( + "context" + "testing" + + "github.com/virtual-kubelet/virtual-kubelet/providers/mock" + testutil "github.com/virtual-kubelet/virtual-kubelet/test/util" + "gotest.tools/assert" + is "gotest.tools/assert/cmp" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes/fake" +) + +type FakeProvider struct { + *mock.MockProvider + createFn func() + updateFn func() +} + +func (f *FakeProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error { + f.createFn() + return f.MockProvider.CreatePod(ctx, pod) +} + +func (f *FakeProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error { + f.updateFn() + return f.MockProvider.CreatePod(ctx, pod) +} + +type TestServer struct { + *Server + mock *FakeProvider + client *fake.Clientset +} + +func newMockProvider(t *testing.T) (*mock.MockProvider, error) { + return mock.NewMockProviderMockConfig( + mock.MockConfig{}, + "vk123", + "linux", + "127.0.0.1", + 443, + ) +} + +func newTestServer(t *testing.T) *TestServer { + + mockProvider, err := newMockProvider(t) + assert.Check(t, is.Nil(err)) + + fk8s := fake.NewSimpleClientset() + + fakeProvider := &FakeProvider{ + MockProvider: mockProvider, + } + + rm := testutil.FakeResourceManager() + + tsvr := &TestServer{ + Server: &Server{ + namespace: "default", + nodeName: "vk123", + provider: fakeProvider, + resourceManager: rm, + k8sClient: fk8s, + }, + mock: fakeProvider, + client: fk8s, + } + return tsvr +} + +func TestPodHashingEqual(t *testing.T) { + p1 := corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12-perl", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + h1 := hashPodSpec(p1) + + p2 := corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12-perl", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + h2 := hashPodSpec(p2) + assert.Check(t, is.Equal(h1, h2)) +} + +func TestPodHashingDifferent(t *testing.T) { + p1 := corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + h1 := hashPodSpec(p1) + + p2 := corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12-perl", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + h2 := hashPodSpec(p2) + assert.Check(t, h1 != h2) +} + +func TestPodCreateNewPod(t *testing.T) { + svr := newTestServer(t) + + pod := &corev1.Pod{} + pod.ObjectMeta.Namespace = "default" + pod.ObjectMeta.Name = "nginx" + pod.Spec = corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + created := false + updated := false + // The pod doesn't exist, we should invoke the CreatePod() method of the provider + svr.mock.createFn = func() { + created = true + } + svr.mock.updateFn = func() { + updated = true + } + er := testutil.FakeEventRecorder(5) + err := svr.createOrUpdatePod(context.Background(), pod, er) + assert.Check(t, is.Nil(err)) + // createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist + assert.Check(t, created) + assert.Check(t, !updated) +} + +func TestPodUpdateExisting(t *testing.T) { + svr := newTestServer(t) + + pod := &corev1.Pod{} + pod.ObjectMeta.Namespace = "default" + pod.ObjectMeta.Name = "nginx" + pod.Spec = corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + err := svr.mock.MockProvider.CreatePod(context.Background(), pod) + assert.Check(t, is.Nil(err)) + created := false + updated := false + // The pod doesn't exist, we should invoke the CreatePod() method of the provider + svr.mock.createFn = func() { + created = true + } + svr.mock.updateFn = func() { + updated = true + } + + pod2 := &corev1.Pod{} + pod2.ObjectMeta.Namespace = "default" + pod2.ObjectMeta.Name = "nginx" + pod2.Spec = corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12-perl", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + er := testutil.FakeEventRecorder(5) + err = svr.createOrUpdatePod(context.Background(), pod2, er) + assert.Check(t, is.Nil(err)) + + // createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed + assert.Check(t, !created) + assert.Check(t, updated) +} + +func TestPodNoSpecChange(t *testing.T) { + svr := newTestServer(t) + + pod := &corev1.Pod{} + pod.ObjectMeta.Namespace = "default" + pod.ObjectMeta.Name = "nginx" + pod.Spec = corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "nginx", + Image: "nginx:1.15.12", + Ports: []corev1.ContainerPort{ + corev1.ContainerPort{ + ContainerPort: 443, + Protocol: "tcp", + }, + }, + }, + }, + } + + err := svr.mock.MockProvider.CreatePod(context.Background(), pod) + assert.Check(t, is.Nil(err)) + created := false + updated := false + // The pod doesn't exist, we should invoke the CreatePod() method of the provider + svr.mock.createFn = func() { + created = true + } + svr.mock.updateFn = func() { + updated = true + } + + er := testutil.FakeEventRecorder(5) + err = svr.createOrUpdatePod(context.Background(), pod, er) + assert.Check(t, is.Nil(err)) + + // createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change + assert.Check(t, !created) + assert.Check(t, !updated) +} diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index 87e1757c9..1ac1731f9 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -22,7 +22,7 @@ const ( type Server struct { namespace string nodeName string - k8sClient *kubernetes.Clientset + k8sClient kubernetes.Interface provider providers.Provider resourceManager *manager.ResourceManager podSyncWorkers int