package vkubelet import ( "context" "time" "github.com/cpuguy83/strongerrors/status/ocstatus" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/providers" "github.com/virtual-kubelet/virtual-kubelet/trace" coord "k8s.io/api/coordination/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/typed/coordination/v1beta1" v1 "k8s.io/client-go/kubernetes/typed/core/v1" ) // NewNode creates a new node. // This does not have any side-effects on the system or kubernetes. // // Use the node's `Run` method to register and run the loops to update the node // in Kubernetes. func NewNode(p providers.NodeProvider, node *corev1.Node, leases v1beta1.LeaseInterface, nodes v1.NodeInterface, opts ...NodeOpt) (*Node, error) { n := &Node{p: p, n: node, leases: leases, nodes: nodes} for _, o := range opts { if err := o(n); err != nil { return nil, pkgerrors.Wrap(err, "error applying node option") } } return n, nil } // NodeOpt are the functional options used for configuring a node type NodeOpt func(*Node) error // WithNodeDisableLease forces node leases to be disabled and to only update // using node status // Note that this will force status updates to occur on the ping interval frequency func WithNodeDisableLease(v bool) NodeOpt { return func(n *Node) error { n.disableLease = v return nil } } // WithNodePingInterval sets the inteval for checking node status func WithNodePingInterval(d time.Duration) NodeOpt { return func(n *Node) error { n.pingInterval = d return nil } } // WithNodeStatusUpdateInterval sets the interval for updating node status // This is only used when leases are supported and only for updating the actual // node status, not the node lease. func WithNodeStatusUpdateInterval(d time.Duration) NodeOpt { return func(n *Node) error { n.statusInterval = d return nil } } // WithNodeLease sets the base node lease to use. // If a lease time is set, it will be ignored. func WithNodeLease(l *coord.Lease) NodeOpt { return func(n *Node) error { n.lease = l return nil } } // Node deals with creating and managing a node object in Kubernetes. // It can register a node with Kubernetes and periodically update its status. type Node struct { p providers.NodeProvider n *corev1.Node leases v1beta1.LeaseInterface nodes v1.NodeInterface disableLease bool pingInterval time.Duration statusInterval time.Duration lease *coord.Lease chStatusUpdate chan *corev1.Node } // The default intervals used for lease and status updates. const ( DefaultPingInterval = 5 * time.Second DefaultStatusUpdateInterval = 1 * time.Minute ) // Run registers the node in kubernetes and starts loops for updating the node // status in Kubernetes. // // The node status must be updated periodically in Kubertnetes to keep the node // active. Newer versions of Kubernetes support node leases, which are // essentially light weight pings. Older versions of Kubernetes require updating // the node status periodically. // // If Kubernetes supports node leases this will use leases with a much slower // node status update (because some things still expect the node to be updated // periodically), otherwise it will only use node status update with the configured // ping interval. func (n *Node) Run(ctx context.Context) error { if n.pingInterval == time.Duration(0) { n.pingInterval = DefaultPingInterval } if n.statusInterval == time.Duration(0) { n.statusInterval = DefaultStatusUpdateInterval } if err := n.updateStatus(ctx); err != nil { return pkgerrors.Wrap(err, "error registering node with kubernetes") } log.G(ctx).Info("Created node") n.chStatusUpdate = make(chan *corev1.Node) n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) { n.chStatusUpdate <- node }) if !n.disableLease { n.lease = newLease(n.lease) setLeaseAttrs(n.lease, n.n, n.pingInterval*5) l, err := ensureLease(ctx, n.leases, n.lease) if err != nil { if errors.IsNotFound(err) { n.disableLease = true } else { return pkgerrors.Wrap(err, "error creating node lease") } } log.G(ctx).Debug("Created node lease") n.lease = l } if n.disableLease { log.G(ctx).Info("Node leases not supported, falling back to only node status updates") } n.controlLoop(ctx) return nil } func (n *Node) controlLoop(ctx context.Context) { pingTimer := time.NewTimer(n.pingInterval) defer pingTimer.Stop() statusTimer := time.NewTimer(n.statusInterval) defer statusTimer.Stop() if n.disableLease { // hack to make sure this channel always blocks since we won't be using it if !statusTimer.Stop() { <-statusTimer.C } } for { select { case <-ctx.Done(): return case updated := <-n.chStatusUpdate: var t *time.Timer if n.disableLease { t = pingTimer } else { t = statusTimer } log.G(ctx).Debug("Received node status update") // Performing a status update so stop/reset the status update timer in this // branch otherwise there could be an uneccessary status update. if !t.Stop() { <-t.C } n.n.Status = updated.Status if err := n.updateStatus(ctx); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } t.Reset(n.statusInterval) case <-statusTimer.C: if err := n.updateStatus(ctx); err != nil { log.G(ctx).WithError(err).Error("Error handling node status update") } statusTimer.Reset(n.statusInterval) case <-pingTimer.C: if err := n.handlePing(ctx); err != nil { log.G(ctx).WithError(err).Error("Error while handling node ping") } else { log.G(ctx).Debug("Successful node ping") } pingTimer.Reset(n.pingInterval) } } } func (n *Node) handlePing(ctx context.Context) (retErr error) { ctx, span := trace.StartSpan(ctx, "node.handlePing") defer span.End() defer func() { span.SetStatus(ocstatus.FromError(retErr)) }() if err := n.p.Ping(ctx); err != nil { return pkgerrors.Wrap(err, "error whiling pinging node provider") } if n.disableLease { return n.updateStatus(ctx) } return n.updateLease(ctx) } func (n *Node) updateLease(ctx context.Context) error { l, err := UpdateNodeLease(ctx, n.leases, newLease(n.lease)) if err != nil { return err } n.lease = l return nil } func (n *Node) updateStatus(ctx context.Context) error { updateNodeStatusHeartbeat(n.n) node, err := UpdateNodeStatus(ctx, n.nodes, n.n) if err != nil { return err } n.n = node return nil } func ensureLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) { l, err := leases.Create(lease) if err != nil { switch { case errors.IsNotFound(err): log.G(ctx).WithError(err).Info("Node lease not supported") return nil, err case errors.IsAlreadyExists(err): if err := leases.Delete(lease.Name, nil); err != nil && !errors.IsNotFound(err) { log.G(ctx).WithError(err).Error("could not delete old node lease") return nil, pkgerrors.Wrap(err, "old lease exists but could not delete it") } l, err = leases.Create(lease) } } return l, err } // UpdateNodeLease updates the node lease. // // If this function returns an errors.IsNotFound(err) error, this likely means // that node leases are not supported, if this is the case, call UpdateNodeStatus // instead. // // If you use this function, it is up to you to syncronize this with other operations. func UpdateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) { ctx, span := trace.StartSpan(ctx, "node.UpdateNodeLease") defer span.End() ctx = span.WithFields(ctx, log.Fields{ "lease.name": lease.Name, "lease.time": lease.Spec.RenewTime, }) if lease.Spec.LeaseDurationSeconds != nil { ctx = span.WithField(ctx, "lease.expiresSeconds", *lease.Spec.LeaseDurationSeconds) } l, err := leases.Update(lease) if err != nil { if errors.IsNotFound(err) { log.G(ctx).Debug("lease not found") l, err = ensureLease(ctx, leases, lease) } if err != nil { span.SetStatus(ocstatus.FromError(err)) return nil, err } log.G(ctx).Debug("created new lease") } else { log.G(ctx).Debug("updated lease") } return l, nil } // just so we don't have to allocate this on every get request var emptyGetOptions = metav1.GetOptions{} // UpdateNodeStatus triggers an update to the node status in Kubernetes. // It first fetches the current node details and then sets the status according // to the passed in node object. // // If you use this function, it is up to you to syncronize this with other operations. func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (*corev1.Node, error) { ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus") defer span.End() node, err := nodes.Get(n.Name, emptyGetOptions) if err != nil { if !errors.IsNotFound(err) { span.SetStatus(ocstatus.FromError(err)) return nil, err } log.G(ctx).Debug("node not found") node, err = nodes.Create(n.DeepCopy()) if err != nil { return nil, err } log.G(ctx).Debug("created new node") return node, nil } log.G(ctx).Debug("got node from api server") node.ResourceVersion = "" node.Status = n.Status ctx = addNodeAttributes(ctx, span, node) updated, err := nodes.UpdateStatus(node) if err != nil { return nil, err } log.G(ctx).WithField("resourceVersion", updated.ResourceVersion).Debug("updated node status in api server") return updated, nil } func newLease(base *coord.Lease) *coord.Lease { var lease *coord.Lease if base == nil { lease = &coord.Lease{} } else { lease = base.DeepCopy() } lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()} return lease } func setLeaseAttrs(l *coord.Lease, n *corev1.Node, dur time.Duration) { if l.Name == "" { l.Name = n.Name } if l.Spec.HolderIdentity == nil { l.Spec.HolderIdentity = &n.Name } if l.Spec.LeaseDurationSeconds == nil { d := int32(dur.Seconds()) * 5 l.Spec.LeaseDurationSeconds = &d } } func updateNodeStatusHeartbeat(n *corev1.Node) { now := metav1.NewTime(time.Now()) for i, _ := range n.Status.Conditions { n.Status.Conditions[i].LastHeartbeatTime = now } } // NaiveNodeProvider is a basic node provider that only uses the passed in context // on `Ping` to determine if the node is healthy. type NaiveNodeProvider struct{} // Ping just implements the NodeProvider interface. // It returns the error from the passed in context only. func (NaiveNodeProvider) Ping(ctx context.Context) error { return ctx.Err() } // NotifyNodeStatus implements the NodeProvider interface. // // This NaiveNodeProvider does not support updating node status and so this // function is a no-op. func (NaiveNodeProvider) NotifyNodeStatus(ctx context.Context, f func(*corev1.Node)) { } type taintsStringer []corev1.Taint func (t taintsStringer) String() string { var s string for _, taint := range t { if s == "" { s = taint.Key + "=" + taint.Value + ":" + string(taint.Effect) } else { s += ", " + taint.Key + "=" + taint.Value + ":" + string(taint.Effect) } } return s } func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) context.Context { return span.WithFields(ctx, log.Fields{ "node.UID": string(n.UID), "node.name": n.Name, "node.cluster": n.ClusterName, "node.taints": taintsStringer(n.Spec.Taints), }) }