[VK] Use Cache controller and Make create/delete pod Concurrently (#373)

* Add k8s.io/client-go/tools/cache package

* Add cache controller

* Add pod creator and terminator

* Pod Synchronizer

* Clean up

* Add back reconcile

* Remove unnecessary space in log

* Incorprate feedbacks

* dep ensure

* Fix the syntax error

* Fix the merge errors

* Minor Refactor

* Set status

* Pass context together with the pod to the pod channel

* Change to use flag to specify the number of pod sync workers

* Remove the unused const

* Use Stable PROD Region WestUS in Test

EastUS2EUAP is not reliable
This commit is contained in:
Robbie Zhang
2018-10-16 17:20:02 -07:00
committed by GitHub
parent b082eced13
commit 4a7b74ed42
40 changed files with 7173 additions and 89 deletions

View File

@@ -3,6 +3,7 @@ package vkubelet
import (
"context"
"fmt"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
@@ -10,7 +11,10 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
func addPodAttributes(span *trace.Span, pod *corev1.Pod) {
@@ -21,6 +25,108 @@ func addPodAttributes(span *trace.Span, pod *corev1.Pod) {
)
}
func (s *Server) onAddPod(ctx context.Context, obj interface{}) {
ctx, span := trace.StartSpan(ctx, "onAddPod")
defer span.End()
logger := log.G(ctx).WithField("method", "onAddPod")
pod := obj.(*corev1.Pod)
if pod == nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: fmt.Sprintf("Unexpected object from event: %v", obj)})
logger.Errorf("obj is not a valid pod: %v", obj)
return
}
addPodAttributes(span, pod)
logger.Debugf("Receive added pod '%s/%s' ", pod.GetNamespace(), pod.GetName())
if s.resourceManager.UpdatePod(pod) {
span.Annotate(nil, "Add pod to synchronizer channel.")
s.podCh <- &podNotification{pod: pod, ctx: ctx}
}
}
func (s *Server) onUpdatePod(ctx context.Context, obj interface{}) {
ctx, span := trace.StartSpan(ctx, "onUpdatePod")
defer span.End()
logger := log.G(ctx).WithField("method", "onUpdatePod")
pod := obj.(*corev1.Pod)
if pod == nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: fmt.Sprintf("Unexpected object from event: %v", obj)})
logger.Errorf("obj is not a valid pod: %v", obj)
return
}
addPodAttributes(span, pod)
logger.Debugf("Receive updated pod '%s/%s'", pod.GetNamespace(), pod.GetName())
if s.resourceManager.UpdatePod(pod) {
span.Annotate(nil, "Add pod to synchronizer channel.")
s.podCh <- &podNotification{pod: pod, ctx: ctx}
}
}
func (s *Server) onDeletePod(ctx context.Context, obj interface{}) {
ctx, span := trace.StartSpan(ctx, "onDeletePod")
defer span.End()
logger := log.G(ctx).WithField("method", "onDeletePod")
pod := obj.(*corev1.Pod)
if pod == nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: fmt.Sprintf("Unexpected object from event: %v", obj)})
logger.Errorf("obj is not a valid pod: %v", obj)
return
}
addPodAttributes(span, pod)
logger.Debugf("Receive deleted pod '%s/%s'", pod.GetNamespace(), pod.GetName())
if s.resourceManager.DeletePod(pod) {
span.Annotate(nil, "Add pod to synchronizer channel.")
s.podCh <- &podNotification{pod: pod, ctx: ctx}
}
}
func (s *Server) startPodSynchronizer(ctx context.Context, id int) {
logger := log.G(ctx).WithField("method", "startPodSynchronizer").WithField("podSynchronizer", id)
logger.Debug("Start pod synchronizer")
for event := range s.podCh {
s.syncPod(event.ctx, event.pod)
}
logger.Info("pod channel is closed.")
}
func (s *Server) syncPod(ctx context.Context, pod *corev1.Pod) {
ctx, span := trace.StartSpan(ctx, "syncPod")
defer span.End()
logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
addPodAttributes(span, pod)
if pod.DeletionTimestamp != nil {
span.Annotate(nil, "Delete pod")
logger.Debugf("Deleting pod")
if err := s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Failed to delete pod")
}
} else {
span.Annotate(nil, "Create pod")
logger.Debugf("Creating pod")
if err := s.createPod(ctx, pod); err != nil {
logger.WithError(err).Errorf("Failed to create pod")
}
}
}
func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "createPod")
defer span.End()
@@ -31,7 +137,7 @@ func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
return err
}
logger := log.G(ctx).WithField("pod", pod.Name)
logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
if origErr := s.provider.CreatePod(ctx, pod); origErr != nil {
podPhase := corev1.PodPending
@@ -41,7 +147,7 @@ func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
pod.Status.Phase = podPhase
pod.Status.Reason = PodStatusReason_ProviderFailed
pod.Status.Reason = podStatusReasonProviderFailed
pod.Status.Message = origErr.Error()
_, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
@@ -73,10 +179,10 @@ func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error {
}
span.Annotate(nil, "Deleted pod from provider")
logger := log.G(ctx).WithField("pod", pod.Name).WithField("namespace", pod.Namespace)
logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
if !errors.IsNotFound(delErr) {
var grace int64
if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
if err := s.k8sClient.CoreV1().Pods(pod.GetNamespace()).Delete(pod.GetName(), &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
if errors.IsNotFound(err) {
span.Annotate(nil, "Pod does not exist in k8s, nothing to delete")
return nil
@@ -107,13 +213,13 @@ func (s *Server) updatePodStatuses(ctx context.Context) {
for _, pod := range pods {
if pod.Status.Phase == corev1.PodSucceeded ||
pod.Status.Phase == corev1.PodFailed ||
pod.Status.Reason == PodStatusReason_ProviderFailed {
pod.Status.Reason == podStatusReasonProviderFailed {
continue
}
status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name)
if err != nil {
log.G(ctx).WithField("pod", pod.Name).WithField("namespace", pod.Namespace).Error("Error retrieving pod status")
log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()).Error("Error retrieving pod status")
return
}
@@ -128,39 +234,65 @@ func (s *Server) updatePodStatuses(ctx context.Context) {
// watchForPodEvent waits for pod changes from kubernetes and updates the details accordingly in the local state.
// This returns after a single pod event.
func (s *Server) watchForPodEvent(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case ev, ok := <-s.podWatcher.ResultChan():
if !ok {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return pkgerrors.New("pod watcher connection is closed unexpectedly")
}
ctx, span := trace.StartSpan(ctx, "updateLocalPod")
defer span.End()
span.AddAttributes(trace.StringAttribute("PodEventType", string(ev.Type)))
log.G(ctx).WithField("type", ev.Type).Debug("Pod watcher event is received")
reconcile := false
switch ev.Type {
case watch.Added:
reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
case watch.Modified:
reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
case watch.Deleted:
reconcile = s.resourceManager.DeletePod(ev.Object.(*corev1.Pod))
}
if reconcile {
span.Annotate(nil, "reconciling")
s.reconcile(ctx)
}
}
opts := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", s.nodeName).String(),
}
pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts)
if err != nil {
return pkgerrors.Wrap(err, "error getting pod list")
}
s.resourceManager.SetPods(pods)
s.reconcile(ctx)
opts.ResourceVersion = pods.ResourceVersion
var controller cache.Controller
_, controller = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if controller != nil {
opts.ResourceVersion = controller.LastSyncResourceVersion()
}
return s.k8sClient.Core().Pods(s.namespace).List(opts)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if controller != nil {
opts.ResourceVersion = controller.LastSyncResourceVersion()
}
return s.k8sClient.Core().Pods(s.namespace).Watch(opts)
},
},
&corev1.Pod{},
time.Minute,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
s.onAddPod(ctx, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
s.onUpdatePod(ctx, newObj)
},
DeleteFunc: func(obj interface{}) {
s.onDeletePod(ctx, obj)
},
},
)
for i := 0; i < s.podSyncWorkers; i++ {
go s.startPodSynchronizer(ctx, i)
}
log.G(ctx).Info("Start to run pod cache controller.")
controller.Run(ctx.Done())
return ctx.Err()
}