Merge pull request #654 from cpuguy83/node_defaults

Update node defaults
This commit is contained in:
Brian Goff
2019-06-10 11:32:14 -07:00
committed by GitHub
3 changed files with 56 additions and 41 deletions

View File

@@ -35,6 +35,7 @@ import (
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
@@ -140,13 +141,17 @@ func runRootCommand(ctx context.Context, c Opts) error {
"watchedNamespace": c.KubeNamespace, "watchedNamespace": c.KubeNamespace,
})) }))
var leaseClient v1beta1.LeaseInterface
if c.EnableNodeLease {
leaseClient = client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
}
pNode := NodeFromProvider(ctx, c.NodeName, taint, p) pNode := NodeFromProvider(ctx, c.NodeName, taint, p)
node, err := vkubelet.NewNode( node, err := vkubelet.NewNode(
vkubelet.NaiveNodeProvider{}, vkubelet.NaiveNodeProvider{},
pNode, pNode,
client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease),
client.CoreV1().Nodes(), client.CoreV1().Nodes(),
vkubelet.WithNodeDisableLease(!c.EnableNodeLease), vkubelet.WithNodeEnableLeaseV1Beta1(leaseClient, nil),
vkubelet.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error { vkubelet.WithNodeStatusUpdateErrorHandler(func(ctx context.Context, err error) error {
if !k8serrors.IsNotFound(err) { if !k8serrors.IsNotFound(err) {
return err return err

View File

@@ -45,8 +45,11 @@ type NodeProvider interface {
// //
// Use the node's `Run` method to register and run the loops to update the node // Use the node's `Run` method to register and run the loops to update the node
// in Kubernetes. // in Kubernetes.
func NewNode(p NodeProvider, node *corev1.Node, leases v1beta1.LeaseInterface, nodes v1.NodeInterface, opts ...NodeOpt) (*Node, error) { //
n := &Node{p: p, n: node, leases: leases, nodes: nodes, chReady: make(chan struct{})} // Note: When if there are multiple NodeOpts which apply against the same
// underlying options, the last NodeOpt will win.
func NewNode(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeOpt) (*Node, error) {
n := &Node{p: p, n: node, nodes: nodes, chReady: make(chan struct{})}
for _, o := range opts { for _, o := range opts {
if err := o(n); err != nil { if err := o(n); err != nil {
return nil, pkgerrors.Wrap(err, "error applying node option") return nil, pkgerrors.Wrap(err, "error applying node option")
@@ -58,17 +61,31 @@ func NewNode(p NodeProvider, node *corev1.Node, leases v1beta1.LeaseInterface, n
// NodeOpt are the functional options used for configuring a node // NodeOpt are the functional options used for configuring a node
type NodeOpt func(*Node) error type NodeOpt func(*Node) error
// WithNodeDisableLease forces node leases to be disabled and to only update // WithNodeEnableLeaseV1Beta1 enables support for v1beta1 leases.
// using node status // If client is nil, leases will not be enabled.
// Note that this will force status updates to occur on the ping interval frequency // If baseLease is nil, a default base lease will be used.
func WithNodeDisableLease(v bool) NodeOpt { //
// The lease will be updated after each successful node ping. To change the
// lease update interval, you must set the node ping interval.
// See WithNodePingInterval().
//
// This also affects the frequency of node status updates:
// - When leases are *not* enabled (or are disabled due to no support on the cluster)
// the node status is updated at every ping interval.
// - When node leases are enabled, node status updates are colled by the
// node status update interval option.
// To set a custom node status update interval, see WithNodeStatusUpdateInterval().
func WithNodeEnableLeaseV1Beta1(client v1beta1.LeaseInterface, baseLease *coord.Lease) NodeOpt {
return func(n *Node) error { return func(n *Node) error {
n.disableLease = v n.leases = client
n.lease = baseLease
return nil return nil
} }
} }
// WithNodePingInterval sets the inteval for checking node status // WithNodePingInterval sets the interval for checking node status
// If node leases are not supported (or not enabled), this is the frequency
// with which the node status will be updated in Kubernetes.
func WithNodePingInterval(d time.Duration) NodeOpt { func WithNodePingInterval(d time.Duration) NodeOpt {
return func(n *Node) error { return func(n *Node) error {
n.pingInterval = d n.pingInterval = d
@@ -79,6 +96,8 @@ func WithNodePingInterval(d time.Duration) NodeOpt {
// WithNodeStatusUpdateInterval sets the interval for updating node status // WithNodeStatusUpdateInterval sets the interval for updating node status
// This is only used when leases are supported and only for updating the actual // This is only used when leases are supported and only for updating the actual
// node status, not the node lease. // node status, not the node lease.
// When node leases are not enabled (or are not supported on the cluster) this
// has no affect and node status is updated on the "ping" interval.
func WithNodeStatusUpdateInterval(d time.Duration) NodeOpt { func WithNodeStatusUpdateInterval(d time.Duration) NodeOpt {
return func(n *Node) error { return func(n *Node) error {
n.statusInterval = d n.statusInterval = d
@@ -105,15 +124,6 @@ func WithNodeStatusUpdateErrorHandler(h ErrorHandler) NodeOpt {
// progress can continue (or a retry is possible). // progress can continue (or a retry is possible).
type ErrorHandler func(context.Context, error) error type ErrorHandler func(context.Context, error) error
// WithNodeLease sets the base node lease to use.
// If a lease time is set, it will be ignored.
func WithNodeLease(l *coord.Lease) NodeOpt {
return func(n *Node) error {
n.lease = l
return nil
}
}
// Node deals with creating and managing a node object in Kubernetes. // Node deals with creating and managing a node object in Kubernetes.
// It can register a node with Kubernetes and periodically update its status. // It can register a node with Kubernetes and periodically update its status.
type Node struct { type Node struct {
@@ -136,7 +146,7 @@ type Node struct {
// The default intervals used for lease and status updates. // The default intervals used for lease and status updates.
const ( const (
DefaultPingInterval = 5 * time.Second DefaultPingInterval = 10 * time.Second
DefaultStatusUpdateInterval = 1 * time.Minute DefaultStatusUpdateInterval = 1 * time.Minute
) )
@@ -169,27 +179,25 @@ func (n *Node) Run(ctx context.Context) error {
return err return err
} }
if !n.disableLease { if n.leases == nil {
n.lease = newLease(n.lease) n.disableLease = true
setLeaseAttrs(n.lease, n.n, n.pingInterval*5) return n.controlLoop(ctx)
}
l, err := ensureLease(ctx, n.leases, n.lease) n.lease = newLease(n.lease)
if err != nil { setLeaseAttrs(n.lease, n.n, n.pingInterval*5)
if errors.IsNotFound(err) {
n.disableLease = true l, err := ensureLease(ctx, n.leases, n.lease)
} else { if err != nil {
return pkgerrors.Wrap(err, "error creating node lease") if !errors.IsNotFound(err) {
} return pkgerrors.Wrap(err, "error creating node lease")
} }
log.G(ctx).Debug("Created node lease")
n.lease = l
}
if n.disableLease {
log.G(ctx).Info("Node leases not supported, falling back to only node status updates") log.G(ctx).Info("Node leases not supported, falling back to only node status updates")
n.disableLease = true
} }
n.lease = l
log.G(ctx).Debug("Created node lease")
return n.controlLoop(ctx) return n.controlLoop(ctx)
} }

View File

@@ -33,11 +33,14 @@ func testNodeRun(t *testing.T, enableLease bool) {
leases := c.Coordination().Leases(corev1.NamespaceNodeLease) leases := c.Coordination().Leases(corev1.NamespaceNodeLease)
interval := 1 * time.Millisecond interval := 1 * time.Millisecond
node, err := NewNode(testP, testNode(t), leases, nodes, opts := []NodeOpt{
WithNodePingInterval(interval), WithNodePingInterval(interval),
WithNodeStatusUpdateInterval(interval), WithNodeStatusUpdateInterval(interval),
WithNodeDisableLease(!enableLease), }
) if enableLease {
opts = append(opts, WithNodeEnableLeaseV1Beta1(leases, nil))
}
node, err := NewNode(testP, testNode(t), nodes, opts...)
assert.NilError(t, err) assert.NilError(t, err)
chErr := make(chan error) chErr := make(chan error)
@@ -157,12 +160,11 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
node, err := NewNode(testP, testNode(t), nil, nodes, node, err := NewNode(testP, testNode(t), nodes,
WithNodeStatusUpdateErrorHandler(func(_ context.Context, err error) error { WithNodeStatusUpdateErrorHandler(func(_ context.Context, err error) error {
cancel() cancel()
return nil return nil
}), }),
WithNodeDisableLease(true),
) )
assert.NilError(t, err) assert.NilError(t, err)