Compare commits

...

70 Commits

Author SHA1 Message Date
Sargun Dhillon
5a39c167a6 Merge pull request #861 from curx/patch-1
Rearange OpenStack/Tensile Part
2020-07-27 09:48:30 -07:00
Thorsten Schifferdecker
c19bac7ed8 Rearange OpenStack/Tensile Part
Signed-off-by: Thorsten Schifferdecker <schifferdecker@b1-systems.de>
2020-07-26 12:49:46 +02:00
Brian Goff
af1df79088 Merge pull request #851 from virtual-kubelet/race-condition-2nd 2020-07-23 13:53:58 -07:00
Brian Goff
0b7e66d57c Merge pull request #853 from elotl/vilmos-stats-path
Use /stats/summary for metrics handler
2020-07-23 10:04:11 -07:00
Vilmos Nebehaj
56b248c854 Add GetStatsSummary to PodHandlerConfig
If both the metrics routes and the pod routes are attached to the same
mux with the pattern "/", it will panic. Instead, add the stats handler
function to PodHandlerConfig and set up the route if it is not nil.
2020-07-23 09:50:19 -07:00
Sargun Dhillon
4258c46746 Enhance / cleanup enqueuePodStatusUpdate polling in retry loop 2020-07-22 18:57:27 -07:00
Sargun Dhillon
1e9e055e89 Address concerns with PR
Also, just use Kubernetes waiter library.
2020-07-22 18:57:27 -07:00
Sargun Dhillon
12625131b5 Solve the notification on startup pod status notification race condition
This solves the race condition as described in
https://github.com/virtual-kubelet/virtual-kubelet/issues/836.

It does this by checking two conditions when the possible race condition
is detected.

If we receive a pod notification from the provider, and it is not
in our known pods list:
1. Is our cache in-sync?
2. Is it known to our pod lister?

The first case can happen because of the order we start the
provider and sync our caches. The second case can happen because
even if the cache returns synced, it does not mean all of the call
backs on the informer have quiesced.

This slightly changes the behaviour of notifyPods to that it
can block (especially at startup). We can solve this later
by using something like a fair (ticket?) lock.
2020-07-22 18:57:27 -07:00
Brian Goff
ee7f5fa3ef Merge pull request #852 from cpuguy83/fix_running_pods_npe
Fix running pods handler on nil lister
2020-07-14 19:04:50 -07:00
Brian Goff
bcb5dfa11c Fix running pods handler on nil lister
This follows suit with other hanlders and returns a NotImplemented
http.HandlerFunc when the lister is nil.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2020-07-14 15:33:59 -07:00
Brian Goff
b7c19cb5a1 Merge pull request #850 from cwdsuzhou/add_website
Add tensile kube to website
2020-07-09 15:50:10 -07:00
wadecai
2f989f5278 Add tensile kube to web site 2020-07-09 11:28:54 +08:00
Brian Goff
5455f290a4 Merge pull request #844 from adrienjt/upgrade-k8s-1-18 2020-07-08 15:02:00 -07:00
Adrien Trouillaud
9c6b48c1c3 rm obsolete doc 2020-07-07 21:00:56 -07:00
Adrien Trouillaud
e00e4c2bba make e2e test compatible with go1.13 2020-07-07 21:00:56 -07:00
Adrien Trouillaud
72a0be3f45 upgrade to go 1.13
required by k8s libs at 1.18
2020-07-07 21:00:56 -07:00
Adrien Trouillaud
845b4cd409 upgrade k8s libs to 1.18.4 2020-07-07 21:00:56 -07:00
Brian Goff
f934ded4a2 Merge pull request #838 from sargun/threeway-patch
Introduce three-way patch for proper handling of updates
2020-07-07 12:52:14 -07:00
Brian Goff
364d7a9a74 Merge pull request #848 from cwdsuzhou/add_tensile-kube
Add tensile kube to README
2020-07-06 11:13:44 -07:00
Sargun Dhillon
e805cb744a Introduce three-way patch for proper handling of out-of-band status updates
As described in the patch itself, there is a case that if a node is updated out of
band (e.g. node-problem-detector (https://github.com/kubernetes/node-problem-detector)),
we will overwrite the patch in our typicaly two-way strategic patch for node status
updates.

The reason why the standard kubelet can do this is because the flow goes:
apiserver->kubelet: Fetch current node
kubelet->kubelet: Update apiserver's snapshot with local state changes
kubelet->apiserver: patch

We don't have this luxury, as we rely on providers making a callback into us
in order to get the most recent pod status. They do not have a way
to do that merge operation themselves, and a two-way merge doesn't
give us enough metadata.

In order to work around this, we perform a three-way merge on behalf of
the user. We do this by stashing the contents of the last update inside
of it. We then fetch that status back, and use that for the future
update itself.

In the upgrade case, or the case where the VK has been created by
"someone else", we do not know which attributes were created by
or written by us, so we cannot generate a three way patch.

In this case, we will do our best to avoid deleting any attributes,
and only overwrite them. We will consider all current api server
values written by "someone else", and not edit them. This is done
by considering the "old node" to be empty.
2020-07-06 11:10:32 -07:00
wadecai
56a39032e9 add_tensile_kube 2020-07-05 01:03:17 +08:00
Brian Goff
b50302b845 Merge pull request #847 from hustcat/tencent
Add Tencent Games to the adopters
2020-07-02 14:45:38 -07:00
dbyin(尹烨)
da1cb98b5d Add Tencent Games to the adopters 2020-07-02 12:13:48 +08:00
Brian Goff
5306173408 Merge pull request #846 from sargun/add-trace-to-updateStatus
Add instrumentation to node controller (tracing)
2020-07-01 12:53:27 -07:00
Sargun Dhillon
d6a38ca721 Merge pull request #845 from sargun/non-blocking-node-status-update
Make node status updates non-blocking
2020-07-01 12:46:56 -07:00
Sargun Dhillon
30aabe6fcb Add instrumentation to node controller (tracing)
This adds tracing in node controller in several sections where
it was missing.
2020-07-01 12:40:09 -07:00
Sargun Dhillon
1e8c16877d Make node status updates non-blocking
There's a (somewhat) common case we can get into where the node
status update loop is busy while a provider is trying to send
a node status update. Right now, we block the provider from
creating a notification in this case.
2020-07-01 12:32:54 -07:00
Brian Goff
bd977cb224 Merge pull request #841 from cwdsuzhou/June/support_queue_define 2020-06-29 15:52:28 -07:00
wadecai
ca417d5239 Expose the queue rate limiter 2020-06-26 10:45:41 +08:00
wadecai
fedffd6f2c Add parameters to support change work queue qps 2020-06-26 10:44:09 +08:00
Brian Goff
e72e31b0d8 Merge pull request #843 from virtual-kubelet/rbitia-zoom-link
Zoom link update
2020-06-24 13:03:12 -07:00
Ria Bhatia
a667c5113b Update README.md
Edit zoom link since we have a new cncf vk zoom account.
2020-06-24 12:51:17 -07:00
Sargun Dhillon
bfd3f51ff3 Merge pull request #842 from sargun/fix-validate
Fix kubernetes version dependency checking script
2020-06-23 09:12:48 -07:00
Sargun Dhillon
5f64eab109 Fix kubernetes version dependency checking script
We had locked to version v1.17.6 when this script was released. At the time,
it was also the current stable release, and what was presented by the
github API. It turns out the Github API does not present all tags. This
changes it to fetch the annotated tag from the upstream repo.
2020-06-22 22:53:27 -07:00
Weidong Cai
2398504d08 dedup in updatePodStatus (#830)
Co-authored-by: Brian Goff <cpuguy83@gmail.com>
2020-06-15 14:35:14 -07:00
Sargun Dhillon
05fc1a43be Merge pull request #835 from cwdsuzhou/June/avoid_enqueue
Avoid enqueue when status of k8s pods change
2020-06-15 12:25:23 -07:00
wadecai
3db9ab97c6 Avoid enqueue when status of k8s pods change 2020-06-13 13:19:55 +08:00
Sargun Dhillon
dfca10f0dc Merge pull request #834 from sargun/upgrade-to-1.17
Upgrade Kubernetes libraries to 1.17
2020-06-08 11:05:52 -07:00
Sargun Dhillon
ee93284f38 Bump version of kubernetes cluster 2020-06-04 16:41:58 -07:00
Sargun Dhillon
bef6eda40d Add versioning pinning checker and instructions
This ensures that the version pinning is setup correctly, so all the deps
are pointing at the right underlying versions.
2020-06-04 16:27:04 -07:00
Sargun Dhillon
b3213d6eb2 Update kubernetes dependencies to v1.17.6
This also locks golang.org/x/sys/unix as it's another transitive dep that
flaps.
2020-06-04 16:25:41 -07:00
Brian Goff
a67cfab42b Merge pull request #833 from cpuguy83/fix_stream_idle_timeout_break
Fix stream timeout defaults
2020-06-03 20:32:53 -07:00
Brian Goff
51b9a6c40d Fix stream timeout defaults
This was an unintentional breaking change in
0bdf742303

A timeout of 0 doesn't make any sense, so use the old values of 30s as a
default.
2020-06-03 10:01:34 -07:00
Brian Goff
8fc8b69d8f Merge pull request #806 from elotl/vilmos-followlogs
Add support for v1.PodLogOptions
2020-05-04 11:05:57 -07:00
Vilmos Nebehaj
3e0d03c833 Use errdefs.InvalidInputf() for formatting 2020-04-28 11:19:37 -07:00
Vilmos Nebehaj
7628c13aeb Add tests for parseLogOptions() 2020-04-28 11:19:37 -07:00
Vilmos Nebehaj
8308033eff Add support for v1.PodLogOptions 2020-04-28 11:19:37 -07:00
Brian Goff
d9193e2440 Merge pull request #824 from cwdsuzhou/March/check_pod_equal
Check pods status deep equal before update
2020-04-21 12:54:30 -07:00
wadecai
30e31c0451 Check pod status equal before enqueue 2020-04-21 10:42:29 +08:00
Brian Goff
70f1a93c6e Merge pull request #828 from EDGsheryl/master
Optimize Docs
2020-04-15 06:24:59 -07:00
EDGsheryl
063f1fcdbc Optimize Docs
Signed-off-by: EDGsheryl <edgsheryl@gmail.com>
2020-04-08 15:34:08 +08:00
Brian Goff
de8226751d Merge pull request #826 from elotl/elotl-kip
Add Elotl Kip as a provider
2020-03-23 10:31:31 -07:00
Vilmos Nebehaj
47a353897e Add Elotl Kip as a provider 2020-03-20 15:08:11 -07:00
Brian Goff
3ec3b14e49 Merge pull request #825 from sargun/add-pods-api
Add /pods HTTP endpoint
2020-03-20 14:28:02 -07:00
Sargun Dhillon
5ad12cd476 Add /pods HTTP endpoint 2020-03-20 12:04:00 -07:00
Brian Goff
230ebe1b29 Merge pull request #818 from guoliangshuai/master
add 'GET' method to pod exec handler, so it can support websocket
2020-03-09 13:30:47 -07:00
guoliangshuai
554d30a0b1 add 'GET' method to pod exec handler, so it can support websocket 2020-03-09 14:16:49 +08:00
Ria Bhatia
5c1c3886e0 Changing meeting times (#814)
Changing meeting times to be once a month, and to be held as office hours. 
Changing from Weds at 10:30 to once a month on Thursday at 10am.
2020-02-19 14:25:21 -08:00
Brian Goff
4fea631791 Merge pull request #810 from adrienjt/add-provider-multicluster-scheduler
add provider admiralty multi-cluster scheduler
2020-02-06 16:18:17 -08:00
Adrien Trouillaud
5995a2a18d add provider admiralty multi-cluster scheduler 2020-02-05 19:05:28 -08:00
Brian Goff
fb33c2e144 Merge pull request #805 from elotl/vilmos-flushlogs
Use correct Flush() prototype from http.Flusher
2020-01-21 08:52:50 -08:00
Vilmos Nebehaj
47112aa5d6 Use correct Flush() prototype from http.Flusher
When calling GetContainerLogs(), a type check is performed to see if the
http.ResponseWriter supports flushing. However, Flush() in http.Flusher
does not return an error, therefore the type check will always fail.

Fix the flushWriter helper interface so flushing the writer will work.
2020-01-20 13:27:36 -08:00
Weidong Cai
0bdf742303 Make exec timeout configurable (#803)
* make exec timeout configurable
2020-01-18 12:11:54 -08:00
Brian Goff
4162bba465 Merge pull request #797 from cwdsuzhou/add_some_event
add some events to pod
2020-01-09 16:12:18 -08:00
wadecai
55f3f17ba0 add some event to pod 2019-11-29 14:33:00 +08:00
Brian Goff
7f2a022915 Merge pull request #793 from cpuguy83/fix_pod_status_panic
[Sync Provider] Fix panic on not found pod status
2019-11-15 14:27:55 -08:00
Brian Goff
6e33b0f084 [Sync Provider] Fix panic on not found pod status 2019-11-15 09:44:29 -08:00
Brian Goff
1a9c4bfb24 Merge pull request #789 from tghartland/fix-notify-status-788
After handling status update, reset update timer with correct duration
2019-11-12 09:49:08 -08:00
Thomas Hartland
c258614d8f After handling status update, reset update timer with correct duration
If the ping timer is being used, it should be reset with the ping update
interval. If the status update interval is used then Ping stops being
called for long enough to cause kubernetes to mark the node as NotReady.
2019-11-11 14:29:52 +01:00
Thomas Hartland
3783a39b26 Add test for node ping interval 2019-11-11 14:29:52 +01:00
35 changed files with 1849 additions and 479 deletions

View File

@@ -3,7 +3,7 @@ jobs:
validate:
resource_class: xlarge
docker:
- image: circleci/golang:1.12
- image: circleci/golang:1.13
environment:
GO111MODULE: "on"
GOPROXY: https://proxy.golang.org
@@ -33,7 +33,7 @@ jobs:
test:
resource_class: xlarge
docker:
- image: circleci/golang:1.12
- image: circleci/golang:1.13
environment:
GO111MODULE: "on"
working_directory: /go/src/github.com/virtual-kubelet/virtual-kubelet
@@ -61,7 +61,7 @@ jobs:
CHANGE_MINIKUBE_NONE_USER: true
GOPATH: /home/circleci/go
KUBECONFIG: /home/circleci/.kube/config
KUBERNETES_VERSION: v1.15.2
KUBERNETES_VERSION: v1.17.6
MINIKUBE_HOME: /home/circleci
MINIKUBE_VERSION: v1.2.0
MINIKUBE_WANTUPDATENOTIFICATION: false
@@ -117,7 +117,7 @@ jobs:
command: |
mkdir $HOME/.go
export PATH=$HOME/.go/bin:${PATH}
curl -fsSL -o "/tmp/go.tar.gz" "https://dl.google.com/go/go1.12.6.linux-amd64.tar.gz"
curl -fsSL -o "/tmp/go.tar.gz" "https://dl.google.com/go/go1.13.12.linux-amd64.tar.gz"
tar -C $HOME/.go --strip-components=1 -xzf "/tmp/go.tar.gz"
go version
make e2e

View File

@@ -6,6 +6,9 @@
* VMWare
* Netflix
* Hashi Corp
* Admiralty
* Elotl
* Tencent Games
Since end-users are specific per provider within VK we have many end-user customers that we don't have permission to list publically. Please contact ribhatia@microsoft.com for more informtation.

View File

@@ -1,4 +1,4 @@
FROM golang:1.12 as builder
FROM golang:1.13 as builder
ENV PATH /go/bin:/usr/local/go/bin:$PATH
ENV GOPATH /go
COPY . /go/src/github.com/virtual-kubelet/virtual-kubelet

View File

@@ -13,7 +13,7 @@ include Makefile.e2e
# Also, we will want to lock our tool versions using go mod:
# https://github.com/golang/go/wiki/Modules#how-can-i-track-tool-dependencies-for-a-module
gobin_tool ?= $(shell which gobin || echo $(GOPATH)/bin/gobin)
goimports := golang.org/x/tools/cmd/goimports@release-branch.go1.12
goimports := golang.org/x/tools/cmd/goimports@release-branch.go1.13
gocovmerge := github.com/wadey/gocovmerge@b5bfa59ec0adc420475f97f89b58045c721d761c
goreleaser := github.com/goreleaser/goreleaser@v0.82.2
gox := github.com/mitchellh/gox@v1.0.1
@@ -119,8 +119,6 @@ format: goimports
$Q find . -iname \*.go | grep -v \
-e "^$$" $(addprefix -e ,$(IGNORED_PACKAGES)) | xargs $(gobin_tool) -run $(goimports) -w
##### =====> Internals <===== #####
.PHONY: setup

View File

@@ -2,7 +2,7 @@
Virtual Kubelet is an open source [Kubernetes kubelet](https://kubernetes.io/docs/reference/generated/kubelet/)
implementation that masquerades as a kubelet for the purposes of connecting Kubernetes to other APIs.
This allows the nodes to be backed by other services like ACI, AWS Fargate, [IoT Edge](https://github.com/Azure/iot-edge-virtual-kubelet-provider) etc. The primary scenario for VK is enabling the extension of the Kubernetes API into serverless container platforms like ACI and Fargate, though we are open to others. However, it should be noted that VK is explicitly not intended to be an alternative to Kubernetes federation.
This allows the nodes to be backed by other services like ACI, AWS Fargate, [IoT Edge](https://github.com/Azure/iot-edge-virtual-kubelet-provider), [Tensile Kube](https://github.com/virtual-kubelet/tensile-kube) etc. The primary scenario for VK is enabling the extension of the Kubernetes API into serverless container platforms like ACI and Fargate, though we are open to others. However, it should be noted that VK is explicitly not intended to be an alternative to Kubernetes federation.
Virtual Kubelet features a pluggable architecture and direct use of Kubernetes primitives, making it much easier to build on.
@@ -16,12 +16,15 @@ The best description is "Kubernetes API on top, programmable back."
* [How It Works](#how-it-works)
* [Usage](#usage)
* [Providers](#providers)
+ [Admiralty Multi-Cluster Scheduler](#admiralty-multi-cluster-scheduler)
+ [Alibaba Cloud ECI Provider](#alibaba-cloud-eci-provider)
+ [Azure Container Instances Provider](#azure-container-instances-provider)
+ [Azure Batch GPU Provider](https://github.com/virtual-kubelet/azure-batch/blob/master/README.md)
+ [AWS Fargate Provider](#aws-fargate-provider)
+ [Elotl Kip](#elotl-kip)
+ [HashiCorp Nomad](#hashicorp-nomad-provider)
+ [OpenStack Zun](#openstack-zun-provider)
+ [Tensile Kube Provider](#tensile-kube-provider)
+ [Adding a New Provider via the Provider Interface](#adding-a-new-provider-via-the-provider-interface)
* [Testing](#testing)
+ [Unit tests](#unit-tests)
@@ -73,6 +76,9 @@ Providers must provide the following functionality to be considered a supported
2. Conforms to the current API provided by Virtual Kubelet.
3. Does not have access to the Kubernetes API Server and has a well-defined callback mechanism for getting data like secrets or configmaps.
### Admiralty Multi-Cluster Scheduler
Admiralty Multi-Cluster Scheduler mutates annotated pods into "proxy pods" scheduled on a virtual-kubelet node and creates corresponding "delegate pods" in remote clusters (actually running the containers). A feedback loop updates the statuses and annotations of the proxy pods to reflect the statuses and annotations of the delegate pods. You can find more details in the [Admiralty Multi-Cluster Scheduler documentation](https://github.com/admiraltyio/multicluster-scheduler).
### Alibaba Cloud ECI Provider
@@ -112,6 +118,12 @@ co-exist with pods on regular worker nodes in the same Kubernetes cluster.
Easy instructions and a sample configuration file is available in the [AWS Fargate provider documentation](https://github.com/virtual-kubelet/aws-fargate). Please note that this provider is not currently supported.
### Elotl Kip
[Kip](https://github.com/elotl/kip) is a provider that runs pods in cloud instances, allowing a Kubernetes cluster to transparently scale workloads into a cloud. When a pod is scheduled onto the virtual node, Kip starts a right-sized cloud instance for the pod's workload and dispatches the pod onto the instance. When the pod is finished running, the cloud instance is terminated.
When workloads run on Kip, your cluster size naturally scales with the cluster workload, pods are strongly isolated from each other and the user is freed from managing worker nodes and strategically packing pods onto nodes.
### HashiCorp Nomad Provider
HashiCorp [Nomad](https://nomadproject.io) provider for Virtual Kubelet connects your Kubernetes cluster
@@ -137,6 +149,11 @@ and bind-mount Cinder volumes into a path inside a pod's container.
For detailed instructions, follow the guide [here](https://github.com/virtual-kubelet/openstack-zun/blob/master/README.md).
### Tensile Kube Provider
[Tensile kube](https://github.com/virtual-kubelet/tensile-kube/blob/master/README.md) is contributed by [tencent
games](https://game.qq.com), which is provider for Virtual Kubelet connects your Kubernetes cluster with other Kubernetes clusters. This provider enables us extending Kubernetes to an unlimited one. By using the provider, pods that are scheduled on the virtual node registered on Kubernetes will run as jobs on other Kubernetes clusters' nodes.
### Adding a New Provider via the Provider Interface
Providers consume this project as a library which implements the core logic of
@@ -276,7 +293,7 @@ Enable the ServiceNodeExclusion flag, by modifying the Controller Manager manife
Virtual Kubelet follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).
Sign the [CNCF CLA](https://github.com/kubernetes/community/blob/master/CLA.md) to be able to make Pull Requests to this repo.
Bi-weekly Virtual Kubelet Architecture meetings are held at 11am PST every other Wednesday in this [zoom meeting room](https://zoom.us/j/245165908). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ).
Monthly Virtual Kubelet Office Hours are held at 10am PST on the last Thursday of every month in this [zoom meeting room](https://zoom.us/j/94701509915). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ).
Our google drive with design specifications and meeting notes are [here](https://drive.google.com/drive/folders/19Ndu11WBCCBDowo9CrrGUHoIfd2L8Ueg?usp=sharing).

View File

@@ -81,6 +81,11 @@ func installFlags(flags *pflag.FlagSet, c *Opts) {
flags.DurationVar(&c.InformerResyncPeriod, "full-resync-period", c.InformerResyncPeriod, "how often to perform a full resync of pods between kubernetes and the provider")
flags.DurationVar(&c.StartupTimeout, "startup-timeout", c.StartupTimeout, "How long to wait for the virtual-kubelet to start")
flags.DurationVar(&c.StreamIdleTimeout, "stream-idle-timeout", c.StreamIdleTimeout,
"stream-idle-timeout is the maximum time a streaming connection can be idle before the connection is"+
" automatically closed, default 30s.")
flags.DurationVar(&c.StreamCreationTimeout, "stream-creation-timeout", c.StreamCreationTimeout,
"stream-creation-timeout is the maximum time for streaming connection, default 30s.")
flagset := flag.NewFlagSet("klog", flag.PanicOnError)
klog.InitFlags(flagset)

View File

@@ -22,6 +22,7 @@ import (
"net"
"net/http"
"os"
"time"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
@@ -57,7 +58,7 @@ func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) {
}, nil
}
func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig) (_ func(), retErr error) {
func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig, getPodsFromKubernetes api.PodListerFunc) (_ func(), retErr error) {
var closers []io.Closer
cancel := func() {
for _, c := range closers {
@@ -88,10 +89,14 @@ func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerCon
mux := http.NewServeMux()
podRoutes := api.PodHandlerConfig{
RunInContainer: p.RunInContainer,
GetContainerLogs: p.GetContainerLogs,
GetPods: p.GetPods,
RunInContainer: p.RunInContainer,
GetContainerLogs: p.GetContainerLogs,
GetPodsFromKubernetes: getPodsFromKubernetes,
GetPods: p.GetPods,
StreamIdleTimeout: cfg.StreamIdleTimeout,
StreamCreationTimeout: cfg.StreamCreationTimeout,
}
api.AttachPodRoutes(podRoutes, mux, true)
s := &http.Server{
@@ -142,10 +147,12 @@ func serveHTTP(ctx context.Context, s *http.Server, l net.Listener, name string)
}
type apiServerConfig struct {
CertPath string
KeyPath string
Addr string
MetricsAddr string
CertPath string
KeyPath string
Addr string
MetricsAddr string
StreamIdleTimeout time.Duration
StreamCreationTimeout time.Duration
}
func getAPIConfig(c Opts) (*apiServerConfig, error) {
@@ -156,6 +163,8 @@ func getAPIConfig(c Opts) (*apiServerConfig, error) {
config.Addr = fmt.Sprintf(":%d", c.ListenPort)
config.MetricsAddr = c.MetricsAddr
config.StreamIdleTimeout = c.StreamIdleTimeout
config.StreamCreationTimeout = c.StreamCreationTimeout
return &config, nil
}

View File

@@ -36,8 +36,10 @@ const (
DefaultKubeNamespace = corev1.NamespaceAll
DefaultKubeClusterDomain = "cluster.local"
DefaultTaintEffect = string(corev1.TaintEffectNoSchedule)
DefaultTaintKey = "virtual-kubelet.io/provider"
DefaultTaintEffect = string(corev1.TaintEffectNoSchedule)
DefaultTaintKey = "virtual-kubelet.io/provider"
DefaultStreamIdleTimeout = 30 * time.Second
DefaultStreamCreationTimeout = 30 * time.Second
)
// Opts stores all the options for configuring the root virtual-kubelet command.
@@ -84,6 +86,11 @@ type Opts struct {
// Startup Timeout is how long to wait for the kubelet to start
StartupTimeout time.Duration
// StreamIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
StreamIdleTimeout time.Duration
// StreamCreationTimeout is the maximum time for streaming connection
StreamCreationTimeout time.Duration
Version string
}
@@ -152,5 +159,13 @@ func SetDefaultOpts(c *Opts) error {
}
}
if c.StreamIdleTimeout == 0 {
c.StreamIdleTimeout = DefaultStreamIdleTimeout
}
if c.StreamCreationTimeout == 0 {
c.StreamCreationTimeout = DefaultStreamCreationTimeout
}
return nil
}

View File

@@ -161,7 +161,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
log.G(ctx).Debug("node not found")
newNode := pNode.DeepCopy()
newNode.ResourceVersion = ""
_, err = client.CoreV1().Nodes().Create(newNode)
_, err = client.CoreV1().Nodes().Create(ctx, newNode, metav1.CreateOptions{})
if err != nil {
return err
}
@@ -193,7 +193,9 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
go podInformerFactory.Start(ctx.Done())
go scmInformerFactory.Start(ctx.Done())
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig)
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) {
return rm.GetPods(), nil
})
if err != nil {
return err
}

81
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/virtual-kubelet/virtual-kubelet
go 1.12
go 1.13
require (
contrib.go.opencensus.io/exporter/jaeger v0.1.0
@@ -8,79 +8,64 @@ require (
github.com/docker/spdystream v0.0.0-20170912183627-bc6354cbbc29 // indirect
github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 // indirect
github.com/evanphx/json-patch v4.1.0+incompatible // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
github.com/google/go-cmp v0.3.1
github.com/google/gofuzz v1.0.0 // indirect
github.com/googleapis/gnostic v0.1.0 // indirect
github.com/gorilla/mux v1.7.0
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/json-iterator/go v1.1.6 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
github.com/sirupsen/logrus v1.4.1
github.com/spf13/cobra v0.0.2
github.com/spf13/pflag v1.0.3
github.com/stretchr/testify v1.3.0 // indirect
github.com/prometheus/client_golang v1.0.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
go.opencensus.io v0.21.0
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c // indirect
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 // indirect
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/genproto v0.0.0-20190404172233-64821d5d2107 // indirect
google.golang.org/grpc v1.20.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7
gotest.tools v2.2.0+incompatible
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
k8s.io/client-go v10.0.0+incompatible
k8s.io/klog v0.3.1
k8s.io/kube-openapi v0.0.0-20190510232812-a01b7d5d6c22 // indirect
k8s.io/kubernetes v1.15.2
k8s.io/api v0.18.4
k8s.io/apimachinery v0.18.4
k8s.io/client-go v0.18.4
k8s.io/klog v1.0.0
k8s.io/kubernetes v1.18.4
)
replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.0.0-20190805144654-3d5bf3a310c1
replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.18.4
replace k8s.io/cloud-provider => k8s.io/cloud-provider v0.0.0-20190805144409-8484242760e7
replace k8s.io/cloud-provider => k8s.io/cloud-provider v0.18.4
replace k8s.io/cli-runtime => k8s.io/cli-runtime v0.0.0-20190805143448-a07e59fb081d
replace k8s.io/cli-runtime => k8s.io/cli-runtime v0.18.4
replace k8s.io/apiserver => k8s.io/apiserver v0.0.0-20190805142138-368b2058237c
replace k8s.io/apiserver => k8s.io/apiserver v0.18.4
replace k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.0.0-20190805144531-3985229e1802
replace k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.18.4
replace k8s.io/cri-api => k8s.io/cri-api v0.0.0-20190531030430-6117653b35f1
replace k8s.io/cri-api => k8s.io/cri-api v0.18.4
replace k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.0.0-20190805142416-fd821fbbb94e
replace k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.18.4
replace k8s.io/kubelet => k8s.io/kubelet v0.0.0-20190805143852-517ff267f8d1
replace k8s.io/kubelet => k8s.io/kubelet v0.18.4
replace k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.0.0-20190805144128-269742da31dd
replace k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.18.4
replace k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
replace k8s.io/apimachinery => k8s.io/apimachinery v0.18.4
replace k8s.io/api => k8s.io/api v0.0.0-20190805141119-fdd30b57c827
replace k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.18.4
replace k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.0.0-20190805144246-c01ee70854a1
replace k8s.io/kube-proxy => k8s.io/kube-proxy v0.18.4
replace k8s.io/kube-proxy => k8s.io/kube-proxy v0.0.0-20190805143734-7f1675b90353
replace k8s.io/component-base => k8s.io/component-base v0.18.4
replace k8s.io/component-base => k8s.io/component-base v0.0.0-20190805141645-3a5e5ac800ae
replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.18.4
replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.0.0-20190805144012-2a1ed1f3d8a4
replace k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.18.4
replace k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.0.0-20190805143126-cdb999c96590
replace k8s.io/metrics => k8s.io/metrics v0.18.4
replace k8s.io/metrics => k8s.io/metrics v0.0.0-20190805143318-16b07057415d
replace k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.18.4
replace k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.0.0-20190805142637-3b65bc4bb24f
replace k8s.io/code-generator => k8s.io/code-generator v0.18.4
replace k8s.io/code-generator => k8s.io/code-generator v0.0.0-20190612205613-18da4a14b22b
replace k8s.io/client-go => k8s.io/client-go v0.18.4
replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190805141520-2fe0317bcee0
replace k8s.io/kubectl => k8s.io/kubectl v0.18.4
replace k8s.io/api => k8s.io/api v0.18.4

685
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -5,4 +5,5 @@ import (
// This is a dep that `go mod tidy` keeps removing, because it's a transitive dep that's pulled in via a test
// See: https://github.com/golang/go/issues/29702
_ "github.com/prometheus/client_golang/prometheus"
_ "golang.org/x/sys/unix"
)

View File

@@ -16,23 +16,24 @@ import (
// WaitUntilNodeCondition establishes a watch on the vk node.
// Then, it waits for the specified condition function to be verified.
func (f *Framework) WaitUntilNodeCondition(fn watch.ConditionFunc) error {
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cancel := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cancel()
// Create a field selector that matches the specified Pod resource.
fs := fields.OneTermEqualSelector("metadata.name", f.NodeName).String()
// Create a ListWatch so we can receive events for the matched Pod resource.
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fs
return f.KubeClient.CoreV1().Nodes().List(options)
return f.KubeClient.CoreV1().Nodes().List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) {
options.FieldSelector = fs
return f.KubeClient.CoreV1().Nodes().Watch(options)
return f.KubeClient.CoreV1().Nodes().Watch(ctx, options)
},
}
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cancel := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cancel()
last, err := watch.UntilWithSync(ctx, lw, &corev1.Node{}, nil, fn)
if err != nil {
return err
@@ -44,17 +45,17 @@ func (f *Framework) WaitUntilNodeCondition(fn watch.ConditionFunc) error {
}
// DeleteNode deletes the vk node used by the framework
func (f *Framework) DeleteNode() error {
func (f *Framework) DeleteNode(ctx context.Context) error {
var gracePeriod int64
propagation := metav1.DeletePropagationBackground
opts := metav1.DeleteOptions{
PropagationPolicy: &propagation,
GracePeriodSeconds: &gracePeriod,
}
return f.KubeClient.CoreV1().Nodes().Delete(f.NodeName, &opts)
return f.KubeClient.CoreV1().Nodes().Delete(ctx, f.NodeName, opts)
}
// GetNode gets the vk nodeused by the framework
func (f *Framework) GetNode() (*corev1.Node, error) {
return f.KubeClient.CoreV1().Nodes().Get(f.NodeName, metav1.GetOptions{})
func (f *Framework) GetNode(ctx context.Context) (*corev1.Node, error) {
return f.KubeClient.CoreV1().Nodes().Get(ctx, f.NodeName, metav1.GetOptions{})
}

View File

@@ -47,21 +47,21 @@ func (f *Framework) CreateDummyPodObjectWithPrefix(testName string, prefix strin
}
// CreatePod creates the specified pod in the Kubernetes API.
func (f *Framework) CreatePod(pod *corev1.Pod) (*corev1.Pod, error) {
return f.KubeClient.CoreV1().Pods(f.Namespace).Create(pod)
func (f *Framework) CreatePod(ctx context.Context, pod *corev1.Pod) (*corev1.Pod, error) {
return f.KubeClient.CoreV1().Pods(f.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}
// DeletePod deletes the pod with the specified name and namespace in the Kubernetes API using the default grace period.
func (f *Framework) DeletePod(namespace, name string) error {
return f.KubeClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{})
func (f *Framework) DeletePod(ctx context.Context, namespace, name string) error {
return f.KubeClient.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{})
}
// DeletePodImmediately forcibly deletes the pod with the specified name and namespace in the Kubernetes API.
// This is equivalent to running "kubectl delete --force --grace-period 0 --namespace <namespace> pod <name>".
func (f *Framework) DeletePodImmediately(namespace, name string) error {
func (f *Framework) DeletePodImmediately(ctx context.Context, namespace, name string) error {
grace := int64(0)
propagation := metav1.DeletePropagationBackground
return f.KubeClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{
return f.KubeClient.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{
GracePeriodSeconds: &grace,
PropagationPolicy: &propagation,
})
@@ -70,22 +70,22 @@ func (f *Framework) DeletePodImmediately(namespace, name string) error {
// WaitUntilPodCondition establishes a watch on the pod with the specified name and namespace.
// Then, it waits for the specified condition function to be verified.
func (f *Framework) WaitUntilPodCondition(namespace, name string, fn watch.ConditionFunc) (*corev1.Pod, error) {
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
// Create a field selector that matches the specified Pod resource.
fs := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.namespace==%s,metadata.name==%s", namespace, name))
// Create a ListWatch so we can receive events for the matched Pod resource.
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Pods(namespace).List(options)
return f.KubeClient.CoreV1().Pods(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) {
options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Pods(namespace).Watch(options)
return f.KubeClient.CoreV1().Pods(namespace).Watch(ctx, options)
},
}
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
last, err := watch.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, fn)
if err != nil {
return nil, err
@@ -129,22 +129,24 @@ func (f *Framework) WaitUntilPodInPhase(namespace, name string, phases ...corev1
// WaitUntilPodEventWithReason establishes a watch on events involving the specified pod.
// Then, it waits for an event with the specified reason to be created/updated.
func (f *Framework) WaitUntilPodEventWithReason(pod *corev1.Pod, reason string) error {
// Watch for updates to the Event resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
// Create a field selector that matches Event resources involving the specified pod.
fs := fields.ParseSelectorOrDie(fmt.Sprintf("involvedObject.kind==Pod,involvedObject.uid==%s", pod.UID))
// Create a ListWatch so we can receive events for the matched Event resource.
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Events(pod.Namespace).List(options)
return f.KubeClient.CoreV1().Events(pod.Namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) {
options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Events(pod.Namespace).Watch(options)
return f.KubeClient.CoreV1().Events(pod.Namespace).Watch(ctx, options)
},
}
// Watch for updates to the Event resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
last, err := watch.UntilWithSync(ctx, lw, &corev1.Event{}, nil, func(event watchapi.Event) (b bool, e error) {
switch event.Type {
case watchapi.Error:
@@ -164,8 +166,8 @@ func (f *Framework) WaitUntilPodEventWithReason(pod *corev1.Pod, reason string)
return nil
}
// GetRunningPods gets the running pods from the provider of the virtual kubelet
func (f *Framework) GetRunningPods() (*corev1.PodList, error) {
// GetRunningPodsFromProvider gets the running pods from the provider of the virtual kubelet
func (f *Framework) GetRunningPodsFromProvider(ctx context.Context) (*corev1.PodList, error) {
result := &corev1.PodList{}
err := f.KubeClient.CoreV1().
@@ -175,7 +177,24 @@ func (f *Framework) GetRunningPods() (*corev1.PodList, error) {
Name(f.NodeName).
SubResource("proxy").
Suffix("runningpods/").
Do().
Do(ctx).
Into(result)
return result, err
}
// GetRunningPodsFromProvider gets the running pods from the provider of the virtual kubelet
func (f *Framework) GetRunningPodsFromKubernetes(ctx context.Context) (*corev1.PodList, error) {
result := &corev1.PodList{}
err := f.KubeClient.CoreV1().
RESTClient().
Get().
Resource("nodes").
Name(f.NodeName).
SubResource("proxy").
Suffix("pods").
Do(ctx).
Into(result)
return result, err

View File

@@ -1,6 +1,7 @@
package framework
import (
"context"
"encoding/json"
"strconv"
@@ -9,7 +10,7 @@ import (
)
// GetStatsSummary queries the /stats/summary endpoint of the virtual-kubelet and returns the Summary object obtained as a response.
func (f *Framework) GetStatsSummary() (*stats.Summary, error) {
func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
// Query the /stats/summary endpoint.
b, err := f.KubeClient.CoreV1().
RESTClient().
@@ -18,7 +19,7 @@ func (f *Framework) GetStatsSummary() (*stats.Summary, error) {
Resource("pods").
SubResource("proxy").
Name(net.JoinSchemeNamePort("http", f.NodeName, strconv.Itoa(10255))).
Suffix("/stats/summary").DoRaw()
Suffix("/stats/summary").DoRaw(ctx)
if err != nil {
return nil, err
}

View File

@@ -23,6 +23,12 @@ var (
nodeName string
)
// go1.13 compatibility cf. https://github.com/golang/go/issues/31859
var _ = func() bool {
testing.Init()
return true
}()
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "path to the kubeconfig file to use when running the test suite outside a kubernetes cluster")
flag.StringVar(&namespace, "namespace", defaultNamespace, "the name of the kubernetes namespace to use for running the test suite (i.e. where to create pods)")

View File

@@ -52,10 +52,53 @@ type TermSize struct {
// HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
func HandleContainerExec(h ContainerExecHandlerFunc) http.HandlerFunc {
// ContainerExecHandlerConfig is used to pass options to options to the container exec handler.
type ContainerExecHandlerConfig struct {
// StreamIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
StreamIdleTimeout time.Duration
// StreamCreationTimeout is the maximum time for streaming connection
StreamCreationTimeout time.Duration
}
// ContainerExecHandlerOption configures a ContainerExecHandlerConfig
// It is used as functional options passed to `HandleContainerExec`
type ContainerExecHandlerOption func(*ContainerExecHandlerConfig)
// WithExecStreamIdleTimeout sets the idle timeout for a container exec stream
func WithExecStreamIdleTimeout(dur time.Duration) ContainerExecHandlerOption {
return func(cfg *ContainerExecHandlerConfig) {
cfg.StreamIdleTimeout = dur
}
}
// WithExecStreamIdleTimeout sets the idle timeout for a container exec stream
func WithExecStreamCreationTimeout(dur time.Duration) ContainerExecHandlerOption {
return func(cfg *ContainerExecHandlerConfig) {
cfg.StreamCreationTimeout = dur
}
}
// HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
func HandleContainerExec(h ContainerExecHandlerFunc, opts ...ContainerExecHandlerOption) http.HandlerFunc {
if h == nil {
return NotImplemented
}
var cfg ContainerExecHandlerConfig
for _, o := range opts {
o(&cfg)
}
if cfg.StreamIdleTimeout == 0 {
cfg.StreamIdleTimeout = 30 * time.Second
}
if cfg.StreamCreationTimeout == 0 {
cfg.StreamCreationTimeout = 30 * time.Second
}
return handleError(func(w http.ResponseWriter, req *http.Request) error {
vars := mux.Vars(req)
@@ -73,14 +116,24 @@ func HandleContainerExec(h ContainerExecHandlerFunc) http.HandlerFunc {
return errdefs.AsInvalidInput(err)
}
idleTimeout := time.Second * 30
streamCreationTimeout := time.Second * 30
// TODO: Why aren't we using req.Context() here?
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
exec := &containerExecContext{ctx: ctx, h: h, pod: pod, namespace: namespace, container: container}
remotecommand.ServeExec(w, req, exec, "", "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
remotecommand.ServeExec(
w,
req,
exec,
"",
"",
container,
command,
streamOpts,
cfg.StreamIdleTimeout,
cfg.StreamCreationTimeout,
supportedStreamProtocols,
)
return nil
})

View File

@@ -56,16 +56,14 @@ type flushWriter struct {
}
type writeFlusher interface {
Flush() error
Flush()
Write([]byte) (int, error)
}
func (fw *flushWriter) Write(p []byte) (int, error) {
n, err := fw.w.Write(p)
if n > 0 {
if err := fw.w.Flush(); err != nil {
return n, err
}
fw.w.Flush()
}
return n, err
}

View File

@@ -18,6 +18,7 @@ import (
"context"
"io"
"net/http"
"net/url"
"strconv"
"time"
@@ -33,10 +34,71 @@ type ContainerLogsHandlerFunc func(ctx context.Context, namespace, podName, cont
// ContainerLogOpts are used to pass along options to be set on the container
// log stream.
type ContainerLogOpts struct {
Tail int
Since time.Duration
LimitBytes int
Timestamps bool
Tail int
LimitBytes int
Timestamps bool
Follow bool
Previous bool
SinceSeconds int
SinceTime time.Time
}
func parseLogOptions(q url.Values) (opts ContainerLogOpts, err error) {
if tailLines := q.Get("tailLines"); tailLines != "" {
opts.Tail, err = strconv.Atoi(tailLines)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"tailLines\""))
}
if opts.Tail < 0 {
return opts, errdefs.InvalidInputf("\"tailLines\" is %d", opts.Tail)
}
}
if follow := q.Get("follow"); follow != "" {
opts.Follow, err = strconv.ParseBool(follow)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"follow\""))
}
}
if limitBytes := q.Get("limitBytes"); limitBytes != "" {
opts.LimitBytes, err = strconv.Atoi(limitBytes)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"limitBytes\""))
}
if opts.LimitBytes < 1 {
return opts, errdefs.InvalidInputf("\"limitBytes\" is %d", opts.LimitBytes)
}
}
if previous := q.Get("previous"); previous != "" {
opts.Previous, err = strconv.ParseBool(previous)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"previous\""))
}
}
if sinceSeconds := q.Get("sinceSeconds"); sinceSeconds != "" {
opts.SinceSeconds, err = strconv.Atoi(sinceSeconds)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"sinceSeconds\""))
}
if opts.SinceSeconds < 1 {
return opts, errdefs.InvalidInputf("\"sinceSeconds\" is %d", opts.SinceSeconds)
}
}
if sinceTime := q.Get("sinceTime"); sinceTime != "" {
opts.SinceTime, err = time.Parse(time.RFC3339, sinceTime)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"sinceTime\""))
}
if opts.SinceSeconds > 0 {
return opts, errdefs.InvalidInput("both \"sinceSeconds\" and \"sinceTime\" are set")
}
}
if timestamps := q.Get("timestamps"); timestamps != "" {
opts.Timestamps, err = strconv.ParseBool(timestamps)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"timestamps\""))
}
}
return opts, nil
}
// HandleContainerLogs creates an http handler function from a provider to serve logs from a pod
@@ -55,22 +117,11 @@ func HandleContainerLogs(h ContainerLogsHandlerFunc) http.HandlerFunc {
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
if queryTail := q.Get("tailLines"); queryTail != "" {
t, err := strconv.Atoi(queryTail)
if err != nil {
return errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"tailLines\""))
}
tail = t
}
// TODO(@cpuguy83): support v1.PodLogOptions
// The kubelet decoding here is not straight forward, so this needs to be disected
opts := ContainerLogOpts{
Tail: tail,
query := req.URL.Query()
opts, err := parseLogOptions(query)
if err != nil {
return err
}
logs, err := h(ctx, namespace, pod, container, opts)

99
node/api/logs_test.go Normal file
View File

@@ -0,0 +1,99 @@
package api
import (
"fmt"
"net/url"
"testing"
"time"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
)
//func parseLogOptions(q url.Values) (opts ContainerLogOpts, err error)
func TestParseLogOptions(t *testing.T) {
//tailLines
//follow
//limitBytes
//previous
//sinceSeconds
//sinceTime
//timestamps
sinceTime, _ := time.Parse(time.RFC3339, "2020-03-20T21:07:34Z")
fmt.Printf("%+v\n", sinceTime)
testCases := []struct {
Values url.Values
Failure bool
Result ContainerLogOpts
}{
{
Values: url.Values{},
Failure: false,
Result: ContainerLogOpts{},
},
{
Values: url.Values{
"follow": {"true"},
"limitBytes": {"123"},
"previous": {"true"},
"sinceSeconds": {"10"},
"tailLines": {"99"},
"timestamps": {"true"},
},
Failure: false,
Result: ContainerLogOpts{
Follow: true,
LimitBytes: 123,
Previous: true,
SinceSeconds: 10,
Tail: 99,
Timestamps: true,
},
},
{
Values: url.Values{
"sinceSeconds": {"10"},
"sinceTime": {"2020-03-20T21:07:34Z"},
},
Failure: true,
},
{
Values: url.Values{
"sinceTime": {"2020-03-20T21:07:34Z"},
},
Failure: false,
Result: ContainerLogOpts{
SinceTime: sinceTime,
},
},
{
Values: url.Values{
"tailLines": {"-1"},
},
Failure: true,
},
{
Values: url.Values{
"limitBytes": {"0"},
},
Failure: true,
},
{
Values: url.Values{
"sinceSeconds": {"-10"},
},
Failure: true,
},
}
// follow=true&limitBytes=1&previous=true&sinceSeconds=1&sinceTime=2020-03-20T21%3A07%3A34Z&tailLines=1&timestamps=true
for i, tc := range testCases {
msg := fmt.Sprintf("test case #%d %+v failed", i+1, tc)
result, err := parseLogOptions(tc.Values)
if tc.Failure {
assert.Check(t, is.ErrorContains(err, ""), msg)
} else {
assert.NilError(t, err, msg)
assert.Check(t, is.Equal(result, tc.Result), msg)
}
}
}

View File

@@ -27,6 +27,10 @@ import (
type PodListerFunc func(context.Context) ([]*v1.Pod, error)
func HandleRunningPods(getPods PodListerFunc) http.HandlerFunc {
if getPods == nil {
return NotImplemented
}
scheme := runtime.NewScheme()
v1.SchemeBuilder.AddToScheme(scheme) //nolint:errcheck
codecs := serializer.NewCodecFactory(scheme)

View File

@@ -16,6 +16,7 @@ package api
import (
"net/http"
"time"
"github.com/gorilla/mux"
"github.com/virtual-kubelet/virtual-kubelet/log"
@@ -35,7 +36,13 @@ type ServeMux interface {
type PodHandlerConfig struct {
RunInContainer ContainerExecHandlerFunc
GetContainerLogs ContainerLogsHandlerFunc
GetPods PodListerFunc
// GetPods is meant to enumerate the pods that the provider knows about
GetPods PodListerFunc
// GetPodsFromKubernetes is meant to enumerate the pods that the node is meant to be running
GetPodsFromKubernetes PodListerFunc
GetStatsSummary PodStatsSummaryHandlerFunc
StreamIdleTimeout time.Duration
StreamCreationTimeout time.Duration
}
// PodHandler creates an http handler for interacting with pods/containers.
@@ -47,8 +54,24 @@ func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
if debug {
r.HandleFunc("/runningpods/", HandleRunningPods(p.GetPods)).Methods("GET")
}
r.HandleFunc("/pods", HandleRunningPods(p.GetPodsFromKubernetes)).Methods("GET")
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", HandleContainerLogs(p.GetContainerLogs)).Methods("GET")
r.HandleFunc("/exec/{namespace}/{pod}/{container}", HandleContainerExec(p.RunInContainer)).Methods("POST")
r.HandleFunc(
"/exec/{namespace}/{pod}/{container}",
HandleContainerExec(
p.RunInContainer,
WithExecStreamCreationTimeout(p.StreamCreationTimeout),
WithExecStreamIdleTimeout(p.StreamIdleTimeout),
),
).Methods("POST", "GET")
if p.GetStatsSummary != nil {
f := HandlePodStatsSummary(p.GetStatsSummary)
r.HandleFunc("/stats/summary", f).Methods("GET")
r.HandleFunc("/stats/summary/", f).Methods("GET")
}
r.NotFoundHandler = http.HandlerFunc(NotFound)
return r
}

View File

@@ -160,7 +160,7 @@ func TestPodLifecycle(t *testing.T) {
mp.setErrorOnDelete(errors.New("random error"))
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false)
pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
pods, err := s.client.CoreV1().Pods(testNamespace).List(ctx, metav1.ListOptions{})
assert.NilError(t, err)
assert.Assert(t, is.Len(pods.Items, 1))
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
@@ -329,7 +329,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
p1 := newPod()
p1.Status.Phase = state
// Create the Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(p1)
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, p1, metav1.CreateOptions{})
assert.NilError(t, e)
// Start the pod controller
@@ -339,7 +339,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
time.Sleep(10 * time.Millisecond)
}
p2, err := s.client.CoreV1().Pods(testNamespace).Get(p1.Name, metav1.GetOptions{})
p2, err := s.client.CoreV1().Pods(testNamespace).Get(ctx, p1.Name, metav1.GetOptions{})
assert.NilError(t, err)
// Make sure the pods have not changed
@@ -367,7 +367,7 @@ func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testin
FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.ObjectMeta.Name).String(),
}
// Setup a watch (prior to pod creation, and pod controller startup)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
assert.NilError(t, err)
defer watcher.Stop()
@@ -379,7 +379,7 @@ func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testin
podCopyWithDeletionTimestamp.DeletionGracePeriodSeconds = &deletionGracePeriod
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
podCopyWithDeletionTimestamp.DeletionTimestamp = &deletionTimestamp
_, e := s.client.CoreV1().Pods(testNamespace).Create(podCopyWithDeletionTimestamp)
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, podCopyWithDeletionTimestamp, metav1.CreateOptions{})
assert.NilError(t, e)
// Start the pod controller
@@ -415,7 +415,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
watchErrCh := make(chan error)
// Setup a watch (prior to pod creation, and pod controller startup)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
assert.NilError(t, err)
defer watcher.Stop()
// This ensures that the pod is created.
@@ -432,7 +432,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
}()
// Create the Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(p)
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, p, metav1.CreateOptions{})
assert.NilError(t, e)
log.G(ctx).Debug("Created pod")
@@ -446,7 +446,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
}
// Setup a watch to check if the pod is in running
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
@@ -471,7 +471,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
}
// Setup a watch to look for the pod eventually going away completely
watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
assert.NilError(t, err)
defer watcher2.Stop()
waitDeleteCh := make(chan error)
@@ -483,7 +483,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
}()
// Setup a watch prior to pod deletion
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
@@ -493,7 +493,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
// Delete the pod via deletiontimestamp
// 1. Get the pod
currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{})
currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(ctx, p.Name, metav1.GetOptions{})
assert.NilError(t, err)
// 2. Set the pod's deletion timestamp, version, and so on
var deletionGracePeriod int64 = 10
@@ -501,7 +501,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
currentPod.DeletionTimestamp = &deletionTimestamp
// 3. Update (overwrite) the pod
_, err = s.client.CoreV1().Pods(testNamespace).Update(currentPod)
_, err = s.client.CoreV1().Pods(testNamespace).Update(ctx, currentPod, metav1.UpdateOptions{})
assert.NilError(t, err)
select {
@@ -535,11 +535,11 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
watchErrCh := make(chan error)
// Create a Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(p)
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, p, metav1.CreateOptions{})
assert.NilError(t, e)
// Setup a watch to check if the pod is in running
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions)
assert.NilError(t, err)
defer watcher.Stop()
go func() {
@@ -576,7 +576,7 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
p.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
log.G(ctx).WithField("pod", p).Info("Updating pod")
_, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
_, err = s.client.CoreV1().Pods(p.Namespace).Update(ctx, p, metav1.UpdateOptions{})
assert.NilError(t, err)
assert.NilError(t, m.getUpdates().until(ctx, func(v int) bool { return v > 0 }))
}
@@ -603,7 +603,7 @@ func benchmarkCreatePods(ctx context.Context, b *testing.B, s *system) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
pod := newPod(randomizeUID, randomizeName)
_, err := s.client.CoreV1().Pods(pod.Namespace).Create(pod)
_, err := s.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
assert.NilError(b, err)
assert.NilError(b, ctx.Err())
}

View File

@@ -17,7 +17,6 @@ package node
import (
"context"
"encoding/json"
"fmt"
"time"
pkgerrors "github.com/pkg/errors"
@@ -31,6 +30,14 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
)
const (
// Annotation with the JSON-serialized last applied node conditions. Based on kube ctl apply. Used to calculate
// the three-way patch
virtualKubeletLastNodeAppliedNodeStatus = "virtual-kubelet.io/last-applied-node-status"
virtualKubeletLastNodeAppliedObjectMeta = "virtual-kubelet.io/last-applied-object-meta"
)
// NodeProvider is the interface used for registering a node and updating its
@@ -185,7 +192,7 @@ func (n *NodeController) Run(ctx context.Context) error {
n.statusInterval = DefaultStatusUpdateInterval
}
n.chStatusUpdate = make(chan *corev1.Node)
n.chStatusUpdate = make(chan *corev1.Node, 1)
n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) {
n.chStatusUpdate <- node
})
@@ -216,13 +223,19 @@ func (n *NodeController) Run(ctx context.Context) error {
return n.controlLoop(ctx)
}
func (n *NodeController) ensureNode(ctx context.Context) error {
err := n.updateStatus(ctx, true)
func (n *NodeController) ensureNode(ctx context.Context) (err error) {
ctx, span := trace.StartSpan(ctx, "node.ensureNode")
defer span.End()
defer func() {
span.SetStatus(err)
}()
err = n.updateStatus(ctx, true)
if err == nil || !errors.IsNotFound(err) {
return err
}
node, err := n.nodes.Create(n.n)
node, err := n.nodes.Create(ctx, n.n, metav1.CreateOptions{})
if err != nil {
return pkgerrors.Wrap(err, "error registering node with kubernetes")
}
@@ -244,7 +257,12 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
statusTimer := time.NewTimer(n.statusInterval)
defer statusTimer.Stop()
timerResetDuration := n.statusInterval
if n.disableLease {
// when resetting the timer after processing a status update, reset it to the ping interval
// (since it will be the ping timer as n.disableLease == true)
timerResetDuration = n.pingInterval
// hack to make sure this channel always blocks since we won't be using it
if !statusTimer.Stop() {
<-statusTimer.C
@@ -253,10 +271,13 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
close(n.chReady)
for {
loop := func() bool {
ctx, span := trace.StartSpan(ctx, "node.controlLoop.loop")
defer span.End()
select {
case <-ctx.Done():
return nil
return true
case updated := <-n.chStatusUpdate:
var t *time.Timer
if n.disableLease {
@@ -273,10 +294,17 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
}
n.n.Status = updated.Status
n.n.ObjectMeta = metav1.ObjectMeta{
Annotations: updated.Annotations,
Labels: updated.Labels,
Name: n.n.ObjectMeta.Name,
Namespace: n.n.ObjectMeta.Namespace,
UID: n.n.ObjectMeta.UID,
}
if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
t.Reset(n.statusInterval)
t.Reset(timerResetDuration)
case <-statusTimer.C:
if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
@@ -290,6 +318,13 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
}
pingTimer.Reset(n.pingInterval)
}
return false
}
for {
shouldTerminate := loop()
if shouldTerminate {
return nil
}
}
}
@@ -321,7 +356,13 @@ func (n *NodeController) updateLease(ctx context.Context) error {
return nil
}
func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) error {
func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) (err error) {
ctx, span := trace.StartSpan(ctx, "node.updateStatus")
defer span.End()
defer func() {
span.SetStatus(err)
}()
updateNodeStatusHeartbeat(n.n)
node, err := updateNodeStatus(ctx, n.nodes, n.n)
@@ -344,18 +385,18 @@ func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) err
}
func ensureLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) {
l, err := leases.Create(lease)
l, err := leases.Create(ctx, lease, metav1.CreateOptions{})
if err != nil {
switch {
case errors.IsNotFound(err):
log.G(ctx).WithError(err).Info("Node lease not supported")
return nil, err
case errors.IsAlreadyExists(err):
if err := leases.Delete(lease.Name, nil); err != nil && !errors.IsNotFound(err) {
if err := leases.Delete(ctx, lease.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
log.G(ctx).WithError(err).Error("could not delete old node lease")
return nil, pkgerrors.Wrap(err, "old lease exists but could not delete it")
}
l, err = leases.Create(lease)
l, err = leases.Create(ctx, lease, metav1.CreateOptions{})
}
}
@@ -380,7 +421,7 @@ func updateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *
ctx = span.WithField(ctx, "lease.expiresSeconds", *lease.Spec.LeaseDurationSeconds)
}
l, err := leases.Update(lease)
l, err := leases.Update(ctx, lease, metav1.UpdateOptions{})
if err != nil {
if errors.IsNotFound(err) {
log.G(ctx).Debug("lease not found")
@@ -401,42 +442,117 @@ func updateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *
// just so we don't have to allocate this on every get request
var emptyGetOptions = metav1.GetOptions{}
// patchNodeStatus patches node status.
// Copied from github.com/kubernetes/kubernetes/pkg/util/node
func patchNodeStatus(nodes v1.NodeInterface, nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) (*corev1.Node, []byte, error) {
patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
if err != nil {
return nil, nil, err
func prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode *corev1.Node) ([]byte, error) {
// We use these two values to calculate a patch. We use a three-way patch in order to avoid
// causing state regression server side. For example, let's consider the scenario:
/*
UML Source:
@startuml
participant VK
participant K8s
participant ExternalUpdater
note right of VK: Updates internal node conditions to [A, B]
VK->K8s: Patch Upsert [A, B]
note left of K8s: Node conditions are [A, B]
ExternalUpdater->K8s: Patch Upsert [C]
note left of K8s: Node Conditions are [A, B, C]
note right of VK: Updates internal node conditions to [A]
VK->K8s: Patch: delete B, upsert A\nThis is where things go wrong,\nbecause the patch is written to replace all node conditions\nit overwrites (drops) [C]
note left of K8s: Node Conditions are [A]\nNode condition C from ExternalUpdater is no longer present
@enduml
,--. ,---. ,---------------.
|VK| |K8s| |ExternalUpdater|
`+-' `-+-' `-------+-------'
| ,------------------------------------------!. | |
| |Updates internal node conditions to [A, B]|_\ | |
| `--------------------------------------------' | |
| Patch Upsert [A, B] | |
| -----------------------------------------------------------> |
| | |
| ,--------------------------!. | |
| |Node conditions are [A, B]|_\| |
| `----------------------------'| |
| | Patch Upsert [C] |
| | <-------------------
| | |
| ,-----------------------------!. | |
| |Node Conditions are [A, B, C]|_\| |
| `-------------------------------'| |
| ,---------------------------------------!. | |
| |Updates internal node conditions to [A]|_\ | |
| `-----------------------------------------' | |
| Patch: delete B, upsert A | |
| This is where things go wrong, | |
| because the patch is written to replace all node conditions| |
| it overwrites (drops) [C] | |
| -----------------------------------------------------------> |
| | |
,----------------------------------------------------------!. | |
|Node Conditions are [A] |_\| |
|Node condition C from ExternalUpdater is no longer present || |
`------------------------------------------------------------'+-. ,-------+-------.
|VK| |K8s| |ExternalUpdater|
`--' `---' `---------------'
*/
// In order to calculate that last patch to delete B, and upsert C, we need to know that C was added by
// "someone else". So, we keep track of our last applied value, and our current value. We then generate
// our patch based on the diff of these and *not* server side state.
oldVKStatus, ok1 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus]
oldVKObjectMeta, ok2 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta]
oldNode := corev1.Node{}
// Check if there were no labels, which means someone else probably created the node, or this is an upgrade. Either way, we will consider
// ourselves as never having written the node object before, so oldNode will be left empty. We will overwrite values if
// our new node conditions / status / objectmeta have them
if ok1 && ok2 {
err := json.Unmarshal([]byte(oldVKObjectMeta), &oldNode.ObjectMeta)
if err != nil {
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node object metadata (key: %q): %q", virtualKubeletLastNodeAppliedObjectMeta, oldVKObjectMeta)
}
err = json.Unmarshal([]byte(oldVKStatus), &oldNode.Status)
if err != nil {
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node status (key: %q): %q", virtualKubeletLastNodeAppliedNodeStatus, oldVKStatus)
}
}
updatedNode, err := nodes.Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
}
return updatedNode, patchBytes, nil
}
// newNode is the representation of the node the provider "wants"
newNode := corev1.Node{}
newNode.ObjectMeta = simplestObjectMetadata(&apiServerNode.ObjectMeta, &nodeFromProvider.ObjectMeta)
nodeFromProvider.Status.DeepCopyInto(&newNode.Status)
func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) ([]byte, error) {
oldData, err := json.Marshal(oldNode)
// virtualKubeletLastNodeAppliedObjectMeta must always be set before virtualKubeletLastNodeAppliedNodeStatus,
// otherwise we capture virtualKubeletLastNodeAppliedNodeStatus in virtualKubeletLastNodeAppliedObjectMeta,
// which is wrong
virtualKubeletLastNodeAppliedObjectMetaBytes, err := json.Marshal(newNode.ObjectMeta)
if err != nil {
return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err)
return nil, pkgerrors.Wrap(err, "Cannot marshal object meta from provider")
}
newNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta] = string(virtualKubeletLastNodeAppliedObjectMetaBytes)
// Reset spec to make sure only patch for Status or ObjectMeta is generated.
// Note that we don't reset ObjectMeta here, because:
// 1. This aligns with Nodes().UpdateStatus().
// 2. Some component does use this to update node annotations.
newNode.Spec = oldNode.Spec
newData, err := json.Marshal(newNode)
virtualKubeletLastNodeAppliedNodeStatusBytes, err := json.Marshal(newNode.Status)
if err != nil {
return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err)
return nil, pkgerrors.Wrap(err, "Cannot marshal node status from provider")
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
newNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus] = string(virtualKubeletLastNodeAppliedNodeStatusBytes)
// Generate three way patch from oldNode -> newNode, without deleting the changes in api server
// Should we use the Kubernetes serialization / deserialization libraries here?
oldNodeBytes, err := json.Marshal(oldNode)
if err != nil {
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err)
return nil, pkgerrors.Wrap(err, "Cannot marshal old node bytes")
}
return patchBytes, nil
newNodeBytes, err := json.Marshal(newNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal new node bytes")
}
apiServerNodeBytes, err := json.Marshal(apiServerNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal api server node")
}
schema, err := strategicpatch.NewPatchMetaFromStruct(&corev1.Node{})
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot get patch schema from node")
}
return strategicpatch.CreateThreeWayMergePatch(oldNodeBytes, newNodeBytes, apiServerNodeBytes, schema, true)
}
// updateNodeStatus triggers an update to the node status in Kubernetes.
@@ -445,37 +561,44 @@ func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Nod
//
// If you use this function, it is up to you to synchronize this with other operations.
// This reduces the time to second-level precision.
func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (_ *corev1.Node, retErr error) {
func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvider *corev1.Node) (_ *corev1.Node, retErr error) {
ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus")
defer func() {
span.End()
span.SetStatus(retErr)
}()
var node *corev1.Node
var updatedNode *corev1.Node
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
apiServerNode, err := nodes.Get(ctx, nodeFromProvider.Name, emptyGetOptions)
if err != nil {
return err
}
ctx = addNodeAttributes(ctx, span, apiServerNode)
log.G(ctx).Debug("got node from api server")
patchBytes, err := prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode)
if err != nil {
return pkgerrors.Wrap(err, "Cannot generate patch")
}
log.G(ctx).WithError(err).WithField("patch", string(patchBytes)).Debug("Generated three way patch")
updatedNode, err = nodes.Patch(ctx, nodeFromProvider.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
// We cannot wrap this error because the kubernetes error module doesn't understand wrapping
log.G(ctx).WithField("patch", string(patchBytes)).WithError(err).Warn("Failed to patch node status")
return err
}
return nil
})
oldNode, err := nodes.Get(n.Name, emptyGetOptions)
if err != nil {
return nil, err
}
log.G(ctx).Debug("got node from api server")
node = oldNode.DeepCopy()
node.ResourceVersion = ""
node.Status = n.Status
ctx = addNodeAttributes(ctx, span, node)
// Patch the node status to merge other changes on the node.
updated, _, err := patchNodeStatus(nodes, types.NodeName(n.Name), oldNode, node)
if err != nil {
return nil, err
}
log.G(ctx).WithField("node.resourceVersion", updated.ResourceVersion).
WithField("node.Status.Conditions", updated.Status.Conditions).
log.G(ctx).WithField("node.resourceVersion", updatedNode.ResourceVersion).
WithField("node.Status.Conditions", updatedNode.Status.Conditions).
Debug("updated node status in api server")
return updated, nil
return updatedNode, nil
}
func newLease(base *coord.Lease) *coord.Lease {
@@ -550,3 +673,31 @@ func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) con
"node.taints": taintsStringer(n.Spec.Taints),
})
}
// Provides the simplest object metadata to match the previous object. Name, namespace, UID. It copies labels and
// annotations from the second object if defined. It exempts the patch metadata
func simplestObjectMetadata(baseObjectMeta, objectMetaWithLabelsAndAnnotations *metav1.ObjectMeta) metav1.ObjectMeta {
ret := metav1.ObjectMeta{
Namespace: baseObjectMeta.Namespace,
Name: baseObjectMeta.Name,
UID: baseObjectMeta.UID,
Annotations: make(map[string]string),
}
if objectMetaWithLabelsAndAnnotations != nil {
if objectMetaWithLabelsAndAnnotations.Labels != nil {
ret.Labels = objectMetaWithLabelsAndAnnotations.Labels
} else {
ret.Labels = make(map[string]string)
}
if objectMetaWithLabelsAndAnnotations.Annotations != nil {
// We want to copy over all annotations except the special embedded ones.
for key := range objectMetaWithLabelsAndAnnotations.Annotations {
if key == virtualKubeletLastNodeAppliedNodeStatus || key == virtualKubeletLastNodeAppliedObjectMeta {
continue
}
ret.Annotations[key] = objectMetaWithLabelsAndAnnotations.Annotations[key]
}
}
}
return ret
}

View File

@@ -22,12 +22,14 @@ import (
"gotest.tools/assert"
"gotest.tools/assert/cmp"
is "gotest.tools/assert/cmp"
coord "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
watch "k8s.io/apimachinery/pkg/watch"
testclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/retry"
)
func TestNodeRun(t *testing.T) {
@@ -72,11 +74,11 @@ func testNodeRun(t *testing.T, enableLease bool) {
close(chErr)
}()
nw := makeWatch(t, nodes, testNodeCopy.Name)
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr := nw.ResultChan()
lw := makeWatch(t, leases, testNodeCopy.Name)
lw := makeWatch(ctx, t, leases, testNodeCopy.Name)
defer lw.Stop()
lr := lw.ResultChan()
@@ -130,7 +132,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
}
// trigger an async node status update
n, err := nodes.Get(testNode.Name, metav1.GetOptions{})
n, err := nodes.Get(ctx, testNode.Name, metav1.GetOptions{})
assert.NilError(t, err)
newCondition := corev1.NodeCondition{
Type: corev1.NodeConditionType("UPDATED"),
@@ -138,7 +140,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
}
n.Status.Conditions = append(n.Status.Conditions, newCondition)
nw = makeWatch(t, nodes, testNodeCopy.Name)
nw = makeWatch(ctx, t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr = nw.ResultChan()
@@ -205,7 +207,7 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
case <-node.Ready():
}
err = nodes.Delete(node.n.Name, nil)
err = nodes.Delete(ctx, node.n.Name, metav1.DeleteOptions{})
assert.NilError(t, err)
testP.triggerStatusUpdate(node.n.DeepCopy())
@@ -251,7 +253,7 @@ func TestUpdateNodeStatus(t *testing.T) {
updated, err := updateNodeStatus(ctx, nodes, n.DeepCopy())
assert.Equal(t, errors.IsNotFound(err), true, err)
_, err = nodes.Create(n)
_, err = nodes.Create(ctx, n, metav1.CreateOptions{})
assert.NilError(t, err)
updated, err = updateNodeStatus(ctx, nodes, n.DeepCopy())
@@ -265,10 +267,10 @@ func TestUpdateNodeStatus(t *testing.T) {
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(n.Status, updated.Status))
err = nodes.Delete(n.Name, nil)
err = nodes.Delete(ctx, n.Name, metav1.DeleteOptions{})
assert.NilError(t, err)
_, err = nodes.Get(n.Name, metav1.GetOptions{})
_, err = nodes.Get(ctx, n.Name, metav1.GetOptions{})
assert.Equal(t, errors.IsNotFound(err), true, err)
_, err = updateNodeStatus(ctx, nodes, updated.DeepCopy())
@@ -287,7 +289,7 @@ func TestUpdateNodeLease(t *testing.T) {
assert.Equal(t, l.Name, lease.Name)
assert.Assert(t, cmp.DeepEqual(l.Spec.HolderIdentity, lease.Spec.HolderIdentity))
compare, err := leases.Get(l.Name, emptyGetOptions)
compare, err := leases.Get(ctx, l.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Equal(t, l.Spec.RenewTime.Time.Unix(), compare.Spec.RenewTime.Time.Unix())
assert.Equal(t, compare.Name, lease.Name)
@@ -302,6 +304,324 @@ func TestUpdateNodeLease(t *testing.T) {
assert.Assert(t, cmp.DeepEqual(compare.Spec.HolderIdentity, lease.Spec.HolderIdentity))
}
// TestPingAfterStatusUpdate checks that Ping continues to be called with the specified interval
// after a node status update occurs, when leases are disabled.
//
// Timing ratios used in this test:
// ping interval (10 ms)
// maximum allowed interval = 2.5 * ping interval
// status update interval = 6 * ping interval
//
// The allowed maximum time is 2.5 times the ping interval because
// the status update resets the ping interval timer, meaning
// that there can be a full two interval durations between
// successive calls to Ping. The extra half is to allow
// for timing variations when using such short durations.
//
// Once the node controller is ready:
// send status update after 10 * ping interval
// end test after another 10 * ping interval
func TestPingAfterStatusUpdate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
nodes := c.CoreV1().Nodes()
testP := &testNodeProviderPing{}
interval := 10 * time.Millisecond
maxAllowedInterval := time.Duration(2.5 * float64(interval.Nanoseconds()))
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
WithNodeStatusUpdateInterval(interval * time.Duration(6)),
}
testNode := testNode(t)
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error, 1)
go func() {
chErr <- node.Run(ctx)
}()
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
// wait for the node to be ready
select {
case <-timer.C:
t.Fatal("timeout waiting for node to be ready")
case <-chErr:
t.Fatalf("node.Run returned earlier than expected: %v", err)
case <-node.Ready():
}
notifyTimer := time.After(interval * time.Duration(10))
select {
case <-notifyTimer:
testP.triggerStatusUpdate(testNodeCopy)
}
endTimer := time.After(interval * time.Duration(10))
select {
case <-endTimer:
break
}
assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval)
}
// Are annotations that were created before the VK existed preserved?
func TestBeforeAnnotationsPreserved(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
interval := 10 * time.Millisecond
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
}
testNode := testNode(t)
testNodeCreateCopy := testNode.DeepCopy()
testNodeCreateCopy.Annotations = map[string]string{
"beforeAnnotation": "value",
}
_, err := nodes.Create(ctx, testNodeCreateCopy, metav1.CreateOptions{})
assert.NilError(t, err)
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
// so it will trigger the race detector.
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error)
defer func() {
cancel()
assert.NilError(t, <-chErr)
}()
go func() {
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr := nw.ResultChan()
t.Log("Waiting for node to exist")
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
return true
}))
testP.notifyNodeStatus(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"testAnnotation": "value",
},
},
})
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
_, ok := e.Object.(*corev1.Node).Annotations["testAnnotation"]
return ok
}))
newNode, err := nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation"))
assert.Assert(t, is.Contains(newNode.Annotations, "beforeAnnotation"))
}
// Are conditions set by systems outside of VK preserved?
func TestManualConditionsPreserved(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
interval := 10 * time.Millisecond
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
}
testNode := testNode(t)
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
// so it will trigger the race detector.
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error)
defer func() {
cancel()
assert.NilError(t, <-chErr)
}()
go func() {
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr := nw.ResultChan()
t.Log("Waiting for node to exist")
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
receivedNode := e.Object.(*corev1.Node)
if len(receivedNode.Status.Conditions) != 0 {
return false
}
return true
}))
newNode, err := nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Len(newNode.Status.Conditions, 0))
baseCondition := corev1.NodeCondition{
Type: "BaseCondition",
Status: "Ok",
Reason: "NA",
Message: "This is the base condition. It is set by VK, and should always be there.",
}
testP.notifyNodeStatus(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"testAnnotation": "value",
},
},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
baseCondition,
},
},
})
// Wait for this (node with condition) to show up
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == baseCondition.Type {
return true
}
}
return false
}))
newNode, err = nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Len(newNode.Status.Conditions, 1))
assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation"))
// Add a new event manually
manuallyAddedCondition := corev1.NodeCondition{
Type: "ManuallyAddedCondition",
Status: "Ok",
Reason: "NA",
Message: "This is a manually added condition. Outside of VK. It should not be removed.",
}
assert.NilError(t, retry.RetryOnConflict(retry.DefaultRetry, func() error {
newNode, err = nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
if err != nil {
return err
}
newNode.Annotations["manuallyAddedAnnotation"] = "value"
newNode.Status.Conditions = append(newNode.Status.Conditions, manuallyAddedCondition)
_, err = nodes.UpdateStatus(ctx, newNode, metav1.UpdateOptions{})
return err
}))
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == manuallyAddedCondition.Type {
return true
}
}
assert.Assert(t, is.Contains(receivedNode.Annotations, "testAnnotation"))
assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation"))
return false
}))
// Let's have the VK have a new condition.
newCondition := corev1.NodeCondition{
Type: "NewCondition",
Status: "Ok",
Reason: "NA",
Message: "This is a newly added condition. It should only show up *with* / *after* ManuallyAddedCondition. It is set by the VK.",
}
// Everything but node status is ignored here
testP.notifyNodeStatus(&corev1.Node{
// Annotations is left empty
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
baseCondition,
newCondition,
},
},
})
i := 0
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == newCondition.Type {
// Wait for 2 updates / patches
if i > 2 {
return true
}
i++
}
}
return false
}))
// Make sure that all three conditions are there.
newNode, err = nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
seenConditionTypes := make([]corev1.NodeConditionType, len(newNode.Status.Conditions))
for idx := range newNode.Status.Conditions {
seenConditionTypes[idx] = newNode.Status.Conditions[idx].Type
}
assert.Assert(t, is.Contains(seenConditionTypes, baseCondition.Type))
assert.Assert(t, is.Contains(seenConditionTypes, newCondition.Type))
assert.Assert(t, is.Contains(seenConditionTypes, manuallyAddedCondition.Type))
assert.Assert(t, is.Equal(newNode.Annotations["testAnnotation"], ""))
assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation"))
t.Log(newNode.Status.Conditions)
}
func testNode(t *testing.T) *corev1.Node {
n := &corev1.Node{}
n.Name = strings.ToLower(t.Name())
@@ -311,26 +631,49 @@ func testNode(t *testing.T) *corev1.Node {
type testNodeProvider struct {
NodeProvider
statusHandlers []func(*corev1.Node)
// Callback to VK
notifyNodeStatus func(*corev1.Node)
}
func (p *testNodeProvider) NotifyNodeStatus(ctx context.Context, h func(*corev1.Node)) {
p.statusHandlers = append(p.statusHandlers, h)
p.notifyNodeStatus = h
}
func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
for _, h := range p.statusHandlers {
h(n)
}
p.notifyNodeStatus(n)
}
// testNodeProviderPing tracks the maximum time interval between calls to Ping
type testNodeProviderPing struct {
testNodeProvider
lastPingTime time.Time
maxPingInterval time.Duration
}
func (tnp *testNodeProviderPing) Ping(ctx context.Context) error {
now := time.Now()
if tnp.lastPingTime.IsZero() {
tnp.lastPingTime = now
return nil
}
if now.Sub(tnp.lastPingTime) > tnp.maxPingInterval {
tnp.maxPingInterval = now.Sub(tnp.lastPingTime)
}
tnp.lastPingTime = now
return nil
}
type watchGetter interface {
Watch(metav1.ListOptions) (watch.Interface, error)
Watch(context.Context, metav1.ListOptions) (watch.Interface, error)
}
func makeWatch(t *testing.T, wc watchGetter, name string) watch.Interface {
func makeWatch(ctx context.Context, t *testing.T, wc watchGetter, name string) watch.Interface {
t.Helper()
w, err := wc.Watch(metav1.ListOptions{FieldSelector: "name=" + name})
w, err := wc.Watch(ctx, metav1.ListOptions{FieldSelector: "name=" + name})
assert.NilError(t, err)
return w
}

View File

@@ -16,6 +16,8 @@ package node
import (
"context"
"fmt"
"time"
"github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors"
@@ -24,12 +26,23 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const (
podStatusReasonProviderFailed = "ProviderFailed"
podEventCreateFailed = "ProviderCreateFailed"
podEventCreateSuccess = "ProviderCreateSuccess"
podEventDeleteFailed = "ProviderDeleteFailed"
podEventDeleteSuccess = "ProviderDeleteSuccess"
podEventUpdateFailed = "ProviderUpdateFailed"
podEventUpdateSuccess = "ProviderUpdateSuccess"
// 151 milliseconds is just chosen as a small prime number to retry between
// attempts to get a notification from the provider to VK
notificationRetryPeriod = 151 * time.Millisecond
)
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
@@ -72,16 +85,22 @@ func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod)
log.G(ctx).Debugf("Pod %s exists, updating pod in provider", podFromProvider.Name)
if origErr := pc.provider.UpdatePod(ctx, podForProvider); origErr != nil {
pc.handleProviderError(ctx, span, origErr, pod)
pc.recorder.Event(pod, corev1.EventTypeWarning, podEventUpdateFailed, origErr.Error())
return origErr
}
log.G(ctx).Info("Updated pod in provider")
pc.recorder.Event(pod, corev1.EventTypeNormal, podEventUpdateSuccess, "Update pod in provider successfully")
}
} else {
if origErr := pc.provider.CreatePod(ctx, podForProvider); origErr != nil {
pc.handleProviderError(ctx, span, origErr, pod)
pc.recorder.Event(pod, corev1.EventTypeWarning, podEventCreateFailed, origErr.Error())
return origErr
}
log.G(ctx).Info("Created pod in provider")
pc.recorder.Event(pod, corev1.EventTypeNormal, podEventCreateSuccess, "Create pod in provider successfully")
}
return nil
}
@@ -107,6 +126,30 @@ func podsEqual(pod1, pod2 *corev1.Pod) bool {
}
func deleteGraceTimeEqual(old, new *int64) bool {
if old == nil && new == nil {
return true
}
if old != nil && new != nil {
return *old == *new
}
return false
}
// podShouldEnqueue checks if two pods equal according according to podsEqual func and DeleteTimeStamp
func podShouldEnqueue(oldPod, newPod *corev1.Pod) bool {
if !podsEqual(oldPod, newPod) {
return true
}
if !deleteGraceTimeEqual(oldPod.DeletionGracePeriodSeconds, newPod.DeletionGracePeriodSeconds) {
return true
}
if !oldPod.DeletionTimestamp.Equal(newPod.DeletionTimestamp) {
return true
}
return false
}
func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) {
podPhase := corev1.PodPending
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
@@ -123,7 +166,7 @@ func (pc *PodController) handleProviderError(ctx context.Context, span trace.Spa
"reason": pod.Status.Reason,
})
_, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod)
_, err := pc.client.Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{})
if err != nil {
logger.WithError(err).Warn("Failed to update pod status")
} else {
@@ -140,9 +183,10 @@ func (pc *PodController) deletePod(ctx context.Context, pod *corev1.Pod) error {
err := pc.provider.DeletePod(ctx, pod.DeepCopy())
if err != nil {
span.SetStatus(err)
pc.recorder.Event(pod, corev1.EventTypeWarning, podEventDeleteFailed, err.Error())
return err
}
pc.recorder.Event(pod, corev1.EventTypeNormal, podEventDeleteSuccess, "Delete pod in provider successfully")
log.G(ctx).Debug("Deleted pod from provider")
return nil
@@ -172,12 +216,14 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
kPod.Lock()
podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy()
kPod.Unlock()
if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) {
return nil
}
// We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating
// the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore
// we need to copy the pod and set ResourceVersion to 0.
podFromProvider.ResourceVersion = "0"
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil {
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(ctx, podFromProvider, metav1.UpdateOptions{}); err != nil {
span.SetStatus(err)
return pkgerrors.Wrap(err, "error while updating pod status in kubernetes")
}
@@ -195,17 +241,72 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
// enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd
// prior to enqueuePodStatusUpdate.
func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key")
} else {
if obj, ok := pc.knownPods.Load(key); ok {
kpod := obj.(*knownPod)
kpod.Lock()
kpod.lastPodStatusReceivedFromProvider = pod
kpod.Unlock()
q.AddRateLimited(key)
}
ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*maxRetries)
defer cancel()
ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate")
defer span.End()
ctx = span.WithField(ctx, "method", "enqueuePodStatusUpdate")
// TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications
// from the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation.
key, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
log.G(ctx).WithError(err).Error("Error getting pod meta namespace key")
span.SetStatus(err)
return
}
ctx = span.WithField(ctx, "key", key)
var obj interface{}
err = wait.PollImmediateUntil(notificationRetryPeriod, func() (bool, error) {
var ok bool
obj, ok = pc.knownPods.Load(key)
if ok {
return true, nil
}
// Blind sync. Partial sync is better than nothing. If this returns false, the poll loop should not be invoked
// again as it means the context has timed out.
if !cache.WaitForNamedCacheSync("enqueuePodStatusUpdate", ctx.Done(), pc.podsInformer.Informer().HasSynced) {
log.G(ctx).Warn("enqueuePodStatusUpdate proceeding with unsynced cache")
}
// The only transient error that pod lister returns is not found. The only case where not found
// should happen, and the pod *actually* exists is the above -- where we haven't been able to finish sync
// before context times out.
// The other class of errors is non-transient
_, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name)
if err != nil {
return false, err
}
// err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. This likely means
// that we're in some kind of startup synchronization issue where the provider knows about a pod (as it performs
// recover, that we do not yet know about).
return false, nil
}, ctx.Done())
if err != nil {
if errors.IsNotFound(err) {
err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err)
log.G(ctx).WithError(err).Debug("Not enqueuing pod status update")
} else {
log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister")
}
span.SetStatus(err)
return
}
kpod := obj.(*knownPod)
kpod.Lock()
if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) {
kpod.Unlock()
return
}
kpod.lastPodStatusReceivedFromProvider = pod
kpod.Unlock()
q.AddRateLimited(key)
}
func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) {
@@ -279,7 +380,7 @@ func (pc *PodController) deletePodHandler(ctx context.Context, key string) (retE
// We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update
// was in progress,
err = pc.client.Pods(namespace).Delete(name, metav1.NewDeleteOptions(0))
err = pc.client.Pods(namespace).Delete(ctx, name, *metav1.NewDeleteOptions(0))
if errors.IsNotFound(err) {
return nil
}

View File

@@ -23,6 +23,7 @@ import (
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
@@ -48,9 +49,11 @@ func newTestController() *TestController {
recorder: testutil.FakeEventRecorder(5),
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
podStatusQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
done: make(chan struct{}),
ready: make(chan struct{}),
podsInformer: iFactory.Core().V1().Pods(),
podsLister: iFactory.Core().V1().Pods().Lister(),
},
mock: p,
client: fk8s,
@@ -108,6 +111,76 @@ func TestPodsDifferentIgnoreValue(t *testing.T) {
assert.Assert(t, podsEqual(p1, p2))
}
func TestPodShouldEnqueueDifferentDeleteTimeStamp(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
now := v1.NewTime(time.Now())
p2.DeletionTimestamp = &now
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueDifferentLabel(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
p2.Labels = map[string]string{"test": "test"}
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueDifferentAnnotation(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
p2.Annotations = map[string]string{"test": "test"}
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldNotEnqueueDifferentStatus(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
p2.Status.Phase = corev1.PodSucceeded
assert.Assert(t, !podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueDifferentDeleteGraceTime(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
oldTime := v1.NewTime(time.Now().Add(5))
newTime := v1.NewTime(time.Now().Add(10))
oldGraceTime := int64(5)
newGraceTime := int64(10)
p1.DeletionGracePeriodSeconds = &oldGraceTime
p2.DeletionTimestamp = &oldTime
p2.DeletionGracePeriodSeconds = &newGraceTime
p2.DeletionTimestamp = &newTime
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueGraceTimeChanged(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
graceTime := int64(30)
p2.DeletionGracePeriodSeconds = &graceTime
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodCreateNewPod(t *testing.T) {
svr := newTestController()

View File

@@ -83,7 +83,8 @@ type PodNotifier interface {
// fashion. The provided pod's PodStatus should be up to date when
// this function is called.
//
// NotifyPods will not block callers.
// NotifyPods must not block the caller since it is only used to register the callback.
// The callback passed into `NotifyPods` may block when called.
NotifyPods(context.Context, func(*corev1.Pod))
}
@@ -108,6 +109,8 @@ type PodController struct {
// deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period
deletionQ workqueue.RateLimitingInterface
podStatusQ workqueue.RateLimitingInterface
// From the time of creation, to termination the knownPods map will contain the pods key
// (derived from Kubernetes' cache library) -> a *knownPod struct.
knownPods sync.Map
@@ -158,6 +161,9 @@ type PodControllerConfig struct {
ConfigMapInformer corev1informers.ConfigMapInformer
SecretInformer corev1informers.SecretInformer
ServiceInformer corev1informers.ServiceInformer
// RateLimiter defines the rate limit of work queue
RateLimiter workqueue.RateLimiter
}
// NewPodController creates a new pod controller with the provided config.
@@ -183,7 +189,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.Provider == nil {
return nil, errdefs.InvalidInput("missing provider")
}
if cfg.RateLimiter == nil {
cfg.RateLimiter = workqueue.DefaultControllerRateLimiter()
}
rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister())
if err != nil {
return nil, pkgerrors.Wrap(err, "could not create resource manager")
@@ -198,8 +206,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
ready: make(chan struct{}),
done: make(chan struct{}),
recorder: cfg.EventRecorder,
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"),
k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"),
podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"),
}
return pc, nil
@@ -242,13 +251,12 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
}
pc.provider = provider
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
provider.NotifyPods(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
pc.enqueuePodStatusUpdate(ctx, pc.podStatusQ, pod.DeepCopy())
})
go runProvider(ctx)
defer podStatusQueue.ShutDown()
defer pc.podStatusQ.ShutDown()
// Wait for the caches to be synced *before* starting to do work.
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
@@ -270,13 +278,16 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Create a copy of the old and new pod objects so we don't mutate the cache.
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.G(ctx).Error(err)
} else {
pc.k8sQ.AddRateLimited(key)
if podShouldEnqueue(oldPod, newPod) {
pc.k8sQ.AddRateLimited(key)
}
}
},
DeleteFunc: func(pod interface{}) {
@@ -305,7 +316,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
workerID := strconv.Itoa(id)
go func() {
defer wg.Done()
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, podStatusQueue)
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, pc.podStatusQ)
}()
}
@@ -333,7 +344,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
<-ctx.Done()
log.G(ctx).Info("shutting down workers")
pc.k8sQ.ShutDown()
podStatusQueue.ShutDown()
pc.podStatusQ.ShutDown()
pc.deletionQ.ShutDown()
wg.Wait()

View File

@@ -152,12 +152,14 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
defer span.End()
ctx = addPodAttributes(ctx, span, podFromKubernetes)
var statusErr error
podStatus, err := p.PodLifecycleHandler.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
if err != nil {
if !errdefs.IsNotFound(err) {
span.SetStatus(err)
return err
}
statusErr = err
}
if podStatus != nil {
pod := podFromKubernetes.DeepCopy()
@@ -168,6 +170,7 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
key, err := cache.MetaNamespaceKeyFunc(podFromKubernetes)
if err != nil {
span.SetStatus(err)
return err
}
@@ -176,32 +179,35 @@ func (p *syncProviderWrapper) updatePodStatus(ctx context.Context, podFromKubern
return nil
}
// Only change the status when the pod was already up.
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute {
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
podStatus = podFromKubernetes.Status.DeepCopy()
podStatus.Phase = corev1.PodFailed
podStatus.Reason = podStatusReasonNotFound
podStatus.Message = podStatusMessageNotFound
now := metav1.NewTime(time.Now())
for i, c := range podStatus.ContainerStatuses {
if c.State.Running == nil {
continue
}
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: containerStatusExitCodeNotFound,
Reason: containerStatusReasonNotFound,
Message: containerStatusMessageNotFound,
FinishedAt: now,
StartedAt: c.State.Running.StartedAt,
ContainerID: c.ContainerID,
}
podStatus.ContainerStatuses[i].State.Running = nil
}
log.G(ctx).Debug("Setting pod not found on pod status")
if podFromKubernetes.Status.Phase != corev1.PodRunning && time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) <= time.Minute {
span.SetStatus(statusErr)
return statusErr
}
// Only change the status when the pod was already up.
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
podStatus = podFromKubernetes.Status.DeepCopy()
podStatus.Phase = corev1.PodFailed
podStatus.Reason = podStatusReasonNotFound
podStatus.Message = podStatusMessageNotFound
now := metav1.NewTime(time.Now())
for i, c := range podStatus.ContainerStatuses {
if c.State.Running == nil {
continue
}
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
ExitCode: containerStatusExitCodeNotFound,
Reason: containerStatusReasonNotFound,
Message: containerStatusMessageNotFound,
FinishedAt: now,
StartedAt: c.State.Running.StartedAt,
ContainerID: c.ContainerID,
}
podStatus.ContainerStatuses[i].State.Running = nil
}
log.G(ctx).Debug("Setting pod not found on pod status")
pod := podFromKubernetes.DeepCopy()
podStatus.DeepCopyInto(&pod.Status)
p.notify(pod)

View File

@@ -1,6 +1,7 @@
package e2e
import (
"context"
"fmt"
"testing"
"time"
@@ -18,17 +19,64 @@ const (
deleteGracePeriodForProvider = 1 * time.Second
)
// TestGetPods tests that the /pods endpoint works, and only returns pods for our kubelet
func (ts *EndToEndTestSuite) TestGetPods(t *testing.T) {
ctx := context.Background()
// Create a pod with prefix "nginx-" having a single container.
podSpec := f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo")
podSpec.Spec.NodeName = f.NodeName
nginx, err := f.CreatePod(ctx, podSpec)
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, nginx.Namespace, nginx.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
t.Logf("Created pod: %s", nginx.Name)
// Wait for the "nginx-" pod to be reported as running and ready.
if _, err := f.WaitUntilPodReady(nginx.Namespace, nginx.Name); err != nil {
t.Fatal(err)
}
t.Logf("Pod %s ready", nginx.Name)
k8sPods, err := f.GetRunningPodsFromKubernetes(ctx)
if err != nil {
t.Fatal(err)
}
podFound := false
for _, pod := range k8sPods.Items {
if pod.Spec.NodeName != f.NodeName {
t.Fatalf("Found pod with node name %s, whereas expected %s", pod.Spec.NodeName, f.NodeName)
}
if pod.UID == nginx.UID {
podFound = true
}
}
if !podFound {
t.Fatal("Nginx pod not found")
}
}
// TestGetStatsSummary creates a pod having two containers and queries the /stats/summary endpoint of the virtual-kubelet.
// It expects this endpoint to return stats for the current node, as well as for the aforementioned pod and each of its two containers.
func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) {
ctx := context.Background()
// Create a pod with prefix "nginx-" having three containers.
pod, err := f.CreatePod(f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo", "bar", "baz"))
pod, err := f.CreatePod(ctx, f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo", "bar", "baz"))
if err != nil {
t.Fatal(err)
}
// Delete the "nginx-0-X" pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -39,7 +87,7 @@ func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) {
}
// Grab the stats from the provider.
stats, err := f.GetStatsSummary()
stats, err := f.GetStatsSummary(ctx)
if err != nil {
t.Fatal(err)
}
@@ -68,17 +116,19 @@ func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) {
// These verifications are made using the /stats/summary endpoint of the virtual-kubelet, by checking for the presence or absence of the pods.
// Hence, the provider being tested must implement the PodMetricsProvider interface.
func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
ctx := context.Background()
// Create a pod with prefix "nginx-" having a single container.
podSpec := f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo")
podSpec.Spec.NodeName = f.NodeName
pod, err := f.CreatePod(podSpec)
pod, err := f.CreatePod(ctx, podSpec)
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -91,7 +141,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
t.Logf("Pod %s ready", pod.Name)
// Grab the pods from the provider.
pods, err := f.GetRunningPods()
pods, err := f.GetRunningPodsFromProvider(ctx)
assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats.
@@ -112,7 +162,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
}()
// Gracefully delete the "nginx-" pod.
if err := f.DeletePod(pod.Namespace, pod.Name); err != nil {
if err := f.DeletePod(ctx, pod.Namespace, pod.Name); err != nil {
t.Fatal(err)
}
t.Logf("Deleted pod: %s", pod.Name)
@@ -125,7 +175,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
time.Sleep(deleteGracePeriodForProvider)
// Give the provider some time to react to the MODIFIED/DELETED events before proceeding.
// Grab the pods from the provider.
pods, err = f.GetRunningPods()
pods, err = f.GetRunningPodsFromProvider(ctx)
assert.NilError(t, err)
// Make sure the pod DOES NOT exist in the provider's set of running pods
@@ -141,15 +191,17 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
// and put them in the running lifecycle. It then does a force delete on the pod, and verifies the provider
// has deleted it.
func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
ctx := context.Background()
podSpec := f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo")
// Create a pod with prefix having a single container.
pod, err := f.CreatePod(podSpec)
pod, err := f.CreatePod(ctx, podSpec)
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -162,7 +214,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
t.Logf("Pod %s ready", pod.Name)
// Grab the pods from the provider.
pods, err := f.GetRunningPods()
pods, err := f.GetRunningPodsFromProvider(ctx)
assert.NilError(t, err)
// Check if the pod exists in the slice of Pods.
@@ -188,7 +240,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
time.Sleep(deleteGracePeriodForProvider)
// Forcibly delete the pod.
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil {
t.Logf("Last saw pod in state: %+v", podLast)
t.Fatal(err)
}
@@ -203,7 +255,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
time.Sleep(deleteGracePeriodForProvider)
// Grab the pods from the provider.
pods, err = f.GetRunningPods()
pods, err = f.GetRunningPodsFromProvider(ctx)
assert.NilError(t, err)
// Make sure the "nginx-" pod DOES NOT exist in the slice of Pods anymore.
@@ -216,15 +268,17 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
// TestCreatePodWithOptionalInexistentSecrets tries to create a pod referencing optional, inexistent secrets.
// It then verifies that the pod is created successfully.
func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentSecrets(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing optional, inexistent secrets.
pod, err := f.CreatePod(f.CreatePodObjectWithOptionalSecretKey(t.Name()))
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithOptionalSecretKey(t.Name()))
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -240,7 +294,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentSecrets(t *testi
}
// Grab the pods from the provider.
pods, err := f.GetRunningPods()
pods, err := f.GetRunningPodsFromProvider(ctx)
assert.NilError(t, err)
// Check if the pod exists in the slice of Pods.
@@ -250,15 +304,17 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentSecrets(t *testi
// TestCreatePodWithMandatoryInexistentSecrets tries to create a pod referencing inexistent secrets.
// It then verifies that the pod is not created.
func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentSecrets(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing inexistent secrets.
pod, err := f.CreatePod(f.CreatePodObjectWithMandatorySecretKey(t.Name()))
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithMandatorySecretKey(t.Name()))
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -269,7 +325,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentSecrets(t *test
}
// Grab the pods from the provider.
pods, err := f.GetRunningPods()
pods, err := f.GetRunningPodsFromProvider(ctx)
assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats.
@@ -279,15 +335,17 @@ func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentSecrets(t *test
// TestCreatePodWithOptionalInexistentConfigMap tries to create a pod referencing optional, inexistent config map.
// It then verifies that the pod is created successfully.
func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentConfigMap(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing optional, inexistent config map.
pod, err := f.CreatePod(f.CreatePodObjectWithOptionalConfigMapKey(t.Name()))
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithOptionalConfigMapKey(t.Name()))
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -303,7 +361,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentConfigMap(t *tes
}
// Grab the pods from the provider.
pods, err := f.GetRunningPods()
pods, err := f.GetRunningPodsFromProvider(ctx)
assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats.
@@ -313,15 +371,17 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentConfigMap(t *tes
// TestCreatePodWithMandatoryInexistentConfigMap tries to create a pod referencing inexistent secrets.
// It then verifies that the pod is not created.
func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentConfigMap(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing inexistent config map.
pod, err := f.CreatePod(f.CreatePodObjectWithMandatoryConfigMapKey(t.Name()))
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithMandatoryConfigMapKey(t.Name()))
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
@@ -332,7 +392,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentConfigMap(t *te
}
// Grab the pods from the provider.
pods, err := f.GetRunningPods()
pods, err := f.GetRunningPodsFromProvider(ctx)
assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats.

View File

@@ -19,7 +19,7 @@ func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
podList, err := f.KubeClient.CoreV1().Pods(f.Namespace).List(metav1.ListOptions{
podList, err := f.KubeClient.CoreV1().Pods(f.Namespace).List(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", f.NodeName).String(),
})
@@ -28,7 +28,7 @@ func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) {
chErr := make(chan error, 1)
originalNode, err := f.GetNode()
originalNode, err := f.GetNode(ctx)
assert.NilError(t, err)
ctx, cancel = context.WithTimeout(ctx, time.Minute)
@@ -50,7 +50,7 @@ func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) {
chErr <- f.WaitUntilNodeCondition(wait)
}()
assert.NilError(t, f.DeleteNode())
assert.NilError(t, f.DeleteNode(ctx))
select {
case result := <-chErr:

View File

@@ -10,7 +10,7 @@ You can install Virtual Kubelet by building it [from source](#source). First, ma
mkdir -p ${GOPATH}/src/github.com/virtual-kubelet
cd ${GOPATH}/src/github.com/virtual-kubelet
git clone https://github.com/virtual-kubelet/virtual-kubelet
make build
cd virtual-kubelet && make build
```
This method adds a `virtual-kubelet` executable to the `bin` folder. To run it:

View File

@@ -1,3 +1,7 @@
- name: Admiralty Multi-Cluster Scheduler
tag: multicluster-scheduler
org: admiraltyio
vanityImportPath: admiralty.io/multicluster-scheduler
- name: Alibaba Cloud Elastic Container Instance (**ECI**)
tag: alibabacloud-eci
- name: AWS Fargate
@@ -6,6 +10,9 @@
tag: azure-batch
- name: Azure Container Instances (**ACI**)
tag: azure-aci
- name: Elotl Kip
tag: kip
org: elotl
- name: Kubernetes Container Runtime Interface (**CRI**)
tag: cri
- name: Huawei Cloud Container Instance (**CCI**)
@@ -14,3 +21,5 @@
tag: nomad
- name: OpenStack Zun
tag: openstack-zun
- name: Tencent Games Tensile Kube
tag: tensile-kube

View File

@@ -25,7 +25,7 @@
<ul>
{{ range $providers }}
{{ $url := printf "https://github.com/virtual-kubelet/%s/blob/master/README.md#readme" .tag }}
{{ $url := printf "https://github.com/%s/%s/blob/master/README.md#readme" (.org | default "virtual-kubelet") .tag }}
<li class="has-bottom-spacing">
<a class="is-size-5 is-size-6-mobile has-text-grey-lighter has-text-weight-light" href="{{ $url }}" target="_blank">
{{ .name | markdownify }}

View File

@@ -3,8 +3,9 @@
<tbody>
{{ range $providers }}
{{ $name := .name | markdownify }}
{{ $pkgName := printf "github.com/virtual-kubelet/%s" .tag }}
{{ $pkgUrl := printf "https://github.com/virtual-kubelet/%s/blob/master/README.md#readme" .tag }}
{{ $githubPath := printf "github.com/%s/%s" (.org | default "virtual-kubelet") .tag }}
{{ $pkgName := .vanityImportPath | default $githubPath }}
{{ $pkgUrl := printf "https://%s/blob/master/README.md#readme" $githubPath }}
{{ $godocUrl := printf "https://godoc.org/%s" $pkgName }}
<tr>
<td>