Light up UpdatePod (#613)
* Light up UpdatePod This PR updates the vkublet/pod.go createOrUpdate(..) method to actually handle updates. It gets the pod from the provider as before, but now if it exists the method checks the hash of the spec against the spec of the new pod. If they've changed, it calls UpdatePod(..). Also makes a small change to the Server struct to swap from kuberentes.Clientset to kubernetes.Interface to better facilitate testing with fake ClientSet. Co-Authored-By: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
committed by
Brian Goff
parent
a3f933d998
commit
87e72bf4df
1
Gopkg.lock
generated
1
Gopkg.lock
generated
@@ -1334,6 +1334,7 @@
|
||||
"github.com/cpuguy83/strongerrors",
|
||||
"github.com/cpuguy83/strongerrors/status",
|
||||
"github.com/cpuguy83/strongerrors/status/ocstatus",
|
||||
"github.com/davecgh/go-spew/spew",
|
||||
"github.com/google/go-cmp/cmp",
|
||||
"github.com/google/uuid",
|
||||
"github.com/gophercloud/gophercloud",
|
||||
|
||||
@@ -71,6 +71,32 @@ func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalI
|
||||
return &provider, nil
|
||||
}
|
||||
|
||||
// NewMockProviderMockConfig creates a new MockProvider with the given Mock Config
|
||||
func NewMockProviderMockConfig(config MockConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
|
||||
|
||||
//set defaults
|
||||
if config.CPU == "" {
|
||||
config.CPU = defaultCPUCapacity
|
||||
}
|
||||
if config.Memory == "" {
|
||||
config.Memory = defaultMemoryCapacity
|
||||
}
|
||||
if config.Pods == "" {
|
||||
config.Pods = defaultPodCapacity
|
||||
}
|
||||
|
||||
provider := MockProvider{
|
||||
nodeName: nodeName,
|
||||
operatingSystem: operatingSystem,
|
||||
internalIP: internalIP,
|
||||
daemonEndpointPort: daemonEndpointPort,
|
||||
pods: make(map[string]*v1.Pod),
|
||||
config: config,
|
||||
startTime: time.Now(),
|
||||
}
|
||||
return &provider, nil
|
||||
}
|
||||
|
||||
// loadConfig loads the given json configuration files.
|
||||
|
||||
func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) {
|
||||
|
||||
111
vkubelet/pod.go
111
vkubelet/pod.go
@@ -2,9 +2,11 @@ package vkubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"hash/fnv"
|
||||
"time"
|
||||
|
||||
"github.com/cpuguy83/strongerrors/status/ocstatus"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
pkgerrors "github.com/pkg/errors"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
||||
@@ -28,62 +30,89 @@ func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) con
|
||||
}
|
||||
|
||||
func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorder record.EventRecorder) error {
|
||||
// Check if the pod is already known by the provider.
|
||||
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
|
||||
// Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod).
|
||||
if pp, _ := s.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil {
|
||||
// The pod has already been created in the provider.
|
||||
// Hence, we return since pod updates are not yet supported.
|
||||
log.G(ctx).Warnf("skipping update of pod %s as pod updates are not supported", pp.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "createOrUpdatePod")
|
||||
defer span.End()
|
||||
addPodAttributes(ctx, span, pod)
|
||||
|
||||
if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil {
|
||||
span.SetStatus(ocstatus.FromError(err))
|
||||
return err
|
||||
}
|
||||
|
||||
ctx = span.WithFields(ctx, log.Fields{
|
||||
"pod": pod.GetName(),
|
||||
"namespace": pod.GetNamespace(),
|
||||
})
|
||||
|
||||
if origErr := s.provider.CreatePod(ctx, pod); origErr != nil {
|
||||
podPhase := corev1.PodPending
|
||||
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
|
||||
podPhase = corev1.PodFailed
|
||||
}
|
||||
|
||||
pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
|
||||
pod.Status.Phase = podPhase
|
||||
pod.Status.Reason = podStatusReasonProviderFailed
|
||||
pod.Status.Message = origErr.Error()
|
||||
|
||||
logger := log.G(ctx).WithFields(log.Fields{
|
||||
"podPhase": podPhase,
|
||||
"reason": pod.Status.Reason,
|
||||
})
|
||||
|
||||
_, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warn("Failed to update pod status")
|
||||
} else {
|
||||
logger.Info("Updated k8s pod status")
|
||||
}
|
||||
|
||||
span.SetStatus(ocstatus.FromError(origErr))
|
||||
return origErr
|
||||
if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil {
|
||||
span.SetStatus(ocstatus.FromError(err))
|
||||
return err
|
||||
}
|
||||
|
||||
log.G(ctx).Info("Created pod in provider")
|
||||
|
||||
// Check if the pod is already known by the provider.
|
||||
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
|
||||
// Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod).
|
||||
if pp, _ := s.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil {
|
||||
// Pod Update Only Permits update of:
|
||||
// - `spec.containers[*].image`
|
||||
// - `spec.initContainers[*].image`
|
||||
// - `spec.activeDeadlineSeconds`
|
||||
// - `spec.tolerations` (only additions to existing tolerations)
|
||||
// compare the hashes of the pod specs to see if the specs actually changed
|
||||
expected := hashPodSpec(pp.Spec)
|
||||
if actual := hashPodSpec(pod.Spec); actual != expected {
|
||||
log.G(ctx).Debugf("Pod %s exists, updating pod in provider", pp.Name)
|
||||
if origErr := s.provider.UpdatePod(ctx, pod); origErr != nil {
|
||||
s.handleProviderError(ctx, span, origErr, pod)
|
||||
return origErr
|
||||
}
|
||||
log.G(ctx).Info("Updated pod in provider")
|
||||
}
|
||||
} else {
|
||||
if origErr := s.provider.CreatePod(ctx, pod); origErr != nil {
|
||||
s.handleProviderError(ctx, span, origErr, pod)
|
||||
return origErr
|
||||
}
|
||||
log.G(ctx).Info("Created pod in provider")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This is basically the kube runtime's hash container functionality.
|
||||
// VK only operates at the Pod level so this is adapted for that
|
||||
func hashPodSpec(spec corev1.PodSpec) uint64 {
|
||||
hash := fnv.New32a()
|
||||
printer := spew.ConfigState{
|
||||
Indent: " ",
|
||||
SortKeys: true,
|
||||
DisableMethods: true,
|
||||
SpewKeys: true,
|
||||
}
|
||||
printer.Fprintf(hash, "%#v", spec)
|
||||
return uint64(hash.Sum32())
|
||||
}
|
||||
|
||||
func (s *Server) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) {
|
||||
podPhase := corev1.PodPending
|
||||
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
|
||||
podPhase = corev1.PodFailed
|
||||
}
|
||||
|
||||
pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
|
||||
pod.Status.Phase = podPhase
|
||||
pod.Status.Reason = podStatusReasonProviderFailed
|
||||
pod.Status.Message = origErr.Error()
|
||||
|
||||
logger := log.G(ctx).WithFields(log.Fields{
|
||||
"podPhase": podPhase,
|
||||
"reason": pod.Status.Reason,
|
||||
})
|
||||
|
||||
_, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warn("Failed to update pod status")
|
||||
} else {
|
||||
logger.Info("Updated k8s pod status")
|
||||
}
|
||||
span.SetStatus(ocstatus.FromError(origErr))
|
||||
}
|
||||
|
||||
func (s *Server) deletePod(ctx context.Context, namespace, name string) error {
|
||||
// Grab the pod as known by the provider.
|
||||
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
|
||||
|
||||
286
vkubelet/pod_test.go
Normal file
286
vkubelet/pod_test.go
Normal file
@@ -0,0 +1,286 @@
|
||||
package vkubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/mock"
|
||||
testutil "github.com/virtual-kubelet/virtual-kubelet/test/util"
|
||||
"gotest.tools/assert"
|
||||
is "gotest.tools/assert/cmp"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
)
|
||||
|
||||
type FakeProvider struct {
|
||||
*mock.MockProvider
|
||||
createFn func()
|
||||
updateFn func()
|
||||
}
|
||||
|
||||
func (f *FakeProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
f.createFn()
|
||||
return f.MockProvider.CreatePod(ctx, pod)
|
||||
}
|
||||
|
||||
func (f *FakeProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
|
||||
f.updateFn()
|
||||
return f.MockProvider.CreatePod(ctx, pod)
|
||||
}
|
||||
|
||||
type TestServer struct {
|
||||
*Server
|
||||
mock *FakeProvider
|
||||
client *fake.Clientset
|
||||
}
|
||||
|
||||
func newMockProvider(t *testing.T) (*mock.MockProvider, error) {
|
||||
return mock.NewMockProviderMockConfig(
|
||||
mock.MockConfig{},
|
||||
"vk123",
|
||||
"linux",
|
||||
"127.0.0.1",
|
||||
443,
|
||||
)
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T) *TestServer {
|
||||
|
||||
mockProvider, err := newMockProvider(t)
|
||||
assert.Check(t, is.Nil(err))
|
||||
|
||||
fk8s := fake.NewSimpleClientset()
|
||||
|
||||
fakeProvider := &FakeProvider{
|
||||
MockProvider: mockProvider,
|
||||
}
|
||||
|
||||
rm := testutil.FakeResourceManager()
|
||||
|
||||
tsvr := &TestServer{
|
||||
Server: &Server{
|
||||
namespace: "default",
|
||||
nodeName: "vk123",
|
||||
provider: fakeProvider,
|
||||
resourceManager: rm,
|
||||
k8sClient: fk8s,
|
||||
},
|
||||
mock: fakeProvider,
|
||||
client: fk8s,
|
||||
}
|
||||
return tsvr
|
||||
}
|
||||
|
||||
func TestPodHashingEqual(t *testing.T) {
|
||||
p1 := corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12-perl",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
h1 := hashPodSpec(p1)
|
||||
|
||||
p2 := corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12-perl",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
h2 := hashPodSpec(p2)
|
||||
assert.Check(t, is.Equal(h1, h2))
|
||||
}
|
||||
|
||||
func TestPodHashingDifferent(t *testing.T) {
|
||||
p1 := corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
h1 := hashPodSpec(p1)
|
||||
|
||||
p2 := corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12-perl",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
h2 := hashPodSpec(p2)
|
||||
assert.Check(t, h1 != h2)
|
||||
}
|
||||
|
||||
func TestPodCreateNewPod(t *testing.T) {
|
||||
svr := newTestServer(t)
|
||||
|
||||
pod := &corev1.Pod{}
|
||||
pod.ObjectMeta.Namespace = "default"
|
||||
pod.ObjectMeta.Name = "nginx"
|
||||
pod.Spec = corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
created := false
|
||||
updated := false
|
||||
// The pod doesn't exist, we should invoke the CreatePod() method of the provider
|
||||
svr.mock.createFn = func() {
|
||||
created = true
|
||||
}
|
||||
svr.mock.updateFn = func() {
|
||||
updated = true
|
||||
}
|
||||
er := testutil.FakeEventRecorder(5)
|
||||
err := svr.createOrUpdatePod(context.Background(), pod, er)
|
||||
assert.Check(t, is.Nil(err))
|
||||
// createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist
|
||||
assert.Check(t, created)
|
||||
assert.Check(t, !updated)
|
||||
}
|
||||
|
||||
func TestPodUpdateExisting(t *testing.T) {
|
||||
svr := newTestServer(t)
|
||||
|
||||
pod := &corev1.Pod{}
|
||||
pod.ObjectMeta.Namespace = "default"
|
||||
pod.ObjectMeta.Name = "nginx"
|
||||
pod.Spec = corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := svr.mock.MockProvider.CreatePod(context.Background(), pod)
|
||||
assert.Check(t, is.Nil(err))
|
||||
created := false
|
||||
updated := false
|
||||
// The pod doesn't exist, we should invoke the CreatePod() method of the provider
|
||||
svr.mock.createFn = func() {
|
||||
created = true
|
||||
}
|
||||
svr.mock.updateFn = func() {
|
||||
updated = true
|
||||
}
|
||||
|
||||
pod2 := &corev1.Pod{}
|
||||
pod2.ObjectMeta.Namespace = "default"
|
||||
pod2.ObjectMeta.Name = "nginx"
|
||||
pod2.Spec = corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12-perl",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
er := testutil.FakeEventRecorder(5)
|
||||
err = svr.createOrUpdatePod(context.Background(), pod2, er)
|
||||
assert.Check(t, is.Nil(err))
|
||||
|
||||
// createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed
|
||||
assert.Check(t, !created)
|
||||
assert.Check(t, updated)
|
||||
}
|
||||
|
||||
func TestPodNoSpecChange(t *testing.T) {
|
||||
svr := newTestServer(t)
|
||||
|
||||
pod := &corev1.Pod{}
|
||||
pod.ObjectMeta.Namespace = "default"
|
||||
pod.ObjectMeta.Name = "nginx"
|
||||
pod.Spec = corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
corev1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx:1.15.12",
|
||||
Ports: []corev1.ContainerPort{
|
||||
corev1.ContainerPort{
|
||||
ContainerPort: 443,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := svr.mock.MockProvider.CreatePod(context.Background(), pod)
|
||||
assert.Check(t, is.Nil(err))
|
||||
created := false
|
||||
updated := false
|
||||
// The pod doesn't exist, we should invoke the CreatePod() method of the provider
|
||||
svr.mock.createFn = func() {
|
||||
created = true
|
||||
}
|
||||
svr.mock.updateFn = func() {
|
||||
updated = true
|
||||
}
|
||||
|
||||
er := testutil.FakeEventRecorder(5)
|
||||
err = svr.createOrUpdatePod(context.Background(), pod, er)
|
||||
assert.Check(t, is.Nil(err))
|
||||
|
||||
// createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change
|
||||
assert.Check(t, !created)
|
||||
assert.Check(t, !updated)
|
||||
}
|
||||
@@ -22,7 +22,7 @@ const (
|
||||
type Server struct {
|
||||
namespace string
|
||||
nodeName string
|
||||
k8sClient *kubernetes.Clientset
|
||||
k8sClient kubernetes.Interface
|
||||
provider providers.Provider
|
||||
resourceManager *manager.ResourceManager
|
||||
podSyncWorkers int
|
||||
|
||||
Reference in New Issue
Block a user