Compare commits
4 Commits
dependabot
...
v1.2.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e6e1dbed87 | ||
|
|
eb9498cdde | ||
|
|
df16317a89 | ||
|
|
7585e11542 |
@@ -244,7 +244,12 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
|
|||||||
|
|
||||||
statusTimer := time.NewTimer(n.statusInterval)
|
statusTimer := time.NewTimer(n.statusInterval)
|
||||||
defer statusTimer.Stop()
|
defer statusTimer.Stop()
|
||||||
|
timerResetDuration := n.statusInterval
|
||||||
if n.disableLease {
|
if n.disableLease {
|
||||||
|
// when resetting the timer after processing a status update, reset it to the ping interval
|
||||||
|
// (since it will be the ping timer as n.disableLease == true)
|
||||||
|
timerResetDuration = n.pingInterval
|
||||||
|
|
||||||
// hack to make sure this channel always blocks since we won't be using it
|
// hack to make sure this channel always blocks since we won't be using it
|
||||||
if !statusTimer.Stop() {
|
if !statusTimer.Stop() {
|
||||||
<-statusTimer.C
|
<-statusTimer.C
|
||||||
@@ -276,7 +281,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
|
|||||||
if err := n.updateStatus(ctx, false); err != nil {
|
if err := n.updateStatus(ctx, false); err != nil {
|
||||||
log.G(ctx).WithError(err).Error("Error handling node status update")
|
log.G(ctx).WithError(err).Error("Error handling node status update")
|
||||||
}
|
}
|
||||||
t.Reset(n.statusInterval)
|
t.Reset(timerResetDuration)
|
||||||
case <-statusTimer.C:
|
case <-statusTimer.C:
|
||||||
if err := n.updateStatus(ctx, false); err != nil {
|
if err := n.updateStatus(ctx, false); err != nil {
|
||||||
log.G(ctx).WithError(err).Error("Error handling node status update")
|
log.G(ctx).WithError(err).Error("Error handling node status update")
|
||||||
|
|||||||
@@ -302,6 +302,78 @@ func TestUpdateNodeLease(t *testing.T) {
|
|||||||
assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity))
|
assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestPingAfterStatusUpdate checks that Ping continues to be called with the specified interval
|
||||||
|
// after a node status update occurs, when leases are disabled.
|
||||||
|
//
|
||||||
|
// Timing ratios used in this test:
|
||||||
|
// ping interval (10 ms)
|
||||||
|
// maximum allowed interval = 2.5 * ping interval
|
||||||
|
// status update interval = 6 * ping interval
|
||||||
|
//
|
||||||
|
// The allowed maximum time is 2.5 times the ping interval because
|
||||||
|
// the status update resets the ping interval timer, meaning
|
||||||
|
// that there can be a full two interval durations between
|
||||||
|
// successive calls to Ping. The extra half is to allow
|
||||||
|
// for timing variations when using such short durations.
|
||||||
|
//
|
||||||
|
// Once the node controller is ready:
|
||||||
|
// send status update after 10 * ping interval
|
||||||
|
// end test after another 10 * ping interval
|
||||||
|
func TestPingAfterStatusUpdate(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
c := testclient.NewSimpleClientset()
|
||||||
|
nodes := c.CoreV1().Nodes()
|
||||||
|
|
||||||
|
testP := &testNodeProviderPing{}
|
||||||
|
|
||||||
|
interval := 10 * time.Millisecond
|
||||||
|
maxAllowedInterval := time.Duration(2.5 * float64(interval.Nanoseconds()))
|
||||||
|
|
||||||
|
opts := []NodeControllerOpt{
|
||||||
|
WithNodePingInterval(interval),
|
||||||
|
WithNodeStatusUpdateInterval(interval * time.Duration(6)),
|
||||||
|
}
|
||||||
|
|
||||||
|
testNode := testNode(t)
|
||||||
|
testNodeCopy := testNode.DeepCopy()
|
||||||
|
|
||||||
|
node, err := NewNodeController(testP, testNode, nodes, opts...)
|
||||||
|
assert.NilError(t, err)
|
||||||
|
|
||||||
|
chErr := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
chErr <- node.Run(ctx)
|
||||||
|
}()
|
||||||
|
|
||||||
|
timer := time.NewTimer(10 * time.Second)
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
// wait for the node to be ready
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
t.Fatal("timeout waiting for node to be ready")
|
||||||
|
case <-chErr:
|
||||||
|
t.Fatalf("node.Run returned earlier than expected: %v", err)
|
||||||
|
case <-node.Ready():
|
||||||
|
}
|
||||||
|
|
||||||
|
notifyTimer := time.After(interval * time.Duration(10))
|
||||||
|
select {
|
||||||
|
case <-notifyTimer:
|
||||||
|
testP.triggerStatusUpdate(testNodeCopy)
|
||||||
|
}
|
||||||
|
|
||||||
|
endTimer := time.After(interval * time.Duration(10))
|
||||||
|
select {
|
||||||
|
case <-endTimer:
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval)
|
||||||
|
}
|
||||||
|
|
||||||
func testNode(t *testing.T) *corev1.Node {
|
func testNode(t *testing.T) *corev1.Node {
|
||||||
n := &corev1.Node{}
|
n := &corev1.Node{}
|
||||||
n.Name = strings.ToLower(t.Name())
|
n.Name = strings.ToLower(t.Name())
|
||||||
@@ -323,6 +395,26 @@ func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// testNodeProviderPing tracks the maximum time interval between calls to Ping
|
||||||
|
type testNodeProviderPing struct {
|
||||||
|
testNodeProvider
|
||||||
|
lastPingTime time.Time
|
||||||
|
maxPingInterval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tnp *testNodeProviderPing) Ping(ctx context.Context) error {
|
||||||
|
now := time.Now()
|
||||||
|
if tnp.lastPingTime.IsZero() {
|
||||||
|
tnp.lastPingTime = now
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if now.Sub(tnp.lastPingTime) > tnp.maxPingInterval {
|
||||||
|
tnp.maxPingInterval = now.Sub(tnp.lastPingTime)
|
||||||
|
}
|
||||||
|
tnp.lastPingTime = now
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type watchGetter interface {
|
type watchGetter interface {
|
||||||
Watch(metav1.ListOptions) (watch.Interface, error)
|
Watch(metav1.ListOptions) (watch.Interface, error)
|
||||||
}
|
}
|
||||||
|
|||||||
54
node/sync.go
54
node/sync.go
@@ -152,12 +152,14 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
|
|||||||
defer span.End()
|
defer span.End()
|
||||||
ctx = addPodAttributes(ctx, span, podFromKubernetes)
|
ctx = addPodAttributes(ctx, span, podFromKubernetes)
|
||||||
|
|
||||||
|
var statusErr error
|
||||||
podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
|
podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errdefs.IsNotFound(err) {
|
if !errdefs.IsNotFound(err) {
|
||||||
span.SetStatus(err)
|
span.SetStatus(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
statusErr = err
|
||||||
}
|
}
|
||||||
if podStatus != nil {
|
if podStatus != nil {
|
||||||
pod := podFromKubernetes.DeepCopy()
|
pod := podFromKubernetes.DeepCopy()
|
||||||
@@ -168,6 +170,7 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
|
|||||||
|
|
||||||
key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes)
|
key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetStatus(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -176,32 +179,35 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only change the status when the pod was already up.
|
if podFromKubernetes.Status.Phase != corev1.PodRunning && time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) <= time.Minute {
|
||||||
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
|
span.SetStatus(statusErr)
|
||||||
if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute {
|
return statusErr
|
||||||
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
|
|
||||||
podStatus = podFromKubernetes.Status.DeepCopy()
|
|
||||||
podStatus.Phase = corev1.PodFailed
|
|
||||||
podStatus.Reason = podStatusReasonNotFound
|
|
||||||
podStatus.Message = podStatusMessageNotFound
|
|
||||||
now := metav1.NewTime(time.Now())
|
|
||||||
for i, c := range podStatus.ContainerStatuses {
|
|
||||||
if c.State.Running == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
|
|
||||||
ExitCode: containerStatusExitCodeNotFound,
|
|
||||||
Reason: containerStatusReasonNotFound,
|
|
||||||
Message: containerStatusMessageNotFound,
|
|
||||||
FinishedAt: now,
|
|
||||||
StartedAt: c.State.Running.StartedAt,
|
|
||||||
ContainerID: c.ContainerID,
|
|
||||||
}
|
|
||||||
podStatus.ContainerStatuses[i].State.Running = nil
|
|
||||||
}
|
|
||||||
log.G(ctx).Debug("Setting pod not found on pod status")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Only change the status when the pod was already up.
|
||||||
|
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
|
||||||
|
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
|
||||||
|
podStatus = podFromKubernetes.Status.DeepCopy()
|
||||||
|
podStatus.Phase = corev1.PodFailed
|
||||||
|
podStatus.Reason = podStatusReasonNotFound
|
||||||
|
podStatus.Message = podStatusMessageNotFound
|
||||||
|
now := metav1.NewTime(time.Now())
|
||||||
|
for i, c := range podStatus.ContainerStatuses {
|
||||||
|
if c.State.Running == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
|
||||||
|
ExitCode: containerStatusExitCodeNotFound,
|
||||||
|
Reason: containerStatusReasonNotFound,
|
||||||
|
Message: containerStatusMessageNotFound,
|
||||||
|
FinishedAt: now,
|
||||||
|
StartedAt: c.State.Running.StartedAt,
|
||||||
|
ContainerID: c.ContainerID,
|
||||||
|
}
|
||||||
|
podStatus.ContainerStatuses[i].State.Running = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.G(ctx).Debug("Setting pod not found on pod status")
|
||||||
pod := podFromKubernetes.DeepCopy()
|
pod := podFromKubernetes.DeepCopy()
|
||||||
podStatus.DeepCopyInto(&pod.Status)
|
podStatus.DeepCopyInto(&pod.Status)
|
||||||
p.notify(pod)
|
p.notify(pod)
|
||||||
|
|||||||
Reference in New Issue
Block a user