From bec818bf3c2324445597a23e9980b205d071912b Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 5 Nov 2018 11:37:00 -0800 Subject: [PATCH] Do not close pod sync, use context cancel instead. (#402) Closing the channel is racey and can lead to a panic on exit. Instead rely on context cancellation to know if workers should exit. --- cmd/root.go | 1 - vkubelet/pod.go | 45 ++++++++++++++++++++++++++++++++++++-------- vkubelet/vkubelet.go | 21 ++++++++++----------- 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 0a0e668ae..14bcd63d7 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -95,7 +95,6 @@ This allows users to schedule kubernetes workloads on nodes that aren't running signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) go func() { <-sig - f.Stop() cancel() }() diff --git a/vkubelet/pod.go b/vkubelet/pod.go index f7c26f82a..03d3cc613 100644 --- a/vkubelet/pod.go +++ b/vkubelet/pod.go @@ -43,7 +43,13 @@ func (s *Server) onAddPod(ctx context.Context, obj interface{}) { if s.resourceManager.UpdatePod(pod) { span.Annotate(nil, "Add pod to synchronizer channel.") - s.podCh <- &podNotification{pod: pod, ctx: ctx} + select { + case <-ctx.Done(): + logger = logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) + logger.WithError(ctx.Err()).Debug("Cancel send pod event due to cancelled context") + return + case s.podCh <- &podNotification{pod: pod, ctx: ctx}: + } } } @@ -65,7 +71,13 @@ func (s *Server) onUpdatePod(ctx context.Context, obj interface{}) { if s.resourceManager.UpdatePod(pod) { span.Annotate(nil, "Add pod to synchronizer channel.") - s.podCh <- &podNotification{pod: pod, ctx: ctx} + select { + case <-ctx.Done(): + logger = logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) + logger.WithError(ctx.Err()).Debug("Cancel send pod event due to cancelled context") + return + case s.podCh <- &podNotification{pod: pod, ctx: ctx}: + } } } @@ -96,7 +108,13 @@ func (s *Server) onDeletePod(ctx context.Context, obj interface{}) { if s.resourceManager.DeletePod(pod) { span.Annotate(nil, "Add pod to synchronizer channel.") - s.podCh <- &podNotification{pod: pod, ctx: ctx} + select { + case <-ctx.Done(): + logger = logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()) + logger.WithError(ctx.Err()).Debug("Cancel send pod event due to cancelled context") + return + case s.podCh <- &podNotification{pod: pod, ctx: ctx}: + } } } @@ -104,11 +122,15 @@ func (s *Server) startPodSynchronizer(ctx context.Context, id int) { logger := log.G(ctx).WithField("method", "startPodSynchronizer").WithField("podSynchronizer", id) logger.Debug("Start pod synchronizer") - for event := range s.podCh { - s.syncPod(event.ctx, event.pod) + for { + select { + case <-ctx.Done(): + logger.Info("Stop pod syncronizer") + return + case event := <-s.podCh: + s.syncPod(event.ctx, event.pod) + } } - - logger.Info("pod channel is closed.") } func (s *Server) syncPod(ctx context.Context, pod *corev1.Pod) { @@ -217,6 +239,13 @@ func (s *Server) updatePodStatuses(ctx context.Context) { span.AddAttributes(trace.Int64Attribute("nPods", int64(len(pods)))) for _, pod := range pods { + select { + case <-ctx.Done(): + span.Annotate(nil, ctx.Err().Error()) + return + default: + } + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed || pod.Status.Reason == podStatusReasonProviderFailed { @@ -281,7 +310,7 @@ func (s *Server) watchForPodEvent(ctx context.Context) error { time.Minute, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { + AddFunc: func(obj interface{}) { s.onAddPod(ctx, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index fa9bcaf3d..9eb906884 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -104,11 +104,16 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) { tick := time.Tick(5 * time.Second) go func() { - for range tick { - ctx, span := trace.StartSpan(ctx, "syncActualState") - s.updateNode(ctx) - s.updatePodStatuses(ctx) - span.End() + for { + select { + case <-ctx.Done(): + return + case <-tick: + ctx, span := trace.StartSpan(ctx, "syncActualState") + s.updateNode(ctx) + s.updatePodStatuses(ctx) + span.End() + } } }() @@ -128,12 +133,6 @@ func (s *Server) Run(ctx context.Context) error { return nil } -// Stop shutsdown the server. -// It does not shutdown pods assigned to the virtual node. -func (s *Server) Stop() { - close(s.podCh) -} - // reconcile is the main reconciliation loop that compares differences between Kubernetes and // the active provider and reconciles the differences. func (s *Server) reconcile(ctx context.Context) {