Compare commits

...

6 Commits

Author SHA1 Message Date
Brian Goff
4e399f93b1 Merge pull request #868 from cpuguy83/1.2_fix_missing_stats_route
Add GetStatsSummary to PodHandlerConfig
2020-07-30 14:02:41 -07:00
Vilmos Nebehaj
c621acc8d2 Add GetStatsSummary to PodHandlerConfig
If both the metrics routes and the pod routes are attached to the same
mux with the pattern "/", it will panic. Instead, add the stats handler
function to PodHandlerConfig and set up the route if it is not nil.

(cherry picked from commit 56b248c854)
Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2020-07-30 10:25:34 -07:00
Brian Goff
e6e1dbed87 Merge pull request #794 from cpuguy83/cherry_picks_1.2.1
Cherry picks 1.2.1
2019-11-15 15:19:11 -08:00
Thomas Hartland
eb9498cdde Add test for node ping interval
(cherry picked from commit 3783a39b26)
2019-11-15 14:31:04 -08:00
Thomas Hartland
df16317a89 After handling status update, reset update timer with correct duration
If the ping timer is being used, it should be reset with the ping update
interval. If the status update interval is used then Ping stops being
called for long enough to cause kubernetes to mark the node as NotReady.

(cherry picked from commit c258614d8f)
2019-11-15 14:30:54 -08:00
Brian Goff
7585e11542 [Sync Provider] Fix panic on not found pod status
(cherry picked from commit 6e33b0f084)
2019-11-15 14:30:22 -08:00
4 changed files with 135 additions and 25 deletions

View File

@@ -36,6 +36,7 @@ type PodHandlerConfig struct {
RunInContainer ContainerExecHandlerFunc RunInContainer ContainerExecHandlerFunc
GetContainerLogs ContainerLogsHandlerFunc GetContainerLogs ContainerLogsHandlerFunc
GetPods PodListerFunc GetPods PodListerFunc
GetStatsSummary PodStatsSummaryHandlerFunc
} }
// PodHandler creates an http handler for interacting with pods/containers. // PodHandler creates an http handler for interacting with pods/containers.
@@ -49,6 +50,12 @@ func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
} }
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", HandleContainerLogs(p.GetContainerLogs)).Methods("GET") r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", HandleContainerLogs(p.GetContainerLogs)).Methods("GET")
r.HandleFunc("/exec/{namespace}/{pod}/{container}", HandleContainerExec(p.RunInContainer)).Methods("POST") r.HandleFunc("/exec/{namespace}/{pod}/{container}", HandleContainerExec(p.RunInContainer)).Methods("POST")
if p.GetStatsSummary != nil {
f := HandlePodStatsSummary(p.GetStatsSummary)
r.HandleFunc("/stats/summary", f).Methods("GET")
r.HandleFunc("/stats/summary/", f).Methods("GET")
}
r.NotFoundHandler = http.HandlerFunc(NotFound) r.NotFoundHandler = http.HandlerFunc(NotFound)
return r return r
} }

View File

@@ -244,7 +244,12 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
statusTimer := time.NewTimer(n.statusInterval) statusTimer := time.NewTimer(n.statusInterval)
defer statusTimer.Stop() defer statusTimer.Stop()
timerResetDuration := n.statusInterval
if n.disableLease { if n.disableLease {
// when resetting the timer after processing a status update, reset it to the ping interval
// (since it will be the ping timer as n.disableLease == true)
timerResetDuration = n.pingInterval
// hack to make sure this channel always blocks since we won't be using it // hack to make sure this channel always blocks since we won't be using it
if !statusTimer.Stop() { if !statusTimer.Stop() {
<-statusTimer.C <-statusTimer.C
@@ -276,7 +281,7 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
if err := n.updateStatus(ctx, false); err != nil { if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update") log.G(ctx).WithError(err).Error("Error handling node status update")
} }
t.Reset(n.statusInterval) t.Reset(timerResetDuration)
case <-statusTimer.C: case <-statusTimer.C:
if err := n.updateStatus(ctx, false); err != nil { if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update") log.G(ctx).WithError(err).Error("Error handling node status update")

View File

@@ -302,6 +302,78 @@ func TestUpdateNodeLease(t *testing.T) {
assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity)) assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity))
} }
// TestPingAfterStatusUpdate checks that Ping continues to be called with the specified interval
// after a node status update occurs, when leases are disabled.
//
// Timing ratios used in this test:
// ping interval (10 ms)
// maximum allowed interval = 2.5 * ping interval
// status update interval = 6 * ping interval
//
// The allowed maximum time is 2.5 times the ping interval because
// the status update resets the ping interval timer, meaning
// that there can be a full two interval durations between
// successive calls to Ping. The extra half is to allow
// for timing variations when using such short durations.
//
// Once the node controller is ready:
// send status update after 10 * ping interval
// end test after another 10 * ping interval
func TestPingAfterStatusUpdate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
nodes := c.CoreV1().Nodes()
testP := &testNodeProviderPing{}
interval := 10 * time.Millisecond
maxAllowedInterval := time.Duration(2.5 * float64(interval.Nanoseconds()))
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
WithNodeStatusUpdateInterval(interval * time.Duration(6)),
}
testNode := testNode(t)
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error, 1)
go func() {
chErr <- node.Run(ctx)
}()
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
// wait for the node to be ready
select {
case <-timer.C:
t.Fatal("timeout waiting for node to be ready")
case <-chErr:
t.Fatalf("node.Run returned earlier than expected: %v", err)
case <-node.Ready():
}
notifyTimer := time.After(interval * time.Duration(10))
select {
case <-notifyTimer:
testP.triggerStatusUpdate(testNodeCopy)
}
endTimer := time.After(interval * time.Duration(10))
select {
case <-endTimer:
break
}
assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval)
}
func testNode(t *testing.T) *corev1.Node { func testNode(t *testing.T) *corev1.Node {
n := &corev1.Node{} n := &corev1.Node{}
n.Name = strings.ToLower(t.Name()) n.Name = strings.ToLower(t.Name())
@@ -323,6 +395,26 @@ func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
} }
} }
// testNodeProviderPing tracks the maximum time interval between calls to Ping
type testNodeProviderPing struct {
testNodeProvider
lastPingTime time.Time
maxPingInterval time.Duration
}
func (tnp *testNodeProviderPing) Ping(ctx context.Context) error {
now := time.Now()
if tnp.lastPingTime.IsZero() {
tnp.lastPingTime = now
return nil
}
if now.Sub(tnp.lastPingTime) > tnp.maxPingInterval {
tnp.maxPingInterval = now.Sub(tnp.lastPingTime)
}
tnp.lastPingTime = now
return nil
}
type watchGetter interface { type watchGetter interface {
Watch(metav1.ListOptions) (watch.Interface, error) Watch(metav1.ListOptions) (watch.Interface, error)
} }

View File

@@ -152,12 +152,14 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
defer span.End() defer span.End()
ctx = addPodAttributes(ctx, span, podFromKubernetes) ctx = addPodAttributes(ctx, span, podFromKubernetes)
var statusErr error
podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name) podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
if err != nil { if err != nil {
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
span.SetStatus(err) span.SetStatus(err)
return err return err
} }
statusErr = err
} }
if podStatus != nil { if podStatus != nil {
pod := podFromKubernetes.DeepCopy() pod := podFromKubernetes.DeepCopy()
@@ -168,6 +170,7 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes) key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes)
if err != nil { if err != nil {
span.SetStatus(err)
return err return err
} }
@@ -176,32 +179,35 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
return nil return nil
} }
// Only change the status when the pod was already up. if podFromKubernetes.Status.Phase != corev1.PodRunning && time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) <= time.Minute {
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation. span.SetStatus(statusErr)
if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute { return statusErr
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
podStatus = podFromKubernetes.Status.DeepCopy()
podStatus.Phase = corev1.PodFailed
podStatus.Reason = podStatusReasonNotFound
podStatus.Message = podStatusMessageNotFound
now := metav1.NewTime(time.Now())
for i, c := range podStatus.ContainerStatuses {
if c.State.Running == nil {
continue
}
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: containerStatusExitCodeNotFound,
Reason: containerStatusReasonNotFound,
Message: containerStatusMessageNotFound,
FinishedAt: now,
StartedAt: c.State.Running.StartedAt,
ContainerID: c.ContainerID,
}
podStatus.ContainerStatuses[i].State.Running = nil
}
log.G(ctx).Debug("Setting pod not found on pod status")
} }
// Only change the status when the pod was already up.
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
podStatus = podFromKubernetes.Status.DeepCopy()
podStatus.Phase = corev1.PodFailed
podStatus.Reason = podStatusReasonNotFound
podStatus.Message = podStatusMessageNotFound
now := metav1.NewTime(time.Now())
for i, c := range podStatus.ContainerStatuses {
if c.State.Running == nil {
continue
}
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: containerStatusExitCodeNotFound,
Reason: containerStatusReasonNotFound,
Message: containerStatusMessageNotFound,
FinishedAt: now,
StartedAt: c.State.Running.StartedAt,
ContainerID: c.ContainerID,
}
podStatus.ContainerStatuses[i].State.Running = nil
}
log.G(ctx).Debug("Setting pod not found on pod status")
pod := podFromKubernetes.DeepCopy() pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status) podStatus.DeepCopyInto(&pod.Status)
p.notify(pod) p.notify(pod)