Compare commits

...

4 Commits
0.01 ... v1.2.1

Author SHA1 Message Date
Brian Goff
e6e1dbed87 Merge pull request #794 from cpuguy83/cherry_picks_1.2.1
Cherry picks 1.2.1
2019-11-15 15:19:11 -08:00
Thomas Hartland
eb9498cdde Add test for node ping interval
(cherry picked from commit 3783a39b26)
2019-11-15 14:31:04 -08:00
Thomas Hartland
df16317a89 After handling status update, reset update timer with correct duration
If the ping timer is being used, it should be reset with the ping update
interval. If the status update interval is used then Ping stops being
called for long enough to cause kubernetes to mark the node as NotReady.

(cherry picked from commit c258614d8f)
2019-11-15 14:30:54 -08:00
Brian Goff
7585e11542 [Sync Provider] Fix panic on not found pod status
(cherry picked from commit 6e33b0f084)
2019-11-15 14:30:22 -08:00
3 changed files with 128 additions and 25 deletions

View File

@@ -244,7 +244,12 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
statusTimer := time.NewTimer(n.statusInterval)
defer statusTimer.Stop()
timerResetDuration := n.statusInterval
if n.disableLease {
// when resetting the timer after processing a status update, reset it to the ping interval
// (since it will be the ping timer as n.disableLease == true)
timerResetDuration = n.pingInterval
// hack to make sure this channel always blocks since we won't be using it
if !statusTimer.Stop() {
<-statusTimer.C
@@ -276,7 +281,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
t.Reset(n.statusInterval)
t.Reset(timerResetDuration)
case <-statusTimer.C:
if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")

View File

@@ -302,6 +302,78 @@ func TestUpdateNodeLease(t *testing.T) {
assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity))
}
// TestPingAfterStatusUpdate checks that Ping continues to be called with the specified interval
// after a node status update occurs, when leases are disabled.
//
// Timing ratios used in this test:
// ping interval (10 ms)
// maximum allowed interval = 2.5 * ping interval
// status update interval = 6 * ping interval
//
// The allowed maximum time is 2.5 times the ping interval because
// the status update resets the ping interval timer, meaning
// that there can be a full two interval durations between
// successive calls to Ping. The extra half is to allow
// for timing variations when using such short durations.
//
// Once the node controller is ready:
// send status update after 10 * ping interval
// end test after another 10 * ping interval
func TestPingAfterStatusUpdate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
nodes := c.CoreV1().Nodes()
testP := &testNodeProviderPing{}
interval := 10 * time.Millisecond
maxAllowedInterval := time.Duration(2.5 * float64(interval.Nanoseconds()))
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
WithNodeStatusUpdateInterval(interval * time.Duration(6)),
}
testNode := testNode(t)
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error, 1)
go func() {
chErr <- node.Run(ctx)
}()
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
// wait for the node to be ready
select {
case <-timer.C:
t.Fatal("timeout waiting for node to be ready")
case <-chErr:
t.Fatalf("node.Run returned earlier than expected: %v", err)
case <-node.Ready():
}
notifyTimer := time.After(interval * time.Duration(10))
select {
case <-notifyTimer:
testP.triggerStatusUpdate(testNodeCopy)
}
endTimer := time.After(interval * time.Duration(10))
select {
case <-endTimer:
break
}
assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval)
}
func testNode(t *testing.T) *corev1.Node {
n := &corev1.Node{}
n.Name = strings.ToLower(t.Name())
@@ -323,6 +395,26 @@ func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
}
}
// testNodeProviderPing tracks the maximum time interval between calls to Ping
type testNodeProviderPing struct {
testNodeProvider
lastPingTime time.Time
maxPingInterval time.Duration
}
func (tnp *testNodeProviderPing) Ping(ctx context.Context) error {
now := time.Now()
if tnp.lastPingTime.IsZero() {
tnp.lastPingTime = now
return nil
}
if now.Sub(tnp.lastPingTime) > tnp.maxPingInterval {
tnp.maxPingInterval = now.Sub(tnp.lastPingTime)
}
tnp.lastPingTime = now
return nil
}
type watchGetter interface {
Watch(metav1.ListOptions) (watch.Interface, error)
}

View File

@@ -152,12 +152,14 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
defer span.End()
ctx = addPodAttributes(ctx, span, podFromKubernetes)
var statusErr error
podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
if err != nil {
if !errdefs.IsNotFound(err) {
span.SetStatus(err)
return err
}
statusErr = err
}
if podStatus != nil {
pod := podFromKubernetes.DeepCopy()
@@ -168,6 +170,7 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes)
if err != nil {
span.SetStatus(err)
return err
}
@@ -176,9 +179,13 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
return nil
}
if podFromKubernetes.Status.Phase != corev1.PodRunning && time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) <= time.Minute {
span.SetStatus(statusErr)
return statusErr
}
// Only change the status when the pod was already up.
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute {
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
podStatus = podFromKubernetes.Status.DeepCopy()
podStatus.Phase = corev1.PodFailed
@@ -199,9 +206,8 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
}
podStatus.ContainerStatuses[i].State.Running = nil
}
log.G(ctx).Debug("Setting pod not found on pod status")
}
log.G(ctx).Debug("Setting pod not found on pod status")
pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status)
p.notify(pod)