Move around some packages (#658)

* Move tracing exporter registration

This doesn't belong in the library and should be configured by the
consumer of the opencensus package.

* Rename `vkublet` package to `node`

`vkubelet` does not convey any information to the consumers of the
package.
Really it would be nice to move this package to the root of the repo,
but then you wind up with... interesting... import semantics due to the
repo name... and after thinking about it some, a subpackage is really
not so bad as long as it has a name that convey's some information.

`node` was chosen since this package deals with all the semantics of
operating a node in Kubernetes.
This commit is contained in:
Brian Goff
2019-06-12 05:11:49 -07:00
committed by Pires
parent 65b32a0fef
commit a54753cb82
42 changed files with 123 additions and 121 deletions

3
node/api/doc.go Normal file
View File

@@ -0,0 +1,3 @@
// Package api implements HTTP handlers for handling requests that the kubelet
// would normally implement, such as pod logs, exec, etc.
package api

174
node/api/exec.go Normal file
View File

@@ -0,0 +1,174 @@
package api
import (
"context"
"io"
"net/http"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"k8s.io/apimachinery/pkg/types"
remoteutils "k8s.io/client-go/tools/remotecommand"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
)
// ContainerExecHandlerFunc defines the handler function used for "execing" into a
// container in a pod.
type ContainerExecHandlerFunc func(ctx context.Context, namespace, podName, containerName string, cmd []string, attach AttachIO) error
// AttachIO is used to pass in streams to attach to a container process
type AttachIO interface {
Stdin() io.Reader
Stdout() io.WriteCloser
Stderr() io.WriteCloser
TTY() bool
Resize() <-chan TermSize
}
// TermSize is used to set the terminal size from attached clients.
type TermSize struct {
Width uint16
Height uint16
}
// HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
func HandleContainerExec(h ContainerExecHandlerFunc) http.HandlerFunc {
if h == nil {
return NotImplemented
}
return handleError(func(w http.ResponseWriter, req *http.Request) error {
vars := mux.Vars(req)
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",")
q := req.URL.Query()
command := q["command"]
streamOpts, err := getExecOptions(req)
if err != nil {
return errdefs.AsInvalidInput(err)
}
idleTimeout := time.Second * 30
streamCreationTimeout := time.Second * 30
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
exec := &containerExecContext{ctx: ctx, h: h, pod: pod, namespace: namespace, container: container}
remotecommand.ServeExec(w, req, exec, "", "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
return nil
})
}
func getExecOptions(req *http.Request) (*remotecommand.Options, error) {
tty := req.FormValue(api.ExecTTYParam) == "1"
stdin := req.FormValue(api.ExecStdinParam) == "1"
stdout := req.FormValue(api.ExecStdoutParam) == "1"
stderr := req.FormValue(api.ExecStderrParam) == "1"
if tty && stderr {
return nil, errors.New("cannot exec with tty and stderr")
}
if !stdin && !stdout && !stderr {
return nil, errors.New("you must specify at least one of stdin, stdout, stderr")
}
return &remotecommand.Options{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
TTY: tty,
}, nil
}
type containerExecContext struct {
h ContainerExecHandlerFunc
eio *execIO
namespace, pod, container string
ctx context.Context
}
// ExecInContainer Implements remotecommand.Executor
// This is called by remotecommand.ServeExec
func (c *containerExecContext) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remoteutils.TerminalSize, timeout time.Duration) error {
eio := &execIO{
tty: tty,
stdin: in,
stdout: out,
stderr: err,
}
if tty {
eio.chResize = make(chan TermSize)
}
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
if tty {
go func() {
send := func(s remoteutils.TerminalSize) bool {
select {
case eio.chResize <- TermSize{Width: s.Width, Height: s.Height}:
return false
case <-ctx.Done():
return true
}
}
for {
select {
case s := <-resize:
if send(s) {
return
}
case <-ctx.Done():
return
}
}
}()
}
return c.h(c.ctx, c.namespace, c.pod, c.container, cmd, eio)
}
type execIO struct {
tty bool
stdin io.Reader
stdout io.WriteCloser
stderr io.WriteCloser
chResize chan TermSize
}
func (e *execIO) TTY() bool {
return e.tty
}
func (e *execIO) Stdin() io.Reader {
return e.stdin
}
func (e *execIO) Stdout() io.WriteCloser {
return e.stdout
}
func (e *execIO) Stderr() io.WriteCloser {
return e.stderr
}
func (e *execIO) Resize() <-chan TermSize {
return e.chResize
}

70
node/api/helpers.go Normal file
View File

@@ -0,0 +1,70 @@
package api
import (
"io"
"net/http"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
)
type handlerFunc func(http.ResponseWriter, *http.Request) error
func handleError(f handlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
err := f(w, req)
if err == nil {
return
}
code := httpStatusCode(err)
w.WriteHeader(code)
io.WriteString(w, err.Error())
logger := log.G(req.Context()).WithError(err).WithField("httpStatusCode", code)
if code >= 500 {
logger.Error("Internal server error on request")
} else {
logger.Debug("Error on request")
}
}
}
func flushOnWrite(w io.Writer) io.Writer {
if fw, ok := w.(writeFlusher); ok {
return &flushWriter{fw}
}
return w
}
type flushWriter struct {
w writeFlusher
}
type writeFlusher interface {
Flush() error
Write([]byte) (int, error)
}
func (fw *flushWriter) Write(p []byte) (int, error) {
n, err := fw.w.Write(p)
if n > 0 {
if err := fw.w.Flush(); err != nil {
return n, err
}
}
return n, err
}
func httpStatusCode(err error) int {
switch {
case err == nil:
return http.StatusOK
case errdefs.IsNotFound(err):
return http.StatusNotFound
case errdefs.IsInvalidInput(err):
return http.StatusBadRequest
default:
return http.StatusInternalServerError
}
}

80
node/api/logs.go Normal file
View File

@@ -0,0 +1,80 @@
package api
import (
"context"
"io"
"net/http"
"strconv"
"time"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
)
// ContainerLogsHandlerFunc is used in place of backend implementations for getting container logs
type ContainerLogsHandlerFunc func(ctx context.Context, namespace, podName, containerName string, opts ContainerLogOpts) (io.ReadCloser, error)
// ContainerLogOpts are used to pass along options to be set on the container
// log stream.
type ContainerLogOpts struct {
Tail int
Since time.Duration
LimitBytes int
Timestamps bool
}
// HandleContainerLogs creates an http handler function from a provider to serve logs from a pod
func HandleContainerLogs(h ContainerLogsHandlerFunc) http.HandlerFunc {
if h == nil {
return NotImplemented
}
return handleError(func(w http.ResponseWriter, req *http.Request) error {
vars := mux.Vars(req)
if len(vars) != 3 {
return errdefs.NotFound("not found")
}
ctx := req.Context()
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
if queryTail := q.Get("tailLines"); queryTail != "" {
t, err := strconv.Atoi(queryTail)
if err != nil {
return errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"tailLines\""))
}
tail = t
}
// TODO(@cpuguy83): support v1.PodLogOptions
// The kubelet decoding here is not straight forward, so this needs to be disected
opts := ContainerLogOpts{
Tail: tail,
}
logs, err := h(ctx, namespace, pod, container, opts)
if err != nil {
return errors.Wrap(err, "error getting container logs?)")
}
defer logs.Close()
req.Header.Set("Transfer-Encoding", "chunked")
if _, ok := w.(writeFlusher); !ok {
log.G(ctx).Debug("http response writer does not support flushes")
}
if _, err := io.Copy(flushOnWrite(w), logs); err != nil {
return errors.Wrap(err, "error writing response to client")
}
return nil
})
}

48
node/api/pods.go Normal file
View File

@@ -0,0 +1,48 @@
package api
import (
"context"
"net/http"
"github.com/virtual-kubelet/virtual-kubelet/log"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
)
type PodListerFunc func(context.Context) ([]*v1.Pod, error)
func HandleRunningPods(getPods PodListerFunc) http.HandlerFunc {
scheme := runtime.NewScheme()
v1.SchemeBuilder.AddToScheme(scheme)
codecs := serializer.NewCodecFactory(scheme)
return handleError(func(w http.ResponseWriter, req *http.Request) error {
ctx := req.Context()
ctx = log.WithLogger(ctx, log.L)
pods, err := getPods(ctx)
if err != nil {
return err
}
// Borrowed from github.com/kubernetes/kubernetes/pkg/kubelet/server/server.go
// encodePods creates an v1.PodList object from pods and returns the encoded
// PodList.
podList := new(v1.PodList)
for _, pod := range pods {
podList.Items = append(podList.Items, *pod)
}
codec := codecs.LegacyCodec(v1.SchemeGroupVersion)
data, err := runtime.Encode(codec, podList)
if err != nil {
return err
}
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(data)
if err != nil {
return err
}
return nil
})
}

121
node/api/server.go Normal file
View File

@@ -0,0 +1,121 @@
package api
import (
"net/http"
"github.com/gorilla/mux"
"github.com/virtual-kubelet/virtual-kubelet/log"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/b3"
)
// ServeMux defines an interface used to attach routes to an existing http
// serve mux.
// It is used to enable callers creating a new server to completely manage
// their own HTTP server while allowing us to attach the required routes to
// satisfy the Kubelet HTTP interfaces.
type ServeMux interface {
Handle(path string, h http.Handler)
}
type PodHandlerConfig struct {
RunInContainer ContainerExecHandlerFunc
GetContainerLogs ContainerLogsHandlerFunc
GetPods PodListerFunc
}
// PodHandler creates an http handler for interacting with pods/containers.
func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
r := mux.NewRouter()
// This matches the behaviour in the reference kubelet
r.StrictSlash(true)
if debug {
r.HandleFunc("/runningpods/", HandleRunningPods(p.GetPods)).Methods("GET")
}
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", HandleContainerLogs(p.GetContainerLogs)).Methods("GET")
r.HandleFunc("/exec/{namespace}/{pod}/{container}", HandleContainerExec(p.RunInContainer)).Methods("POST")
r.NotFoundHandler = http.HandlerFunc(NotFound)
return r
}
// PodStatsSummaryHandler creates an http handler for serving pod metrics.
//
// If the passed in handler func is nil this will create handlers which only
// serves http.StatusNotImplemented
func PodStatsSummaryHandler(f PodStatsSummaryHandlerFunc) http.Handler {
if f == nil {
return http.HandlerFunc(NotImplemented)
}
r := mux.NewRouter()
const summaryRoute = "/stats/summary"
h := HandlePodStatsSummary(f)
r.Handle(summaryRoute, ochttp.WithRouteTag(h, "PodStatsSummaryHandler")).Methods("GET")
r.Handle(summaryRoute+"/", ochttp.WithRouteTag(h, "PodStatsSummaryHandler")).Methods("GET")
r.NotFoundHandler = http.HandlerFunc(NotFound)
return r
}
// AttachPodRoutes adds the http routes for pod stuff to the passed in serve mux.
//
// Callers should take care to namespace the serve mux as they see fit, however
// these routes get called by the Kubernetes API server.
func AttachPodRoutes(p PodHandlerConfig, mux ServeMux, debug bool) {
mux.Handle("/", InstrumentHandler(PodHandler(p, debug)))
}
// PodMetricsConfig stores the handlers for pod metrics routes
// It is used by AttachPodMetrics.
//
// The main reason for this struct is in case of expansion we do not need to break
// the package level API.
type PodMetricsConfig struct {
GetStatsSummary PodStatsSummaryHandlerFunc
}
// AttachPodMetricsRoutes adds the http routes for pod/node metrics to the passed in serve mux.
//
// Callers should take care to namespace the serve mux as they see fit, however
// these routes get called by the Kubernetes API server.
func AttachPodMetricsRoutes(p PodMetricsConfig, mux ServeMux) {
mux.Handle("/", InstrumentHandler(HandlePodStatsSummary(p.GetStatsSummary)))
}
func instrumentRequest(r *http.Request) *http.Request {
ctx := r.Context()
logger := log.G(ctx).WithFields(log.Fields{
"uri": r.RequestURI,
"vars": mux.Vars(r),
})
ctx = log.WithLogger(ctx, logger)
return r.WithContext(ctx)
}
// InstrumentHandler wraps an http.Handler and injects instrumentation into the request context.
func InstrumentHandler(h http.Handler) http.Handler {
instrumented := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
req = instrumentRequest(req)
h.ServeHTTP(w, req)
})
return &ochttp.Handler{
Handler: instrumented,
Propagation: &b3.HTTPFormat{},
}
}
// NotFound provides a handler for cases where the requested endpoint doesn't exist
func NotFound(w http.ResponseWriter, r *http.Request) {
log.G(r.Context()).Debug("404 request not found")
http.Error(w, "404 request not found", http.StatusNotFound)
}
// NotImplemented provides a handler for cases where a provider does not implement a given API
func NotImplemented(w http.ResponseWriter, r *http.Request) {
log.G(r.Context()).Debug("501 not implemented")
http.Error(w, "501 not implemented", http.StatusNotImplemented)
}

55
node/api/stats.go Normal file
View File

@@ -0,0 +1,55 @@
package api
import (
"context"
"encoding/json"
"net/http"
"github.com/pkg/errors"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// PodStatsSummaryHandlerFunc defines the handler for getting pod stats summaries
type PodStatsSummaryHandlerFunc func(context.Context) (*stats.Summary, error)
// HandlePodStatsSummary makes an HTTP handler for implementing the kubelet summary stats endpoint
func HandlePodStatsSummary(h PodStatsSummaryHandlerFunc) http.HandlerFunc {
if h == nil {
return NotImplemented
}
return handleError(func(w http.ResponseWriter, req *http.Request) error {
stats, err := h(req.Context())
if err != nil {
if isCancelled(err) {
return err
}
return errors.Wrap(err, "error getting status from provider")
}
b, err := json.Marshal(stats)
if err != nil {
return errors.Wrap(err, "error marshalling stats")
}
if _, err := w.Write(b); err != nil {
return errors.Wrap(err, "could not write to client")
}
return nil
})
}
func isCancelled(err error) bool {
if err == context.Canceled {
return true
}
if e, ok := err.(causal); ok {
return isCancelled(e.Cause())
}
return false
}
type causal interface {
Cause() error
error
}

48
node/doc.go Normal file
View File

@@ -0,0 +1,48 @@
/*
Package node implements the components for operating a node in Kubernetes.
This includes controllers for managin the node object, running scheduled pods,
and exporting HTTP endpoints expected by the Kubernets API server.
There are two primary controllers, the node runner and the pod runner.
nodeRunner, _ := node.NewNodeController(...)
// setup other things
podRunner, _ := node.NewPodController(...)
go podRunner.Run(ctx)
select {
case <-podRunner.Ready():
go nodeRunner.Run(ctx)
case <-ctx.Done()
return ctx.Err()
}
After calling start, cancelling the passed in context will shutdown the
controller.
Note this example elides error handling.
Up to this point you have an active node in Kubernetes which can have pods scheduled
to it. However the API server expects nodes to implement API endpoints in order
to support certain features such as fetching logs or execing a new process.
The api package provides some helpers for this:
`api.AttachPodRoutes` and `api.AttachMetricsRoutes`.
mux := http.NewServeMux()
api.AttachPodRoutes(provider, mux)
You must configure your own HTTP server, but these helpers will add handlers at
the correct URI paths to your serve mux. You are not required to use go's
built-in `*http.ServeMux`, but it does implement the `ServeMux` interface
defined in this package which is used for these helpers.
Note: The metrics routes may need to be attached to a different HTTP server,
depending on your configuration.
For more fine-grained control over the API, see the `node/api` package which
only implements the HTTP handlers that you can use in whatever way you want.
This uses open-cenesus to implement tracing (but no internal metrics yet) which
is propagated through the context. This is passed on even to the providers.
*/
package node

462
node/env.go Executable file
View File

@@ -0,0 +1,462 @@
package node
import (
"context"
"fmt"
"sort"
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
apivalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/tools/record"
podshelper "k8s.io/kubernetes/pkg/apis/core/pods"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
fieldpath "k8s.io/kubernetes/pkg/fieldpath"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/third_party/forked/golang/expansion"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
)
const (
// ReasonOptionalConfigMapNotFound is the reason used in events emitted when an optional configmap is not found.
ReasonOptionalConfigMapNotFound = "OptionalConfigMapNotFound"
// ReasonOptionalConfigMapKeyNotFound is the reason used in events emitted when an optional configmap key is not found.
ReasonOptionalConfigMapKeyNotFound = "OptionalConfigMapKeyNotFound"
// ReasonFailedToReadOptionalConfigMap is the reason used in events emitted when an optional configmap could not be read.
ReasonFailedToReadOptionalConfigMap = "FailedToReadOptionalConfigMap"
// ReasonOptionalSecretNotFound is the reason used in events emitted when an optional secret is not found.
ReasonOptionalSecretNotFound = "OptionalSecretNotFound"
// ReasonOptionalSecretKeyNotFound is the reason used in events emitted when an optional secret key is not found.
ReasonOptionalSecretKeyNotFound = "OptionalSecretKeyNotFound"
// ReasonFailedToReadOptionalSecret is the reason used in events emitted when an optional secret could not be read.
ReasonFailedToReadOptionalSecret = "FailedToReadOptionalSecret"
// ReasonMandatoryConfigMapNotFound is the reason used in events emitted when an mandatory configmap is not found.
ReasonMandatoryConfigMapNotFound = "MandatoryConfigMapNotFound"
// ReasonMandatoryConfigMapKeyNotFound is the reason used in events emitted when an mandatory configmap key is not found.
ReasonMandatoryConfigMapKeyNotFound = "MandatoryConfigMapKeyNotFound"
// ReasonFailedToReadMandatoryConfigMap is the reason used in events emitted when an mandatory configmap could not be read.
ReasonFailedToReadMandatoryConfigMap = "FailedToReadMandatoryConfigMap"
// ReasonMandatorySecretNotFound is the reason used in events emitted when an mandatory secret is not found.
ReasonMandatorySecretNotFound = "MandatorySecretNotFound"
// ReasonMandatorySecretKeyNotFound is the reason used in events emitted when an mandatory secret key is not found.
ReasonMandatorySecretKeyNotFound = "MandatorySecretKeyNotFound"
// ReasonFailedToReadMandatorySecret is the reason used in events emitted when an mandatory secret could not be read.
ReasonFailedToReadMandatorySecret = "FailedToReadMandatorySecret"
// ReasonInvalidEnvironmentVariableNames is the reason used in events emitted when a configmap/secret referenced in a ".spec.containers[*].envFrom" field contains invalid environment variable names.
ReasonInvalidEnvironmentVariableNames = "InvalidEnvironmentVariableNames"
)
var masterServices = sets.NewString("kubernetes")
// populateEnvironmentVariables populates the environment of each container (and init container) in the specified pod.
// TODO Make this the single exported function of a "pkg/environment" package in the future.
func populateEnvironmentVariables(ctx context.Context, pod *corev1.Pod, rm *manager.ResourceManager, recorder record.EventRecorder) error {
// Populate each init container's environment.
for idx := range pod.Spec.InitContainers {
if err := populateContainerEnvironment(ctx, pod, &pod.Spec.InitContainers[idx], rm, recorder); err != nil {
return err
}
}
// Populate each container's environment.
for idx := range pod.Spec.Containers {
if err := populateContainerEnvironment(ctx, pod, &pod.Spec.Containers[idx], rm, recorder); err != nil {
return err
}
}
return nil
}
// populateContainerEnvironment populates the environment of a single container in the specified pod.
func populateContainerEnvironment(ctx context.Context, pod *corev1.Pod, container *corev1.Container, rm *manager.ResourceManager, recorder record.EventRecorder) error {
// Create an "environment map" based on the value of the specified container's ".envFrom" field.
tmpEnv, err := makeEnvironmentMapBasedOnEnvFrom(ctx, pod, container, rm, recorder)
if err != nil {
return err
}
// Create the final "environment map" for the container using the ".env" and ".envFrom" field
// and service environment variables.
err = makeEnvironmentMap(ctx, pod, container, rm, recorder, tmpEnv)
if err != nil {
return err
}
// Empty the container's ".envFrom" field and replace its ".env" field with the final, merged environment.
// Values in "env" (sourced from ".env") will override any values with the same key defined in "envFrom" (sourced from ".envFrom").
// This is in accordance with what the Kubelet itself does.
// https://github.com/kubernetes/kubernetes/blob/v1.13.1/pkg/kubelet/kubelet_pods.go#L557-L558
container.EnvFrom = []corev1.EnvFromSource{}
res := make([]corev1.EnvVar, 0)
for key, val := range tmpEnv {
res = append(res, corev1.EnvVar{
Name: key,
Value: val,
})
}
container.Env = res
return nil
}
// getServiceEnvVarMap makes a map[string]string of env vars for services a
// pod in namespace ns should see.
// Based on getServiceEnvVarMap in kubelet_pods.go.
func getServiceEnvVarMap(rm *manager.ResourceManager, ns string, enableServiceLinks bool) (map[string]string, error) {
var (
serviceMap = make(map[string]*corev1.Service)
m = make(map[string]string)
)
services, err := rm.ListServices()
if err != nil {
return nil, err
}
// project the services in namespace ns onto the master services
for i := range services {
service := services[i]
// ignore services where ClusterIP is "None" or empty
if !v1helper.IsServiceIPSet(service) {
continue
}
serviceName := service.Name
// We always want to add environment variables for master kubernetes service
// from the default namespace, even if enableServiceLinks is false.
// We also add environment variables for other services in the same
// namespace, if enableServiceLinks is true.
if service.Namespace == metav1.NamespaceDefault && masterServices.Has(serviceName) {
if _, exists := serviceMap[serviceName]; !exists {
serviceMap[serviceName] = service
}
} else if service.Namespace == ns && enableServiceLinks {
serviceMap[serviceName] = service
}
}
mappedServices := make([]*corev1.Service, 0, len(serviceMap))
for key := range serviceMap {
mappedServices = append(mappedServices, serviceMap[key])
}
for _, e := range envvars.FromServices(mappedServices) {
m[e.Name] = e.Value
}
return m, nil
}
// makeEnvironmentMapBasedOnEnvFrom returns a map representing the resolved environment of the specified container after being populated from the entries in the ".envFrom" field.
func makeEnvironmentMapBasedOnEnvFrom(ctx context.Context, pod *corev1.Pod, container *corev1.Container, rm *manager.ResourceManager, recorder record.EventRecorder) (map[string]string, error) {
// Create a map to hold the resulting environment.
res := make(map[string]string, 0)
// Iterate over "envFrom" references in order to populate the environment.
loop:
for _, envFrom := range container.EnvFrom {
switch {
// Handle population from a configmap.
case envFrom.ConfigMapRef != nil:
ef := envFrom.ConfigMapRef
// Check whether the configmap reference is optional.
// This will control whether we fail when unable to read the configmap.
optional := ef.Optional != nil && *ef.Optional
// Try to grab the referenced configmap.
m, err := rm.GetConfigMap(ef.Name, pod.Namespace)
if err != nil {
// We couldn't fetch the configmap.
// However, if the configmap reference is optional we should not fail.
if optional {
if errors.IsNotFound(err) {
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonOptionalConfigMapNotFound, "configmap %q not found", ef.Name)
} else {
log.G(ctx).Warnf("failed to read configmap %q: %v", ef.Name, err)
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonFailedToReadOptionalConfigMap, "failed to read configmap %q", ef.Name)
}
// Continue on to the next reference.
continue loop
}
// At this point we know the configmap reference is mandatory.
// Hence, we should return a meaningful error.
if errors.IsNotFound(err) {
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonMandatoryConfigMapNotFound, "configmap %q not found", ef.Name)
return nil, fmt.Errorf("configmap %q not found", ef.Name)
}
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonFailedToReadMandatoryConfigMap, "failed to read configmap %q", ef.Name)
return nil, fmt.Errorf("failed to fetch configmap %q: %v", ef.Name, err)
}
// At this point we have successfully fetched the target configmap.
// Iterate over the keys defined in the configmap and populate the environment accordingly.
// https://github.com/kubernetes/kubernetes/blob/v1.13.1/pkg/kubelet/kubelet_pods.go#L581-L595
invalidKeys := make([]string, 0)
mKeys:
for key, val := range m.Data {
// If a prefix has been defined, prepend it to the environment variable's name.
if len(envFrom.Prefix) > 0 {
key = envFrom.Prefix + key
}
// Make sure that the resulting key is a valid environment variable name.
// If it isn't, it should be appended to the list of invalid keys and skipped.
if errMsgs := apivalidation.IsEnvVarName(key); len(errMsgs) != 0 {
invalidKeys = append(invalidKeys, key)
continue mKeys
}
// Add the key and its value to the environment.
res[key] = val
}
// Report any invalid keys.
if len(invalidKeys) > 0 {
sort.Strings(invalidKeys)
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonInvalidEnvironmentVariableNames, "keys [%s] from configmap %s/%s were skipped since they are invalid as environment variable names", strings.Join(invalidKeys, ", "), m.Namespace, m.Name)
}
// Handle population from a secret.
case envFrom.SecretRef != nil:
ef := envFrom.SecretRef
// Check whether the secret reference is optional.
// This will control whether we fail when unable to read the secret.
optional := ef.Optional != nil && *ef.Optional
// Try to grab the referenced secret.
s, err := rm.GetSecret(ef.Name, pod.Namespace)
if err != nil {
// We couldn't fetch the secret.
// However, if the secret reference is optional we should not fail.
if optional {
if errors.IsNotFound(err) {
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonOptionalSecretNotFound, "secret %q not found", ef.Name)
} else {
log.G(ctx).Warnf("failed to read secret %q: %v", ef.Name, err)
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonFailedToReadOptionalSecret, "failed to read secret %q", ef.Name)
}
// Continue on to the next reference.
continue loop
}
// At this point we know the secret reference is mandatory.
// Hence, we should return a meaningful error.
if errors.IsNotFound(err) {
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonMandatorySecretNotFound, "secret %q not found", ef.Name)
return nil, fmt.Errorf("secret %q not found", ef.Name)
}
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonFailedToReadMandatorySecret, "failed to read secret %q", ef.Name)
return nil, fmt.Errorf("failed to fetch secret %q: %v", ef.Name, err)
}
// At this point we have successfully fetched the target secret.
// Iterate over the keys defined in the secret and populate the environment accordingly.
// https://github.com/kubernetes/kubernetes/blob/v1.13.1/pkg/kubelet/kubelet_pods.go#L581-L595
invalidKeys := make([]string, 0)
sKeys:
for key, val := range s.Data {
// If a prefix has been defined, prepend it to the environment variable's name.
if len(envFrom.Prefix) > 0 {
key = envFrom.Prefix + key
}
// Make sure that the resulting key is a valid environment variable name.
// If it isn't, it should be appended to the list of invalid keys and skipped.
if errMsgs := apivalidation.IsEnvVarName(key); len(errMsgs) != 0 {
invalidKeys = append(invalidKeys, key)
continue sKeys
}
// Add the key and its value to the environment.
res[key] = string(val)
}
// Report any invalid keys.
if len(invalidKeys) > 0 {
sort.Strings(invalidKeys)
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonInvalidEnvironmentVariableNames, "keys [%s] from secret %s/%s were skipped since they are invalid as environment variable names", strings.Join(invalidKeys, ", "), s.Namespace, s.Name)
}
}
}
// Return the populated environment.
return res, nil
}
// makeEnvironmentMap returns a map representing the resolved environment of the specified container after being populated from the entries in the ".env" and ".envFrom" field.
func makeEnvironmentMap(ctx context.Context, pod *corev1.Pod, container *corev1.Container, rm *manager.ResourceManager, recorder record.EventRecorder, res map[string]string) error {
// TODO If pod.Spec.EnableServiceLinks is nil then fail as per 1.14 kubelet.
enableServiceLinks := corev1.DefaultEnableServiceLinks
if pod.Spec.EnableServiceLinks != nil {
enableServiceLinks = *pod.Spec.EnableServiceLinks
}
// Note that there is a race between Kubelet seeing the pod and kubelet seeing the service.
// To avoid this users can: (1) wait between starting a service and starting; or (2) detect
// missing service env var and exit and be restarted; or (3) use DNS instead of env vars
// and keep trying to resolve the DNS name of the service (recommended).
svcEnv, err := getServiceEnvVarMap(rm, pod.Namespace, enableServiceLinks)
if err != nil {
return err
}
// If the variable's Value is set, expand the `$(var)` references to other
// variables in the .Value field; the sources of variables are the declared
// variables of the container and the service environment variables.
mappingFunc := expansion.MappingFuncFor(res, svcEnv)
// Iterate over environment variables in order to populate the map.
loop:
for _, env := range container.Env {
switch {
// Handle values that have been directly provided.
case env.Value != "":
// Expand variable references
res[env.Name] = expansion.Expand(env.Value, mappingFunc)
continue loop
// Handle population from a configmap key.
case env.ValueFrom != nil && env.ValueFrom.ConfigMapKeyRef != nil:
// The environment variable must be set from a configmap.
vf := env.ValueFrom.ConfigMapKeyRef
// Check whether the key reference is optional.
// This will control whether we fail when unable to read the requested key.
optional := vf != nil && vf.Optional != nil && *vf.Optional
// Try to grab the referenced configmap.
m, err := rm.GetConfigMap(vf.Name, pod.Namespace)
if err != nil {
// We couldn't fetch the configmap.
// However, if the key reference is optional we should not fail.
if optional {
if errors.IsNotFound(err) {
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonOptionalConfigMapNotFound, "skipping optional envvar %q: configmap %q not found", env.Name, vf.Name)
} else {
log.G(ctx).Warnf("failed to read configmap %q: %v", vf.Name, err)
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonFailedToReadOptionalConfigMap, "skipping optional envvar %q: failed to read configmap %q", env.Name, vf.Name)
}
// Continue on to the next reference.
continue loop
}
// At this point we know the key reference is mandatory.
// Hence, we should return a meaningful error.
if errors.IsNotFound(err) {
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonMandatoryConfigMapNotFound, "configmap %q not found", vf.Name)
return fmt.Errorf("configmap %q not found", vf.Name)
}
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonFailedToReadMandatoryConfigMap, "failed to read configmap %q", vf.Name)
return fmt.Errorf("failed to read configmap %q: %v", vf.Name, err)
}
// At this point we have successfully fetched the target configmap.
// We must now try to grab the requested key.
var (
keyExists bool
keyValue string
)
if keyValue, keyExists = m.Data[vf.Key]; !keyExists {
// The requested key does not exist.
// However, we should not fail if the key reference is optional.
if optional {
// Continue on to the next reference.
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonOptionalConfigMapKeyNotFound, "skipping optional envvar %q: key %q does not exist in configmap %q", env.Name, vf.Key, vf.Name)
continue loop
}
// At this point we know the key reference is mandatory.
// Hence, we should fail.
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonMandatoryConfigMapKeyNotFound, "key %q does not exist in configmap %q", vf.Key, vf.Name)
return fmt.Errorf("configmap %q doesn't contain the %q key required by pod %s", vf.Name, vf.Key, pod.Name)
}
// Populate the environment variable and continue on to the next reference.
res[env.Name] = keyValue
continue loop
// Handle population from a secret key.
case env.ValueFrom != nil && env.ValueFrom.SecretKeyRef != nil:
vf := env.ValueFrom.SecretKeyRef
// Check whether the key reference is optional.
// This will control whether we fail when unable to read the requested key.
optional := vf != nil && vf.Optional != nil && *vf.Optional
// Try to grab the referenced secret.
s, err := rm.GetSecret(vf.Name, pod.Namespace)
if err != nil {
// We couldn't fetch the secret.
// However, if the key reference is optional we should not fail.
if optional {
if errors.IsNotFound(err) {
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonOptionalSecretNotFound, "skipping optional envvar %q: secret %q not found", env.Name, vf.Name)
} else {
log.G(ctx).Warnf("failed to read secret %q: %v", vf.Name, err)
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonFailedToReadOptionalSecret, "skipping optional envvar %q: failed to read secret %q", env.Name, vf.Name)
}
// Continue on to the next reference.
continue loop
}
// At this point we know the key reference is mandatory.
// Hence, we should return a meaningful error.
if errors.IsNotFound(err) {
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonMandatorySecretNotFound, "secret %q not found", vf.Name)
return fmt.Errorf("secret %q not found", vf.Name)
}
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonFailedToReadMandatorySecret, "failed to read secret %q", vf.Name)
return fmt.Errorf("failed to read secret %q: %v", vf.Name, err)
}
// At this point we have successfully fetched the target secret.
// We must now try to grab the requested key.
var (
keyExists bool
keyValue []byte
)
if keyValue, keyExists = s.Data[vf.Key]; !keyExists {
// The requested key does not exist.
// However, we should not fail if the key reference is optional.
if optional {
// Continue on to the next reference.
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonOptionalSecretKeyNotFound, "skipping optional envvar %q: key %q does not exist in secret %q", env.Name, vf.Key, vf.Name)
continue loop
}
// At this point we know the key reference is mandatory.
// Hence, we should fail.
recorder.Eventf(pod, corev1.EventTypeWarning, ReasonMandatorySecretKeyNotFound, "key %q does not exist in secret %q", vf.Key, vf.Name)
return fmt.Errorf("secret %q doesn't contain the %q key required by pod %s", vf.Name, vf.Key, pod.Name)
}
// Populate the environment variable and continue on to the next reference.
res[env.Name] = string(keyValue)
continue loop
// Handle population from a field (downward API).
case env.ValueFrom != nil && env.ValueFrom.FieldRef != nil:
// https://github.com/virtual-kubelet/virtual-kubelet/issues/123
vf := env.ValueFrom.FieldRef
runtimeVal, err := podFieldSelectorRuntimeValue(vf, pod)
if err != nil {
return err
}
res[env.Name] = runtimeVal
continue loop
// Handle population from a resource request/limit.
case env.ValueFrom != nil && env.ValueFrom.ResourceFieldRef != nil:
// TODO Implement populating resource requests.
continue loop
}
}
// Append service env vars.
for k, v := range svcEnv {
if _, present := res[k]; !present {
res[k] = v
}
}
return nil
}
// podFieldSelectorRuntimeValue returns the runtime value of the given
// selector for a pod.
func podFieldSelectorRuntimeValue(fs *corev1.ObjectFieldSelector, pod *corev1.Pod) (string, error) {
internalFieldPath, _, err := podshelper.ConvertDownwardAPIFieldLabel(fs.APIVersion, fs.FieldPath, "")
if err != nil {
return "", err
}
switch internalFieldPath {
case "spec.nodeName":
return pod.Spec.NodeName, nil
case "spec.serviceAccountName":
return pod.Spec.ServiceAccountName, nil
}
return fieldpath.ExtractFieldPathAsString(pod, internalFieldPath)
}

1078
node/env_internal_test.go Normal file

File diff suppressed because it is too large Load Diff

540
node/node.go Normal file
View File

@@ -0,0 +1,540 @@
package node
import (
"context"
"encoding/json"
"fmt"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
coord "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
// NodeProvider is the interface used for registering a node and updating its
// status in Kubernetes.
//
// Note: Implementers can choose to manage a node themselves, in which case
// it is not needed to provide an implementation for this interface.
type NodeProvider interface {
// Ping checks if the node is still active.
// This is intended to be lightweight as it will be called periodically as a
// heartbeat to keep the node marked as ready in Kubernetes.
Ping(context.Context) error
// NotifyNodeStatus is used to asynchronously monitor the node.
// The passed in callback should be called any time there is a change to the
// node's status.
// This will generally trigger a call to the Kubernetes API server to update
// the status.
//
// NotifyNodeStatus should not block callers.
NotifyNodeStatus(ctx context.Context, cb func(*corev1.Node))
}
// NewNodeController creates a new node controller.
// This does not have any side-effects on the system or kubernetes.
//
// Use the node's `Run` method to register and run the loops to update the node
// in Kubernetes.
//
// Note: When if there are multiple NodeControllerOpts which apply against the same
// underlying options, the last NodeControllerOpt will win.
func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeControllerOpt) (*NodeController, error) {
n := &NodeController{p: p, n: node, nodes: nodes, chReady: make(chan struct{})}
for _, o := range opts {
if err := o(n); err != nil {
return nil, pkgerrors.Wrap(err, "error applying node option")
}
}
return n, nil
}
// NodeControllerOpt are the functional options used for configuring a node
type NodeControllerOpt func(*NodeController) error // nolint: golint
// WithNodeEnableLeaseV1Beta1 enables support for v1beta1 leases.
// If client is nil, leases will not be enabled.
// If baseLease is nil, a default base lease will be used.
//
// The lease will be updated after each successful node ping. To change the
// lease update interval, you must set the node ping interval.
// See WithNodePingInterval().
//
// This also affects the frequency of node status updates:
// - When leases are *not* enabled (or are disabled due to no support on the cluster)
// the node status is updated at every ping interval.
// - When node leases are enabled, node status updates are colled by the
// node status update interval option.
// To set a custom node status update interval, see WithNodeStatusUpdateInterval().
func WithNodeEnableLeaseV1Beta1(client v1beta1.LeaseInterface, baseLease *coord.Lease) NodeControllerOpt {
return func(n *NodeController) error {
n.leases = client
n.lease = baseLease
return nil
}
}
// WithNodePingInterval sets the interval for checking node status
// If node leases are not supported (or not enabled), this is the frequency
// with which the node status will be updated in Kubernetes.
func WithNodePingInterval(d time.Duration) NodeControllerOpt {
return func(n *NodeController) error {
n.pingInterval = d
return nil
}
}
// WithNodeStatusUpdateInterval sets the interval for updating node status
// This is only used when leases are supported and only for updating the actual
// node status, not the node lease.
// When node leases are not enabled (or are not supported on the cluster) this
// has no affect and node status is updated on the "ping" interval.
func WithNodeStatusUpdateInterval(d time.Duration) NodeControllerOpt {
return func(n *NodeController) error {
n.statusInterval = d
return nil
}
}
// WithNodeStatusUpdateErrorHandler adds an error handler for cases where there is an error
// when updating the node status.
// This allows the caller to have some control on how errors are dealt with when
// updating a node's status.
//
// The error passed to the handler will be the error received from kubernetes
// when updating node status.
func WithNodeStatusUpdateErrorHandler(h ErrorHandler) NodeControllerOpt {
return func(n *NodeController) error {
n.nodeStatusUpdateErrorHandler = h
return nil
}
}
// ErrorHandler is a type of function used to allow callbacks for handling errors.
// It is expected that if a nil error is returned that the error is handled and
// progress can continue (or a retry is possible).
type ErrorHandler func(context.Context, error) error
// NodeController deals with creating and managing a node object in Kubernetes.
// It can register a node with Kubernetes and periodically update its status.
// NodeController manages a single node entity.
type NodeController struct { // nolint: golint
p NodeProvider
n *corev1.Node
leases v1beta1.LeaseInterface
nodes v1.NodeInterface
disableLease bool
pingInterval time.Duration
statusInterval time.Duration
lease *coord.Lease
chStatusUpdate chan *corev1.Node
nodeStatusUpdateErrorHandler ErrorHandler
chReady chan struct{}
}
// The default intervals used for lease and status updates.
const (
DefaultPingInterval = 10 * time.Second
DefaultStatusUpdateInterval = 1 * time.Minute
)
// Run registers the node in kubernetes and starts loops for updating the node
// status in Kubernetes.
//
// The node status must be updated periodically in Kubertnetes to keep the node
// active. Newer versions of Kubernetes support node leases, which are
// essentially light weight pings. Older versions of Kubernetes require updating
// the node status periodically.
//
// If Kubernetes supports node leases this will use leases with a much slower
// node status update (because some things still expect the node to be updated
// periodically), otherwise it will only use node status update with the configured
// ping interval.
func (n *NodeController) Run(ctx context.Context) error {
if n.pingInterval == time.Duration(0) {
n.pingInterval = DefaultPingInterval
}
if n.statusInterval == time.Duration(0) {
n.statusInterval = DefaultStatusUpdateInterval
}
n.chStatusUpdate = make(chan *corev1.Node)
n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) {
n.chStatusUpdate <- node
})
if err := n.ensureNode(ctx); err != nil {
return err
}
if n.leases == nil {
n.disableLease = true
return n.controlLoop(ctx)
}
n.lease = newLease(n.lease)
setLeaseAttrs(n.lease, n.n, n.pingInterval*5)
l, err := ensureLease(ctx, n.leases, n.lease)
if err != nil {
if !errors.IsNotFound(err) {
return pkgerrors.Wrap(err, "error creating node lease")
}
log.G(ctx).Info("Node leases not supported, falling back to only node status updates")
n.disableLease = true
}
n.lease = l
log.G(ctx).Debug("Created node lease")
return n.controlLoop(ctx)
}
func (n *NodeController) ensureNode(ctx context.Context) error {
err := n.updateStatus(ctx, true)
if err == nil || !errors.IsNotFound(err) {
return err
}
node, err := n.nodes.Create(n.n)
if err != nil {
return pkgerrors.Wrap(err, "error registering node with kubernetes")
}
n.n = node
return nil
}
// Ready returns a channel that gets closed when the node is fully up and
// running. Note that if there is an error on startup this channel will never
// be started.
func (n *NodeController) Ready() <-chan struct{} {
return n.chReady
}
func (n *NodeController) controlLoop(ctx context.Context) error {
pingTimer := time.NewTimer(n.pingInterval)
defer pingTimer.Stop()
statusTimer := time.NewTimer(n.statusInterval)
defer statusTimer.Stop()
if n.disableLease {
// hack to make sure this channel always blocks since we won't be using it
if !statusTimer.Stop() {
<-statusTimer.C
}
}
close(n.chReady)
for {
select {
case <-ctx.Done():
return nil
case updated := <-n.chStatusUpdate:
var t *time.Timer
if n.disableLease {
t = pingTimer
} else {
t = statusTimer
}
log.G(ctx).Debug("Received node status update")
// Performing a status update so stop/reset the status update timer in this
// branch otherwise there could be an uneccessary status update.
if !t.Stop() {
<-t.C
}
n.n.Status = updated.Status
if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
t.Reset(n.statusInterval)
case <-statusTimer.C:
if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
statusTimer.Reset(n.statusInterval)
case <-pingTimer.C:
if err := n.handlePing(ctx); err != nil {
log.G(ctx).WithError(err).Error("Error while handling node ping")
} else {
log.G(ctx).Debug("Successful node ping")
}
pingTimer.Reset(n.pingInterval)
}
}
}
func (n *NodeController) handlePing(ctx context.Context) (retErr error) {
ctx, span := trace.StartSpan(ctx, "node.handlePing")
defer span.End()
defer func() {
span.SetStatus(retErr)
}()
if err := n.p.Ping(ctx); err != nil {
return pkgerrors.Wrap(err, "error while pinging the node provider")
}
if n.disableLease {
return n.updateStatus(ctx, false)
}
return n.updateLease(ctx)
}
func (n *NodeController) updateLease(ctx context.Context) error {
l, err := UpdateNodeLease(ctx, n.leases, newLease(n.lease))
if err != nil {
return err
}
n.lease = l
return nil
}
func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) error {
updateNodeStatusHeartbeat(n.n)
node, err := UpdateNodeStatus(ctx, n.nodes, n.n)
if err != nil {
if skipErrorCb || n.nodeStatusUpdateErrorHandler == nil {
return err
}
if err := n.nodeStatusUpdateErrorHandler(ctx, err); err != nil {
return err
}
node, err = UpdateNodeStatus(ctx, n.nodes, n.n)
if err != nil {
return err
}
}
n.n = node
return nil
}
func ensureLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) {
l, err := leases.Create(lease)
if err != nil {
switch {
case errors.IsNotFound(err):
log.G(ctx).WithError(err).Info("Node lease not supported")
return nil, err
case errors.IsAlreadyExists(err):
if err := leases.Delete(lease.Name, nil); err != nil && !errors.IsNotFound(err) {
log.G(ctx).WithError(err).Error("could not delete old node lease")
return nil, pkgerrors.Wrap(err, "old lease exists but could not delete it")
}
l, err = leases.Create(lease)
}
}
return l, err
}
// UpdateNodeLease updates the node lease.
//
// If this function returns an errors.IsNotFound(err) error, this likely means
// that node leases are not supported, if this is the case, call UpdateNodeStatus
// instead.
//
// If you use this function, it is up to you to syncronize this with other operations.
func UpdateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) {
ctx, span := trace.StartSpan(ctx, "node.UpdateNodeLease")
defer span.End()
ctx = span.WithFields(ctx, log.Fields{
"lease.name": lease.Name,
"lease.time": lease.Spec.RenewTime,
})
if lease.Spec.LeaseDurationSeconds != nil {
ctx = span.WithField(ctx, "lease.expiresSeconds", *lease.Spec.LeaseDurationSeconds)
}
l, err := leases.Update(lease)
if err != nil {
if errors.IsNotFound(err) {
log.G(ctx).Debug("lease not found")
l, err = ensureLease(ctx, leases, lease)
}
if err != nil {
span.SetStatus(err)
return nil, err
}
log.G(ctx).Debug("created new lease")
} else {
log.G(ctx).Debug("updated lease")
}
return l, nil
}
// just so we don't have to allocate this on every get request
var emptyGetOptions = metav1.GetOptions{}
// PatchNodeStatus patches node status.
// Copied from github.com/kubernetes/kubernetes/pkg/util/node
func PatchNodeStatus(nodes v1.NodeInterface, nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) (*corev1.Node, []byte, error) {
patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
if err != nil {
return nil, nil, err
}
updatedNode, err := nodes.Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
}
return updatedNode, patchBytes, nil
}
func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) ([]byte, error) {
oldData, err := json.Marshal(oldNode)
if err != nil {
return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err)
}
// Reset spec to make sure only patch for Status or ObjectMeta is generated.
// Note that we don't reset ObjectMeta here, because:
// 1. This aligns with Nodes().UpdateStatus().
// 2. Some component does use this to update node annotations.
newNode.Spec = oldNode.Spec
newData, err := json.Marshal(newNode)
if err != nil {
return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
if err != nil {
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err)
}
return patchBytes, nil
}
// UpdateNodeStatus triggers an update to the node status in Kubernetes.
// It first fetches the current node details and then sets the status according
// to the passed in node object.
//
// If you use this function, it is up to you to syncronize this with other operations.
// This reduces the time to second-level precision.
func UpdateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (_ *corev1.Node, retErr error) {
ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus")
defer func() {
span.End()
span.SetStatus(retErr)
}()
var node *corev1.Node
oldNode, err := nodes.Get(n.Name, emptyGetOptions)
if err != nil {
return nil, err
}
log.G(ctx).Debug("got node from api server")
node = oldNode.DeepCopy()
node.ResourceVersion = ""
node.Status = n.Status
ctx = addNodeAttributes(ctx, span, node)
// Patch the node status to merge other changes on the node.
updated, _, err := PatchNodeStatus(nodes, types.NodeName(n.Name), oldNode, node)
if err != nil {
return nil, err
}
log.G(ctx).WithField("node.resourceVersion", updated.ResourceVersion).
WithField("node.Status.Conditions", updated.Status.Conditions).
Debug("updated node status in api server")
return updated, nil
}
func newLease(base *coord.Lease) *coord.Lease {
var lease *coord.Lease
if base == nil {
lease = &coord.Lease{}
} else {
lease = base.DeepCopy()
}
lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
return lease
}
func setLeaseAttrs(l *coord.Lease, n *corev1.Node, dur time.Duration) {
if l.Name == "" {
l.Name = n.Name
}
if l.Spec.HolderIdentity == nil {
l.Spec.HolderIdentity = &n.Name
}
if l.Spec.LeaseDurationSeconds == nil {
d := int32(dur.Seconds()) * 5
l.Spec.LeaseDurationSeconds = &d
}
}
func updateNodeStatusHeartbeat(n *corev1.Node) {
now := metav1.NewTime(time.Now())
for i := range n.Status.Conditions {
n.Status.Conditions[i].LastHeartbeatTime = now
}
}
// NaiveNodeProvider is a basic node provider that only uses the passed in context
// on `Ping` to determine if the node is healthy.
type NaiveNodeProvider struct{}
// Ping just implements the NodeProvider interface.
// It returns the error from the passed in context only.
func (NaiveNodeProvider) Ping(ctx context.Context) error {
return ctx.Err()
}
// NotifyNodeStatus implements the NodeProvider interface.
//
// This NaiveNodeProvider does not support updating node status and so this
// function is a no-op.
func (NaiveNodeProvider) NotifyNodeStatus(ctx context.Context, f func(*corev1.Node)) {
}
type taintsStringer []corev1.Taint
func (t taintsStringer) String() string {
var s string
for _, taint := range t {
if s == "" {
s = taint.Key + "=" + taint.Value + ":" + string(taint.Effect)
} else {
s += ", " + taint.Key + "=" + taint.Value + ":" + string(taint.Effect)
}
}
return s
}
func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) context.Context {
return span.WithFields(ctx, log.Fields{
"node.UID": string(n.UID),
"node.name": n.Name,
"node.cluster": n.ClusterName,
"node.taints": taintsStringer(n.Spec.Taints),
})
}

379
node/node_test.go Normal file
View File

@@ -0,0 +1,379 @@
package node
import (
"context"
"strings"
"testing"
"time"
"gotest.tools/assert"
"gotest.tools/assert/cmp"
coord "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
watch "k8s.io/apimachinery/pkg/watch"
testclient "k8s.io/client-go/kubernetes/fake"
)
func TestNodeRun(t *testing.T) {
t.Run("WithoutLease", func(t *testing.T) { testNodeRun(t, false) })
t.Run("WithLease", func(t *testing.T) { testNodeRun(t, true) })
}
func testNodeRun(t *testing.T, enableLease bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
leases := c.Coordination().Leases(corev1.NamespaceNodeLease)
interval := 1 * time.Millisecond
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
WithNodeStatusUpdateInterval(interval),
}
if enableLease {
opts = append(opts, WithNodeEnableLeaseV1Beta1(leases, nil))
}
node, err := NewNodeController(testP, testNode(t), nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error)
defer func() {
cancel()
assert.NilError(t, <-chErr)
}()
go func() {
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(t, nodes, node.n.Name)
defer nw.Stop()
nr := nw.ResultChan()
lw := makeWatch(t, leases, node.n.Name)
defer lw.Stop()
lr := lw.ResultChan()
var (
lBefore *coord.Lease
nodeUpdates int
leaseUpdates int
iters = 50
expectAtLeast = iters / 5
)
timeout := time.After(10 * time.Second)
for i := 0; i < iters; i++ {
var l *coord.Lease
select {
case <-timeout:
t.Fatal("timed out waiting for expected events")
case <-time.After(4 * interval):
t.Errorf("timeout waiting for event")
continue
case err := <-chErr:
t.Fatal(err) // if this returns at all it is an error regardless if err is nil
case <-nr:
nodeUpdates++
continue
case le := <-lr:
l = le.Object.(*coord.Lease)
leaseUpdates++
assert.Assert(t, cmp.Equal(l.Spec.HolderIdentity != nil, true))
assert.Check(t, cmp.Equal(*l.Spec.HolderIdentity, node.n.Name))
if lBefore != nil {
assert.Check(t, before(lBefore.Spec.RenewTime.Time, l.Spec.RenewTime.Time))
}
lBefore = l
}
}
lw.Stop()
nw.Stop()
assert.Check(t, atLeast(nodeUpdates, expectAtLeast))
if enableLease {
assert.Check(t, atLeast(leaseUpdates, expectAtLeast))
} else {
assert.Check(t, cmp.Equal(leaseUpdates, 0))
}
// trigger an async node status update
n := node.n.DeepCopy()
newCondition := corev1.NodeCondition{
Type: corev1.NodeConditionType("UPDATED"),
LastTransitionTime: metav1.Now().Rfc3339Copy(),
}
n.Status.Conditions = append(n.Status.Conditions, newCondition)
nw = makeWatch(t, nodes, node.n.Name)
defer nw.Stop()
nr = nw.ResultChan()
testP.triggerStatusUpdate(n)
eCtx, eCancel := context.WithTimeout(ctx, 10*time.Second)
defer eCancel()
select {
case err := <-chErr:
t.Fatal(err) // if this returns at all it is an error regardless if err is nil
case err := <-waitForEvent(eCtx, nr, func(e watch.Event) bool {
node := e.Object.(*corev1.Node)
if len(node.Status.Conditions) == 0 {
return false
}
// Check if this is a node update we are looking for
// Since node updates happen periodically there could be some that occur
// before the status update that we are looking for happens.
c := node.Status.Conditions[len(n.Status.Conditions)-1]
if !c.LastTransitionTime.Equal(&newCondition.LastTransitionTime) {
return false
}
if c.Type != newCondition.Type {
return false
}
return true
}):
assert.NilError(t, err, "error waiting for updated node condition")
}
}
func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
c := testclient.NewSimpleClientset()
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node, err := NewNodeController(testP, testNode(t), nodes,
WithNodeStatusUpdateErrorHandler(func(_ context.Context, err error) error {
cancel()
return nil
}),
)
assert.NilError(t, err)
chErr := make(chan error, 1)
go func() {
chErr <- node.Run(ctx)
}()
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
// wait for the node to be ready
select {
case <-timer.C:
t.Fatal("timeout waiting for node to be ready")
case <-chErr:
t.Fatalf("node.Run returned earlier than expected: %v", err)
case <-node.Ready():
}
err = nodes.Delete(node.n.Name, nil)
assert.NilError(t, err)
testP.triggerStatusUpdate(node.n.DeepCopy())
timer = time.NewTimer(10 * time.Second)
defer timer.Stop()
select {
case err := <-chErr:
assert.Equal(t, err, nil)
case <-timer.C:
t.Fatal("timeout waiting for node shutdown")
}
}
func TestEnsureLease(t *testing.T) {
c := testclient.NewSimpleClientset().Coordination().Leases(corev1.NamespaceNodeLease)
n := testNode(t)
ctx := context.Background()
lease := newLease(nil)
setLeaseAttrs(lease, n, 1*time.Second)
l1, err := ensureLease(ctx, c, lease.DeepCopy())
assert.NilError(t, err)
assert.Check(t, timeEqual(l1.Spec.RenewTime.Time, lease.Spec.RenewTime.Time))
l1.Spec.RenewTime.Time = time.Now().Add(1 * time.Second)
l2, err := ensureLease(ctx, c, l1.DeepCopy())
assert.NilError(t, err)
assert.Check(t, timeEqual(l2.Spec.RenewTime.Time, l1.Spec.RenewTime.Time))
}
func TestUpdateNodeStatus(t *testing.T) {
n := testNode(t)
n.Status.Conditions = append(n.Status.Conditions, corev1.NodeCondition{
LastHeartbeatTime: metav1.Now().Rfc3339Copy(),
})
n.Status.Phase = corev1.NodePending
nodes := testclient.NewSimpleClientset().CoreV1().Nodes()
ctx := context.Background()
updated, err := UpdateNodeStatus(ctx, nodes, n.DeepCopy())
assert.Equal(t, errors.IsNotFound(err), true, err)
_, err = nodes.Create(n)
assert.NilError(t, err)
updated, err = UpdateNodeStatus(ctx, nodes, n.DeepCopy())
assert.NilError(t, err)
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(n.Status, updated.Status))
n.Status.Phase = corev1.NodeRunning
updated, err = UpdateNodeStatus(ctx, nodes, n.DeepCopy())
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(n.Status, updated.Status))
err = nodes.Delete(n.Name, nil)
assert.NilError(t, err)
_, err = nodes.Get(n.Name, metav1.GetOptions{})
assert.Equal(t, errors.IsNotFound(err), true, err)
_, err = UpdateNodeStatus(ctx, nodes, updated.DeepCopy())
assert.Equal(t, errors.IsNotFound(err), true, err)
}
func TestUpdateNodeLease(t *testing.T) {
leases := testclient.NewSimpleClientset().Coordination().Leases(corev1.NamespaceNodeLease)
lease := newLease(nil)
n := testNode(t)
setLeaseAttrs(lease, n, 0)
ctx := context.Background()
l, err := UpdateNodeLease(ctx, leases, lease)
assert.NilError(t, err)
assert.Equal(t, l.Name, lease.Name)
assert.Assert(t, cmp.DeepEqual(l.Spec.HolderIdentity, lease.Spec.HolderIdentity))
compare, err := leases.Get(l.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Equal(t, l.Spec.RenewTime.Time.Unix(), compare.Spec.RenewTime.Time.Unix())
assert.Equal(t, compare.Name, lease.Name)
assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity))
l.Spec.RenewTime.Time = time.Now().Add(10 * time.Second)
compare, err = UpdateNodeLease(ctx, leases, l.DeepCopy())
assert.NilError(t, err)
assert.Equal(t, compare.Spec.RenewTime.Time.Unix(), l.Spec.RenewTime.Time.Unix())
assert.Equal(t, compare.Name, lease.Name)
assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity))
}
func testNode(t *testing.T) *corev1.Node {
n := &corev1.Node{}
n.Name = strings.ToLower(t.Name())
return n
}
type testNodeProvider struct {
NodeProvider
statusHandlers []func(*corev1.Node)
}
func (p *testNodeProvider) NotifyNodeStatus(ctx context.Context, h func(*corev1.Node)) {
p.statusHandlers = append(p.statusHandlers, h)
}
func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
for _, h := range p.statusHandlers {
h(n)
}
}
type watchGetter interface {
Watch(metav1.ListOptions) (watch.Interface, error)
}
func makeWatch(t *testing.T, wc watchGetter, name string) watch.Interface {
t.Helper()
w, err := wc.Watch(metav1.ListOptions{FieldSelector: "name=" + name})
assert.NilError(t, err)
return w
}
func atLeast(x, atLeast int) cmp.Comparison {
return func() cmp.Result {
if x < atLeast {
return cmp.ResultFailureTemplate(failTemplate("<"), map[string]interface{}{"x": x, "y": atLeast})
}
return cmp.ResultSuccess
}
}
func before(x, y time.Time) cmp.Comparison {
return func() cmp.Result {
if x.Before(y) {
return cmp.ResultSuccess
}
return cmp.ResultFailureTemplate(failTemplate(">="), map[string]interface{}{"x": x, "y": y})
}
}
func timeEqual(x, y time.Time) cmp.Comparison {
return func() cmp.Result {
if x.Equal(y) {
return cmp.ResultSuccess
}
return cmp.ResultFailureTemplate(failTemplate("!="), map[string]interface{}{"x": x, "y": y})
}
}
// waitForEvent waits for the `check` function to return true
// `check` is run when an event is received
// Cancelling the context will cancel the wait, with the context error sent on
// the returned channel.
func waitForEvent(ctx context.Context, chEvent <-chan watch.Event, check func(watch.Event) bool) <-chan error {
chErr := make(chan error, 1)
go func() {
for {
select {
case e := <-chEvent:
if check(e) {
chErr <- nil
return
}
case <-ctx.Done():
chErr <- ctx.Err()
return
}
}
}()
return chErr
}
func failTemplate(op string) string {
return `
{{- .Data.x}} (
{{- with callArg 0 }}{{ formatNode . }} {{end -}}
{{- printf "%T" .Data.x -}}
) ` + op + ` {{ .Data.y}} (
{{- with callArg 1 }}{{ formatNode . }} {{end -}}
{{- printf "%T" .Data.y -}}
)`
}

289
node/pod.go Normal file
View File

@@ -0,0 +1,289 @@
package node
import (
"context"
"hash/fnv"
"time"
"github.com/davecgh/go-spew/spew"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const (
podStatusReasonProviderFailed = "ProviderFailed"
)
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
return span.WithFields(ctx, log.Fields{
"uid": string(pod.GetUID()),
"namespace": pod.GetNamespace(),
"name": pod.GetName(),
"phase": string(pod.Status.Phase),
"reason": pod.Status.Reason,
})
}
func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "createOrUpdatePod")
defer span.End()
addPodAttributes(ctx, span, pod)
ctx = span.WithFields(ctx, log.Fields{
"pod": pod.GetName(),
"namespace": pod.GetNamespace(),
})
if err := populateEnvironmentVariables(ctx, pod, pc.resourceManager, pc.recorder); err != nil {
span.SetStatus(err)
return err
}
// Check if the pod is already known by the provider.
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
// Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod).
if pp, _ := pc.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil {
// Pod Update Only Permits update of:
// - `spec.containers[*].image`
// - `spec.initContainers[*].image`
// - `spec.activeDeadlineSeconds`
// - `spec.tolerations` (only additions to existing tolerations)
// compare the hashes of the pod specs to see if the specs actually changed
expected := hashPodSpec(pp.Spec)
if actual := hashPodSpec(pod.Spec); actual != expected {
log.G(ctx).Debugf("Pod %s exists, updating pod in provider", pp.Name)
if origErr := pc.provider.UpdatePod(ctx, pod); origErr != nil {
pc.handleProviderError(ctx, span, origErr, pod)
return origErr
}
log.G(ctx).Info("Updated pod in provider")
}
} else {
if origErr := pc.provider.CreatePod(ctx, pod); origErr != nil {
pc.handleProviderError(ctx, span, origErr, pod)
return origErr
}
log.G(ctx).Info("Created pod in provider")
}
return nil
}
// This is basically the kube runtime's hash container functionality.
// VK only operates at the Pod level so this is adapted for that
func hashPodSpec(spec corev1.PodSpec) uint64 {
hash := fnv.New32a()
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}
printer.Fprintf(hash, "%#v", spec)
return uint64(hash.Sum32())
}
func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) {
podPhase := corev1.PodPending
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
podPhase = corev1.PodFailed
}
pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
pod.Status.Phase = podPhase
pod.Status.Reason = podStatusReasonProviderFailed
pod.Status.Message = origErr.Error()
logger := log.G(ctx).WithFields(log.Fields{
"podPhase": podPhase,
"reason": pod.Status.Reason,
})
_, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod)
if err != nil {
logger.WithError(err).Warn("Failed to update pod status")
} else {
logger.Info("Updated k8s pod status")
}
span.SetStatus(origErr)
}
func (pc *PodController) deletePod(ctx context.Context, namespace, name string) error {
// Grab the pod as known by the provider.
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
// Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod).
pod, _ := pc.provider.GetPod(ctx, namespace, name)
if pod == nil {
// The provider is not aware of the pod, but we must still delete the Kubernetes API resource.
return pc.forceDeletePodResource(ctx, namespace, name)
}
ctx, span := trace.StartSpan(ctx, "deletePod")
defer span.End()
ctx = addPodAttributes(ctx, span, pod)
var delErr error
if delErr = pc.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) {
span.SetStatus(delErr)
return delErr
}
log.G(ctx).Debug("Deleted pod from provider")
if !errors.IsNotFound(delErr) {
if err := pc.forceDeletePodResource(ctx, namespace, name); err != nil {
span.SetStatus(err)
return err
}
log.G(ctx).Info("Deleted pod from Kubernetes")
}
return nil
}
func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, name string) error {
ctx, span := trace.StartSpan(ctx, "forceDeletePodResource")
defer span.End()
ctx = span.WithFields(ctx, log.Fields{
"namespace": namespace,
"name": name,
})
var grace int64
if err := pc.client.Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil {
if errors.IsNotFound(err) {
log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete")
return nil
}
span.SetStatus(err)
return pkgerrors.Wrap(err, "Failed to delete Kubernetes pod")
}
return nil
}
// updatePodStatuses syncs the providers pod status with the kubernetes pod status.
func (pc *PodController) updatePodStatuses(ctx context.Context, q workqueue.RateLimitingInterface) {
ctx, span := trace.StartSpan(ctx, "updatePodStatuses")
defer span.End()
// Update all the pods with the provider status.
pods, err := pc.podsLister.List(labels.Everything())
if err != nil {
err = pkgerrors.Wrap(err, "error getting pod list")
span.SetStatus(err)
log.G(ctx).WithError(err).Error("Error updating pod statuses")
return
}
ctx = span.WithField(ctx, "nPods", int64(len(pods)))
for _, pod := range pods {
if !shouldSkipPodStatusUpdate(pod) {
enqueuePodStatusUpdate(ctx, q, pod)
}
}
}
func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodSucceeded ||
pod.Status.Phase == corev1.PodFailed ||
pod.Status.Reason == podStatusReasonProviderFailed
}
func (pc *PodController) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {
if shouldSkipPodStatusUpdate(pod) {
return nil
}
ctx, span := trace.StartSpan(ctx, "updatePodStatus")
defer span.End()
ctx = addPodAttributes(ctx, span, pod)
status, err := pc.provider.GetPodStatus(ctx, pod.Namespace, pod.Name)
if err != nil && !errdefs.IsNotFound(err) {
span.SetStatus(err)
return pkgerrors.Wrap(err, "error retreiving pod status")
}
// Update the pod's status
if status != nil {
pod.Status = *status
} else {
// Only change the status when the pod was already up
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
if pod.Status.Phase == corev1.PodRunning || pod.ObjectMeta.CreationTimestamp.Add(time.Minute).Before(time.Now()) {
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
pod.Status.Phase = corev1.PodFailed
pod.Status.Reason = "NotFound"
pod.Status.Message = "The pod status was not found and may have been deleted from the provider"
for i, c := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: -137,
Reason: "NotFound",
Message: "Container was not found and was likely deleted",
FinishedAt: metav1.NewTime(time.Now()),
StartedAt: c.State.Running.StartedAt,
ContainerID: c.ContainerID,
}
pod.Status.ContainerStatuses[i].State.Running = nil
}
}
}
if _, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod); err != nil {
span.SetStatus(err)
return pkgerrors.Wrap(err, "error while updating pod status in kubernetes")
}
log.G(ctx).WithFields(log.Fields{
"new phase": string(pod.Status.Phase),
"new reason": pod.Status.Reason,
}).Debug("Updated pod status in kubernetes")
return nil
}
func enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key")
} else {
q.AddRateLimited(key)
}
}
func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "podStatusHandler")
defer span.End()
ctx = span.WithField(ctx, "key", key)
log.G(ctx).Debug("processing pod status update")
defer func() {
span.SetStatus(retErr)
if retErr != nil {
log.G(ctx).WithError(retErr).Error("Error processing pod status update")
}
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return pkgerrors.Wrap(err, "error spliting cache key")
}
pod, err := pc.podsLister.Pods(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
log.G(ctx).WithError(err).Debug("Skipping pod status update for pod missing in Kubernetes")
return nil
}
return pkgerrors.Wrap(err, "error looking up pod")
}
return pc.updatePodStatus(ctx, pod)
}

281
node/pod_test.go Normal file
View File

@@ -0,0 +1,281 @@
package node
import (
"context"
"path"
"testing"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/fake"
)
type mockProvider struct {
pods map[string]*corev1.Pod
creates int
updates int
deletes int
}
func (m *mockProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
m.pods[path.Join(pod.GetNamespace(), pod.GetName())] = pod
m.creates++
return nil
}
func (m *mockProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
m.pods[path.Join(pod.GetNamespace(), pod.GetName())] = pod
m.updates++
return nil
}
func (m *mockProvider) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) {
p := m.pods[path.Join(namespace, name)]
if p == nil {
return nil, errdefs.NotFound("not found")
}
return p, nil
}
func (m *mockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) {
p := m.pods[path.Join(namespace, name)]
if p == nil {
return nil, errdefs.NotFound("not found")
}
return &p.Status, nil
}
func (m *mockProvider) DeletePod(ctx context.Context, p *corev1.Pod) error {
delete(m.pods, path.Join(p.GetNamespace(), p.GetName()))
m.deletes++
return nil
}
func (m *mockProvider) GetPods(_ context.Context) ([]*corev1.Pod, error) {
ls := make([]*corev1.Pod, 0, len(m.pods))
for _, p := range ls {
ls = append(ls, p)
}
return ls, nil
}
type TestController struct {
*PodController
mock *mockProvider
client *fake.Clientset
}
func newMockProvider() *mockProvider {
return &mockProvider{pods: make(map[string]*corev1.Pod)}
}
func newTestController() *TestController {
fk8s := fake.NewSimpleClientset()
rm := testutil.FakeResourceManager()
p := newMockProvider()
return &TestController{
PodController: &PodController{
client: fk8s.CoreV1(),
provider: p,
resourceManager: rm,
recorder: testutil.FakeEventRecorder(5),
},
mock: p,
client: fk8s,
}
}
func TestPodHashingEqual(t *testing.T) {
p1 := corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: "nginx",
Image: "nginx:1.15.12-perl",
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
ContainerPort: 443,
Protocol: "tcp",
},
},
},
},
}
h1 := hashPodSpec(p1)
p2 := corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: "nginx",
Image: "nginx:1.15.12-perl",
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
ContainerPort: 443,
Protocol: "tcp",
},
},
},
},
}
h2 := hashPodSpec(p2)
assert.Check(t, is.Equal(h1, h2))
}
func TestPodHashingDifferent(t *testing.T) {
p1 := corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: "nginx",
Image: "nginx:1.15.12",
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
ContainerPort: 443,
Protocol: "tcp",
},
},
},
},
}
h1 := hashPodSpec(p1)
p2 := corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: "nginx",
Image: "nginx:1.15.12-perl",
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
ContainerPort: 443,
Protocol: "tcp",
},
},
},
},
}
h2 := hashPodSpec(p2)
assert.Check(t, h1 != h2)
}
func TestPodCreateNewPod(t *testing.T) {
svr := newTestController()
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Spec = corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: "nginx",
Image: "nginx:1.15.12",
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
ContainerPort: 443,
Protocol: "tcp",
},
},
},
},
}
err := svr.createOrUpdatePod(context.Background(), pod)
assert.Check(t, is.Nil(err))
// createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist
assert.Check(t, is.Equal(svr.mock.creates, 1))
assert.Check(t, is.Equal(svr.mock.updates, 0))
}
func TestPodUpdateExisting(t *testing.T) {
svr := newTestController()
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Spec = corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: "nginx",
Image: "nginx:1.15.12",
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
ContainerPort: 443,
Protocol: "tcp",
},
},
},
},
}
err := svr.provider.CreatePod(context.Background(), pod)
assert.Check(t, is.Nil(err))
assert.Check(t, is.Equal(svr.mock.creates, 1))
assert.Check(t, is.Equal(svr.mock.updates, 0))
pod2 := &corev1.Pod{}
pod2.ObjectMeta.Namespace = "default"
pod2.ObjectMeta.Name = "nginx"
pod2.Spec = corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: "nginx",
Image: "nginx:1.15.12-perl",
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
ContainerPort: 443,
Protocol: "tcp",
},
},
},
},
}
err = svr.createOrUpdatePod(context.Background(), pod2)
assert.Check(t, is.Nil(err))
// createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed
assert.Check(t, is.Equal(svr.mock.creates, 1))
assert.Check(t, is.Equal(svr.mock.updates, 1))
}
func TestPodNoSpecChange(t *testing.T) {
svr := newTestController()
pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default"
pod.ObjectMeta.Name = "nginx"
pod.Spec = corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: "nginx",
Image: "nginx:1.15.12",
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
ContainerPort: 443,
Protocol: "tcp",
},
},
},
},
}
err := svr.mock.CreatePod(context.Background(), pod)
assert.Check(t, is.Nil(err))
assert.Check(t, is.Equal(svr.mock.creates, 1))
assert.Check(t, is.Equal(svr.mock.updates, 0))
err = svr.createOrUpdatePod(context.Background(), pod)
assert.Check(t, is.Nil(err))
// createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change
assert.Check(t, is.Equal(svr.mock.creates, 1))
assert.Check(t, is.Equal(svr.mock.updates, 0))
}

396
node/podcontroller.go Normal file
View File

@@ -0,0 +1,396 @@
// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package node
import (
"context"
"fmt"
"reflect"
"strconv"
"sync"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
// PodLifecycleHandler defines the interface used by the PodController to react
// to new and changed pods scheduled to the node that is being managed.
//
// Errors produced by these methods should implement an interface from
// github.com/virtual-kubelet/virtual-kubelet/errdefs package in order for the
// core logic to be able to understand the type of failure.
type PodLifecycleHandler interface {
// CreatePod takes a Kubernetes Pod and deploys it within the provider.
CreatePod(ctx context.Context, pod *corev1.Pod) error
// UpdatePod takes a Kubernetes Pod and updates it within the provider.
UpdatePod(ctx context.Context, pod *corev1.Pod) error
// DeletePod takes a Kubernetes Pod and deletes it from the provider.
DeletePod(ctx context.Context, pod *corev1.Pod) error
// GetPod retrieves a pod by name from the provider (can be cached).
GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error)
// GetPodStatus retrieves the status of a pod by name from the provider.
GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error)
// GetPods retrieves a list of all pods running on the provider (can be cached).
GetPods(context.Context) ([]*corev1.Pod, error)
}
// PodNotifier notifies callers of pod changes.
// Providers should implement this interface to enable callers to be notified
// of pod status updates asyncronously.
type PodNotifier interface {
// NotifyPods instructs the notifier to call the passed in function when
// the pod status changes.
//
// NotifyPods should not block callers.
NotifyPods(context.Context, func(*corev1.Pod))
}
// PodController is the controller implementation for Pod resources.
type PodController struct {
provider PodLifecycleHandler
// podsInformer is an informer for Pod resources.
podsInformer corev1informers.PodInformer
// podsLister is able to list/get Pod resources from a shared informer's store.
podsLister corev1listers.PodLister
// recorder is an event recorder for recording Event resources to the Kubernetes API.
recorder record.EventRecorder
// ready is a channel which will be closed once the pod controller is fully up and running.
// this channel will never be closed if there is an error on startup.
ready chan struct{}
client corev1client.PodsGetter
resourceManager *manager.ResourceManager // TODO: can we eliminate this?
}
// PodControllerConfig is used to configure a new PodController.
type PodControllerConfig struct {
// PodClient is used to perform actions on the k8s API, such as updating pod status
// This field is required
PodClient corev1client.PodsGetter
// PodInformer is used as a local cache for pods
// This should be configured to only look at pods scheduled to the node which the controller will be managing
PodInformer corev1informers.PodInformer
EventRecorder record.EventRecorder
Provider PodLifecycleHandler
// TODO: get rid of this
ResourceManager *manager.ResourceManager
}
func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.PodClient == nil {
return nil, errdefs.InvalidInput("must set core client")
}
if cfg.EventRecorder == nil {
return nil, errdefs.InvalidInput("must set event recorder")
}
if cfg.PodInformer == nil {
return nil, errdefs.InvalidInput("must set informer")
}
return &PodController{
client: cfg.PodClient,
podsInformer: cfg.PodInformer,
podsLister: cfg.PodInformer.Lister(),
provider: cfg.Provider,
resourceManager: cfg.ResourceManager,
ready: make(chan struct{}),
recorder: cfg.EventRecorder,
}, nil
}
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
k8sQ := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes")
defer k8sQ.ShutDown()
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
pc.runSyncFromProvider(ctx, podStatusQueue)
defer podStatusQueue.ShutDown()
// Set up event handlers for when Pod resources change.
pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(pod interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.L.Error(err)
} else {
k8sQ.AddRateLimited(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Create a copy of the old and new pod objects so we don't mutate the cache.
oldPod := oldObj.(*corev1.Pod).DeepCopy()
newPod := newObj.(*corev1.Pod).DeepCopy()
// We want to check if the two objects differ in anything other than their resource versions.
// Hence, we make them equal so that this change isn't picked up by reflect.DeepEqual.
newPod.ResourceVersion = oldPod.ResourceVersion
// Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed.
// This guarantees that we don't attempt to sync the pod every time its .status field is updated.
if reflect.DeepEqual(oldPod.ObjectMeta, newPod.ObjectMeta) && reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
return
}
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.L.Error(err)
} else {
k8sQ.AddRateLimited(key)
}
},
DeleteFunc: func(pod interface{}) {
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
log.L.Error(err)
} else {
k8sQ.AddRateLimited(key)
}
},
})
// Wait for the caches to be synced *before* starting workers.
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
return pkgerrors.New("failed to wait for caches to sync")
}
log.G(ctx).Info("Pod cache in-sync")
// Perform a reconciliation step that deletes any dangling pods from the provider.
// This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis.
// If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried.
pc.deleteDanglingPods(ctx, podSyncWorkers)
log.G(ctx).Info("starting workers")
for id := 0; id < podSyncWorkers; id++ {
go wait.Until(func() {
// Use the worker's "index" as its ID so we can use it for tracing.
pc.runWorker(ctx, strconv.Itoa(id), k8sQ)
}, time.Second, ctx.Done())
}
close(pc.ready)
log.G(ctx).Info("started workers")
<-ctx.Done()
log.G(ctx).Info("shutting down workers")
return nil
}
// Ready returns a channel which gets closed once the PodController is ready to handle scheduled pods.
// This channel will never close if there is an error on startup.
// The status of this channel after sthudown is indeterminate.
func (pc *PodController) Ready() <-chan struct{} {
return pc.ready
}
// runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue.
func (pc *PodController) runWorker(ctx context.Context, workerId string, q workqueue.RateLimitingInterface) {
for pc.processNextWorkItem(ctx, workerId, q) {
}
}
// processNextWorkItem will read a single work item off the work queue and attempt to process it,by calling the syncHandler.
func (pc *PodController) processNextWorkItem(ctx context.Context, workerId string, q workqueue.RateLimitingInterface) bool {
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
ctx, span := trace.StartSpan(ctx, "processNextWorkItem")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerId", workerId)
return handleQueueItem(ctx, q, pc.syncHandler)
}
// syncHandler compares the actual state with the desired, and attempts to converge the two.
func (pc *PodController) syncHandler(ctx context.Context, key string) error {
ctx, span := trace.StartSpan(ctx, "syncHandler")
defer span.End()
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key)
// Convert the namespace/name string into a distinct namespace and name.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
// Log the error as a warning, but do not requeue the key as it is invalid.
log.G(ctx).Warn(pkgerrors.Wrapf(err, "invalid resource key: %q", key))
return nil
}
// Get the Pod resource with this namespace/name.
pod, err := pc.podsLister.Pods(namespace).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
// We've failed to fetch the pod from the lister, but the error is not a 404.
// Hence, we add the key back to the work queue so we can retry processing it later.
err := pkgerrors.Wrapf(err, "failed to fetch pod with key %q from lister", key)
span.SetStatus(err)
return err
}
// At this point we know the Pod resource doesn't exist, which most probably means it was deleted.
// Hence, we must delete it from the provider if it still exists there.
if err := pc.deletePod(ctx, namespace, name); err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
span.SetStatus(err)
return err
}
return nil
}
// At this point we know the Pod resource has either been created or updated (which includes being marked for deletion).
return pc.syncPodInProvider(ctx, pod)
}
// syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation.
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "syncPodInProvider")
defer span.End()
// Add the pod's attributes to the current span.
ctx = addPodAttributes(ctx, span, pod)
// Check whether the pod has been marked for deletion.
// If it does, guarantee it is deleted in the provider and Kubernetes.
if pod.DeletionTimestamp != nil {
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod))
span.SetStatus(err)
return err
}
return nil
}
// Ignore the pod if it is in the "Failed" or "Succeeded" state.
if pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded {
log.G(ctx).Warnf("skipping sync of pod %q in %q phase", loggablePodName(pod), pod.Status.Phase)
return nil
}
// Create or update the pod in the provider.
if err := pc.createOrUpdatePod(ctx, pod); err != nil {
err := pkgerrors.Wrapf(err, "failed to sync pod %q in the provider", loggablePodName(pod))
span.SetStatus(err)
return err
}
return nil
}
// deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them.
func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) {
ctx, span := trace.StartSpan(ctx, "deleteDanglingPods")
defer span.End()
// Grab the list of pods known to the provider.
pps, err := pc.provider.GetPods(ctx)
if err != nil {
err := pkgerrors.Wrap(err, "failed to fetch the list of pods from the provider")
span.SetStatus(err)
log.G(ctx).Error(err)
return
}
// Create a slice to hold the pods we will be deleting from the provider.
ptd := make([]*corev1.Pod, 0)
// Iterate over the pods known to the provider, marking for deletion those that don't exist in Kubernetes.
// Take on this opportunity to populate the list of key that correspond to pods known to the provider.
for _, pp := range pps {
if _, err := pc.podsLister.Pods(pp.Namespace).Get(pp.Name); err != nil {
if errors.IsNotFound(err) {
// The current pod does not exist in Kubernetes, so we mark it for deletion.
ptd = append(ptd, pp)
continue
}
// For some reason we couldn't fetch the pod from the lister, so we propagate the error.
err := pkgerrors.Wrap(err, "failed to fetch pod from the lister")
span.SetStatus(err)
log.G(ctx).Error(err)
return
}
}
// We delete each pod in its own goroutine, allowing a maximum of "threadiness" concurrent deletions.
semaphore := make(chan struct{}, threadiness)
var wg sync.WaitGroup
wg.Add(len(ptd))
// Iterate over the slice of pods to be deleted and delete them in the provider.
for _, pod := range ptd {
go func(ctx context.Context, pod *corev1.Pod) {
defer wg.Done()
ctx, span := trace.StartSpan(ctx, "deleteDanglingPod")
defer span.End()
semaphore <- struct{}{}
defer func() {
<-semaphore
}()
// Add the pod's attributes to the current span.
ctx = addPodAttributes(ctx, span, pod)
// Actually delete the pod.
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
span.SetStatus(err)
log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod))
} else {
log.G(ctx).Infof("deleted leaked pod %q in provider", loggablePodName(pod))
}
}(ctx, pod)
}
// Wait for all pods to be deleted.
wg.Wait()
return
}
// loggablePodName returns the "namespace/name" key for the specified pod.
// If the key cannot be computed, "(unknown)" is returned.
// This method is meant to be used for logging purposes only.
func loggablePodName(pod *corev1.Pod) string {
k, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
return "(unknown)"
}
return k
}
// loggablePodNameFromCoordinates returns the "namespace/name" key for the pod identified by the specified namespace and name (coordinates).
func loggablePodNameFromCoordinates(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

140
node/queue.go Normal file
View File

@@ -0,0 +1,140 @@
package node
import (
"context"
"strconv"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
)
const (
// maxRetries is the number of times we try to process a given key before permanently forgetting it.
maxRetries = 20
)
type queueHandler func(ctx context.Context, key string) error
func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, handler queueHandler) bool {
ctx, span := trace.StartSpan(ctx, "handleQueueItem")
defer span.End()
obj, shutdown := q.Get()
if shutdown {
return false
}
log.G(ctx).Debug("Got queue object")
err := func(obj interface{}) error {
defer log.G(ctx).Debug("Processed queue item")
// We call Done here so the work queue knows we have finished processing this item.
// We also must remember to call Forget if we do not want this work item being re-queued.
// For example, we do not call Forget if a transient error occurs.
// Instead, the item is put back on the work queue and attempted again after a back-off period.
defer q.Done(obj)
var key string
var ok bool
// We expect strings to come off the work queue.
// These are of the form namespace/name.
// We do this as the delayed nature of the work queue means the items in the informer cache may actually be more up to date that when the item was initially put onto the workqueue.
if key, ok = obj.(string); !ok {
// As the item in the work queue is actually invalid, we call Forget here else we'd go into a loop of attempting to process a work item that is invalid.
q.Forget(obj)
log.G(ctx).Warnf("expected string in work queue item but got %#v", obj)
return nil
}
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key)
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
if err := handler(ctx, key); err != nil {
if q.NumRequeues(key) < maxRetries {
// Put the item back on the work queue to handle any transient errors.
log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", key)
q.AddRateLimited(key)
return nil
}
// We've exceeded the maximum retries, so we must forget the key.
q.Forget(key)
return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key)
}
// Finally, if no error occurs we Forget this item so it does not get queued again until another change happens.
q.Forget(obj)
return nil
}(obj)
if err != nil {
// We've actually hit an error, so we set the span's status based on the error.
span.SetStatus(err)
log.G(ctx).Error(err)
return true
}
return true
}
func (pc *PodController) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface, numWorkers int) {
for i := 0; i < numWorkers; i++ {
go func(index int) {
workerID := strconv.Itoa(index)
pc.runProviderSyncWorker(ctx, workerID, q)
}(i)
}
}
func (pc *PodController) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processPodStatusUpdate(ctx, workerID, q) {
}
}
func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
ctx, span := trace.StartSpan(ctx, "processPodStatusUpdate")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerID", workerID)
return handleQueueItem(ctx, q, pc.podStatusHandler)
}
// providerSyncLoop syncronizes pod states from the provider back to kubernetes
// Deprecated: This is only used when the provider does not support async updates
// Providers should implement async update support, even if it just means copying
// something like this in.
func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) {
const sleepTime = 5 * time.Second
t := time.NewTimer(sleepTime)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
t.Stop()
ctx, span := trace.StartSpan(ctx, "syncActualState")
pc.updatePodStatuses(ctx, q)
span.End()
// restart the timer
t.Reset(sleepTime)
}
}
}
func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
if pn, ok := pc.provider.(PodNotifier); ok {
pn.NotifyPods(ctx, func(pod *corev1.Pod) {
enqueuePodStatusUpdate(ctx, q, pod)
})
} else {
go pc.providerSyncLoop(ctx, q)
}
}