diff --git a/vkubelet/pod.go b/vkubelet/pod.go index a507fa833..b7b7d9627 100644 --- a/vkubelet/pod.go +++ b/vkubelet/pod.go @@ -3,6 +3,7 @@ package vkubelet import ( "context" "fmt" + "sync" "time" "github.com/cpuguy83/strongerrors/status/ocstatus" @@ -138,19 +139,31 @@ func (s *Server) updatePodStatuses(ctx context.Context) { pods := s.resourceManager.GetPods() span.AddAttributes(trace.Int64Attribute("nPods", int64(len(pods)))) - for _, pod := range pods { - select { - case <-ctx.Done(): - span.Annotate(nil, ctx.Err().Error()) - return - default: - } + sema := make(chan struct{}, s.podSyncWorkers) + var wg sync.WaitGroup + wg.Add(len(pods)) - if err := s.updatePodStatus(ctx, pod); err != nil { - logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()).WithField("status", pod.Status.Phase).WithField("reason", pod.Status.Reason) - logger.Error(err) - } + for _, pod := range pods { + go func(pod *corev1.Pod) { + defer wg.Done() + + select { + case <-ctx.Done(): + span.SetStatus(ocstatus.FromError(ctx.Err())) + return + case sema <- struct{}{}: + } + defer func() { <-sema }() + + if err := s.updatePodStatus(ctx, pod); err != nil { + logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace()).WithField("status", pod.Status.Phase).WithField("reason", pod.Status.Reason) + logger.Error(err) + } + + }(pod) } + + wg.Wait() } func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {