Add async provider pod status updates (#493)

This adds a new interface that a provider can implement which enables
async notifications of pod status changes rather than the existing loop
which goes through every pod in k8s and checks the status in the
provider.
In practice this should be significantly more efficient since we are not
constantly listing all pods and then looking up the status in the
provider.

For providers that do not support this interface, the old method is
still used to sync state from the provider.

This commit does not update any of the providers to support this
interface.
This commit is contained in:
Brian Goff
2019-04-01 09:07:26 -07:00
committed by GitHub
parent 5afb24809d
commit 1942522cf6
5 changed files with 183 additions and 99 deletions

View File

@@ -61,3 +61,14 @@ type Provider interface {
type PodMetricsProvider interface {
GetStatsSummary(context.Context) (*stats.Summary, error)
}
// PodNotifier notifies callers of pod changes.
// Providers should implement this interface to enable callers to be notified
// of pod status updates asyncronously.
type PodNotifier interface {
// NotifyPods instructs the notifier to call the passed in function when
// the pod status changes.
//
// NotifyPods should not block callers.
NotifyPods(context.Context, func(*v1.Pod))
}

View File

@@ -2,7 +2,6 @@ package vkubelet
import (
"context"
"sync"
"time"
"github.com/cpuguy83/strongerrors/status/ocstatus"
@@ -12,7 +11,10 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
@@ -136,57 +138,42 @@ func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name str
}
// updatePodStatuses syncs the providers pod status with the kubernetes pod status.
func (s *Server) updatePodStatuses(ctx context.Context) {
func (s *Server) updatePodStatuses(ctx context.Context, q workqueue.RateLimitingInterface) {
ctx, span := trace.StartSpan(ctx, "updatePodStatuses")
defer span.End()
// Update all the pods with the provider status.
pods := s.resourceManager.GetPods()
pods, err := s.podInformer.Lister().List(labels.Everything())
if err != nil {
err = pkgerrors.Wrap(err, "error getting pod list")
span.SetStatus(ocstatus.FromError(err))
log.G(ctx).WithError(err).Error("Error updating pod statuses")
return
}
ctx = span.WithField(ctx, "nPods", int64(len(pods)))
sema := make(chan struct{}, s.podSyncWorkers)
var wg sync.WaitGroup
wg.Add(len(pods))
for _, pod := range pods {
go func(pod *corev1.Pod) {
defer wg.Done()
select {
case <-ctx.Done():
span.SetStatus(ocstatus.FromError(ctx.Err()))
return
case sema <- struct{}{}:
if !shouldSkipPodStatusUpdate(pod) {
s.enqueuePodStatusUpdate(ctx, q, pod)
}
defer func() { <-sema }()
if err := s.updatePodStatus(ctx, pod); err != nil {
log.G(ctx).WithFields(log.Fields{
"pod": pod.GetName(),
"namespace": pod.GetNamespace(),
"status": pod.Status.Phase,
"reason": pod.Status.Reason,
}).Error(err)
}
}
}(pod)
}
wg.Wait()
func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodSucceeded ||
pod.Status.Phase == corev1.PodFailed ||
pod.Status.Reason == podStatusReasonProviderFailed
}
func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {
if shouldSkipPodStatusUpdate(pod) {
return nil
}
ctx, span := trace.StartSpan(ctx, "updatePodStatus")
defer span.End()
ctx = addPodAttributes(ctx, span, pod)
if pod.Status.Phase == corev1.PodSucceeded ||
pod.Status.Phase == corev1.PodFailed ||
pod.Status.Reason == podStatusReasonProviderFailed {
return nil
}
status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name)
if err != nil {
span.SetStatus(ocstatus.FromError(err))
@@ -230,3 +217,33 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {
return nil
}
func (s *Server) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key")
} else {
q.AddRateLimited(key)
}
}
func (s *Server) podStatusHandler(ctx context.Context, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "podStatusHandler")
defer span.End()
defer func() {
span.SetStatus(ocstatus.FromError(retErr))
}()
ctx = span.WithField(ctx, "key", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return pkgerrors.Wrap(err, "error spliting cache key")
}
pod, err := s.podInformer.Lister().Pods(namespace).Get(name)
if err != nil {
return pkgerrors.Wrap(err, "error looking up pod")
}
return s.updatePodStatus(ctx, pod)
}

View File

@@ -39,11 +39,6 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/log"
)
const (
// maxRetries is the number of times we try to process a given key before permanently forgetting it.
maxRetries = 20
)
// PodController is the controller implementation for Pod resources.
type PodController struct {
// server is the instance to which this controller belongs.
@@ -158,11 +153,6 @@ func (pc *PodController) runWorker(ctx context.Context, workerId string) {
// processNextWorkItem will read a single work item off the work queue and attempt to process it,by calling the syncHandler.
func (pc *PodController) processNextWorkItem(ctx context.Context, workerId string) bool {
obj, shutdown := pc.workqueue.Get()
if shutdown {
return false
}
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
ctx, span := trace.StartSpan(ctx, "processNextWorkItem")
@@ -170,52 +160,7 @@ func (pc *PodController) processNextWorkItem(ctx context.Context, workerId strin
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerId", workerId)
// We wrap this block in a func so we can defer pc.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the work queue knows we have finished processing this item.
// We also must remember to call Forget if we do not want this work item being re-queued.
// For example, we do not call Forget if a transient error occurs.
// Instead, the item is put back on the work queue and attempted again after a back-off period.
defer pc.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the work queue.
// These are of the form namespace/name.
// We do this as the delayed nature of the work queue means the items in the informer cache may actually be more up to date that when the item was initially put onto the workqueue.
if key, ok = obj.(string); !ok {
// As the item in the work queue is actually invalid, we call Forget here else we'd go into a loop of attempting to process a work item that is invalid.
pc.workqueue.Forget(obj)
log.G(ctx).Warnf("expected string in work queue but got %#v", obj)
return nil
}
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key)
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
if err := pc.syncHandler(ctx, key); err != nil {
if pc.workqueue.NumRequeues(key) < maxRetries {
// Put the item back on the work queue to handle any transient errors.
log.G(ctx).Warnf("requeuing %q due to failed sync: %v", key, err)
pc.workqueue.AddRateLimited(key)
return nil
}
// We've exceeded the maximum retries, so we must forget the key.
pc.workqueue.Forget(key)
return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key)
}
// Finally, if no error occurs we Forget this item so it does not get queued again until another change happens.
pc.workqueue.Forget(obj)
return nil
}(obj)
if err != nil {
// We've actually hit an error, so we set the span's status based on the error.
span.SetStatus(ocstatus.FromError(err))
log.G(ctx).Error(err)
return true
}
return true
return handleQueueItem(ctx, pc.workqueue, pc.syncHandler)
}
// syncHandler compares the actual state with the desired, and attempts to converge the two.

73
vkubelet/queue.go Normal file
View File

@@ -0,0 +1,73 @@
package vkubelet
import (
"context"
"github.com/cpuguy83/strongerrors/status/ocstatus"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"k8s.io/client-go/util/workqueue"
)
const (
// maxRetries is the number of times we try to process a given key before permanently forgetting it.
maxRetries = 20
)
type queueHandler func(ctx context.Context, key string) error
func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, handler queueHandler) bool {
ctx, span := trace.StartSpan(ctx, "handleQueueItem")
defer span.End()
obj, shutdown := q.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
// We call Done here so the work queue knows we have finished processing this item.
// We also must remember to call Forget if we do not want this work item being re-queued.
// For example, we do not call Forget if a transient error occurs.
// Instead, the item is put back on the work queue and attempted again after a back-off period.
defer q.Done(obj)
var key string
var ok bool
// We expect strings to come off the work queue.
// These are of the form namespace/name.
// We do this as the delayed nature of the work queue means the items in the informer cache may actually be more up to date that when the item was initially put onto the workqueue.
if key, ok = obj.(string); !ok {
// As the item in the work queue is actually invalid, we call Forget here else we'd go into a loop of attempting to process a work item that is invalid.
q.Forget(obj)
log.G(ctx).Warnf("expected string in work queue item but got %#v", obj)
return nil
}
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key)
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
if err := handler(ctx, key); err != nil {
if q.NumRequeues(key) < maxRetries {
// Put the item back on the work queue to handle any transient errors.
log.G(ctx).Warnf("requeuing %q due to failed sync: %v", key, err)
q.AddRateLimited(key)
return nil
}
// We've exceeded the maximum retries, so we must forget the key.
q.Forget(key)
return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key)
}
// Finally, if no error occurs we Forget this item so it does not get queued again until another change happens.
q.Forget(obj)
return nil
}(obj)
if err != nil {
// We've actually hit an error, so we set the span's status based on the error.
span.SetStatus(ocstatus.FromError(err))
log.G(ctx).Error(err)
return true
}
return true
}

View File

@@ -2,14 +2,16 @@ package vkubelet
import (
"context"
"strconv"
"time"
"go.opencensus.io/trace"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
)
const (
@@ -61,13 +63,25 @@ func New(cfg Config) *Server {
// info to the Kubernetes API Server, such as logs, metrics, exec, etc.
// See `AttachPodRoutes` and `AttachMetricsRoutes` to set these up.
func (s *Server) Run(ctx context.Context) error {
go s.providerSyncLoop(ctx)
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podStatusUpdate")
go s.runProviderSyncWorkers(ctx, q)
if pn, ok := s.provider.(providers.PodNotifier); ok {
pn.NotifyPods(ctx, func(pod *corev1.Pod) {
s.enqueuePodStatusUpdate(ctx, q, pod)
})
} else {
go s.providerSyncLoop(ctx, q)
}
return NewPodController(s).Run(ctx, s.podSyncWorkers)
}
// providerSyncLoop syncronizes pod states from the provider back to kubernetes
func (s *Server) providerSyncLoop(ctx context.Context) {
// Deprecated: This is only used when the provider does not support async updates
// Providers should implement async update support, even if it just means copying
// something like this in.
func (s *Server) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) {
const sleepTime = 5 * time.Second
t := time.NewTimer(sleepTime)
@@ -81,7 +95,7 @@ func (s *Server) providerSyncLoop(ctx context.Context) {
t.Stop()
ctx, span := trace.StartSpan(ctx, "syncActualState")
s.updatePodStatuses(ctx)
s.updatePodStatuses(ctx, q)
span.End()
// restart the timer
@@ -89,3 +103,27 @@ func (s *Server) providerSyncLoop(ctx context.Context) {
}
}
}
func (s *Server) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface) {
for i := 0; i < s.podSyncWorkers; i++ {
go func(index int) {
workerID := strconv.Itoa(index)
s.runProviderSyncWorker(ctx, workerID, q)
}(i)
}
}
func (s *Server) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for s.processPodStatusUpdate(ctx, workerID, q) {
}
}
func (s *Server) processPodStatusUpdate(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
ctx, span := trace.StartSpan(ctx, "processPodStatusUpdate")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerID", workerID)
return handleQueueItem(ctx, q, s.podStatusHandler)
}