diff --git a/node/node.go b/node/node.go index 59ce9d674..702be8f21 100644 --- a/node/node.go +++ b/node/node.go @@ -244,7 +244,12 @@ func (n *NodeController) controlLoop(ctx context.Context) error { statusTimer := time.NewTimer(n.statusInterval) defer statusTimer.Stop() + timerResetDuration := n.statusInterval if n.disableLease { + // when resetting the timer after processing a status update, reset it to the ping interval + // (since it will be the ping timer as n.disableLease == true) + timerResetDuration = n.pingInterval + // hack to make sure this channel always blocks since we won't be using it if !statusTimer.Stop() { <-statusTimer.C @@ -276,7 +281,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error { if err := n.updateStatus(ctx, false); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } - t.Reset(n.statusInterval) + t.Reset(timerResetDuration) case <-statusTimer.C: if err := n.updateStatus(ctx, false); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") diff --git a/node/node_test.go b/node/node_test.go index cb4c3f13c..f1195b90d 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -302,6 +302,78 @@ func TestUpdateNodeLease(t *testing.T) { assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity)) } +// TestPingAfterStatusUpdate checks that Ping continues to be called with the specified interval +// after a node status update occurs, when leases are disabled. +// +// Timing ratios used in this test: +// ping interval (10 ms) +// maximum allowed interval = 2.5 * ping interval +// status update interval = 6 * ping interval +// +// The allowed maximum time is 2.5 times the ping interval because +// the status update resets the ping interval timer, meaning +// that there can be a full two interval durations between +// successive calls to Ping. The extra half is to allow +// for timing variations when using such short durations. +// +// Once the node controller is ready: +// send status update after 10 * ping interval +// end test after another 10 * ping interval +func TestPingAfterStatusUpdate(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := testclient.NewSimpleClientset() + nodes := c.CoreV1().Nodes() + + testP := &testNodeProviderPing{} + + interval := 10 * time.Millisecond + maxAllowedInterval := time.Duration(2.5 * float64(interval.Nanoseconds())) + + opts := []NodeControllerOpt{ + WithNodePingInterval(interval), + WithNodeStatusUpdateInterval(interval * time.Duration(6)), + } + + testNode := testNode(t) + testNodeCopy := testNode.DeepCopy() + + node, err := NewNodeController(testP, testNode, nodes, opts...) + assert.NilError(t, err) + + chErr := make(chan error, 1) + go func() { + chErr <- node.Run(ctx) + }() + + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + + // wait for the node to be ready + select { + case <-timer.C: + t.Fatal("timeout waiting for node to be ready") + case <-chErr: + t.Fatalf("node.Run returned earlier than expected: %v", err) + case <-node.Ready(): + } + + notifyTimer := time.After(interval * time.Duration(10)) + select { + case <-notifyTimer: + testP.triggerStatusUpdate(testNodeCopy) + } + + endTimer := time.After(interval * time.Duration(10)) + select { + case <-endTimer: + break + } + + assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval) +} + func testNode(t *testing.T) *corev1.Node { n := &corev1.Node{} n.Name = strings.ToLower(t.Name()) @@ -323,6 +395,26 @@ func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) { } } +// testNodeProviderPing tracks the maximum time interval between calls to Ping +type testNodeProviderPing struct { + testNodeProvider + lastPingTime time.Time + maxPingInterval time.Duration +} + +func (tnp *testNodeProviderPing) Ping(ctx context.Context) error { + now := time.Now() + if tnp.lastPingTime.IsZero() { + tnp.lastPingTime = now + return nil + } + if now.Sub(tnp.lastPingTime) > tnp.maxPingInterval { + tnp.maxPingInterval = now.Sub(tnp.lastPingTime) + } + tnp.lastPingTime = now + return nil +} + type watchGetter interface { Watch(metav1.ListOptions) (watch.Interface, error) } diff --git a/node/sync.go b/node/sync.go index eca751a25..62d7ebbfe 100644 --- a/node/sync.go +++ b/node/sync.go @@ -152,12 +152,14 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern defer span.End() ctx = addPodAttributes(ctx, span, podFromKubernetes) + var statusErr error podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name) if err != nil { if !errdefs.IsNotFound(err) { span.SetStatus(err) return err } + statusErr = err } if podStatus != nil { pod := podFromKubernetes.DeepCopy() @@ -168,6 +170,7 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes) if err != nil { + span.SetStatus(err) return err } @@ -176,32 +179,35 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern return nil } - // Only change the status when the pod was already up. - // Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation. - if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute { - // Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created. - podStatus = podFromKubernetes.Status.DeepCopy() - podStatus.Phase = corev1.PodFailed - podStatus.Reason = podStatusReasonNotFound - podStatus.Message = podStatusMessageNotFound - now := metav1.NewTime(time.Now()) - for i, c := range podStatus.ContainerStatuses { - if c.State.Running == nil { - continue - } - podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ - ExitCode: containerStatusExitCodeNotFound, - Reason: containerStatusReasonNotFound, - Message: containerStatusMessageNotFound, - FinishedAt: now, - StartedAt: c.State.Running.StartedAt, - ContainerID: c.ContainerID, - } - podStatus.ContainerStatuses[i].State.Running = nil - } - log.G(ctx).Debug("Setting pod not found on pod status") + if podFromKubernetes.Status.Phase != corev1.PodRunning && time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) <= time.Minute { + span.SetStatus(statusErr) + return statusErr } + // Only change the status when the pod was already up. + // Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation. + // Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created. + podStatus = podFromKubernetes.Status.DeepCopy() + podStatus.Phase = corev1.PodFailed + podStatus.Reason = podStatusReasonNotFound + podStatus.Message = podStatusMessageNotFound + now := metav1.NewTime(time.Now()) + for i, c := range podStatus.ContainerStatuses { + if c.State.Running == nil { + continue + } + podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{ + ExitCode: containerStatusExitCodeNotFound, + Reason: containerStatusReasonNotFound, + Message: containerStatusMessageNotFound, + FinishedAt: now, + StartedAt: c.State.Running.StartedAt, + ContainerID: c.ContainerID, + } + podStatus.ContainerStatuses[i].State.Running = nil + } + + log.G(ctx).Debug("Setting pod not found on pod status") pod := podFromKubernetes.DeepCopy() podStatus.DeepCopyInto(&pod.Status) p.notify(pod)