diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index 98c6a7815..5be2b6e82 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -192,47 +192,62 @@ func (s *Server) registerNode() error { // Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster. // Run will block until Stop is called or a SIGINT or SIGTERM signal is received. func (s *Server) Run() error { + shouldStop := false + sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) go func() { <-sig + shouldStop = true s.Stop() }() - opts := metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", s.nodeName).String(), - } - - pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts) - if err != nil { - log.Fatal(err) - } - s.resourceManager.SetPods(pods) - s.reconcile() - - opts.ResourceVersion = pods.ResourceVersion - s.podWatcher, err = s.k8sClient.CoreV1().Pods(s.namespace).Watch(opts) - if err != nil { - log.Fatal(err) - } - for { - select { - case ev, ok := <-s.podWatcher.ResultChan(): - if !ok { - return nil - } - - switch ev.Type { - case watch.Added: - s.resourceManager.AddPod(ev.Object.(*corev1.Pod)) - case watch.Modified: - s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod)) - case watch.Deleted: - s.resourceManager.DeletePod(ev.Object.(*corev1.Pod)) - } - s.reconcile() + opts := metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("spec.nodeName", s.nodeName).String(), } + + pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts) + if err != nil { + log.Fatal("Failed to list pods", err) + } + s.resourceManager.SetPods(pods) + s.reconcile() + + opts.ResourceVersion = pods.ResourceVersion + s.podWatcher, err = s.k8sClient.CoreV1().Pods(s.namespace).Watch(opts) + if err != nil { + log.Fatal("Failed to watch pods", err) + } + + loop: + for { + select { + case ev, ok := <-s.podWatcher.ResultChan(): + if !ok { + if shouldStop { + log.Println("Pod watcher is stopped.") + return nil + } + + log.Println("Pod watcher connection is closed unexpectedly.") + break loop + } + + log.Println("Pod watcher event is received:", ev.Type) + switch ev.Type { + case watch.Added: + s.resourceManager.AddPod(ev.Object.(*corev1.Pod)) + case watch.Modified: + s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod)) + case watch.Deleted: + s.resourceManager.DeletePod(ev.Object.(*corev1.Pod)) + } + s.reconcile() + } + } + + time.Sleep(5 * time.Second) } }