Add an extra loop when the pod watcher is closed unexpectedly. (#226)
This commit is contained in:
@@ -192,47 +192,62 @@ func (s *Server) registerNode() error {
|
|||||||
// Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster.
|
// Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster.
|
||||||
// Run will block until Stop is called or a SIGINT or SIGTERM signal is received.
|
// Run will block until Stop is called or a SIGINT or SIGTERM signal is received.
|
||||||
func (s *Server) Run() error {
|
func (s *Server) Run() error {
|
||||||
|
shouldStop := false
|
||||||
|
|
||||||
sig := make(chan os.Signal, 1)
|
sig := make(chan os.Signal, 1)
|
||||||
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
|
||||||
go func() {
|
go func() {
|
||||||
<-sig
|
<-sig
|
||||||
|
shouldStop = true
|
||||||
s.Stop()
|
s.Stop()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
opts := metav1.ListOptions{
|
|
||||||
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", s.nodeName).String(),
|
|
||||||
}
|
|
||||||
|
|
||||||
pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
s.resourceManager.SetPods(pods)
|
|
||||||
s.reconcile()
|
|
||||||
|
|
||||||
opts.ResourceVersion = pods.ResourceVersion
|
|
||||||
s.podWatcher, err = s.k8sClient.CoreV1().Pods(s.namespace).Watch(opts)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
opts := metav1.ListOptions{
|
||||||
case ev, ok := <-s.podWatcher.ResultChan():
|
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", s.nodeName).String(),
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
switch ev.Type {
|
|
||||||
case watch.Added:
|
|
||||||
s.resourceManager.AddPod(ev.Object.(*corev1.Pod))
|
|
||||||
case watch.Modified:
|
|
||||||
s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
|
|
||||||
case watch.Deleted:
|
|
||||||
s.resourceManager.DeletePod(ev.Object.(*corev1.Pod))
|
|
||||||
}
|
|
||||||
s.reconcile()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to list pods", err)
|
||||||
|
}
|
||||||
|
s.resourceManager.SetPods(pods)
|
||||||
|
s.reconcile()
|
||||||
|
|
||||||
|
opts.ResourceVersion = pods.ResourceVersion
|
||||||
|
s.podWatcher, err = s.k8sClient.CoreV1().Pods(s.namespace).Watch(opts)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to watch pods", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ev, ok := <-s.podWatcher.ResultChan():
|
||||||
|
if !ok {
|
||||||
|
if shouldStop {
|
||||||
|
log.Println("Pod watcher is stopped.")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("Pod watcher connection is closed unexpectedly.")
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("Pod watcher event is received:", ev.Type)
|
||||||
|
switch ev.Type {
|
||||||
|
case watch.Added:
|
||||||
|
s.resourceManager.AddPod(ev.Object.(*corev1.Pod))
|
||||||
|
case watch.Modified:
|
||||||
|
s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
|
||||||
|
case watch.Deleted:
|
||||||
|
s.resourceManager.DeletePod(ev.Object.(*corev1.Pod))
|
||||||
|
}
|
||||||
|
s.reconcile()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user