Compare commits

..

6 Commits

Author SHA1 Message Date
Brian Goff
4e399f93b1 Merge pull request #868 from cpuguy83/1.2_fix_missing_stats_route
Add GetStatsSummary to PodHandlerConfig
2020-07-30 14:02:41 -07:00
Vilmos Nebehaj
c621acc8d2 Add GetStatsSummary to PodHandlerConfig
If both the metrics routes and the pod routes are attached to the same
mux with the pattern "/", it will panic. Instead, add the stats handler
function to PodHandlerConfig and set up the route if it is not nil.

(cherry picked from commit 56b248c854)
Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2020-07-30 10:25:34 -07:00
Brian Goff
e6e1dbed87 Merge pull request #794 from cpuguy83/cherry_picks_1.2.1
Cherry picks 1.2.1
2019-11-15 15:19:11 -08:00
Thomas Hartland
eb9498cdde Add test for node ping interval
(cherry picked from commit 3783a39b26)
2019-11-15 14:31:04 -08:00
Thomas Hartland
df16317a89 After handling status update, reset update timer with correct duration
If the ping timer is being used, it should be reset with the ping update
interval. If the status update interval is used then Ping stops being
called for long enough to cause kubernetes to mark the node as NotReady.

(cherry picked from commit c258614d8f)
2019-11-15 14:30:54 -08:00
Brian Goff
7585e11542 [Sync Provider] Fix panic on not found pod status
(cherry picked from commit 6e33b0f084)
2019-11-15 14:30:22 -08:00
34 changed files with 455 additions and 1715 deletions

View File

@@ -3,7 +3,7 @@ jobs:
validate:
resource_class: xlarge
docker:
- image: circleci/golang:1.13
- image: circleci/golang:1.12
environment:
GO111MODULE: "on"
GOPROXY: https://proxy.golang.org
@@ -33,7 +33,7 @@ jobs:
test:
resource_class: xlarge
docker:
- image: circleci/golang:1.13
- image: circleci/golang:1.12
environment:
GO111MODULE: "on"
working_directory: /go/src/github.com/virtual-kubelet/virtual-kubelet
@@ -61,7 +61,7 @@ jobs:
CHANGE_MINIKUBE_NONE_USER: true
GOPATH: /home/circleci/go
KUBECONFIG: /home/circleci/.kube/config
KUBERNETES_VERSION: v1.17.6
KUBERNETES_VERSION: v1.15.2
MINIKUBE_HOME: /home/circleci
MINIKUBE_VERSION: v1.2.0
MINIKUBE_WANTUPDATENOTIFICATION: false
@@ -117,7 +117,7 @@ jobs:
command: |
mkdir $HOME/.go
export PATH=$HOME/.go/bin:${PATH}
curl -fsSL -o "/tmp/go.tar.gz" "https://dl.google.com/go/go1.13.12.linux-amd64.tar.gz"
curl -fsSL -o "/tmp/go.tar.gz" "https://dl.google.com/go/go1.12.6.linux-amd64.tar.gz"
tar -C $HOME/.go --strip-components=1 -xzf "/tmp/go.tar.gz"
go version
make e2e

View File

@@ -6,9 +6,6 @@
* VMWare
* Netflix
* Hashi Corp
* Admiralty
* Elotl
* Tencent Games
Since end-users are specific per provider within VK we have many end-user customers that we don't have permission to list publically. Please contact ribhatia@microsoft.com for more informtation.

View File

@@ -1,4 +1,4 @@
FROM golang:1.13 as builder
FROM golang:1.12 as builder
ENV PATH /go/bin:/usr/local/go/bin:$PATH
ENV GOPATH /go
COPY . /go/src/github.com/virtual-kubelet/virtual-kubelet

View File

@@ -13,7 +13,7 @@ include Makefile.e2e
# Also, we will want to lock our tool versions using go mod:
# https://github.com/golang/go/wiki/Modules#how-can-i-track-tool-dependencies-for-a-module
gobin_tool ?= $(shell which gobin || echo $(GOPATH)/bin/gobin)
goimports := golang.org/x/tools/cmd/goimports@release-branch.go1.13
goimports := golang.org/x/tools/cmd/goimports@release-branch.go1.12
gocovmerge := github.com/wadey/gocovmerge@b5bfa59ec0adc420475f97f89b58045c721d761c
goreleaser := github.com/goreleaser/goreleaser@v0.82.2
gox := github.com/mitchellh/gox@v1.0.1
@@ -119,6 +119,8 @@ format: goimports
$Q find . -iname \*.go | grep -v \
-e "^$$" $(addprefix -e ,$(IGNORED_PACKAGES)) | xargs $(gobin_tool) -run $(goimports) -w
##### =====> Internals <===== #####
.PHONY: setup

View File

@@ -2,7 +2,7 @@
Virtual Kubelet is an open source [Kubernetes kubelet](https://kubernetes.io/docs/reference/generated/kubelet/)
implementation that masquerades as a kubelet for the purposes of connecting Kubernetes to other APIs.
This allows the nodes to be backed by other services like ACI, AWS Fargate, [IoT Edge](https://github.com/Azure/iot-edge-virtual-kubelet-provider), [Tensile Kube](https://github.com/virtual-kubelet/tensile-kube) etc. The primary scenario for VK is enabling the extension of the Kubernetes API into serverless container platforms like ACI and Fargate, though we are open to others. However, it should be noted that VK is explicitly not intended to be an alternative to Kubernetes federation.
This allows the nodes to be backed by other services like ACI, AWS Fargate, [IoT Edge](https://github.com/Azure/iot-edge-virtual-kubelet-provider) etc. The primary scenario for VK is enabling the extension of the Kubernetes API into serverless container platforms like ACI and Fargate, though we are open to others. However, it should be noted that VK is explicitly not intended to be an alternative to Kubernetes federation.
Virtual Kubelet features a pluggable architecture and direct use of Kubernetes primitives, making it much easier to build on.
@@ -16,15 +16,12 @@ The best description is "Kubernetes API on top, programmable back."
* [How It Works](#how-it-works)
* [Usage](#usage)
* [Providers](#providers)
+ [Admiralty Multi-Cluster Scheduler](#admiralty-multi-cluster-scheduler)
+ [Alibaba Cloud ECI Provider](#alibaba-cloud-eci-provider)
+ [Azure Container Instances Provider](#azure-container-instances-provider)
+ [Azure Batch GPU Provider](https://github.com/virtual-kubelet/azure-batch/blob/master/README.md)
+ [AWS Fargate Provider](#aws-fargate-provider)
+ [Elotl Kip](#elotl-kip)
+ [HashiCorp Nomad](#hashicorp-nomad-provider)
+ [OpenStack Zun](#openstack-zun-provider)
+ [Tensile Kube Provider](#tensile-kube-provider)
+ [Adding a New Provider via the Provider Interface](#adding-a-new-provider-via-the-provider-interface)
* [Testing](#testing)
+ [Unit tests](#unit-tests)
@@ -76,9 +73,6 @@ Providers must provide the following functionality to be considered a supported
2. Conforms to the current API provided by Virtual Kubelet.
3. Does not have access to the Kubernetes API Server and has a well-defined callback mechanism for getting data like secrets or configmaps.
### Admiralty Multi-Cluster Scheduler
Admiralty Multi-Cluster Scheduler mutates annotated pods into "proxy pods" scheduled on a virtual-kubelet node and creates corresponding "delegate pods" in remote clusters (actually running the containers). A feedback loop updates the statuses and annotations of the proxy pods to reflect the statuses and annotations of the delegate pods. You can find more details in the [Admiralty Multi-Cluster Scheduler documentation](https://github.com/admiraltyio/multicluster-scheduler).
### Alibaba Cloud ECI Provider
@@ -118,12 +112,6 @@ co-exist with pods on regular worker nodes in the same Kubernetes cluster.
Easy instructions and a sample configuration file is available in the [AWS Fargate provider documentation](https://github.com/virtual-kubelet/aws-fargate). Please note that this provider is not currently supported.
### Elotl Kip
[Kip](https://github.com/elotl/kip) is a provider that runs pods in cloud instances, allowing a Kubernetes cluster to transparently scale workloads into a cloud. When a pod is scheduled onto the virtual node, Kip starts a right-sized cloud instance for the pod's workload and dispatches the pod onto the instance. When the pod is finished running, the cloud instance is terminated.
When workloads run on Kip, your cluster size naturally scales with the cluster workload, pods are strongly isolated from each other and the user is freed from managing worker nodes and strategically packing pods onto nodes.
### HashiCorp Nomad Provider
HashiCorp [Nomad](https://nomadproject.io) provider for Virtual Kubelet connects your Kubernetes cluster
@@ -149,11 +137,6 @@ and bind-mount Cinder volumes into a path inside a pod's container.
For detailed instructions, follow the guide [here](https://github.com/virtual-kubelet/openstack-zun/blob/master/README.md).
### Tensile Kube Provider
[Tensile kube](https://github.com/virtual-kubelet/tensile-kube/blob/master/README.md) is contributed by [tencent
games](https://game.qq.com), which is provider for Virtual Kubelet connects your Kubernetes cluster with other Kubernetes clusters. This provider enables us extending Kubernetes to an unlimited one. By using the provider, pods that are scheduled on the virtual node registered on Kubernetes will run as jobs on other Kubernetes clusters' nodes.
### Adding a New Provider via the Provider Interface
Providers consume this project as a library which implements the core logic of
@@ -293,7 +276,7 @@ Enable the ServiceNodeExclusion flag, by modifying the Controller Manager manife
Virtual Kubelet follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).
Sign the [CNCF CLA](https://github.com/kubernetes/community/blob/master/CLA.md) to be able to make Pull Requests to this repo.
Monthly Virtual Kubelet Office Hours are held at 10am PST on the last Thursday of every month in this [zoom meeting room](https://zoom.us/j/94701509915). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ).
Bi-weekly Virtual Kubelet Architecture meetings are held at 11am PST every other Wednesday in this [zoom meeting room](https://zoom.us/j/245165908). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ).
Our google drive with design specifications and meeting notes are [here](https://drive.google.com/drive/folders/19Ndu11WBCCBDowo9CrrGUHoIfd2L8Ueg?usp=sharing).

View File

@@ -81,11 +81,6 @@ func installFlags(flags *pflag.FlagSet, c *Opts) {
flags.DurationVar(&c.InformerResyncPeriod, "full-resync-period", c.InformerResyncPeriod, "how often to perform a full resync of pods between kubernetes and the provider")
flags.DurationVar(&c.StartupTimeout, "startup-timeout", c.StartupTimeout, "How long to wait for the virtual-kubelet to start")
flags.DurationVar(&c.StreamIdleTimeout, "stream-idle-timeout", c.StreamIdleTimeout,
"stream-idle-timeout is the maximum time a streaming connection can be idle before the connection is"+
" automatically closed, default 30s.")
flags.DurationVar(&c.StreamCreationTimeout, "stream-creation-timeout", c.StreamCreationTimeout,
"stream-creation-timeout is the maximum time for streaming connection, default 30s.")
flagset := flag.NewFlagSet("klog", flag.PanicOnError)
klog.InitFlags(flagset)

View File

@@ -22,7 +22,6 @@ import (
"net"
"net/http"
"os"
"time"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
@@ -58,7 +57,7 @@ func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) {
}, nil
}
func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig, getPodsFromKubernetes api.PodListerFunc) (_ func(), retErr error) {
func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig) (_ func(), retErr error) {
var closers []io.Closer
cancel := func() {
for _, c := range closers {
@@ -89,14 +88,10 @@ func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerCon
mux := http.NewServeMux()
podRoutes := api.PodHandlerConfig{
RunInContainer: p.RunInContainer,
GetContainerLogs: p.GetContainerLogs,
GetPodsFromKubernetes: getPodsFromKubernetes,
GetPods: p.GetPods,
StreamIdleTimeout: cfg.StreamIdleTimeout,
StreamCreationTimeout: cfg.StreamCreationTimeout,
RunInContainer: p.RunInContainer,
GetContainerLogs: p.GetContainerLogs,
GetPods: p.GetPods,
}
api.AttachPodRoutes(podRoutes, mux, true)
s := &http.Server{
@@ -147,12 +142,10 @@ func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string)
}
type apiServerConfig struct {
CertPath string
KeyPath string
Addr string
MetricsAddr string
StreamIdleTimeout time.Duration
StreamCreationTimeout time.Duration
CertPath string
KeyPath string
Addr string
MetricsAddr string
}
func getAPIConfig(c Opts) (*apiServerConfig, error) {
@@ -163,8 +156,6 @@ func getAPIConfig(c Opts) (*apiServerConfig, error) {
config.Addr = fmt.Sprintf(":%d", c.ListenPort)
config.MetricsAddr = c.MetricsAddr
config.StreamIdleTimeout = c.StreamIdleTimeout
config.StreamCreationTimeout = c.StreamCreationTimeout
return &config, nil
}

View File

@@ -36,10 +36,8 @@ const (
DefaultKubeNamespace = corev1.NamespaceAll
DefaultKubeClusterDomain = "cluster.local"
DefaultTaintEffect = string(corev1.TaintEffectNoSchedule)
DefaultTaintKey = "virtual-kubelet.io/provider"
DefaultStreamIdleTimeout = 30 * time.Second
DefaultStreamCreationTimeout = 30 * time.Second
DefaultTaintEffect = string(corev1.TaintEffectNoSchedule)
DefaultTaintKey = "virtual-kubelet.io/provider"
)
// Opts stores all the options for configuring the root virtual-kubelet command.
@@ -86,11 +84,6 @@ type Opts struct {
// Startup Timeout is how long to wait for the kubelet to start
StartupTimeout time.Duration
// StreamIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
StreamIdleTimeout time.Duration
// StreamCreationTimeout is the maximum time for streaming connection
StreamCreationTimeout time.Duration
Version string
}
@@ -159,13 +152,5 @@ func SetDefaultOpts(c *Opts) error {
}
}
if c.StreamIdleTimeout == 0 {
c.StreamIdleTimeout = DefaultStreamIdleTimeout
}
if c.StreamCreationTimeout == 0 {
c.StreamCreationTimeout = DefaultStreamCreationTimeout
}
return nil
}

View File

@@ -161,7 +161,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
log.G(ctx).Debug("node not found")
newNode := pNode.DeepCopy()
newNode.ResourceVersion = ""
_, err = client.CoreV1().Nodes().Create(ctx, newNode, metav1.CreateOptions{})
_, err = client.CoreV1().Nodes().Create(newNode)
if err != nil {
return err
}
@@ -193,9 +193,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
go podInformerFactory.Start(ctx.Done())
go scmInformerFactory.Start(ctx.Done())
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) {
return rm.GetPods(), nil
})
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig)
if err != nil {
return err
}

81
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/virtual-kubelet/virtual-kubelet
go 1.13
go 1.12
require (
contrib.go.opencensus.io/exporter/jaeger v0.1.0
@@ -8,64 +8,79 @@ require (
github.com/docker/spdystream v0.0.0-20170912183627-bc6354cbbc29 // indirect
github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 // indirect
github.com/evanphx/json-patch v4.1.0+incompatible // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
github.com/google/go-cmp v0.3.1
github.com/google/gofuzz v1.0.0 // indirect
github.com/googleapis/gnostic v0.1.0 // indirect
github.com/gorilla/mux v1.7.0
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/json-iterator/go v1.1.6 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.0.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
github.com/sirupsen/logrus v1.4.1
github.com/spf13/cobra v0.0.2
github.com/spf13/pflag v1.0.3
github.com/stretchr/testify v1.3.0 // indirect
go.opencensus.io v0.21.0
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c // indirect
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 // indirect
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/genproto v0.0.0-20190404172233-64821d5d2107 // indirect
google.golang.org/grpc v1.20.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
gotest.tools v2.2.0+incompatible
k8s.io/api v0.18.4
k8s.io/apimachinery v0.18.4
k8s.io/client-go v0.18.4
k8s.io/klog v1.0.0
k8s.io/kubernetes v1.18.4
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
k8s.io/client-go v10.0.0+incompatible
k8s.io/klog v0.3.1
k8s.io/kube-openapi v0.0.0-20190510232812-a01b7d5d6c22 // indirect
k8s.io/kubernetes v1.15.2
)
replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.18.4
replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.0.0-20190805144654-3d5bf3a310c1
replace k8s.io/cloud-provider => k8s.io/cloud-provider v0.18.4
replace k8s.io/cloud-provider => k8s.io/cloud-provider v0.0.0-20190805144409-8484242760e7
replace k8s.io/cli-runtime => k8s.io/cli-runtime v0.18.4
replace k8s.io/cli-runtime => k8s.io/cli-runtime v0.0.0-20190805143448-a07e59fb081d
replace k8s.io/apiserver => k8s.io/apiserver v0.18.4
replace k8s.io/apiserver => k8s.io/apiserver v0.0.0-20190805142138-368b2058237c
replace k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.18.4
replace k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.0.0-20190805144531-3985229e1802
replace k8s.io/cri-api => k8s.io/cri-api v0.18.4
replace k8s.io/cri-api => k8s.io/cri-api v0.0.0-20190531030430-6117653b35f1
replace k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.18.4
replace k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.0.0-20190805142416-fd821fbbb94e
replace k8s.io/kubelet => k8s.io/kubelet v0.18.4
replace k8s.io/kubelet => k8s.io/kubelet v0.0.0-20190805143852-517ff267f8d1
replace k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.18.4
replace k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.0.0-20190805144128-269742da31dd
replace k8s.io/apimachinery => k8s.io/apimachinery v0.18.4
replace k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
replace k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.18.4
replace k8s.io/api => k8s.io/api v0.0.0-20190805141119-fdd30b57c827
replace k8s.io/kube-proxy => k8s.io/kube-proxy v0.18.4
replace k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.0.0-20190805144246-c01ee70854a1
replace k8s.io/component-base => k8s.io/component-base v0.18.4
replace k8s.io/kube-proxy => k8s.io/kube-proxy v0.0.0-20190805143734-7f1675b90353
replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.18.4
replace k8s.io/component-base => k8s.io/component-base v0.0.0-20190805141645-3a5e5ac800ae
replace k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.18.4
replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.0.0-20190805144012-2a1ed1f3d8a4
replace k8s.io/metrics => k8s.io/metrics v0.18.4
replace k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.0.0-20190805143126-cdb999c96590
replace k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.18.4
replace k8s.io/metrics => k8s.io/metrics v0.0.0-20190805143318-16b07057415d
replace k8s.io/code-generator => k8s.io/code-generator v0.18.4
replace k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.0.0-20190805142637-3b65bc4bb24f
replace k8s.io/client-go => k8s.io/client-go v0.18.4
replace k8s.io/code-generator => k8s.io/code-generator v0.0.0-20190612205613-18da4a14b22b
replace k8s.io/kubectl => k8s.io/kubectl v0.18.4
replace k8s.io/api => k8s.io/api v0.18.4
replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190805141520-2fe0317bcee0

685
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -5,5 +5,4 @@ import (
// This is a dep that `go mod tidy` keeps removing, because it's a transitive dep that's pulled in via a test
// See: https://github.com/golang/go/issues/29702
_ "github.com/prometheus/client_golang/prometheus"
_ "golang.org/x/sys/unix"
)

View File

@@ -16,24 +16,23 @@ import (
// WaitUntilNodeCondition establishes a watch on the vk node.
// Then, it waits for the specified condition function to be verified.
func (f *Framework) WaitUntilNodeCondition(fn watch.ConditionFunc) error {
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cancel := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cancel()
// Create a field selector that matches the specified Pod resource.
fs := fields.OneTermEqualSelector("metadata.name", f.NodeName).String()
// Create a ListWatch so we can receive events for the matched Pod resource.
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fs
return f.KubeClient.CoreV1().Nodes().List(ctx, options)
return f.KubeClient.CoreV1().Nodes().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) {
options.FieldSelector = fs
return f.KubeClient.CoreV1().Nodes().Watch(ctx, options)
return f.KubeClient.CoreV1().Nodes().Watch(options)
},
}
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cancel := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cancel()
last, err := watch.UntilWithSync(ctx, lw, &corev1.Node{}, nil, fn)
if err != nil {
return err
@@ -45,17 +44,17 @@ func (f *Framework) WaitUntilNodeCondition(fn watch.ConditionFunc) error {
}
// DeleteNode deletes the vk node used by the framework
func (f *Framework) DeleteNode(ctx context.Context) error {
func (f *Framework) DeleteNode() error {
var gracePeriod int64
propagation := metav1.DeletePropagationBackground
opts := metav1.DeleteOptions{
PropagationPolicy: &propagation,
GracePeriodSeconds: &gracePeriod,
}
return f.KubeClient.CoreV1().Nodes().Delete(ctx, f.NodeName, opts)
return f.KubeClient.CoreV1().Nodes().Delete(f.NodeName, &opts)
}
// GetNode gets the vk nodeused by the framework
func (f *Framework) GetNode(ctx context.Context) (*corev1.Node, error) {
return f.KubeClient.CoreV1().Nodes().Get(ctx, f.NodeName, metav1.GetOptions{})
func (f *Framework) GetNode() (*corev1.Node, error) {
return f.KubeClient.CoreV1().Nodes().Get(f.NodeName, metav1.GetOptions{})
}

View File

@@ -47,21 +47,21 @@ func (f *Framework) CreateDummyPodObjectWithPrefix(testName string, prefix strin
}
// CreatePod creates the specified pod in the Kubernetes API.
func (f *Framework) CreatePod(ctx context.Context, pod *corev1.Pod) (*corev1.Pod, error) {
return f.KubeClient.CoreV1().Pods(f.Namespace).Create(ctx, pod, metav1.CreateOptions{})
func (f *Framework) CreatePod(pod *corev1.Pod) (*corev1.Pod, error) {
return f.KubeClient.CoreV1().Pods(f.Namespace).Create(pod)
}
// DeletePod deletes the pod with the specified name and namespace in the Kubernetes API using the default grace period.
func (f *Framework) DeletePod(ctx context.Context, namespace, name string) error {
return f.KubeClient.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{})
func (f *Framework) DeletePod(namespace, name string) error {
return f.KubeClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{})
}
// DeletePodImmediately forcibly deletes the pod with the specified name and namespace in the Kubernetes API.
// This is equivalent to running "kubectl delete --force --grace-period 0 --namespace <namespace> pod <name>".
func (f *Framework) DeletePodImmediately(ctx context.Context, namespace, name string) error {
func (f *Framework) DeletePodImmediately(namespace, name string) error {
grace := int64(0)
propagation := metav1.DeletePropagationBackground
return f.KubeClient.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{
return f.KubeClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{
GracePeriodSeconds: &grace,
PropagationPolicy: &propagation,
})
@@ -70,22 +70,22 @@ func (f *Framework) DeletePodImmediately(ctx context.Context, namespace, name st
// WaitUntilPodCondition establishes a watch on the pod with the specified name and namespace.
// Then, it waits for the specified condition function to be verified.
func (f *Framework) WaitUntilPodCondition(namespace, name string, fn watch.ConditionFunc) (*corev1.Pod, error) {
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
// Create a field selector that matches the specified Pod resource.
fs := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.namespace==%s,metadata.name==%s", namespace, name))
// Create a ListWatch so we can receive events for the matched Pod resource.
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Pods(namespace).List(ctx, options)
return f.KubeClient.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) {
options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Pods(namespace).Watch(ctx, options)
return f.KubeClient.CoreV1().Pods(namespace).Watch(options)
},
}
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
last, err := watch.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, fn)
if err != nil {
return nil, err
@@ -129,24 +129,22 @@ func (f *Framework) WaitUntilPodInPhase(namespace, name string, phases ...corev1
// WaitUntilPodEventWithReason establishes a watch on events involving the specified pod.
// Then, it waits for an event with the specified reason to be created/updated.
func (f *Framework) WaitUntilPodEventWithReason(pod *corev1.Pod, reason string) error {
// Watch for updates to the Event resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
// Create a field selector that matches Event resources involving the specified pod.
fs := fields.ParseSelectorOrDie(fmt.Sprintf("involvedObject.kind==Pod,involvedObject.uid==%s", pod.UID))
// Create a ListWatch so we can receive events for the matched Event resource.
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Events(pod.Namespace).List(ctx, options)
return f.KubeClient.CoreV1().Events(pod.Namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) {
options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Events(pod.Namespace).Watch(ctx, options)
return f.KubeClient.CoreV1().Events(pod.Namespace).Watch(options)
},
}
// Watch for updates to the Event resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
last, err := watch.UntilWithSync(ctx, lw, &corev1.Event{}, nil, func(event watchapi.Event) (b bool, e error) {
switch event.Type {
case watchapi.Error:
@@ -166,8 +164,8 @@ func (f *Framework) WaitUntilPodEventWithReason(pod *corev1.Pod, reason string)
return nil
}
// GetRunningPodsFromProvider gets the running pods from the provider of the virtual kubelet
func (f *Framework) GetRunningPodsFromProvider(ctx context.Context) (*corev1.PodList, error) {
// GetRunningPods gets the running pods from the provider of the virtual kubelet
func (f *Framework) GetRunningPods() (*corev1.PodList, error) {
result := &corev1.PodList{}
err := f.KubeClient.CoreV1().
@@ -177,24 +175,7 @@ func (f *Framework) GetRunningPodsFromProvider(ctx context.Context) (*corev1.Pod
Name(f.NodeName).
SubResource("proxy").
Suffix("runningpods/").
Do(ctx).
Into(result)
return result, err
}
// GetRunningPodsFromProvider gets the running pods from the provider of the virtual kubelet
func (f *Framework) GetRunningPodsFromKubernetes(ctx context.Context) (*corev1.PodList, error) {
result := &corev1.PodList{}
err := f.KubeClient.CoreV1().
RESTClient().
Get().
Resource("nodes").
Name(f.NodeName).
SubResource("proxy").
Suffix("pods").
Do(ctx).
Do().
Into(result)
return result, err

View File

@@ -1,7 +1,6 @@
package framework
import (
"context"
"encoding/json"
"strconv"
@@ -10,7 +9,7 @@ import (
)
// GetStatsSummary queries the /stats/summary endpoint of the virtual-kubelet and returns the Summary object obtained as a response.
func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
func (f *Framework) GetStatsSummary() (*stats.Summary, error) {
// Query the /stats/summary endpoint.
b, err := f.KubeClient.CoreV1().
RESTClient().
@@ -19,7 +18,7 @@ func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
Resource("pods").
SubResource("proxy").
Name(net.JoinSchemeNamePort("http", f.NodeName, strconv.Itoa(10255))).
Suffix("/stats/summary").DoRaw(ctx)
Suffix("/stats/summary").DoRaw()
if err != nil {
return nil, err
}

View File

@@ -23,12 +23,6 @@ var (
nodeName string
)
// go1.13 compatibility cf. https://github.com/golang/go/issues/31859
var _ = func() bool {
testing.Init()
return true
}()
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "path to the kubeconfig file to use when running the test suite outside a kubernetes cluster")
flag.StringVar(&namespace, "namespace", defaultNamespace, "the name of the kubernetes namespace to use for running the test suite (i.e. where to create pods)")

View File

@@ -52,53 +52,10 @@ type TermSize struct {
// HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
// ContainerExecHandlerConfig is used to pass options to options to the container exec handler.
type ContainerExecHandlerConfig struct {
// StreamIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
StreamIdleTimeout time.Duration
// StreamCreationTimeout is the maximum time for streaming connection
StreamCreationTimeout time.Duration
}
// ContainerExecHandlerOption configures a ContainerExecHandlerConfig
// It is used as functional options passed to `HandleContainerExec`
type ContainerExecHandlerOption func(*ContainerExecHandlerConfig)
// WithExecStreamIdleTimeout sets the idle timeout for a container exec stream
func WithExecStreamIdleTimeout(dur time.Duration) ContainerExecHandlerOption {
return func(cfg *ContainerExecHandlerConfig) {
cfg.StreamIdleTimeout = dur
}
}
// WithExecStreamIdleTimeout sets the idle timeout for a container exec stream
func WithExecStreamCreationTimeout(dur time.Duration) ContainerExecHandlerOption {
return func(cfg *ContainerExecHandlerConfig) {
cfg.StreamCreationTimeout = dur
}
}
// HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
func HandleContainerExec(h ContainerExecHandlerFunc, opts ...ContainerExecHandlerOption) http.HandlerFunc {
func HandleContainerExec(h ContainerExecHandlerFunc) http.HandlerFunc {
if h == nil {
return NotImplemented
}
var cfg ContainerExecHandlerConfig
for _, o := range opts {
o(&cfg)
}
if cfg.StreamIdleTimeout == 0 {
cfg.StreamIdleTimeout = 30 * time.Second
}
if cfg.StreamCreationTimeout == 0 {
cfg.StreamCreationTimeout = 30 * time.Second
}
return handleError(func(w http.ResponseWriter, req *http.Request) error {
vars := mux.Vars(req)
@@ -116,24 +73,14 @@ func HandleContainerExec(h ContainerExecHandlerFunc, opts ...ContainerExecHandle
return errdefs.AsInvalidInput(err)
}
// TODO: Why aren't we using req.Context() here?
idleTimeout := time.Second * 30
streamCreationTimeout := time.Second * 30
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
exec := &containerExecContext{ctx: ctx, h: h, pod: pod, namespace: namespace, container: container}
remotecommand.ServeExec(
w,
req,
exec,
"",
"",
container,
command,
streamOpts,
cfg.StreamIdleTimeout,
cfg.StreamCreationTimeout,
supportedStreamProtocols,
)
remotecommand.ServeExec(w, req, exec, "", "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
return nil
})

View File

@@ -56,14 +56,16 @@ type flushWriter struct {
}
type writeFlusher interface {
Flush()
Flush() error
Write([]byte) (int, error)
}
func (fw *flushWriter) Write(p []byte) (int, error) {
n, err := fw.w.Write(p)
if n > 0 {
fw.w.Flush()
if err := fw.w.Flush(); err != nil {
return n, err
}
}
return n, err
}

View File

@@ -18,7 +18,6 @@ import (
"context"
"io"
"net/http"
"net/url"
"strconv"
"time"
@@ -34,71 +33,10 @@ type ContainerLogsHandlerFunc func(ctx context.Context, namespace, podName, cont
// ContainerLogOpts are used to pass along options to be set on the container
// log stream.
type ContainerLogOpts struct {
Tail int
LimitBytes int
Timestamps bool
Follow bool
Previous bool
SinceSeconds int
SinceTime time.Time
}
func parseLogOptions(q url.Values) (opts ContainerLogOpts, err error) {
if tailLines := q.Get("tailLines"); tailLines != "" {
opts.Tail, err = strconv.Atoi(tailLines)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"tailLines\""))
}
if opts.Tail < 0 {
return opts, errdefs.InvalidInputf("\"tailLines\" is %d", opts.Tail)
}
}
if follow := q.Get("follow"); follow != "" {
opts.Follow, err = strconv.ParseBool(follow)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"follow\""))
}
}
if limitBytes := q.Get("limitBytes"); limitBytes != "" {
opts.LimitBytes, err = strconv.Atoi(limitBytes)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"limitBytes\""))
}
if opts.LimitBytes < 1 {
return opts, errdefs.InvalidInputf("\"limitBytes\" is %d", opts.LimitBytes)
}
}
if previous := q.Get("previous"); previous != "" {
opts.Previous, err = strconv.ParseBool(previous)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"previous\""))
}
}
if sinceSeconds := q.Get("sinceSeconds"); sinceSeconds != "" {
opts.SinceSeconds, err = strconv.Atoi(sinceSeconds)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"sinceSeconds\""))
}
if opts.SinceSeconds < 1 {
return opts, errdefs.InvalidInputf("\"sinceSeconds\" is %d", opts.SinceSeconds)
}
}
if sinceTime := q.Get("sinceTime"); sinceTime != "" {
opts.SinceTime, err = time.Parse(time.RFC3339, sinceTime)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"sinceTime\""))
}
if opts.SinceSeconds > 0 {
return opts, errdefs.InvalidInput("both \"sinceSeconds\" and \"sinceTime\" are set")
}
}
if timestamps := q.Get("timestamps"); timestamps != "" {
opts.Timestamps, err = strconv.ParseBool(timestamps)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"timestamps\""))
}
}
return opts, nil
Tail int
Since time.Duration
LimitBytes int
Timestamps bool
}
// HandleContainerLogs creates an http handler function from a provider to serve logs from a pod
@@ -117,11 +55,22 @@ func HandleContainerLogs(h ContainerLogsHandlerFunc) http.HandlerFunc {
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
query := req.URL.Query()
opts, err := parseLogOptions(query)
if err != nil {
return err
if queryTail := q.Get("tailLines"); queryTail != "" {
t, err := strconv.Atoi(queryTail)
if err != nil {
return errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"tailLines\""))
}
tail = t
}
// TODO(@cpuguy83): support v1.PodLogOptions
// The kubelet decoding here is not straight forward, so this needs to be disected
opts := ContainerLogOpts{
Tail: tail,
}
logs, err := h(ctx, namespace, pod, container, opts)

View File

@@ -1,99 +0,0 @@
package api
import (
"fmt"
"net/url"
"testing"
"time"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
)
//func parseLogOptions(q url.Values) (opts ContainerLogOpts, err error)
func TestParseLogOptions(t *testing.T) {
//tailLines
//follow
//limitBytes
//previous
//sinceSeconds
//sinceTime
//timestamps
sinceTime, _ := time.Parse(time.RFC3339, "2020-03-20T21:07:34Z")
fmt.Printf("%+v\n", sinceTime)
testCases := []struct {
Values url.Values
Failure bool
Result ContainerLogOpts
}{
{
Values: url.Values{},
Failure: false,
Result: ContainerLogOpts{},
},
{
Values: url.Values{
"follow": {"true"},
"limitBytes": {"123"},
"previous": {"true"},
"sinceSeconds": {"10"},
"tailLines": {"99"},
"timestamps": {"true"},
},
Failure: false,
Result: ContainerLogOpts{
Follow: true,
LimitBytes: 123,
Previous: true,
SinceSeconds: 10,
Tail: 99,
Timestamps: true,
},
},
{
Values: url.Values{
"sinceSeconds": {"10"},
"sinceTime": {"2020-03-20T21:07:34Z"},
},
Failure: true,
},
{
Values: url.Values{
"sinceTime": {"2020-03-20T21:07:34Z"},
},
Failure: false,
Result: ContainerLogOpts{
SinceTime: sinceTime,
},
},
{
Values: url.Values{
"tailLines": {"-1"},
},
Failure: true,
},
{
Values: url.Values{
"limitBytes": {"0"},
},
Failure: true,
},
{
Values: url.Values{
"sinceSeconds": {"-10"},
},
Failure: true,
},
}
// follow=true&limitBytes=1&previous=true&sinceSeconds=1&sinceTime=2020-03-20T21%3A07%3A34Z&tailLines=1&timestamps=true
for i, tc := range testCases {
msg := fmt.Sprintf("test case #%d %+v failed", i+1, tc)
result, err := parseLogOptions(tc.Values)
if tc.Failure {
assert.Check(t, is.ErrorContains(err, ""), msg)
} else {
assert.NilError(t, err, msg)
assert.Check(t, is.Equal(result, tc.Result), msg)
}
}
}

View File

@@ -27,10 +27,6 @@ import (
type PodListerFunc func(context.Context) ([]*v1.Pod, error)
func HandleRunningPods(getPods PodListerFunc) http.HandlerFunc {
if getPods == nil {
return NotImplemented
}
scheme := runtime.NewScheme()
v1.SchemeBuilder.AddToScheme(scheme) //nolint:errcheck
codecs := serializer.NewCodecFactory(scheme)

View File

@@ -16,7 +16,6 @@ package api
import (
"net/http"
"time"
"github.com/gorilla/mux"
"github.com/virtual-kubelet/virtual-kubelet/log"
@@ -36,13 +35,8 @@ type ServeMux interface {
type PodHandlerConfig struct {
RunInContainer ContainerExecHandlerFunc
GetContainerLogs ContainerLogsHandlerFunc
// GetPods is meant to enumerate the pods that the provider knows about
GetPods PodListerFunc
// GetPodsFromKubernetes is meant to enumerate the pods that the node is meant to be running
GetPodsFromKubernetes PodListerFunc
GetStatsSummary PodStatsSummaryHandlerFunc
StreamIdleTimeout time.Duration
StreamCreationTimeout time.Duration
GetPods PodListerFunc
GetStatsSummary PodStatsSummaryHandlerFunc
}
// PodHandler creates an http handler for interacting with pods/containers.
@@ -54,18 +48,8 @@ func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
if debug {
r.HandleFunc("/runningpods/", HandleRunningPods(p.GetPods)).Methods("GET")
}
r.HandleFunc("/pods", HandleRunningPods(p.GetPodsFromKubernetes)).Methods("GET")
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", HandleContainerLogs(p.GetContainerLogs)).Methods("GET")
r.HandleFunc(
"/exec/{namespace}/{pod}/{container}",
HandleContainerExec(
p.RunInContainer,
WithExecStreamCreationTimeout(p.StreamCreationTimeout),
WithExecStreamIdleTimeout(p.StreamIdleTimeout),
),
).Methods("POST", "GET")
r.HandleFunc("/exec/{namespace}/{pod}/{container}", HandleContainerExec(p.RunInContainer)).Methods("POST")
if p.GetStatsSummary != nil {
f := HandlePodStatsSummary(p.GetStatsSummary)
r.HandleFunc("/stats/summary", f).Methods("GET")

View File

@@ -160,7 +160,7 @@ func TestPodLifecycle(t *testing.T) {
mp.setErrorOnDelete(errors.New("random error"))
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false)
pods, err := s.client.CoreV1().Pods(testNamespace).List(ctx, metav1.ListOptions{})
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
assert.NilError(t, err)
assert.Assert(t, is.Len(pods.Items, 1))
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
@@ -329,7 +329,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
p1 := newPod()
p1.Status.Phase = state
// Create the Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, p1, metav1.CreateOptions{})
_, e := s.client.CoreV1().Pods(testNamespace).Create(p1)
assert.NilError(t, e)
// Start the pod controller
@@ -339,7 +339,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
time.Sleep(10 * time.Millisecond)
}
p2, err := s.client.CoreV1().Pods(testNamespace).Get(ctx, p1.Name, metav1.GetOptions{})
p2, err := s.client.CoreV1().Pods(testNamespace).Get(p1.Name, metav1.GetOptions{})
assert.NilError(t, err)
// Make sure the pods have not changed
@@ -367,7 +367,7 @@ func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testin
FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.ObjectMeta.Name).String(),
}
// Setup a watch (prior to pod creation, and pod controller startup)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
@@ -379,7 +379,7 @@ func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testin
podCopyWithDeletionTimestamp.DeletionGracePeriodSeconds = &deletionGracePeriod
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
podCopyWithDeletionTimestamp.DeletionTimestamp = &deletionTimestamp
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, podCopyWithDeletionTimestamp, metav1.CreateOptions{})
_, e := s.client.CoreV1().Pods(testNamespace).Create(podCopyWithDeletionTimestamp)
assert.NilError(t, e)
// Start the pod controller
@@ -415,7 +415,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
watchErrCh := make(chan error)
// Setup a watch (prior to pod creation, and pod controller startup)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
// This ensures that the pod is created.
@@ -432,7 +432,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
}()
// Create the Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, p, metav1.CreateOptions{})
_, e := s.client.CoreV1().Pods(testNamespace).Create(p)
assert.NilError(t, e)
log.G(ctx).Debug("Created pod")
@@ -446,7 +446,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
}
// Setup a watch to check if the pod is in running
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
@@ -471,7 +471,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
}
// Setup a watch to look for the pod eventually going away completely
watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher2.Stop()
waitDeleteCh := make(chan error)
@@ -483,7 +483,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
}()
// Setup a watch prior to pod deletion
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
@@ -493,7 +493,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
// Delete the pod via deletiontimestamp
// 1. Get the pod
currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(ctx, p.Name, metav1.GetOptions{})
currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{})
assert.NilError(t, err)
// 2. Set the pod's deletion timestamp, version, and so on
var deletionGracePeriod int64 = 10
@@ -501,7 +501,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
currentPod.DeletionTimestamp = &deletionTimestamp
// 3. Update (overwrite) the pod
_, err = s.client.CoreV1().Pods(testNamespace).Update(ctx, currentPod, metav1.UpdateOptions{})
_, err = s.client.CoreV1().Pods(testNamespace).Update(currentPod)
assert.NilError(t, err)
select {
@@ -535,11 +535,11 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
watchErrCh := make(chan error)
// Create a Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, p, metav1.CreateOptions{})
_, e := s.client.CoreV1().Pods(testNamespace).Create(p)
assert.NilError(t, e)
// Setup a watch to check if the pod is in running
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
@@ -576,7 +576,7 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
p.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
log.G(ctx).WithField("pod", p).Info("Updating pod")
_, err = s.client.CoreV1().Pods(p.Namespace).Update(ctx, p, metav1.UpdateOptions{})
_, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
assert.NilError(t, err)
assert.NilError(t, m.getUpdates().until(ctx, func(v int) bool { return v > 0 }))
}
@@ -603,7 +603,7 @@ func benchmarkCreatePods(ctx context.Context, b *testing.B, s *system) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
pod := newPod(randomizeUID, randomizeName)
_, err := s.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
_, err := s.client.CoreV1().Pods(pod.Namespace).Create(pod)
assert.NilError(b, err)
assert.NilError(b, ctx.Err())
}

View File

@@ -17,6 +17,7 @@ package node
import (
"context"
"encoding/json"
"fmt"
"time"
pkgerrors "github.com/pkg/errors"
@@ -30,14 +31,6 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
)
const (
// Annotation with the JSON-serialized last applied node conditions. Based on kube ctl apply. Used to calculate
// the three-way patch
virtualKubeletLastNodeAppliedNodeStatus = "virtual-kubelet.io/last-applied-node-status"
virtualKubeletLastNodeAppliedObjectMeta = "virtual-kubelet.io/last-applied-object-meta"
)
// NodeProvider is the interface used for registering a node and updating its
@@ -192,7 +185,7 @@ func (n *NodeController) Run(ctx context.Context) error {
n.statusInterval = DefaultStatusUpdateInterval
}
n.chStatusUpdate = make(chan *corev1.Node, 1)
n.chStatusUpdate = make(chan *corev1.Node)
n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) {
n.chStatusUpdate <- node
})
@@ -223,19 +216,13 @@ func (n *NodeController) Run(ctx context.Context) error {
return n.controlLoop(ctx)
}
func (n *NodeController) ensureNode(ctx context.Context) (err error) {
ctx, span := trace.StartSpan(ctx, "node.ensureNode")
defer span.End()
defer func() {
span.SetStatus(err)
}()
err = n.updateStatus(ctx, true)
func (n *NodeController) ensureNode(ctx context.Context) error {
err := n.updateStatus(ctx, true)
if err == nil || !errors.IsNotFound(err) {
return err
}
node, err := n.nodes.Create(ctx, n.n, metav1.CreateOptions{})
node, err := n.nodes.Create(n.n)
if err != nil {
return pkgerrors.Wrap(err, "error registering node with kubernetes")
}
@@ -271,13 +258,10 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
close(n.chReady)
loop := func() bool {
ctx, span := trace.StartSpan(ctx, "node.controlLoop.loop")
defer span.End()
for {
select {
case <-ctx.Done():
return true
return nil
case updated := <-n.chStatusUpdate:
var t *time.Timer
if n.disableLease {
@@ -294,13 +278,6 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
}
n.n.Status = updated.Status
n.n.ObjectMeta = metav1.ObjectMeta{
Annotations: updated.Annotations,
Labels: updated.Labels,
Name: n.n.ObjectMeta.Name,
Namespace: n.n.ObjectMeta.Namespace,
UID: n.n.ObjectMeta.UID,
}
if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
@@ -318,13 +295,6 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
}
pingTimer.Reset(n.pingInterval)
}
return false
}
for {
shouldTerminate := loop()
if shouldTerminate {
return nil
}
}
}
@@ -356,13 +326,7 @@ func (n *NodeController) updateLease(ctx context.Context) error {
return nil
}
func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) (err error) {
ctx, span := trace.StartSpan(ctx, "node.updateStatus")
defer span.End()
defer func() {
span.SetStatus(err)
}()
func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) error {
updateNodeStatusHeartbeat(n.n)
node, err := updateNodeStatus(ctx, n.nodes, n.n)
@@ -385,18 +349,18 @@ func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) (er
}
func ensureLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) {
l, err := leases.Create(ctx, lease, metav1.CreateOptions{})
l, err := leases.Create(lease)
if err != nil {
switch {
case errors.IsNotFound(err):
log.G(ctx).WithError(err).Info("Node lease not supported")
return nil, err
case errors.IsAlreadyExists(err):
if err := leases.Delete(ctx, lease.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
if err := leases.Delete(lease.Name, nil); err != nil && !errors.IsNotFound(err) {
log.G(ctx).WithError(err).Error("could not delete old node lease")
return nil, pkgerrors.Wrap(err, "old lease exists but could not delete it")
}
l, err = leases.Create(ctx, lease, metav1.CreateOptions{})
l, err = leases.Create(lease)
}
}
@@ -421,7 +385,7 @@ func updateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *
ctx = span.WithField(ctx, "lease.expiresSeconds", *lease.Spec.LeaseDurationSeconds)
}
l, err := leases.Update(ctx, lease, metav1.UpdateOptions{})
l, err := leases.Update(lease)
if err != nil {
if errors.IsNotFound(err) {
log.G(ctx).Debug("lease not found")
@@ -442,117 +406,42 @@ func updateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *
// just so we don't have to allocate this on every get request
var emptyGetOptions = metav1.GetOptions{}
func prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode *corev1.Node) ([]byte, error) {
// We use these two values to calculate a patch. We use a three-way patch in order to avoid
// causing state regression server side. For example, let's consider the scenario:
/*
UML Source:
@startuml
participant VK
participant K8s
participant ExternalUpdater
note right of VK: Updates internal node conditions to [A, B]
VK->K8s: Patch Upsert [A, B]
note left of K8s: Node conditions are [A, B]
ExternalUpdater->K8s: Patch Upsert [C]
note left of K8s: Node Conditions are [A, B, C]
note right of VK: Updates internal node conditions to [A]
VK->K8s: Patch: delete B, upsert A\nThis is where things go wrong,\nbecause the patch is written to replace all node conditions\nit overwrites (drops) [C]
note left of K8s: Node Conditions are [A]\nNode condition C from ExternalUpdater is no longer present
@enduml
,--. ,---. ,---------------.
|VK| |K8s| |ExternalUpdater|
`+-' `-+-' `-------+-------'
| ,------------------------------------------!. | |
| |Updates internal node conditions to [A, B]|_\ | |
| `--------------------------------------------' | |
| Patch Upsert [A, B] | |
| -----------------------------------------------------------> |
| | |
| ,--------------------------!. | |
| |Node conditions are [A, B]|_\| |
| `----------------------------'| |
| | Patch Upsert [C] |
| | <-------------------
| | |
| ,-----------------------------!. | |
| |Node Conditions are [A, B, C]|_\| |
| `-------------------------------'| |
| ,---------------------------------------!. | |
| |Updates internal node conditions to [A]|_\ | |
| `-----------------------------------------' | |
| Patch: delete B, upsert A | |
| This is where things go wrong, | |
| because the patch is written to replace all node conditions| |
| it overwrites (drops) [C] | |
| -----------------------------------------------------------> |
| | |
,----------------------------------------------------------!. | |
|Node Conditions are [A] |_\| |
|Node condition C from ExternalUpdater is no longer present || |
`------------------------------------------------------------'+-. ,-------+-------.
|VK| |K8s| |ExternalUpdater|
`--' `---' `---------------'
*/
// In order to calculate that last patch to delete B, and upsert C, we need to know that C was added by
// "someone else". So, we keep track of our last applied value, and our current value. We then generate
// our patch based on the diff of these and *not* server side state.
oldVKStatus, ok1 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus]
oldVKObjectMeta, ok2 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta]
oldNode := corev1.Node{}
// Check if there were no labels, which means someone else probably created the node, or this is an upgrade. Either way, we will consider
// ourselves as never having written the node object before, so oldNode will be left empty. We will overwrite values if
// our new node conditions / status / objectmeta have them
if ok1 && ok2 {
err := json.Unmarshal([]byte(oldVKObjectMeta), &oldNode.ObjectMeta)
if err != nil {
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node object metadata (key: %q): %q", virtualKubeletLastNodeAppliedObjectMeta, oldVKObjectMeta)
}
err = json.Unmarshal([]byte(oldVKStatus), &oldNode.Status)
if err != nil {
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node status (key: %q): %q", virtualKubeletLastNodeAppliedNodeStatus, oldVKStatus)
}
// patchNodeStatus patches node status.
// Copied from github.com/kubernetes/kubernetes/pkg/util/node
func patchNodeStatus(nodes v1.NodeInterface, nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) (*corev1.Node, []byte, error) {
patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
if err != nil {
return nil, nil, err
}
// newNode is the representation of the node the provider "wants"
newNode := corev1.Node{}
newNode.ObjectMeta = simplestObjectMetadata(&apiServerNode.ObjectMeta, &nodeFromProvider.ObjectMeta)
nodeFromProvider.Status.DeepCopyInto(&newNode.Status)
updatedNode, err := nodes.Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
}
return updatedNode, patchBytes, nil
}
// virtualKubeletLastNodeAppliedObjectMeta must always be set before virtualKubeletLastNodeAppliedNodeStatus,
// otherwise we capture virtualKubeletLastNodeAppliedNodeStatus in virtualKubeletLastNodeAppliedObjectMeta,
// which is wrong
virtualKubeletLastNodeAppliedObjectMetaBytes, err := json.Marshal(newNode.ObjectMeta)
func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) ([]byte, error) {
oldData, err := json.Marshal(oldNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal object meta from provider")
return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err)
}
newNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta] = string(virtualKubeletLastNodeAppliedObjectMetaBytes)
virtualKubeletLastNodeAppliedNodeStatusBytes, err := json.Marshal(newNode.Status)
// Reset spec to make sure only patch for Status or ObjectMeta is generated.
// Note that we don't reset ObjectMeta here, because:
// 1. This aligns with Nodes().UpdateStatus().
// 2. Some component does use this to update node annotations.
newNode.Spec = oldNode.Spec
newData, err := json.Marshal(newNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal node status from provider")
return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err)
}
newNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus] = string(virtualKubeletLastNodeAppliedNodeStatusBytes)
// Generate three way patch from oldNode -> newNode, without deleting the changes in api server
// Should we use the Kubernetes serialization / deserialization libraries here?
oldNodeBytes, err := json.Marshal(oldNode)
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal old node bytes")
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err)
}
newNodeBytes, err := json.Marshal(newNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal new node bytes")
}
apiServerNodeBytes, err := json.Marshal(apiServerNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal api server node")
}
schema, err := strategicpatch.NewPatchMetaFromStruct(&corev1.Node{})
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot get patch schema from node")
}
return strategicpatch.CreateThreeWayMergePatch(oldNodeBytes, newNodeBytes, apiServerNodeBytes, schema, true)
return patchBytes, nil
}
// updateNodeStatus triggers an update to the node status in Kubernetes.
@@ -561,44 +450,37 @@ func prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode *cor
//
// If you use this function, it is up to you to synchronize this with other operations.
// This reduces the time to second-level precision.
func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvider *corev1.Node) (_ *corev1.Node, retErr error) {
func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (_ *corev1.Node, retErr error) {
ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus")
defer func() {
span.End()
span.SetStatus(retErr)
}()
var updatedNode *corev1.Node
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
apiServerNode, err := nodes.Get(ctx, nodeFromProvider.Name, emptyGetOptions)
if err != nil {
return err
}
ctx = addNodeAttributes(ctx, span, apiServerNode)
log.G(ctx).Debug("got node from api server")
patchBytes, err := prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode)
if err != nil {
return pkgerrors.Wrap(err, "Cannot generate patch")
}
log.G(ctx).WithError(err).WithField("patch", string(patchBytes)).Debug("Generated three way patch")
updatedNode, err = nodes.Patch(ctx, nodeFromProvider.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
// We cannot wrap this error because the kubernetes error module doesn't understand wrapping
log.G(ctx).WithField("patch", string(patchBytes)).WithError(err).Warn("Failed to patch node status")
return err
}
return nil
})
var node *corev1.Node
oldNode, err := nodes.Get(n.Name, emptyGetOptions)
if err != nil {
return nil, err
}
log.G(ctx).WithField("node.resourceVersion", updatedNode.ResourceVersion).
WithField("node.Status.Conditions", updatedNode.Status.Conditions).
log.G(ctx).Debug("got node from api server")
node = oldNode.DeepCopy()
node.ResourceVersion = ""
node.Status = n.Status
ctx = addNodeAttributes(ctx, span, node)
// Patch the node status to merge other changes on the node.
updated, _, err := patchNodeStatus(nodes, types.NodeName(n.Name), oldNode, node)
if err != nil {
return nil, err
}
log.G(ctx).WithField("node.resourceVersion", updated.ResourceVersion).
WithField("node.Status.Conditions", updated.Status.Conditions).
Debug("updated node status in api server")
return updatedNode, nil
return updated, nil
}
func newLease(base *coord.Lease) *coord.Lease {
@@ -673,31 +555,3 @@ func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) con
"node.taints": taintsStringer(n.Spec.Taints),
})
}
// Provides the simplest object metadata to match the previous object. Name, namespace, UID. It copies labels and
// annotations from the second object if defined. It exempts the patch metadata
func simplestObjectMetadata(baseObjectMeta, objectMetaWithLabelsAndAnnotations *metav1.ObjectMeta) metav1.ObjectMeta {
ret := metav1.ObjectMeta{
Namespace: baseObjectMeta.Namespace,
Name: baseObjectMeta.Name,
UID: baseObjectMeta.UID,
Annotations: make(map[string]string),
}
if objectMetaWithLabelsAndAnnotations != nil {
if objectMetaWithLabelsAndAnnotations.Labels != nil {
ret.Labels = objectMetaWithLabelsAndAnnotations.Labels
} else {
ret.Labels = make(map[string]string)
}
if objectMetaWithLabelsAndAnnotations.Annotations != nil {
// We want to copy over all annotations except the special embedded ones.
for key := range objectMetaWithLabelsAndAnnotations.Annotations {
if key == virtualKubeletLastNodeAppliedNodeStatus || key == virtualKubeletLastNodeAppliedObjectMeta {
continue
}
ret.Annotations[key] = objectMetaWithLabelsAndAnnotations.Annotations[key]
}
}
}
return ret
}

View File

@@ -22,14 +22,12 @@ import (
"gotest.tools/assert"
"gotest.tools/assert/cmp"
is "gotest.tools/assert/cmp"
coord "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
watch "k8s.io/apimachinery/pkg/watch"
testclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/retry"
)
func TestNodeRun(t *testing.T) {
@@ -74,11 +72,11 @@ func testNodeRun(t *testing.T, enableLease bool) {
close(chErr)
}()
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
nw := makeWatch(t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr := nw.ResultChan()
lw := makeWatch(ctx, t, leases, testNodeCopy.Name)
lw := makeWatch(t, leases, testNodeCopy.Name)
defer lw.Stop()
lr := lw.ResultChan()
@@ -132,7 +130,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
}
// trigger an async node status update
n, err := nodes.Get(ctx, testNode.Name, metav1.GetOptions{})
n, err := nodes.Get(testNode.Name, metav1.GetOptions{})
assert.NilError(t, err)
newCondition := corev1.NodeCondition{
Type: corev1.NodeConditionType("UPDATED"),
@@ -140,7 +138,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
}
n.Status.Conditions = append(n.Status.Conditions, newCondition)
nw = makeWatch(ctx, t, nodes, testNodeCopy.Name)
nw = makeWatch(t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr = nw.ResultChan()
@@ -207,7 +205,7 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
case <-node.Ready():
}
err = nodes.Delete(ctx, node.n.Name, metav1.DeleteOptions{})
err = nodes.Delete(node.n.Name, nil)
assert.NilError(t, err)
testP.triggerStatusUpdate(node.n.DeepCopy())
@@ -253,7 +251,7 @@ func TestUpdateNodeStatus(t *testing.T) {
updated, err := updateNodeStatus(ctx, nodes, n.DeepCopy())
assert.Equal(t, errors.IsNotFound(err), true, err)
_, err = nodes.Create(ctx, n, metav1.CreateOptions{})
_, err = nodes.Create(n)
assert.NilError(t, err)
updated, err = updateNodeStatus(ctx, nodes, n.DeepCopy())
@@ -267,10 +265,10 @@ func TestUpdateNodeStatus(t *testing.T) {
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(n.Status, updated.Status))
err = nodes.Delete(ctx, n.Name, metav1.DeleteOptions{})
err = nodes.Delete(n.Name, nil)
assert.NilError(t, err)
_, err = nodes.Get(ctx, n.Name, metav1.GetOptions{})
_, err = nodes.Get(n.Name, metav1.GetOptions{})
assert.Equal(t, errors.IsNotFound(err), true, err)
_, err = updateNodeStatus(ctx, nodes, updated.DeepCopy())
@@ -289,7 +287,7 @@ func TestUpdateNodeLease(t *testing.T) {
assert.Equal(t, l.Name, lease.Name)
assert.Assert(t, cmp.DeepEqual(l.Spec.HolderIdentity, lease.Spec.HolderIdentity))
compare, err := leases.Get(ctx, l.Name, emptyGetOptions)
compare, err := leases.Get(l.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Equal(t, l.Spec.RenewTime.Time.Unix(), compare.Spec.RenewTime.Time.Unix())
assert.Equal(t, compare.Name, lease.Name)
@@ -376,252 +374,6 @@ func TestPingAfterStatusUpdate(t *testing.T) {
assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval)
}
// Are annotations that were created before the VK existed preserved?
func TestBeforeAnnotationsPreserved(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
interval := 10 * time.Millisecond
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
}
testNode := testNode(t)
testNodeCreateCopy := testNode.DeepCopy()
testNodeCreateCopy.Annotations = map[string]string{
"beforeAnnotation": "value",
}
_, err := nodes.Create(ctx, testNodeCreateCopy, metav1.CreateOptions{})
assert.NilError(t, err)
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
// so it will trigger the race detector.
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error)
defer func() {
cancel()
assert.NilError(t, <-chErr)
}()
go func() {
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr := nw.ResultChan()
t.Log("Waiting for node to exist")
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
return true
}))
testP.notifyNodeStatus(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"testAnnotation": "value",
},
},
})
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
_, ok := e.Object.(*corev1.Node).Annotations["testAnnotation"]
return ok
}))
newNode, err := nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation"))
assert.Assert(t, is.Contains(newNode.Annotations, "beforeAnnotation"))
}
// Are conditions set by systems outside of VK preserved?
func TestManualConditionsPreserved(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
interval := 10 * time.Millisecond
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
}
testNode := testNode(t)
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
// so it will trigger the race detector.
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error)
defer func() {
cancel()
assert.NilError(t, <-chErr)
}()
go func() {
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr := nw.ResultChan()
t.Log("Waiting for node to exist")
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
receivedNode := e.Object.(*corev1.Node)
if len(receivedNode.Status.Conditions) != 0 {
return false
}
return true
}))
newNode, err := nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Len(newNode.Status.Conditions, 0))
baseCondition := corev1.NodeCondition{
Type: "BaseCondition",
Status: "Ok",
Reason: "NA",
Message: "This is the base condition. It is set by VK, and should always be there.",
}
testP.notifyNodeStatus(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"testAnnotation": "value",
},
},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
baseCondition,
},
},
})
// Wait for this (node with condition) to show up
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == baseCondition.Type {
return true
}
}
return false
}))
newNode, err = nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Len(newNode.Status.Conditions, 1))
assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation"))
// Add a new event manually
manuallyAddedCondition := corev1.NodeCondition{
Type: "ManuallyAddedCondition",
Status: "Ok",
Reason: "NA",
Message: "This is a manually added condition. Outside of VK. It should not be removed.",
}
assert.NilError(t, retry.RetryOnConflict(retry.DefaultRetry, func() error {
newNode, err = nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
if err != nil {
return err
}
newNode.Annotations["manuallyAddedAnnotation"] = "value"
newNode.Status.Conditions = append(newNode.Status.Conditions, manuallyAddedCondition)
_, err = nodes.UpdateStatus(ctx, newNode, metav1.UpdateOptions{})
return err
}))
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == manuallyAddedCondition.Type {
return true
}
}
assert.Assert(t, is.Contains(receivedNode.Annotations, "testAnnotation"))
assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation"))
return false
}))
// Let's have the VK have a new condition.
newCondition := corev1.NodeCondition{
Type: "NewCondition",
Status: "Ok",
Reason: "NA",
Message: "This is a newly added condition. It should only show up *with* / *after* ManuallyAddedCondition. It is set by the VK.",
}
// Everything but node status is ignored here
testP.notifyNodeStatus(&corev1.Node{
// Annotations is left empty
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
baseCondition,
newCondition,
},
},
})
i := 0
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == newCondition.Type {
// Wait for 2 updates / patches
if i > 2 {
return true
}
i++
}
}
return false
}))
// Make sure that all three conditions are there.
newNode, err = nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
seenConditionTypes := make([]corev1.NodeConditionType, len(newNode.Status.Conditions))
for idx := range newNode.Status.Conditions {
seenConditionTypes[idx] = newNode.Status.Conditions[idx].Type
}
assert.Assert(t, is.Contains(seenConditionTypes, baseCondition.Type))
assert.Assert(t, is.Contains(seenConditionTypes, newCondition.Type))
assert.Assert(t, is.Contains(seenConditionTypes, manuallyAddedCondition.Type))
assert.Assert(t, is.Equal(newNode.Annotations["testAnnotation"], ""))
assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation"))
t.Log(newNode.Status.Conditions)
}
func testNode(t *testing.T) *corev1.Node {
n := &corev1.Node{}
n.Name = strings.ToLower(t.Name())
@@ -631,19 +383,16 @@ func testNode(t *testing.T) *corev1.Node {
type testNodeProvider struct {
NodeProvider
statusHandlers []func(*corev1.Node)
// Callback to VK
notifyNodeStatus func(*corev1.Node)
}
func (p *testNodeProvider) NotifyNodeStatus(ctx context.Context, h func(*corev1.Node)) {
p.notifyNodeStatus = h
p.statusHandlers = append(p.statusHandlers, h)
}
func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
for _, h := range p.statusHandlers {
h(n)
}
p.notifyNodeStatus(n)
}
// testNodeProviderPing tracks the maximum time interval between calls to Ping
@@ -667,13 +416,13 @@ func (tnp *testNodeProviderPing) Ping(ctx context.Context) error {
}
type watchGetter interface {
Watch(context.Context, metav1.ListOptions) (watch.Interface, error)
Watch(metav1.ListOptions) (watch.Interface, error)
}
func makeWatch(ctx context.Context, t *testing.T, wc watchGetter, name string) watch.Interface {
func makeWatch(t *testing.T, wc watchGetter, name string) watch.Interface {
t.Helper()
w, err := wc.Watch(ctx, metav1.ListOptions{FieldSelector: "name=" + name})
w, err := wc.Watch(metav1.ListOptions{FieldSelector: "name=" + name})
assert.NilError(t, err)
return w
}

View File

@@ -16,8 +16,6 @@ package node
import (
"context"
"fmt"
"time"
"github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors"
@@ -26,23 +24,12 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const (
podStatusReasonProviderFailed = "ProviderFailed"
podEventCreateFailed = "ProviderCreateFailed"
podEventCreateSuccess = "ProviderCreateSuccess"
podEventDeleteFailed = "ProviderDeleteFailed"
podEventDeleteSuccess = "ProviderDeleteSuccess"
podEventUpdateFailed = "ProviderUpdateFailed"
podEventUpdateSuccess = "ProviderUpdateSuccess"
// 151 milliseconds is just chosen as a small prime number to retry between
// attempts to get a notification from the provider to VK
notificationRetryPeriod = 151 * time.Millisecond
)
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
@@ -85,22 +72,16 @@ func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod)
log.G(ctx).Debugf("Pod %s exists, updating pod in provider", podFromProvider.Name)
if origErr := pc.provider.UpdatePod(ctx, podForProvider); origErr != nil {
pc.handleProviderError(ctx, span, origErr, pod)
pc.recorder.Event(pod, corev1.EventTypeWarning, podEventUpdateFailed, origErr.Error())
return origErr
}
log.G(ctx).Info("Updated pod in provider")
pc.recorder.Event(pod, corev1.EventTypeNormal, podEventUpdateSuccess, "Update pod in provider successfully")
}
} else {
if origErr := pc.provider.CreatePod(ctx, podForProvider); origErr != nil {
pc.handleProviderError(ctx, span, origErr, pod)
pc.recorder.Event(pod, corev1.EventTypeWarning, podEventCreateFailed, origErr.Error())
return origErr
}
log.G(ctx).Info("Created pod in provider")
pc.recorder.Event(pod, corev1.EventTypeNormal, podEventCreateSuccess, "Create pod in provider successfully")
}
return nil
}
@@ -126,30 +107,6 @@ func podsEqual(pod1, pod2 *corev1.Pod) bool {
}
func deleteGraceTimeEqual(old, new *int64) bool {
if old == nil && new == nil {
return true
}
if old != nil && new != nil {
return *old == *new
}
return false
}
// podShouldEnqueue checks if two pods equal according according to podsEqual func and DeleteTimeStamp
func podShouldEnqueue(oldPod, newPod *corev1.Pod) bool {
if !podsEqual(oldPod, newPod) {
return true
}
if !deleteGraceTimeEqual(oldPod.DeletionGracePeriodSeconds, newPod.DeletionGracePeriodSeconds) {
return true
}
if !oldPod.DeletionTimestamp.Equal(newPod.DeletionTimestamp) {
return true
}
return false
}
func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) {
podPhase := corev1.PodPending
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
@@ -166,7 +123,7 @@ func (pc *PodController) handleProviderError(ctx context.Context, span trace.Spa
"reason": pod.Status.Reason,
})
_, err := pc.client.Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{})
_, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod)
if err != nil {
logger.WithError(err).Warn("Failed to update pod status")
} else {
@@ -183,10 +140,9 @@ func (pc *PodController) deletePod(ctx context.Context, pod *corev1.Pod) error {
err := pc.provider.DeletePod(ctx, pod.DeepCopy())
if err != nil {
span.SetStatus(err)
pc.recorder.Event(pod, corev1.EventTypeWarning, podEventDeleteFailed, err.Error())
return err
}
pc.recorder.Event(pod, corev1.EventTypeNormal, podEventDeleteSuccess, "Delete pod in provider successfully")
log.G(ctx).Debug("Deleted pod from provider")
return nil
@@ -216,14 +172,12 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
kPod.Lock()
podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy()
kPod.Unlock()
if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) {
return nil
}
// We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating
// the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore
// we need to copy the pod and set ResourceVersion to 0.
podFromProvider.ResourceVersion = "0"
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(ctx, podFromProvider, metav1.UpdateOptions{}); err != nil {
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil {
span.SetStatus(err)
return pkgerrors.Wrap(err, "error while updating pod status in kubernetes")
}
@@ -241,72 +195,17 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
// enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd
// prior to enqueuePodStatusUpdate.
func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*maxRetries)
defer cancel()
ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate")
defer span.End()
ctx = span.WithField(ctx, "method", "enqueuePodStatusUpdate")
// TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications
// from the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation.
key, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
log.G(ctx).WithError(err).Error("Error getting pod meta namespace key")
span.SetStatus(err)
return
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key")
} else {
if obj, ok := pc.knownPods.Load(key); ok {
kpod := obj.(*knownPod)
kpod.Lock()
kpod.lastPodStatusReceivedFromProvider = pod
kpod.Unlock()
q.AddRateLimited(key)
}
}
ctx = span.WithField(ctx, "key", key)
var obj interface{}
err = wait.PollImmediateUntil(notificationRetryPeriod, func() (bool, error) {
var ok bool
obj, ok = pc.knownPods.Load(key)
if ok {
return true, nil
}
// Blind sync. Partial sync is better than nothing. If this returns false, the poll loop should not be invoked
// again as it means the context has timed out.
if !cache.WaitForNamedCacheSync("enqueuePodStatusUpdate", ctx.Done(), pc.podsInformer.Informer().HasSynced) {
log.G(ctx).Warn("enqueuePodStatusUpdate proceeding with unsynced cache")
}
// The only transient error that pod lister returns is not found. The only case where not found
// should happen, and the pod *actually* exists is the above -- where we haven't been able to finish sync
// before context times out.
// The other class of errors is non-transient
_, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name)
if err != nil {
return false, err
}
// err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. This likely means
// that we're in some kind of startup synchronization issue where the provider knows about a pod (as it performs
// recover, that we do not yet know about).
return false, nil
}, ctx.Done())
if err != nil {
if errors.IsNotFound(err) {
err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err)
log.G(ctx).WithError(err).Debug("Not enqueuing pod status update")
} else {
log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister")
}
span.SetStatus(err)
return
}
kpod := obj.(*knownPod)
kpod.Lock()
if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) {
kpod.Unlock()
return
}
kpod.lastPodStatusReceivedFromProvider = pod
kpod.Unlock()
q.AddRateLimited(key)
}
func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) {
@@ -380,7 +279,7 @@ func (pc *PodController) deletePodHandler(ctx context.Context, key string) (retE
// We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update
// was in progress,
err = pc.client.Pods(namespace).Delete(ctx, name, *metav1.NewDeleteOptions(0))
err = pc.client.Pods(namespace).Delete(name, metav1.NewDeleteOptions(0))
if errors.IsNotFound(err) {
return nil
}

View File

@@ -23,7 +23,6 @@ import (
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
@@ -49,11 +48,9 @@ func newTestController() *TestController {
recorder: testutil.FakeEventRecorder(5),
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
podStatusQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
done: make(chan struct{}),
ready: make(chan struct{}),
podsInformer: iFactory.Core().V1().Pods(),
podsLister: iFactory.Core().V1().Pods().Lister(),
},
mock: p,
client: fk8s,
@@ -111,76 +108,6 @@ func TestPodsDifferentIgnoreValue(t *testing.T) {
assert.Assert(t, podsEqual(p1, p2))
}
func TestPodShouldEnqueueDifferentDeleteTimeStamp(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
now := v1.NewTime(time.Now())
p2.DeletionTimestamp = &now
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueDifferentLabel(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
p2.Labels = map[string]string{"test": "test"}
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueDifferentAnnotation(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
p2.Annotations = map[string]string{"test": "test"}
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldNotEnqueueDifferentStatus(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
p2.Status.Phase = corev1.PodSucceeded
assert.Assert(t, !podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueDifferentDeleteGraceTime(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
oldTime := v1.NewTime(time.Now().Add(5))
newTime := v1.NewTime(time.Now().Add(10))
oldGraceTime := int64(5)
newGraceTime := int64(10)
p1.DeletionGracePeriodSeconds = &oldGraceTime
p2.DeletionTimestamp = &oldTime
p2.DeletionGracePeriodSeconds = &newGraceTime
p2.DeletionTimestamp = &newTime
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueGraceTimeChanged(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
graceTime := int64(30)
p2.DeletionGracePeriodSeconds = &graceTime
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodCreateNewPod(t *testing.T) {
svr := newTestController()

View File

@@ -83,8 +83,7 @@ type PodNotifier interface {
// fashion. The provided pod's PodStatus should be up to date when
// this function is called.
//
// NotifyPods must not block the caller since it is only used to register the callback.
// The callback passed into `NotifyPods` may block when called.
// NotifyPods will not block callers.
NotifyPods(context.Context, func(*corev1.Pod))
}
@@ -109,8 +108,6 @@ type PodController struct {
// deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period
deletionQ workqueue.RateLimitingInterface
podStatusQ workqueue.RateLimitingInterface
// From the time of creation, to termination the knownPods map will contain the pods key
// (derived from Kubernetes' cache library) -> a *knownPod struct.
knownPods sync.Map
@@ -161,9 +158,6 @@ type PodControllerConfig struct {
ConfigMapInformer corev1informers.ConfigMapInformer
SecretInformer corev1informers.SecretInformer
ServiceInformer corev1informers.ServiceInformer
// RateLimiter defines the rate limit of work queue
RateLimiter workqueue.RateLimiter
}
// NewPodController creates a new pod controller with the provided config.
@@ -189,9 +183,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.Provider == nil {
return nil, errdefs.InvalidInput("missing provider")
}
if cfg.RateLimiter == nil {
cfg.RateLimiter = workqueue.DefaultControllerRateLimiter()
}
rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister())
if err != nil {
return nil, pkgerrors.Wrap(err, "could not create resource manager")
@@ -206,9 +198,8 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
ready: make(chan struct{}),
done: make(chan struct{}),
recorder: cfg.EventRecorder,
k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"),
podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"),
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"),
}
return pc, nil
@@ -251,12 +242,13 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
}
pc.provider = provider
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
provider.NotifyPods(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, pc.podStatusQ, pod.DeepCopy())
pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
})
go runProvider(ctx)
defer pc.podStatusQ.ShutDown()
defer podStatusQueue.ShutDown()
// Wait for the caches to be synced *before* starting to do work.
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
@@ -278,16 +270,13 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Create a copy of the old and new pod objects so we don't mutate the cache.
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.G(ctx).Error(err)
} else {
if podShouldEnqueue(oldPod, newPod) {
pc.k8sQ.AddRateLimited(key)
}
pc.k8sQ.AddRateLimited(key)
}
},
DeleteFunc: func(pod interface{}) {
@@ -316,7 +305,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
workerID := strconv.Itoa(id)
go func() {
defer wg.Done()
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, pc.podStatusQ)
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, podStatusQueue)
}()
}
@@ -344,7 +333,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
<-ctx.Done()
log.G(ctx).Info("shutting down workers")
pc.k8sQ.ShutDown()
pc.podStatusQ.ShutDown()
podStatusQueue.ShutDown()
pc.deletionQ.ShutDown()
wg.Wait()

View File

@@ -1,7 +1,6 @@
package e2e
import (
"context"
"fmt"
"testing"
"time"
@@ -19,64 +18,17 @@ const (
deleteGracePeriodForProvider = 1 * time.Second
)
// TestGetPods tests that the /pods endpoint works, and only returns pods for our kubelet
func (ts *EndToEndTestSuite) TestGetPods(t *testing.T) {
ctx := context.Background()
// Create a pod with prefix "nginx-" having a single container.
podSpec := f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo")
podSpec.Spec.NodeName = f.NodeName
nginx, err := f.CreatePod(ctx, podSpec)
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, nginx.Namespace, nginx.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
t.Logf("Created pod: %s", nginx.Name)
// Wait for the "nginx-" pod to be reported as running and ready.
if _, err := f.WaitUntilPodReady(nginx.Namespace, nginx.Name); err != nil {
t.Fatal(err)
}
t.Logf("Pod %s ready", nginx.Name)
k8sPods, err := f.GetRunningPodsFromKubernetes(ctx)
if err != nil {
t.Fatal(err)
}
podFound := false
for _, pod := range k8sPods.Items {
if pod.Spec.NodeName != f.NodeName {
t.Fatalf("Found pod with node name %s, whereas expected %s", pod.Spec.NodeName, f.NodeName)
}
if pod.UID == nginx.UID {
podFound = true
}
}
if !podFound {
t.Fatal("Nginx pod not found")
}
}
// TestGetStatsSummary creates a pod having two containers and queries the /stats/summary endpoint of the virtual-kubelet.
// It expects this endpoint to return stats for the current node, as well as for the aforementioned pod and each of its two containers.
func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) {
ctx := context.Background()
// Create a pod with prefix "nginx-" having three containers.
pod, err := f.CreatePod(ctx, f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo", "bar", "baz"))
pod, err := f.CreatePod(f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo", "bar", "baz"))
if err != nil {
t.Fatal(err)
}
// Delete the "nginx-0-X" pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -87,7 +39,7 @@ func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) {
}
// Grab the stats from the provider.
stats, err := f.GetStatsSummary(ctx)
stats, err := f.GetStatsSummary()
if err != nil {
t.Fatal(err)
}
@@ -116,19 +68,17 @@ func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) {
// These verifications are made using the /stats/summary endpoint of the virtual-kubelet, by checking for the presence or absence of the pods.
// Hence, the provider being tested must implement the PodMetricsProvider interface.
func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
ctx := context.Background()
// Create a pod with prefix "nginx-" having a single container.
podSpec := f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo")
podSpec.Spec.NodeName = f.NodeName
pod, err := f.CreatePod(ctx, podSpec)
pod, err := f.CreatePod(podSpec)
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -141,7 +91,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
t.Logf("Pod %s ready", pod.Name)
// Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx)
pods, err := f.GetRunningPods()
assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats.
@@ -162,7 +112,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
}()
// Gracefully delete the "nginx-" pod.
if err := f.DeletePod(ctx, pod.Namespace, pod.Name); err != nil {
if err := f.DeletePod(pod.Namespace, pod.Name); err != nil {
t.Fatal(err)
}
t.Logf("Deleted pod: %s", pod.Name)
@@ -175,7 +125,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
time.Sleep(deleteGracePeriodForProvider)
// Give the provider some time to react to the MODIFIED/DELETED events before proceeding.
// Grab the pods from the provider.
pods, err = f.GetRunningPodsFromProvider(ctx)
pods, err = f.GetRunningPods()
assert.NilError(t, err)
// Make sure the pod DOES NOT exist in the provider's set of running pods
@@ -191,17 +141,15 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
// and put them in the running lifecycle. It then does a force delete on the pod, and verifies the provider
// has deleted it.
func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
ctx := context.Background()
podSpec := f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo")
// Create a pod with prefix having a single container.
pod, err := f.CreatePod(ctx, podSpec)
pod, err := f.CreatePod(podSpec)
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -214,7 +162,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
t.Logf("Pod %s ready", pod.Name)
// Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx)
pods, err := f.GetRunningPods()
assert.NilError(t, err)
// Check if the pod exists in the slice of Pods.
@@ -240,7 +188,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
time.Sleep(deleteGracePeriodForProvider)
// Forcibly delete the pod.
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil {
t.Logf("Last saw pod in state: %+v", podLast)
t.Fatal(err)
}
@@ -255,7 +203,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
time.Sleep(deleteGracePeriodForProvider)
// Grab the pods from the provider.
pods, err = f.GetRunningPodsFromProvider(ctx)
pods, err = f.GetRunningPods()
assert.NilError(t, err)
// Make sure the "nginx-" pod DOES NOT exist in the slice of Pods anymore.
@@ -268,17 +216,15 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
// TestCreatePodWithOptionalInexistentSecrets tries to create a pod referencing optional, inexistent secrets.
// It then verifies that the pod is created successfully.
func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentSecrets(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing optional, inexistent secrets.
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithOptionalSecretKey(t.Name()))
pod, err := f.CreatePod(f.CreatePodObjectWithOptionalSecretKey(t.Name()))
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -294,7 +240,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentSecrets(t *testi
}
// Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx)
pods, err := f.GetRunningPods()
assert.NilError(t, err)
// Check if the pod exists in the slice of Pods.
@@ -304,17 +250,15 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentSecrets(t *testi
// TestCreatePodWithMandatoryInexistentSecrets tries to create a pod referencing inexistent secrets.
// It then verifies that the pod is not created.
func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentSecrets(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing inexistent secrets.
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithMandatorySecretKey(t.Name()))
pod, err := f.CreatePod(f.CreatePodObjectWithMandatorySecretKey(t.Name()))
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -325,7 +269,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentSecrets(t *test
}
// Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx)
pods, err := f.GetRunningPods()
assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats.
@@ -335,17 +279,15 @@ func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentSecrets(t *test
// TestCreatePodWithOptionalInexistentConfigMap tries to create a pod referencing optional, inexistent config map.
// It then verifies that the pod is created successfully.
func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentConfigMap(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing optional, inexistent config map.
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithOptionalConfigMapKey(t.Name()))
pod, err := f.CreatePod(f.CreatePodObjectWithOptionalConfigMapKey(t.Name()))
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -361,7 +303,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentConfigMap(t *tes
}
// Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx)
pods, err := f.GetRunningPods()
assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats.
@@ -371,17 +313,15 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentConfigMap(t *tes
// TestCreatePodWithMandatoryInexistentConfigMap tries to create a pod referencing inexistent secrets.
// It then verifies that the pod is not created.
func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentConfigMap(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing inexistent config map.
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithMandatoryConfigMapKey(t.Name()))
pod, err := f.CreatePod(f.CreatePodObjectWithMandatoryConfigMapKey(t.Name()))
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -392,7 +332,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentConfigMap(t *te
}
// Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx)
pods, err := f.GetRunningPods()
assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats.

View File

@@ -19,7 +19,7 @@ func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
podList, err := f.KubeClient.CoreV1().Pods(f.Namespace).List(ctx, metav1.ListOptions{
podList, err := f.KubeClient.CoreV1().Pods(f.Namespace).List(metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", f.NodeName).String(),
})
@@ -28,7 +28,7 @@ func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) {
chErr := make(chan error, 1)
originalNode, err := f.GetNode(ctx)
originalNode, err := f.GetNode()
assert.NilError(t, err)
ctx, cancel = context.WithTimeout(ctx, time.Minute)
@@ -50,7 +50,7 @@ func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) {
chErr <- f.WaitUntilNodeCondition(wait)
}()
assert.NilError(t, f.DeleteNode(ctx))
assert.NilError(t, f.DeleteNode())
select {
case result := <-chErr:

View File

@@ -10,7 +10,7 @@ You can install Virtual Kubelet by building it [from source](#source). First, ma
mkdir -p ${GOPATH}/src/github.com/virtual-kubelet
cd ${GOPATH}/src/github.com/virtual-kubelet
git clone https://github.com/virtual-kubelet/virtual-kubelet
cd virtual-kubelet && make build
make build
```
This method adds a `virtual-kubelet` executable to the `bin` folder. To run it:

View File

@@ -1,7 +1,3 @@
- name: Admiralty Multi-Cluster Scheduler
tag: multicluster-scheduler
org: admiraltyio
vanityImportPath: admiralty.io/multicluster-scheduler
- name: Alibaba Cloud Elastic Container Instance (**ECI**)
tag: alibabacloud-eci
- name: AWS Fargate
@@ -10,9 +6,6 @@
tag: azure-batch
- name: Azure Container Instances (**ACI**)
tag: azure-aci
- name: Elotl Kip
tag: kip
org: elotl
- name: Kubernetes Container Runtime Interface (**CRI**)
tag: cri
- name: Huawei Cloud Container Instance (**CCI**)
@@ -21,5 +14,3 @@
tag: nomad
- name: OpenStack Zun
tag: openstack-zun
- name: Tencent Games Tensile Kube
tag: tensile-kube

View File

@@ -25,7 +25,7 @@
<ul>
{{ range $providers }}
{{ $url := printf "https://github.com/%s/%s/blob/master/README.md#readme" (.org | default "virtual-kubelet") .tag }}
{{ $url := printf "https://github.com/virtual-kubelet/%s/blob/master/README.md#readme" .tag }}
<li class="has-bottom-spacing">
<a class="is-size-5 is-size-6-mobile has-text-grey-lighter has-text-weight-light" href="{{ $url }}" target="_blank">
{{ .name | markdownify }}

View File

@@ -3,9 +3,8 @@
<tbody>
{{ range $providers }}
{{ $name := .name | markdownify }}
{{ $githubPath := printf "github.com/%s/%s" (.org | default "virtual-kubelet") .tag }}
{{ $pkgName := .vanityImportPath | default $githubPath }}
{{ $pkgUrl := printf "https://%s/blob/master/README.md#readme" $githubPath }}
{{ $pkgName := printf "github.com/virtual-kubelet/%s" .tag }}
{{ $pkgUrl := printf "https://github.com/virtual-kubelet/%s/blob/master/README.md#readme" .tag }}
{{ $godocUrl := printf "https://godoc.org/%s" $pkgName }}
<tr>
<td>