Compare commits

..

6 Commits

Author SHA1 Message Date
Brian Goff
4e399f93b1 Merge pull request #868 from cpuguy83/1.2_fix_missing_stats_route
Add GetStatsSummary to PodHandlerConfig
2020-07-30 14:02:41 -07:00
Vilmos Nebehaj
c621acc8d2 Add GetStatsSummary to PodHandlerConfig
If both the metrics routes and the pod routes are attached to the same
mux with the pattern "/", it will panic. Instead, add the stats handler
function to PodHandlerConfig and set up the route if it is not nil.

(cherry picked from commit 56b248c854)
Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2020-07-30 10:25:34 -07:00
Brian Goff
e6e1dbed87 Merge pull request #794 from cpuguy83/cherry_picks_1.2.1
Cherry picks 1.2.1
2019-11-15 15:19:11 -08:00
Thomas Hartland
eb9498cdde Add test for node ping interval
(cherry picked from commit 3783a39b26)
2019-11-15 14:31:04 -08:00
Thomas Hartland
df16317a89 After handling status update, reset update timer with correct duration
If the ping timer is being used, it should be reset with the ping update
interval. If the status update interval is used then Ping stops being
called for long enough to cause kubernetes to mark the node as NotReady.

(cherry picked from commit c258614d8f)
2019-11-15 14:30:54 -08:00
Brian Goff
7585e11542 [Sync Provider] Fix panic on not found pod status
(cherry picked from commit 6e33b0f084)
2019-11-15 14:30:22 -08:00
34 changed files with 455 additions and 1715 deletions

View File

@@ -3,7 +3,7 @@ jobs:
validate: validate:
resource_class: xlarge resource_class: xlarge
docker: docker:
- image: circleci/golang:1.13 - image: circleci/golang:1.12
environment: environment:
GO111MODULE: "on" GO111MODULE: "on"
GOPROXY: https://proxy.golang.org GOPROXY: https://proxy.golang.org
@@ -33,7 +33,7 @@ jobs:
test: test:
resource_class: xlarge resource_class: xlarge
docker: docker:
- image: circleci/golang:1.13 - image: circleci/golang:1.12
environment: environment:
GO111MODULE: "on" GO111MODULE: "on"
working_directory: /go/src/github.com/virtual-kubelet/virtual-kubelet working_directory: /go/src/github.com/virtual-kubelet/virtual-kubelet
@@ -61,7 +61,7 @@ jobs:
CHANGE_MINIKUBE_NONE_USER: true CHANGE_MINIKUBE_NONE_USER: true
GOPATH: /home/circleci/go GOPATH: /home/circleci/go
KUBECONFIG: /home/circleci/.kube/config KUBECONFIG: /home/circleci/.kube/config
KUBERNETES_VERSION: v1.17.6 KUBERNETES_VERSION: v1.15.2
MINIKUBE_HOME: /home/circleci MINIKUBE_HOME: /home/circleci
MINIKUBE_VERSION: v1.2.0 MINIKUBE_VERSION: v1.2.0
MINIKUBE_WANTUPDATENOTIFICATION: false MINIKUBE_WANTUPDATENOTIFICATION: false
@@ -117,7 +117,7 @@ jobs:
command: | command: |
mkdir $HOME/.go mkdir $HOME/.go
export PATH=$HOME/.go/bin:${PATH} export PATH=$HOME/.go/bin:${PATH}
curl -fsSL -o "/tmp/go.tar.gz" "https://dl.google.com/go/go1.13.12.linux-amd64.tar.gz" curl -fsSL -o "/tmp/go.tar.gz" "https://dl.google.com/go/go1.12.6.linux-amd64.tar.gz"
tar -C $HOME/.go --strip-components=1 -xzf "/tmp/go.tar.gz" tar -C $HOME/.go --strip-components=1 -xzf "/tmp/go.tar.gz"
go version go version
make e2e make e2e

View File

@@ -6,9 +6,6 @@
* VMWare * VMWare
* Netflix * Netflix
* Hashi Corp * Hashi Corp
* Admiralty
* Elotl
* Tencent Games
Since end-users are specific per provider within VK we have many end-user customers that we don't have permission to list publically. Please contact ribhatia@microsoft.com for more informtation. Since end-users are specific per provider within VK we have many end-user customers that we don't have permission to list publically. Please contact ribhatia@microsoft.com for more informtation.

View File

@@ -1,4 +1,4 @@
FROM golang:1.13 as builder FROM golang:1.12 as builder
ENV PATH /go/bin:/usr/local/go/bin:$PATH ENV PATH /go/bin:/usr/local/go/bin:$PATH
ENV GOPATH /go ENV GOPATH /go
COPY . /go/src/github.com/virtual-kubelet/virtual-kubelet COPY . /go/src/github.com/virtual-kubelet/virtual-kubelet

View File

@@ -13,7 +13,7 @@ include Makefile.e2e
# Also, we will want to lock our tool versions using go mod: # Also, we will want to lock our tool versions using go mod:
# https://github.com/golang/go/wiki/Modules#how-can-i-track-tool-dependencies-for-a-module # https://github.com/golang/go/wiki/Modules#how-can-i-track-tool-dependencies-for-a-module
gobin_tool ?= $(shell which gobin || echo $(GOPATH)/bin/gobin) gobin_tool ?= $(shell which gobin || echo $(GOPATH)/bin/gobin)
goimports := golang.org/x/tools/cmd/goimports@release-branch.go1.13 goimports := golang.org/x/tools/cmd/goimports@release-branch.go1.12
gocovmerge := github.com/wadey/gocovmerge@b5bfa59ec0adc420475f97f89b58045c721d761c gocovmerge := github.com/wadey/gocovmerge@b5bfa59ec0adc420475f97f89b58045c721d761c
goreleaser := github.com/goreleaser/goreleaser@v0.82.2 goreleaser := github.com/goreleaser/goreleaser@v0.82.2
gox := github.com/mitchellh/gox@v1.0.1 gox := github.com/mitchellh/gox@v1.0.1
@@ -119,6 +119,8 @@ format: goimports
$Q find . -iname \*.go | grep -v \ $Q find . -iname \*.go | grep -v \
-e "^$$" $(addprefix -e ,$(IGNORED_PACKAGES)) | xargs $(gobin_tool) -run $(goimports) -w -e "^$$" $(addprefix -e ,$(IGNORED_PACKAGES)) | xargs $(gobin_tool) -run $(goimports) -w
##### =====> Internals <===== ##### ##### =====> Internals <===== #####
.PHONY: setup .PHONY: setup

View File

@@ -2,7 +2,7 @@
Virtual Kubelet is an open source [Kubernetes kubelet](https://kubernetes.io/docs/reference/generated/kubelet/) Virtual Kubelet is an open source [Kubernetes kubelet](https://kubernetes.io/docs/reference/generated/kubelet/)
implementation that masquerades as a kubelet for the purposes of connecting Kubernetes to other APIs. implementation that masquerades as a kubelet for the purposes of connecting Kubernetes to other APIs.
This allows the nodes to be backed by other services like ACI, AWS Fargate, [IoT Edge](https://github.com/Azure/iot-edge-virtual-kubelet-provider), [Tensile Kube](https://github.com/virtual-kubelet/tensile-kube) etc. The primary scenario for VK is enabling the extension of the Kubernetes API into serverless container platforms like ACI and Fargate, though we are open to others. However, it should be noted that VK is explicitly not intended to be an alternative to Kubernetes federation. This allows the nodes to be backed by other services like ACI, AWS Fargate, [IoT Edge](https://github.com/Azure/iot-edge-virtual-kubelet-provider) etc. The primary scenario for VK is enabling the extension of the Kubernetes API into serverless container platforms like ACI and Fargate, though we are open to others. However, it should be noted that VK is explicitly not intended to be an alternative to Kubernetes federation.
Virtual Kubelet features a pluggable architecture and direct use of Kubernetes primitives, making it much easier to build on. Virtual Kubelet features a pluggable architecture and direct use of Kubernetes primitives, making it much easier to build on.
@@ -16,15 +16,12 @@ The best description is "Kubernetes API on top, programmable back."
* [How It Works](#how-it-works) * [How It Works](#how-it-works)
* [Usage](#usage) * [Usage](#usage)
* [Providers](#providers) * [Providers](#providers)
+ [Admiralty Multi-Cluster Scheduler](#admiralty-multi-cluster-scheduler)
+ [Alibaba Cloud ECI Provider](#alibaba-cloud-eci-provider) + [Alibaba Cloud ECI Provider](#alibaba-cloud-eci-provider)
+ [Azure Container Instances Provider](#azure-container-instances-provider) + [Azure Container Instances Provider](#azure-container-instances-provider)
+ [Azure Batch GPU Provider](https://github.com/virtual-kubelet/azure-batch/blob/master/README.md) + [Azure Batch GPU Provider](https://github.com/virtual-kubelet/azure-batch/blob/master/README.md)
+ [AWS Fargate Provider](#aws-fargate-provider) + [AWS Fargate Provider](#aws-fargate-provider)
+ [Elotl Kip](#elotl-kip)
+ [HashiCorp Nomad](#hashicorp-nomad-provider) + [HashiCorp Nomad](#hashicorp-nomad-provider)
+ [OpenStack Zun](#openstack-zun-provider) + [OpenStack Zun](#openstack-zun-provider)
+ [Tensile Kube Provider](#tensile-kube-provider)
+ [Adding a New Provider via the Provider Interface](#adding-a-new-provider-via-the-provider-interface) + [Adding a New Provider via the Provider Interface](#adding-a-new-provider-via-the-provider-interface)
* [Testing](#testing) * [Testing](#testing)
+ [Unit tests](#unit-tests) + [Unit tests](#unit-tests)
@@ -76,9 +73,6 @@ Providers must provide the following functionality to be considered a supported
2. Conforms to the current API provided by Virtual Kubelet. 2. Conforms to the current API provided by Virtual Kubelet.
3. Does not have access to the Kubernetes API Server and has a well-defined callback mechanism for getting data like secrets or configmaps. 3. Does not have access to the Kubernetes API Server and has a well-defined callback mechanism for getting data like secrets or configmaps.
### Admiralty Multi-Cluster Scheduler
Admiralty Multi-Cluster Scheduler mutates annotated pods into "proxy pods" scheduled on a virtual-kubelet node and creates corresponding "delegate pods" in remote clusters (actually running the containers). A feedback loop updates the statuses and annotations of the proxy pods to reflect the statuses and annotations of the delegate pods. You can find more details in the [Admiralty Multi-Cluster Scheduler documentation](https://github.com/admiraltyio/multicluster-scheduler).
### Alibaba Cloud ECI Provider ### Alibaba Cloud ECI Provider
@@ -118,12 +112,6 @@ co-exist with pods on regular worker nodes in the same Kubernetes cluster.
Easy instructions and a sample configuration file is available in the [AWS Fargate provider documentation](https://github.com/virtual-kubelet/aws-fargate). Please note that this provider is not currently supported. Easy instructions and a sample configuration file is available in the [AWS Fargate provider documentation](https://github.com/virtual-kubelet/aws-fargate). Please note that this provider is not currently supported.
### Elotl Kip
[Kip](https://github.com/elotl/kip) is a provider that runs pods in cloud instances, allowing a Kubernetes cluster to transparently scale workloads into a cloud. When a pod is scheduled onto the virtual node, Kip starts a right-sized cloud instance for the pod's workload and dispatches the pod onto the instance. When the pod is finished running, the cloud instance is terminated.
When workloads run on Kip, your cluster size naturally scales with the cluster workload, pods are strongly isolated from each other and the user is freed from managing worker nodes and strategically packing pods onto nodes.
### HashiCorp Nomad Provider ### HashiCorp Nomad Provider
HashiCorp [Nomad](https://nomadproject.io) provider for Virtual Kubelet connects your Kubernetes cluster HashiCorp [Nomad](https://nomadproject.io) provider for Virtual Kubelet connects your Kubernetes cluster
@@ -149,11 +137,6 @@ and bind-mount Cinder volumes into a path inside a pod's container.
For detailed instructions, follow the guide [here](https://github.com/virtual-kubelet/openstack-zun/blob/master/README.md). For detailed instructions, follow the guide [here](https://github.com/virtual-kubelet/openstack-zun/blob/master/README.md).
### Tensile Kube Provider
[Tensile kube](https://github.com/virtual-kubelet/tensile-kube/blob/master/README.md) is contributed by [tencent
games](https://game.qq.com), which is provider for Virtual Kubelet connects your Kubernetes cluster with other Kubernetes clusters. This provider enables us extending Kubernetes to an unlimited one. By using the provider, pods that are scheduled on the virtual node registered on Kubernetes will run as jobs on other Kubernetes clusters' nodes.
### Adding a New Provider via the Provider Interface ### Adding a New Provider via the Provider Interface
Providers consume this project as a library which implements the core logic of Providers consume this project as a library which implements the core logic of
@@ -293,7 +276,7 @@ Enable the ServiceNodeExclusion flag, by modifying the Controller Manager manife
Virtual Kubelet follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md). Virtual Kubelet follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).
Sign the [CNCF CLA](https://github.com/kubernetes/community/blob/master/CLA.md) to be able to make Pull Requests to this repo. Sign the [CNCF CLA](https://github.com/kubernetes/community/blob/master/CLA.md) to be able to make Pull Requests to this repo.
Monthly Virtual Kubelet Office Hours are held at 10am PST on the last Thursday of every month in this [zoom meeting room](https://zoom.us/j/94701509915). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ). Bi-weekly Virtual Kubelet Architecture meetings are held at 11am PST every other Wednesday in this [zoom meeting room](https://zoom.us/j/245165908). Check out the calendar [here](https://calendar.google.com/calendar?cid=bjRtbGMxYWNtNXR0NXQ1a2hqZmRkNTRncGNAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ).
Our google drive with design specifications and meeting notes are [here](https://drive.google.com/drive/folders/19Ndu11WBCCBDowo9CrrGUHoIfd2L8Ueg?usp=sharing). Our google drive with design specifications and meeting notes are [here](https://drive.google.com/drive/folders/19Ndu11WBCCBDowo9CrrGUHoIfd2L8Ueg?usp=sharing).

View File

@@ -81,11 +81,6 @@ func installFlags(flags *pflag.FlagSet, c *Opts) {
flags.DurationVar(&c.InformerResyncPeriod, "full-resync-period", c.InformerResyncPeriod, "how often to perform a full resync of pods between kubernetes and the provider") flags.DurationVar(&c.InformerResyncPeriod, "full-resync-period", c.InformerResyncPeriod, "how often to perform a full resync of pods between kubernetes and the provider")
flags.DurationVar(&c.StartupTimeout, "startup-timeout", c.StartupTimeout, "How long to wait for the virtual-kubelet to start") flags.DurationVar(&c.StartupTimeout, "startup-timeout", c.StartupTimeout, "How long to wait for the virtual-kubelet to start")
flags.DurationVar(&c.StreamIdleTimeout, "stream-idle-timeout", c.StreamIdleTimeout,
"stream-idle-timeout is the maximum time a streaming connection can be idle before the connection is"+
" automatically closed, default 30s.")
flags.DurationVar(&c.StreamCreationTimeout, "stream-creation-timeout", c.StreamCreationTimeout,
"stream-creation-timeout is the maximum time for streaming connection, default 30s.")
flagset := flag.NewFlagSet("klog", flag.PanicOnError) flagset := flag.NewFlagSet("klog", flag.PanicOnError)
klog.InitFlags(flagset) klog.InitFlags(flagset)

View File

@@ -22,7 +22,6 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider" "github.com/virtual-kubelet/virtual-kubelet/cmd/virtual-kubelet/internal/provider"
@@ -58,7 +57,7 @@ func loadTLSConfig(certPath, keyPath string) (*tls.Config, error) {
}, nil }, nil
} }
func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig, getPodsFromKubernetes api.PodListerFunc) (_ func(), retErr error) { func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerConfig) (_ func(), retErr error) {
var closers []io.Closer var closers []io.Closer
cancel := func() { cancel := func() {
for _, c := range closers { for _, c := range closers {
@@ -91,12 +90,8 @@ func setupHTTPServer(ctx context.Context, p provider.Provider, cfg *apiServerCon
podRoutes := api.PodHandlerConfig{ podRoutes := api.PodHandlerConfig{
RunInContainer: p.RunInContainer, RunInContainer: p.RunInContainer,
GetContainerLogs: p.GetContainerLogs, GetContainerLogs: p.GetContainerLogs,
GetPodsFromKubernetes: getPodsFromKubernetes,
GetPods: p.GetPods, GetPods: p.GetPods,
StreamIdleTimeout: cfg.StreamIdleTimeout,
StreamCreationTimeout: cfg.StreamCreationTimeout,
} }
api.AttachPodRoutes(podRoutes, mux, true) api.AttachPodRoutes(podRoutes, mux, true)
s := &http.Server{ s := &http.Server{
@@ -151,8 +146,6 @@ type apiServerConfig struct {
KeyPath string KeyPath string
Addr string Addr string
MetricsAddr string MetricsAddr string
StreamIdleTimeout time.Duration
StreamCreationTimeout time.Duration
} }
func getAPIConfig(c Opts) (*apiServerConfig, error) { func getAPIConfig(c Opts) (*apiServerConfig, error) {
@@ -163,8 +156,6 @@ func getAPIConfig(c Opts) (*apiServerConfig, error) {
config.Addr = fmt.Sprintf(":%d", c.ListenPort) config.Addr = fmt.Sprintf(":%d", c.ListenPort)
config.MetricsAddr = c.MetricsAddr config.MetricsAddr = c.MetricsAddr
config.StreamIdleTimeout = c.StreamIdleTimeout
config.StreamCreationTimeout = c.StreamCreationTimeout
return &config, nil return &config, nil
} }

View File

@@ -38,8 +38,6 @@ const (
DefaultTaintEffect = string(corev1.TaintEffectNoSchedule) DefaultTaintEffect = string(corev1.TaintEffectNoSchedule)
DefaultTaintKey = "virtual-kubelet.io/provider" DefaultTaintKey = "virtual-kubelet.io/provider"
DefaultStreamIdleTimeout = 30 * time.Second
DefaultStreamCreationTimeout = 30 * time.Second
) )
// Opts stores all the options for configuring the root virtual-kubelet command. // Opts stores all the options for configuring the root virtual-kubelet command.
@@ -86,11 +84,6 @@ type Opts struct {
// Startup Timeout is how long to wait for the kubelet to start // Startup Timeout is how long to wait for the kubelet to start
StartupTimeout time.Duration StartupTimeout time.Duration
// StreamIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
StreamIdleTimeout time.Duration
// StreamCreationTimeout is the maximum time for streaming connection
StreamCreationTimeout time.Duration
Version string Version string
} }
@@ -159,13 +152,5 @@ func SetDefaultOpts(c *Opts) error {
} }
} }
if c.StreamIdleTimeout == 0 {
c.StreamIdleTimeout = DefaultStreamIdleTimeout
}
if c.StreamCreationTimeout == 0 {
c.StreamCreationTimeout = DefaultStreamCreationTimeout
}
return nil return nil
} }

View File

@@ -161,7 +161,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
log.G(ctx).Debug("node not found") log.G(ctx).Debug("node not found")
newNode := pNode.DeepCopy() newNode := pNode.DeepCopy()
newNode.ResourceVersion = "" newNode.ResourceVersion = ""
_, err = client.CoreV1().Nodes().Create(ctx, newNode, metav1.CreateOptions{}) _, err = client.CoreV1().Nodes().Create(newNode)
if err != nil { if err != nil {
return err return err
} }
@@ -193,9 +193,7 @@ func runRootCommand(ctx context.Context, s *provider.Store, c Opts) error {
go podInformerFactory.Start(ctx.Done()) go podInformerFactory.Start(ctx.Done())
go scmInformerFactory.Start(ctx.Done()) go scmInformerFactory.Start(ctx.Done())
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig, func(context.Context) ([]*corev1.Pod, error) { cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig)
return rm.GetPods(), nil
})
if err != nil { if err != nil {
return err return err
} }

81
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/virtual-kubelet/virtual-kubelet module github.com/virtual-kubelet/virtual-kubelet
go 1.13 go 1.12
require ( require (
contrib.go.opencensus.io/exporter/jaeger v0.1.0 contrib.go.opencensus.io/exporter/jaeger v0.1.0
@@ -8,64 +8,79 @@ require (
github.com/docker/spdystream v0.0.0-20170912183627-bc6354cbbc29 // indirect github.com/docker/spdystream v0.0.0-20170912183627-bc6354cbbc29 // indirect
github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 // indirect github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 // indirect
github.com/evanphx/json-patch v4.1.0+incompatible // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
github.com/google/go-cmp v0.3.1 github.com/google/go-cmp v0.3.1
github.com/google/gofuzz v1.0.0 // indirect
github.com/googleapis/gnostic v0.1.0 // indirect
github.com/gorilla/mux v1.7.0 github.com/gorilla/mux v1.7.0
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/json-iterator/go v1.1.6 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/go-homedir v1.1.0
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/pkg/errors v0.8.1 github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.0.0 github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
github.com/sirupsen/logrus v1.4.2 github.com/sirupsen/logrus v1.4.1
github.com/spf13/cobra v0.0.5 github.com/spf13/cobra v0.0.2
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.3
github.com/stretchr/testify v1.3.0 // indirect
go.opencensus.io v0.21.0 go.opencensus.io v0.21.0
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c // indirect
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 // indirect
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/genproto v0.0.0-20190404172233-64821d5d2107 // indirect
google.golang.org/grpc v1.20.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
gotest.tools v2.2.0+incompatible gotest.tools v2.2.0+incompatible
k8s.io/api v0.18.4 k8s.io/api v0.0.0
k8s.io/apimachinery v0.18.4 k8s.io/apimachinery v0.0.0
k8s.io/client-go v0.18.4 k8s.io/client-go v10.0.0+incompatible
k8s.io/klog v1.0.0 k8s.io/klog v0.3.1
k8s.io/kubernetes v1.18.4 k8s.io/kube-openapi v0.0.0-20190510232812-a01b7d5d6c22 // indirect
k8s.io/kubernetes v1.15.2
) )
replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.18.4 replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.0.0-20190805144654-3d5bf3a310c1
replace k8s.io/cloud-provider => k8s.io/cloud-provider v0.18.4 replace k8s.io/cloud-provider => k8s.io/cloud-provider v0.0.0-20190805144409-8484242760e7
replace k8s.io/cli-runtime => k8s.io/cli-runtime v0.18.4 replace k8s.io/cli-runtime => k8s.io/cli-runtime v0.0.0-20190805143448-a07e59fb081d
replace k8s.io/apiserver => k8s.io/apiserver v0.18.4 replace k8s.io/apiserver => k8s.io/apiserver v0.0.0-20190805142138-368b2058237c
replace k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.18.4 replace k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.0.0-20190805144531-3985229e1802
replace k8s.io/cri-api => k8s.io/cri-api v0.18.4 replace k8s.io/cri-api => k8s.io/cri-api v0.0.0-20190531030430-6117653b35f1
replace k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.18.4 replace k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.0.0-20190805142416-fd821fbbb94e
replace k8s.io/kubelet => k8s.io/kubelet v0.18.4 replace k8s.io/kubelet => k8s.io/kubelet v0.0.0-20190805143852-517ff267f8d1
replace k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.18.4 replace k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.0.0-20190805144128-269742da31dd
replace k8s.io/apimachinery => k8s.io/apimachinery v0.18.4 replace k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
replace k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.18.4 replace k8s.io/api => k8s.io/api v0.0.0-20190805141119-fdd30b57c827
replace k8s.io/kube-proxy => k8s.io/kube-proxy v0.18.4 replace k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.0.0-20190805144246-c01ee70854a1
replace k8s.io/component-base => k8s.io/component-base v0.18.4 replace k8s.io/kube-proxy => k8s.io/kube-proxy v0.0.0-20190805143734-7f1675b90353
replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.18.4 replace k8s.io/component-base => k8s.io/component-base v0.0.0-20190805141645-3a5e5ac800ae
replace k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.18.4 replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.0.0-20190805144012-2a1ed1f3d8a4
replace k8s.io/metrics => k8s.io/metrics v0.18.4 replace k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.0.0-20190805143126-cdb999c96590
replace k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.18.4 replace k8s.io/metrics => k8s.io/metrics v0.0.0-20190805143318-16b07057415d
replace k8s.io/code-generator => k8s.io/code-generator v0.18.4 replace k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.0.0-20190805142637-3b65bc4bb24f
replace k8s.io/client-go => k8s.io/client-go v0.18.4 replace k8s.io/code-generator => k8s.io/code-generator v0.0.0-20190612205613-18da4a14b22b
replace k8s.io/kubectl => k8s.io/kubectl v0.18.4 replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190805141520-2fe0317bcee0
replace k8s.io/api => k8s.io/api v0.18.4

685
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -5,5 +5,4 @@ import (
// This is a dep that `go mod tidy` keeps removing, because it's a transitive dep that's pulled in via a test // This is a dep that `go mod tidy` keeps removing, because it's a transitive dep that's pulled in via a test
// See: https://github.com/golang/go/issues/29702 // See: https://github.com/golang/go/issues/29702
_ "github.com/prometheus/client_golang/prometheus" _ "github.com/prometheus/client_golang/prometheus"
_ "golang.org/x/sys/unix"
) )

View File

@@ -16,24 +16,23 @@ import (
// WaitUntilNodeCondition establishes a watch on the vk node. // WaitUntilNodeCondition establishes a watch on the vk node.
// Then, it waits for the specified condition function to be verified. // Then, it waits for the specified condition function to be verified.
func (f *Framework) WaitUntilNodeCondition(fn watch.ConditionFunc) error { func (f *Framework) WaitUntilNodeCondition(fn watch.ConditionFunc) error {
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cancel := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cancel()
// Create a field selector that matches the specified Pod resource. // Create a field selector that matches the specified Pod resource.
fs := fields.OneTermEqualSelector("metadata.name", f.NodeName).String() fs := fields.OneTermEqualSelector("metadata.name", f.NodeName).String()
// Create a ListWatch so we can receive events for the matched Pod resource. // Create a ListWatch so we can receive events for the matched Pod resource.
lw := &cache.ListWatch{ lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fs options.FieldSelector = fs
return f.KubeClient.CoreV1().Nodes().List(ctx, options) return f.KubeClient.CoreV1().Nodes().List(options)
}, },
WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) {
options.FieldSelector = fs options.FieldSelector = fs
return f.KubeClient.CoreV1().Nodes().Watch(ctx, options) return f.KubeClient.CoreV1().Nodes().Watch(options)
}, },
} }
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cancel := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cancel()
last, err := watch.UntilWithSync(ctx, lw, &corev1.Node{}, nil, fn) last, err := watch.UntilWithSync(ctx, lw, &corev1.Node{}, nil, fn)
if err != nil { if err != nil {
return err return err
@@ -45,17 +44,17 @@ func (f *Framework) WaitUntilNodeCondition(fn watch.ConditionFunc) error {
} }
// DeleteNode deletes the vk node used by the framework // DeleteNode deletes the vk node used by the framework
func (f *Framework) DeleteNode(ctx context.Context) error { func (f *Framework) DeleteNode() error {
var gracePeriod int64 var gracePeriod int64
propagation := metav1.DeletePropagationBackground propagation := metav1.DeletePropagationBackground
opts := metav1.DeleteOptions{ opts := metav1.DeleteOptions{
PropagationPolicy: &propagation, PropagationPolicy: &propagation,
GracePeriodSeconds: &gracePeriod, GracePeriodSeconds: &gracePeriod,
} }
return f.KubeClient.CoreV1().Nodes().Delete(ctx, f.NodeName, opts) return f.KubeClient.CoreV1().Nodes().Delete(f.NodeName, &opts)
} }
// GetNode gets the vk nodeused by the framework // GetNode gets the vk nodeused by the framework
func (f *Framework) GetNode(ctx context.Context) (*corev1.Node, error) { func (f *Framework) GetNode() (*corev1.Node, error) {
return f.KubeClient.CoreV1().Nodes().Get(ctx, f.NodeName, metav1.GetOptions{}) return f.KubeClient.CoreV1().Nodes().Get(f.NodeName, metav1.GetOptions{})
} }

View File

@@ -47,21 +47,21 @@ func (f *Framework) CreateDummyPodObjectWithPrefix(testName string, prefix strin
} }
// CreatePod creates the specified pod in the Kubernetes API. // CreatePod creates the specified pod in the Kubernetes API.
func (f *Framework) CreatePod(ctx context.Context, pod *corev1.Pod) (*corev1.Pod, error) { func (f *Framework) CreatePod(pod *corev1.Pod) (*corev1.Pod, error) {
return f.KubeClient.CoreV1().Pods(f.Namespace).Create(ctx, pod, metav1.CreateOptions{}) return f.KubeClient.CoreV1().Pods(f.Namespace).Create(pod)
} }
// DeletePod deletes the pod with the specified name and namespace in the Kubernetes API using the default grace period. // DeletePod deletes the pod with the specified name and namespace in the Kubernetes API using the default grace period.
func (f *Framework) DeletePod(ctx context.Context, namespace, name string) error { func (f *Framework) DeletePod(namespace, name string) error {
return f.KubeClient.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{}) return f.KubeClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{})
} }
// DeletePodImmediately forcibly deletes the pod with the specified name and namespace in the Kubernetes API. // DeletePodImmediately forcibly deletes the pod with the specified name and namespace in the Kubernetes API.
// This is equivalent to running "kubectl delete --force --grace-period 0 --namespace <namespace> pod <name>". // This is equivalent to running "kubectl delete --force --grace-period 0 --namespace <namespace> pod <name>".
func (f *Framework) DeletePodImmediately(ctx context.Context, namespace, name string) error { func (f *Framework) DeletePodImmediately(namespace, name string) error {
grace := int64(0) grace := int64(0)
propagation := metav1.DeletePropagationBackground propagation := metav1.DeletePropagationBackground
return f.KubeClient.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{ return f.KubeClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{
GracePeriodSeconds: &grace, GracePeriodSeconds: &grace,
PropagationPolicy: &propagation, PropagationPolicy: &propagation,
}) })
@@ -70,22 +70,22 @@ func (f *Framework) DeletePodImmediately(ctx context.Context, namespace, name st
// WaitUntilPodCondition establishes a watch on the pod with the specified name and namespace. // WaitUntilPodCondition establishes a watch on the pod with the specified name and namespace.
// Then, it waits for the specified condition function to be verified. // Then, it waits for the specified condition function to be verified.
func (f *Framework) WaitUntilPodCondition(namespace, name string, fn watch.ConditionFunc) (*corev1.Pod, error) { func (f *Framework) WaitUntilPodCondition(namespace, name string, fn watch.ConditionFunc) (*corev1.Pod, error) {
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
// Create a field selector that matches the specified Pod resource. // Create a field selector that matches the specified Pod resource.
fs := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.namespace==%s,metadata.name==%s", namespace, name)) fs := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.namespace==%s,metadata.name==%s", namespace, name))
// Create a ListWatch so we can receive events for the matched Pod resource. // Create a ListWatch so we can receive events for the matched Pod resource.
lw := &cache.ListWatch{ lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fs.String() options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Pods(namespace).List(ctx, options) return f.KubeClient.CoreV1().Pods(namespace).List(options)
}, },
WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) {
options.FieldSelector = fs.String() options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Pods(namespace).Watch(ctx, options) return f.KubeClient.CoreV1().Pods(namespace).Watch(options)
}, },
} }
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
last, err := watch.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, fn) last, err := watch.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, fn)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -129,24 +129,22 @@ func (f *Framework) WaitUntilPodInPhase(namespace, name string, phases ...corev1
// WaitUntilPodEventWithReason establishes a watch on events involving the specified pod. // WaitUntilPodEventWithReason establishes a watch on events involving the specified pod.
// Then, it waits for an event with the specified reason to be created/updated. // Then, it waits for an event with the specified reason to be created/updated.
func (f *Framework) WaitUntilPodEventWithReason(pod *corev1.Pod, reason string) error { func (f *Framework) WaitUntilPodEventWithReason(pod *corev1.Pod, reason string) error {
// Watch for updates to the Event resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
// Create a field selector that matches Event resources involving the specified pod. // Create a field selector that matches Event resources involving the specified pod.
fs := fields.ParseSelectorOrDie(fmt.Sprintf("involvedObject.kind==Pod,involvedObject.uid==%s", pod.UID)) fs := fields.ParseSelectorOrDie(fmt.Sprintf("involvedObject.kind==Pod,involvedObject.uid==%s", pod.UID))
// Create a ListWatch so we can receive events for the matched Event resource. // Create a ListWatch so we can receive events for the matched Event resource.
lw := &cache.ListWatch{ lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fs.String() options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Events(pod.Namespace).List(ctx, options) return f.KubeClient.CoreV1().Events(pod.Namespace).List(options)
}, },
WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) {
options.FieldSelector = fs.String() options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Events(pod.Namespace).Watch(ctx, options) return f.KubeClient.CoreV1().Events(pod.Namespace).Watch(options)
}, },
} }
// Watch for updates to the Event resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), f.WatchTimeout)
defer cfn()
last, err := watch.UntilWithSync(ctx, lw, &corev1.Event{}, nil, func(event watchapi.Event) (b bool, e error) { last, err := watch.UntilWithSync(ctx, lw, &corev1.Event{}, nil, func(event watchapi.Event) (b bool, e error) {
switch event.Type { switch event.Type {
case watchapi.Error: case watchapi.Error:
@@ -166,8 +164,8 @@ func (f *Framework) WaitUntilPodEventWithReason(pod *corev1.Pod, reason string)
return nil return nil
} }
// GetRunningPodsFromProvider gets the running pods from the provider of the virtual kubelet // GetRunningPods gets the running pods from the provider of the virtual kubelet
func (f *Framework) GetRunningPodsFromProvider(ctx context.Context) (*corev1.PodList, error) { func (f *Framework) GetRunningPods() (*corev1.PodList, error) {
result := &corev1.PodList{} result := &corev1.PodList{}
err := f.KubeClient.CoreV1(). err := f.KubeClient.CoreV1().
@@ -177,24 +175,7 @@ func (f *Framework) GetRunningPodsFromProvider(ctx context.Context) (*corev1.Pod
Name(f.NodeName). Name(f.NodeName).
SubResource("proxy"). SubResource("proxy").
Suffix("runningpods/"). Suffix("runningpods/").
Do(ctx). Do().
Into(result)
return result, err
}
// GetRunningPodsFromProvider gets the running pods from the provider of the virtual kubelet
func (f *Framework) GetRunningPodsFromKubernetes(ctx context.Context) (*corev1.PodList, error) {
result := &corev1.PodList{}
err := f.KubeClient.CoreV1().
RESTClient().
Get().
Resource("nodes").
Name(f.NodeName).
SubResource("proxy").
Suffix("pods").
Do(ctx).
Into(result) Into(result)
return result, err return result, err

View File

@@ -1,7 +1,6 @@
package framework package framework
import ( import (
"context"
"encoding/json" "encoding/json"
"strconv" "strconv"
@@ -10,7 +9,7 @@ import (
) )
// GetStatsSummary queries the /stats/summary endpoint of the virtual-kubelet and returns the Summary object obtained as a response. // GetStatsSummary queries the /stats/summary endpoint of the virtual-kubelet and returns the Summary object obtained as a response.
func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error) { func (f *Framework) GetStatsSummary() (*stats.Summary, error) {
// Query the /stats/summary endpoint. // Query the /stats/summary endpoint.
b, err := f.KubeClient.CoreV1(). b, err := f.KubeClient.CoreV1().
RESTClient(). RESTClient().
@@ -19,7 +18,7 @@ func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
Resource("pods"). Resource("pods").
SubResource("proxy"). SubResource("proxy").
Name(net.JoinSchemeNamePort("http", f.NodeName, strconv.Itoa(10255))). Name(net.JoinSchemeNamePort("http", f.NodeName, strconv.Itoa(10255))).
Suffix("/stats/summary").DoRaw(ctx) Suffix("/stats/summary").DoRaw()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -23,12 +23,6 @@ var (
nodeName string nodeName string
) )
// go1.13 compatibility cf. https://github.com/golang/go/issues/31859
var _ = func() bool {
testing.Init()
return true
}()
func init() { func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "path to the kubeconfig file to use when running the test suite outside a kubernetes cluster") flag.StringVar(&kubeconfig, "kubeconfig", "", "path to the kubeconfig file to use when running the test suite outside a kubernetes cluster")
flag.StringVar(&namespace, "namespace", defaultNamespace, "the name of the kubernetes namespace to use for running the test suite (i.e. where to create pods)") flag.StringVar(&namespace, "namespace", defaultNamespace, "the name of the kubernetes namespace to use for running the test suite (i.e. where to create pods)")

View File

@@ -52,53 +52,10 @@ type TermSize struct {
// HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container // HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container
// Note that this handler currently depends on gorrilla/mux to get url parts as variables. // Note that this handler currently depends on gorrilla/mux to get url parts as variables.
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function // TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
// ContainerExecHandlerConfig is used to pass options to options to the container exec handler. func HandleContainerExec(h ContainerExecHandlerFunc) http.HandlerFunc {
type ContainerExecHandlerConfig struct {
// StreamIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
StreamIdleTimeout time.Duration
// StreamCreationTimeout is the maximum time for streaming connection
StreamCreationTimeout time.Duration
}
// ContainerExecHandlerOption configures a ContainerExecHandlerConfig
// It is used as functional options passed to `HandleContainerExec`
type ContainerExecHandlerOption func(*ContainerExecHandlerConfig)
// WithExecStreamIdleTimeout sets the idle timeout for a container exec stream
func WithExecStreamIdleTimeout(dur time.Duration) ContainerExecHandlerOption {
return func(cfg *ContainerExecHandlerConfig) {
cfg.StreamIdleTimeout = dur
}
}
// WithExecStreamIdleTimeout sets the idle timeout for a container exec stream
func WithExecStreamCreationTimeout(dur time.Duration) ContainerExecHandlerOption {
return func(cfg *ContainerExecHandlerConfig) {
cfg.StreamCreationTimeout = dur
}
}
// HandleContainerExec makes an http handler func from a Provider which execs a command in a pod's container
// Note that this handler currently depends on gorrilla/mux to get url parts as variables.
// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function
func HandleContainerExec(h ContainerExecHandlerFunc, opts ...ContainerExecHandlerOption) http.HandlerFunc {
if h == nil { if h == nil {
return NotImplemented return NotImplemented
} }
var cfg ContainerExecHandlerConfig
for _, o := range opts {
o(&cfg)
}
if cfg.StreamIdleTimeout == 0 {
cfg.StreamIdleTimeout = 30 * time.Second
}
if cfg.StreamCreationTimeout == 0 {
cfg.StreamCreationTimeout = 30 * time.Second
}
return handleError(func(w http.ResponseWriter, req *http.Request) error { return handleError(func(w http.ResponseWriter, req *http.Request) error {
vars := mux.Vars(req) vars := mux.Vars(req)
@@ -116,24 +73,14 @@ func HandleContainerExec(h ContainerExecHandlerFunc, opts ...ContainerExecHandle
return errdefs.AsInvalidInput(err) return errdefs.AsInvalidInput(err)
} }
// TODO: Why aren't we using req.Context() here? idleTimeout := time.Second * 30
streamCreationTimeout := time.Second * 30
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
defer cancel() defer cancel()
exec := &containerExecContext{ctx: ctx, h: h, pod: pod, namespace: namespace, container: container} exec := &containerExecContext{ctx: ctx, h: h, pod: pod, namespace: namespace, container: container}
remotecommand.ServeExec( remotecommand.ServeExec(w, req, exec, "", "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
w,
req,
exec,
"",
"",
container,
command,
streamOpts,
cfg.StreamIdleTimeout,
cfg.StreamCreationTimeout,
supportedStreamProtocols,
)
return nil return nil
}) })

View File

@@ -56,14 +56,16 @@ type flushWriter struct {
} }
type writeFlusher interface { type writeFlusher interface {
Flush() Flush() error
Write([]byte) (int, error) Write([]byte) (int, error)
} }
func (fw *flushWriter) Write(p []byte) (int, error) { func (fw *flushWriter) Write(p []byte) (int, error) {
n, err := fw.w.Write(p) n, err := fw.w.Write(p)
if n > 0 { if n > 0 {
fw.w.Flush() if err := fw.w.Flush(); err != nil {
return n, err
}
} }
return n, err return n, err
} }

View File

@@ -18,7 +18,6 @@ import (
"context" "context"
"io" "io"
"net/http" "net/http"
"net/url"
"strconv" "strconv"
"time" "time"
@@ -35,70 +34,9 @@ type ContainerLogsHandlerFunc func(ctx context.Context, namespace, podName, cont
// log stream. // log stream.
type ContainerLogOpts struct { type ContainerLogOpts struct {
Tail int Tail int
Since time.Duration
LimitBytes int LimitBytes int
Timestamps bool Timestamps bool
Follow bool
Previous bool
SinceSeconds int
SinceTime time.Time
}
func parseLogOptions(q url.Values) (opts ContainerLogOpts, err error) {
if tailLines := q.Get("tailLines"); tailLines != "" {
opts.Tail, err = strconv.Atoi(tailLines)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"tailLines\""))
}
if opts.Tail < 0 {
return opts, errdefs.InvalidInputf("\"tailLines\" is %d", opts.Tail)
}
}
if follow := q.Get("follow"); follow != "" {
opts.Follow, err = strconv.ParseBool(follow)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"follow\""))
}
}
if limitBytes := q.Get("limitBytes"); limitBytes != "" {
opts.LimitBytes, err = strconv.Atoi(limitBytes)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"limitBytes\""))
}
if opts.LimitBytes < 1 {
return opts, errdefs.InvalidInputf("\"limitBytes\" is %d", opts.LimitBytes)
}
}
if previous := q.Get("previous"); previous != "" {
opts.Previous, err = strconv.ParseBool(previous)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"previous\""))
}
}
if sinceSeconds := q.Get("sinceSeconds"); sinceSeconds != "" {
opts.SinceSeconds, err = strconv.Atoi(sinceSeconds)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"sinceSeconds\""))
}
if opts.SinceSeconds < 1 {
return opts, errdefs.InvalidInputf("\"sinceSeconds\" is %d", opts.SinceSeconds)
}
}
if sinceTime := q.Get("sinceTime"); sinceTime != "" {
opts.SinceTime, err = time.Parse(time.RFC3339, sinceTime)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"sinceTime\""))
}
if opts.SinceSeconds > 0 {
return opts, errdefs.InvalidInput("both \"sinceSeconds\" and \"sinceTime\" are set")
}
}
if timestamps := q.Get("timestamps"); timestamps != "" {
opts.Timestamps, err = strconv.ParseBool(timestamps)
if err != nil {
return opts, errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"timestamps\""))
}
}
return opts, nil
} }
// HandleContainerLogs creates an http handler function from a provider to serve logs from a pod // HandleContainerLogs creates an http handler function from a provider to serve logs from a pod
@@ -117,11 +55,22 @@ func HandleContainerLogs(h ContainerLogsHandlerFunc) http.HandlerFunc {
namespace := vars["namespace"] namespace := vars["namespace"]
pod := vars["pod"] pod := vars["pod"]
container := vars["container"] container := vars["container"]
tail := 10
q := req.URL.Query()
query := req.URL.Query() if queryTail := q.Get("tailLines"); queryTail != "" {
opts, err := parseLogOptions(query) t, err := strconv.Atoi(queryTail)
if err != nil { if err != nil {
return err return errdefs.AsInvalidInput(errors.Wrap(err, "could not parse \"tailLines\""))
}
tail = t
}
// TODO(@cpuguy83): support v1.PodLogOptions
// The kubelet decoding here is not straight forward, so this needs to be disected
opts := ContainerLogOpts{
Tail: tail,
} }
logs, err := h(ctx, namespace, pod, container, opts) logs, err := h(ctx, namespace, pod, container, opts)

View File

@@ -1,99 +0,0 @@
package api
import (
"fmt"
"net/url"
"testing"
"time"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
)
//func parseLogOptions(q url.Values) (opts ContainerLogOpts, err error)
func TestParseLogOptions(t *testing.T) {
//tailLines
//follow
//limitBytes
//previous
//sinceSeconds
//sinceTime
//timestamps
sinceTime, _ := time.Parse(time.RFC3339, "2020-03-20T21:07:34Z")
fmt.Printf("%+v\n", sinceTime)
testCases := []struct {
Values url.Values
Failure bool
Result ContainerLogOpts
}{
{
Values: url.Values{},
Failure: false,
Result: ContainerLogOpts{},
},
{
Values: url.Values{
"follow": {"true"},
"limitBytes": {"123"},
"previous": {"true"},
"sinceSeconds": {"10"},
"tailLines": {"99"},
"timestamps": {"true"},
},
Failure: false,
Result: ContainerLogOpts{
Follow: true,
LimitBytes: 123,
Previous: true,
SinceSeconds: 10,
Tail: 99,
Timestamps: true,
},
},
{
Values: url.Values{
"sinceSeconds": {"10"},
"sinceTime": {"2020-03-20T21:07:34Z"},
},
Failure: true,
},
{
Values: url.Values{
"sinceTime": {"2020-03-20T21:07:34Z"},
},
Failure: false,
Result: ContainerLogOpts{
SinceTime: sinceTime,
},
},
{
Values: url.Values{
"tailLines": {"-1"},
},
Failure: true,
},
{
Values: url.Values{
"limitBytes": {"0"},
},
Failure: true,
},
{
Values: url.Values{
"sinceSeconds": {"-10"},
},
Failure: true,
},
}
// follow=true&limitBytes=1&previous=true&sinceSeconds=1&sinceTime=2020-03-20T21%3A07%3A34Z&tailLines=1&timestamps=true
for i, tc := range testCases {
msg := fmt.Sprintf("test case #%d %+v failed", i+1, tc)
result, err := parseLogOptions(tc.Values)
if tc.Failure {
assert.Check(t, is.ErrorContains(err, ""), msg)
} else {
assert.NilError(t, err, msg)
assert.Check(t, is.Equal(result, tc.Result), msg)
}
}
}

View File

@@ -27,10 +27,6 @@ import (
type PodListerFunc func(context.Context) ([]*v1.Pod, error) type PodListerFunc func(context.Context) ([]*v1.Pod, error)
func HandleRunningPods(getPods PodListerFunc) http.HandlerFunc { func HandleRunningPods(getPods PodListerFunc) http.HandlerFunc {
if getPods == nil {
return NotImplemented
}
scheme := runtime.NewScheme() scheme := runtime.NewScheme()
v1.SchemeBuilder.AddToScheme(scheme) //nolint:errcheck v1.SchemeBuilder.AddToScheme(scheme) //nolint:errcheck
codecs := serializer.NewCodecFactory(scheme) codecs := serializer.NewCodecFactory(scheme)

View File

@@ -16,7 +16,6 @@ package api
import ( import (
"net/http" "net/http"
"time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/log"
@@ -36,13 +35,8 @@ type ServeMux interface {
type PodHandlerConfig struct { type PodHandlerConfig struct {
RunInContainer ContainerExecHandlerFunc RunInContainer ContainerExecHandlerFunc
GetContainerLogs ContainerLogsHandlerFunc GetContainerLogs ContainerLogsHandlerFunc
// GetPods is meant to enumerate the pods that the provider knows about
GetPods PodListerFunc GetPods PodListerFunc
// GetPodsFromKubernetes is meant to enumerate the pods that the node is meant to be running
GetPodsFromKubernetes PodListerFunc
GetStatsSummary PodStatsSummaryHandlerFunc GetStatsSummary PodStatsSummaryHandlerFunc
StreamIdleTimeout time.Duration
StreamCreationTimeout time.Duration
} }
// PodHandler creates an http handler for interacting with pods/containers. // PodHandler creates an http handler for interacting with pods/containers.
@@ -54,18 +48,8 @@ func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
if debug { if debug {
r.HandleFunc("/runningpods/", HandleRunningPods(p.GetPods)).Methods("GET") r.HandleFunc("/runningpods/", HandleRunningPods(p.GetPods)).Methods("GET")
} }
r.HandleFunc("/pods", HandleRunningPods(p.GetPodsFromKubernetes)).Methods("GET")
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", HandleContainerLogs(p.GetContainerLogs)).Methods("GET") r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", HandleContainerLogs(p.GetContainerLogs)).Methods("GET")
r.HandleFunc( r.HandleFunc("/exec/{namespace}/{pod}/{container}", HandleContainerExec(p.RunInContainer)).Methods("POST")
"/exec/{namespace}/{pod}/{container}",
HandleContainerExec(
p.RunInContainer,
WithExecStreamCreationTimeout(p.StreamCreationTimeout),
WithExecStreamIdleTimeout(p.StreamIdleTimeout),
),
).Methods("POST", "GET")
if p.GetStatsSummary != nil { if p.GetStatsSummary != nil {
f := HandlePodStatsSummary(p.GetStatsSummary) f := HandlePodStatsSummary(p.GetStatsSummary)
r.HandleFunc("/stats/summary", f).Methods("GET") r.HandleFunc("/stats/summary", f).Methods("GET")

View File

@@ -160,7 +160,7 @@ func TestPodLifecycle(t *testing.T) {
mp.setErrorOnDelete(errors.New("random error")) mp.setErrorOnDelete(errors.New("random error"))
assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) { assert.NilError(t, wireUpSystem(ctx, mp, func(ctx context.Context, s *system) {
testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false) testCreateStartDeleteScenario(ctx, t, s, deletionFunc, false)
pods, err := s.client.CoreV1().Pods(testNamespace).List(ctx, metav1.ListOptions{}) pods, err := s.client.CoreV1().Pods(testNamespace).List(metav1.ListOptions{})
assert.NilError(t, err) assert.NilError(t, err)
assert.Assert(t, is.Len(pods.Items, 1)) assert.Assert(t, is.Len(pods.Items, 1))
assert.Assert(t, pods.Items[0].DeletionTimestamp != nil) assert.Assert(t, pods.Items[0].DeletionTimestamp != nil)
@@ -329,7 +329,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
p1 := newPod() p1 := newPod()
p1.Status.Phase = state p1.Status.Phase = state
// Create the Pod // Create the Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, p1, metav1.CreateOptions{}) _, e := s.client.CoreV1().Pods(testNamespace).Create(p1)
assert.NilError(t, e) assert.NilError(t, e)
// Start the pod controller // Start the pod controller
@@ -339,7 +339,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
} }
p2, err := s.client.CoreV1().Pods(testNamespace).Get(ctx, p1.Name, metav1.GetOptions{}) p2, err := s.client.CoreV1().Pods(testNamespace).Get(p1.Name, metav1.GetOptions{})
assert.NilError(t, err) assert.NilError(t, err)
// Make sure the pods have not changed // Make sure the pods have not changed
@@ -367,7 +367,7 @@ func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testin
FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.ObjectMeta.Name).String(), FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.ObjectMeta.Name).String(),
} }
// Setup a watch (prior to pod creation, and pod controller startup) // Setup a watch (prior to pod creation, and pod controller startup)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions) watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err) assert.NilError(t, err)
defer watcher.Stop() defer watcher.Stop()
@@ -379,7 +379,7 @@ func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testin
podCopyWithDeletionTimestamp.DeletionGracePeriodSeconds = &deletionGracePeriod podCopyWithDeletionTimestamp.DeletionGracePeriodSeconds = &deletionGracePeriod
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod))) deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
podCopyWithDeletionTimestamp.DeletionTimestamp = &deletionTimestamp podCopyWithDeletionTimestamp.DeletionTimestamp = &deletionTimestamp
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, podCopyWithDeletionTimestamp, metav1.CreateOptions{}) _, e := s.client.CoreV1().Pods(testNamespace).Create(podCopyWithDeletionTimestamp)
assert.NilError(t, e) assert.NilError(t, e)
// Start the pod controller // Start the pod controller
@@ -415,7 +415,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
watchErrCh := make(chan error) watchErrCh := make(chan error)
// Setup a watch (prior to pod creation, and pod controller startup) // Setup a watch (prior to pod creation, and pod controller startup)
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions) watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err) assert.NilError(t, err)
defer watcher.Stop() defer watcher.Stop()
// This ensures that the pod is created. // This ensures that the pod is created.
@@ -432,7 +432,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
}() }()
// Create the Pod // Create the Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, p, metav1.CreateOptions{}) _, e := s.client.CoreV1().Pods(testNamespace).Create(p)
assert.NilError(t, e) assert.NilError(t, e)
log.G(ctx).Debug("Created pod") log.G(ctx).Debug("Created pod")
@@ -446,7 +446,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
} }
// Setup a watch to check if the pod is in running // Setup a watch to check if the pod is in running
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions) watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err) assert.NilError(t, err)
defer watcher.Stop() defer watcher.Stop()
go func() { go func() {
@@ -471,7 +471,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
} }
// Setup a watch to look for the pod eventually going away completely // Setup a watch to look for the pod eventually going away completely
watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions) watcher2, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err) assert.NilError(t, err)
defer watcher2.Stop() defer watcher2.Stop()
waitDeleteCh := make(chan error) waitDeleteCh := make(chan error)
@@ -483,7 +483,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
}() }()
// Setup a watch prior to pod deletion // Setup a watch prior to pod deletion
watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions) watcher, err = s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err) assert.NilError(t, err)
defer watcher.Stop() defer watcher.Stop()
go func() { go func() {
@@ -493,7 +493,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
// Delete the pod via deletiontimestamp // Delete the pod via deletiontimestamp
// 1. Get the pod // 1. Get the pod
currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(ctx, p.Name, metav1.GetOptions{}) currentPod, err := s.client.CoreV1().Pods(testNamespace).Get(p.Name, metav1.GetOptions{})
assert.NilError(t, err) assert.NilError(t, err)
// 2. Set the pod's deletion timestamp, version, and so on // 2. Set the pod's deletion timestamp, version, and so on
var deletionGracePeriod int64 = 10 var deletionGracePeriod int64 = 10
@@ -501,7 +501,7 @@ func testCreateStartDeleteScenario(ctx context.Context, t *testing.T, s *system,
deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod))) deletionTimestamp := metav1.NewTime(time.Now().Add(time.Second * time.Duration(deletionGracePeriod)))
currentPod.DeletionTimestamp = &deletionTimestamp currentPod.DeletionTimestamp = &deletionTimestamp
// 3. Update (overwrite) the pod // 3. Update (overwrite) the pod
_, err = s.client.CoreV1().Pods(testNamespace).Update(ctx, currentPod, metav1.UpdateOptions{}) _, err = s.client.CoreV1().Pods(testNamespace).Update(currentPod)
assert.NilError(t, err) assert.NilError(t, err)
select { select {
@@ -535,11 +535,11 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
watchErrCh := make(chan error) watchErrCh := make(chan error)
// Create a Pod // Create a Pod
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, p, metav1.CreateOptions{}) _, e := s.client.CoreV1().Pods(testNamespace).Create(p)
assert.NilError(t, e) assert.NilError(t, e)
// Setup a watch to check if the pod is in running // Setup a watch to check if the pod is in running
watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(ctx, listOptions) watcher, err := s.client.CoreV1().Pods(testNamespace).Watch(listOptions)
assert.NilError(t, err) assert.NilError(t, err)
defer watcher.Stop() defer watcher.Stop()
go func() { go func() {
@@ -576,7 +576,7 @@ func testUpdatePodWhileRunningScenario(ctx context.Context, t *testing.T, s *sys
p.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds p.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
log.G(ctx).WithField("pod", p).Info("Updating pod") log.G(ctx).WithField("pod", p).Info("Updating pod")
_, err = s.client.CoreV1().Pods(p.Namespace).Update(ctx, p, metav1.UpdateOptions{}) _, err = s.client.CoreV1().Pods(p.Namespace).Update(p)
assert.NilError(t, err) assert.NilError(t, err)
assert.NilError(t, m.getUpdates().until(ctx, func(v int) bool { return v > 0 })) assert.NilError(t, m.getUpdates().until(ctx, func(v int) bool { return v > 0 }))
} }
@@ -603,7 +603,7 @@ func benchmarkCreatePods(ctx context.Context, b *testing.B, s *system) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
pod := newPod(randomizeUID, randomizeName) pod := newPod(randomizeUID, randomizeName)
_, err := s.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) _, err := s.client.CoreV1().Pods(pod.Namespace).Create(pod)
assert.NilError(b, err) assert.NilError(b, err)
assert.NilError(b, ctx.Err()) assert.NilError(b, ctx.Err())
} }

View File

@@ -17,6 +17,7 @@ package node
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"time" "time"
pkgerrors "github.com/pkg/errors" pkgerrors "github.com/pkg/errors"
@@ -30,14 +31,6 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/typed/coordination/v1beta1" "k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1" v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
)
const (
// Annotation with the JSON-serialized last applied node conditions. Based on kube ctl apply. Used to calculate
// the three-way patch
virtualKubeletLastNodeAppliedNodeStatus = "virtual-kubelet.io/last-applied-node-status"
virtualKubeletLastNodeAppliedObjectMeta = "virtual-kubelet.io/last-applied-object-meta"
) )
// NodeProvider is the interface used for registering a node and updating its // NodeProvider is the interface used for registering a node and updating its
@@ -192,7 +185,7 @@ func (n *NodeController) Run(ctx context.Context) error {
n.statusInterval = DefaultStatusUpdateInterval n.statusInterval = DefaultStatusUpdateInterval
} }
n.chStatusUpdate = make(chan *corev1.Node, 1) n.chStatusUpdate = make(chan *corev1.Node)
n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) { n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) {
n.chStatusUpdate <- node n.chStatusUpdate <- node
}) })
@@ -223,19 +216,13 @@ func (n *NodeController) Run(ctx context.Context) error {
return n.controlLoop(ctx) return n.controlLoop(ctx)
} }
func (n *NodeController) ensureNode(ctx context.Context) (err error) { func (n *NodeController) ensureNode(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "node.ensureNode") err := n.updateStatus(ctx, true)
defer span.End()
defer func() {
span.SetStatus(err)
}()
err = n.updateStatus(ctx, true)
if err == nil || !errors.IsNotFound(err) { if err == nil || !errors.IsNotFound(err) {
return err return err
} }
node, err := n.nodes.Create(ctx, n.n, metav1.CreateOptions{}) node, err := n.nodes.Create(n.n)
if err != nil { if err != nil {
return pkgerrors.Wrap(err, "error registering node with kubernetes") return pkgerrors.Wrap(err, "error registering node with kubernetes")
} }
@@ -271,13 +258,10 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
close(n.chReady) close(n.chReady)
loop := func() bool { for {
ctx, span := trace.StartSpan(ctx, "node.controlLoop.loop")
defer span.End()
select { select {
case <-ctx.Done(): case <-ctx.Done():
return true return nil
case updated := <-n.chStatusUpdate: case updated := <-n.chStatusUpdate:
var t *time.Timer var t *time.Timer
if n.disableLease { if n.disableLease {
@@ -294,13 +278,6 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
} }
n.n.Status = updated.Status n.n.Status = updated.Status
n.n.ObjectMeta = metav1.ObjectMeta{
Annotations: updated.Annotations,
Labels: updated.Labels,
Name: n.n.ObjectMeta.Name,
Namespace: n.n.ObjectMeta.Namespace,
UID: n.n.ObjectMeta.UID,
}
if err := n.updateStatus(ctx, false); err != nil { if err := n.updateStatus(ctx, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update") log.G(ctx).WithError(err).Error("Error handling node status update")
} }
@@ -318,13 +295,6 @@ func (n *NodeController) controlLoop(ctx context.Context) error {
} }
pingTimer.Reset(n.pingInterval) pingTimer.Reset(n.pingInterval)
} }
return false
}
for {
shouldTerminate := loop()
if shouldTerminate {
return nil
}
} }
} }
@@ -356,13 +326,7 @@ func (n *NodeController) updateLease(ctx context.Context) error {
return nil return nil
} }
func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) (err error) { func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) error {
ctx, span := trace.StartSpan(ctx, "node.updateStatus")
defer span.End()
defer func() {
span.SetStatus(err)
}()
updateNodeStatusHeartbeat(n.n) updateNodeStatusHeartbeat(n.n)
node, err := updateNodeStatus(ctx, n.nodes, n.n) node, err := updateNodeStatus(ctx, n.nodes, n.n)
@@ -385,18 +349,18 @@ func (n *NodeController) updateStatus(ctx context.Context, skipErrorCb bool) (er
} }
func ensureLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) { func ensureLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *coord.Lease) (*coord.Lease, error) {
l, err := leases.Create(ctx, lease, metav1.CreateOptions{}) l, err := leases.Create(lease)
if err != nil { if err != nil {
switch { switch {
case errors.IsNotFound(err): case errors.IsNotFound(err):
log.G(ctx).WithError(err).Info("Node lease not supported") log.G(ctx).WithError(err).Info("Node lease not supported")
return nil, err return nil, err
case errors.IsAlreadyExists(err): case errors.IsAlreadyExists(err):
if err := leases.Delete(ctx, lease.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { if err := leases.Delete(lease.Name, nil); err != nil && !errors.IsNotFound(err) {
log.G(ctx).WithError(err).Error("could not delete old node lease") log.G(ctx).WithError(err).Error("could not delete old node lease")
return nil, pkgerrors.Wrap(err, "old lease exists but could not delete it") return nil, pkgerrors.Wrap(err, "old lease exists but could not delete it")
} }
l, err = leases.Create(ctx, lease, metav1.CreateOptions{}) l, err = leases.Create(lease)
} }
} }
@@ -421,7 +385,7 @@ func updateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *
ctx = span.WithField(ctx, "lease.expiresSeconds", *lease.Spec.LeaseDurationSeconds) ctx = span.WithField(ctx, "lease.expiresSeconds", *lease.Spec.LeaseDurationSeconds)
} }
l, err := leases.Update(ctx, lease, metav1.UpdateOptions{}) l, err := leases.Update(lease)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
log.G(ctx).Debug("lease not found") log.G(ctx).Debug("lease not found")
@@ -442,117 +406,42 @@ func updateNodeLease(ctx context.Context, leases v1beta1.LeaseInterface, lease *
// just so we don't have to allocate this on every get request // just so we don't have to allocate this on every get request
var emptyGetOptions = metav1.GetOptions{} var emptyGetOptions = metav1.GetOptions{}
func prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode *corev1.Node) ([]byte, error) { // patchNodeStatus patches node status.
// We use these two values to calculate a patch. We use a three-way patch in order to avoid // Copied from github.com/kubernetes/kubernetes/pkg/util/node
// causing state regression server side. For example, let's consider the scenario: func patchNodeStatus(nodes v1.NodeInterface, nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) (*corev1.Node, []byte, error) {
/* patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
UML Source:
@startuml
participant VK
participant K8s
participant ExternalUpdater
note right of VK: Updates internal node conditions to [A, B]
VK->K8s: Patch Upsert [A, B]
note left of K8s: Node conditions are [A, B]
ExternalUpdater->K8s: Patch Upsert [C]
note left of K8s: Node Conditions are [A, B, C]
note right of VK: Updates internal node conditions to [A]
VK->K8s: Patch: delete B, upsert A\nThis is where things go wrong,\nbecause the patch is written to replace all node conditions\nit overwrites (drops) [C]
note left of K8s: Node Conditions are [A]\nNode condition C from ExternalUpdater is no longer present
@enduml
,--. ,---. ,---------------.
|VK| |K8s| |ExternalUpdater|
`+-' `-+-' `-------+-------'
| ,------------------------------------------!. | |
| |Updates internal node conditions to [A, B]|_\ | |
| `--------------------------------------------' | |
| Patch Upsert [A, B] | |
| -----------------------------------------------------------> |
| | |
| ,--------------------------!. | |
| |Node conditions are [A, B]|_\| |
| `----------------------------'| |
| | Patch Upsert [C] |
| | <-------------------
| | |
| ,-----------------------------!. | |
| |Node Conditions are [A, B, C]|_\| |
| `-------------------------------'| |
| ,---------------------------------------!. | |
| |Updates internal node conditions to [A]|_\ | |
| `-----------------------------------------' | |
| Patch: delete B, upsert A | |
| This is where things go wrong, | |
| because the patch is written to replace all node conditions| |
| it overwrites (drops) [C] | |
| -----------------------------------------------------------> |
| | |
,----------------------------------------------------------!. | |
|Node Conditions are [A] |_\| |
|Node condition C from ExternalUpdater is no longer present || |
`------------------------------------------------------------'+-. ,-------+-------.
|VK| |K8s| |ExternalUpdater|
`--' `---' `---------------'
*/
// In order to calculate that last patch to delete B, and upsert C, we need to know that C was added by
// "someone else". So, we keep track of our last applied value, and our current value. We then generate
// our patch based on the diff of these and *not* server side state.
oldVKStatus, ok1 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus]
oldVKObjectMeta, ok2 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta]
oldNode := corev1.Node{}
// Check if there were no labels, which means someone else probably created the node, or this is an upgrade. Either way, we will consider
// ourselves as never having written the node object before, so oldNode will be left empty. We will overwrite values if
// our new node conditions / status / objectmeta have them
if ok1 && ok2 {
err := json.Unmarshal([]byte(oldVKObjectMeta), &oldNode.ObjectMeta)
if err != nil { if err != nil {
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node object metadata (key: %q): %q", virtualKubeletLastNodeAppliedObjectMeta, oldVKObjectMeta) return nil, nil, err
}
err = json.Unmarshal([]byte(oldVKStatus), &oldNode.Status)
if err != nil {
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node status (key: %q): %q", virtualKubeletLastNodeAppliedNodeStatus, oldVKStatus)
}
} }
// newNode is the representation of the node the provider "wants" updatedNode, err := nodes.Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
newNode := corev1.Node{} if err != nil {
newNode.ObjectMeta = simplestObjectMetadata(&apiServerNode.ObjectMeta, &nodeFromProvider.ObjectMeta) return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
nodeFromProvider.Status.DeepCopyInto(&newNode.Status) }
return updatedNode, patchBytes, nil
}
// virtualKubeletLastNodeAppliedObjectMeta must always be set before virtualKubeletLastNodeAppliedNodeStatus, func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *corev1.Node, newNode *corev1.Node) ([]byte, error) {
// otherwise we capture virtualKubeletLastNodeAppliedNodeStatus in virtualKubeletLastNodeAppliedObjectMeta, oldData, err := json.Marshal(oldNode)
// which is wrong
virtualKubeletLastNodeAppliedObjectMetaBytes, err := json.Marshal(newNode.ObjectMeta)
if err != nil { if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal object meta from provider") return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err)
} }
newNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta] = string(virtualKubeletLastNodeAppliedObjectMetaBytes)
virtualKubeletLastNodeAppliedNodeStatusBytes, err := json.Marshal(newNode.Status) // Reset spec to make sure only patch for Status or ObjectMeta is generated.
// Note that we don't reset ObjectMeta here, because:
// 1. This aligns with Nodes().UpdateStatus().
// 2. Some component does use this to update node annotations.
newNode.Spec = oldNode.Spec
newData, err := json.Marshal(newNode)
if err != nil { if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal node status from provider") return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err)
} }
newNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus] = string(virtualKubeletLastNodeAppliedNodeStatusBytes)
// Generate three way patch from oldNode -> newNode, without deleting the changes in api server patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
// Should we use the Kubernetes serialization / deserialization libraries here?
oldNodeBytes, err := json.Marshal(oldNode)
if err != nil { if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal old node bytes") return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err)
} }
newNodeBytes, err := json.Marshal(newNode) return patchBytes, nil
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal new node bytes")
}
apiServerNodeBytes, err := json.Marshal(apiServerNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal api server node")
}
schema, err := strategicpatch.NewPatchMetaFromStruct(&corev1.Node{})
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot get patch schema from node")
}
return strategicpatch.CreateThreeWayMergePatch(oldNodeBytes, newNodeBytes, apiServerNodeBytes, schema, true)
} }
// updateNodeStatus triggers an update to the node status in Kubernetes. // updateNodeStatus triggers an update to the node status in Kubernetes.
@@ -561,44 +450,37 @@ func prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode *cor
// //
// If you use this function, it is up to you to synchronize this with other operations. // If you use this function, it is up to you to synchronize this with other operations.
// This reduces the time to second-level precision. // This reduces the time to second-level precision.
func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvider *corev1.Node) (_ *corev1.Node, retErr error) { func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, n *corev1.Node) (_ *corev1.Node, retErr error) {
ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus") ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus")
defer func() { defer func() {
span.End() span.End()
span.SetStatus(retErr) span.SetStatus(retErr)
}() }()
var updatedNode *corev1.Node var node *corev1.Node
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
apiServerNode, err := nodes.Get(ctx, nodeFromProvider.Name, emptyGetOptions)
if err != nil {
return err
}
ctx = addNodeAttributes(ctx, span, apiServerNode)
log.G(ctx).Debug("got node from api server")
patchBytes, err := prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode)
if err != nil {
return pkgerrors.Wrap(err, "Cannot generate patch")
}
log.G(ctx).WithError(err).WithField("patch", string(patchBytes)).Debug("Generated three way patch")
updatedNode, err = nodes.Patch(ctx, nodeFromProvider.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
// We cannot wrap this error because the kubernetes error module doesn't understand wrapping
log.G(ctx).WithField("patch", string(patchBytes)).WithError(err).Warn("Failed to patch node status")
return err
}
return nil
})
oldNode, err := nodes.Get(n.Name, emptyGetOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.G(ctx).WithField("node.resourceVersion", updatedNode.ResourceVersion).
WithField("node.Status.Conditions", updatedNode.Status.Conditions). log.G(ctx).Debug("got node from api server")
node = oldNode.DeepCopy()
node.ResourceVersion = ""
node.Status = n.Status
ctx = addNodeAttributes(ctx, span, node)
// Patch the node status to merge other changes on the node.
updated, _, err := patchNodeStatus(nodes, types.NodeName(n.Name), oldNode, node)
if err != nil {
return nil, err
}
log.G(ctx).WithField("node.resourceVersion", updated.ResourceVersion).
WithField("node.Status.Conditions", updated.Status.Conditions).
Debug("updated node status in api server") Debug("updated node status in api server")
return updatedNode, nil return updated, nil
} }
func newLease(base *coord.Lease) *coord.Lease { func newLease(base *coord.Lease) *coord.Lease {
@@ -673,31 +555,3 @@ func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) con
"node.taints": taintsStringer(n.Spec.Taints), "node.taints": taintsStringer(n.Spec.Taints),
}) })
} }
// Provides the simplest object metadata to match the previous object. Name, namespace, UID. It copies labels and
// annotations from the second object if defined. It exempts the patch metadata
func simplestObjectMetadata(baseObjectMeta, objectMetaWithLabelsAndAnnotations *metav1.ObjectMeta) metav1.ObjectMeta {
ret := metav1.ObjectMeta{
Namespace: baseObjectMeta.Namespace,
Name: baseObjectMeta.Name,
UID: baseObjectMeta.UID,
Annotations: make(map[string]string),
}
if objectMetaWithLabelsAndAnnotations != nil {
if objectMetaWithLabelsAndAnnotations.Labels != nil {
ret.Labels = objectMetaWithLabelsAndAnnotations.Labels
} else {
ret.Labels = make(map[string]string)
}
if objectMetaWithLabelsAndAnnotations.Annotations != nil {
// We want to copy over all annotations except the special embedded ones.
for key := range objectMetaWithLabelsAndAnnotations.Annotations {
if key == virtualKubeletLastNodeAppliedNodeStatus || key == virtualKubeletLastNodeAppliedObjectMeta {
continue
}
ret.Annotations[key] = objectMetaWithLabelsAndAnnotations.Annotations[key]
}
}
}
return ret
}

View File

@@ -22,14 +22,12 @@ import (
"gotest.tools/assert" "gotest.tools/assert"
"gotest.tools/assert/cmp" "gotest.tools/assert/cmp"
is "gotest.tools/assert/cmp"
coord "k8s.io/api/coordination/v1beta1" coord "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
watch "k8s.io/apimachinery/pkg/watch" watch "k8s.io/apimachinery/pkg/watch"
testclient "k8s.io/client-go/kubernetes/fake" testclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/retry"
) )
func TestNodeRun(t *testing.T) { func TestNodeRun(t *testing.T) {
@@ -74,11 +72,11 @@ func testNodeRun(t *testing.T, enableLease bool) {
close(chErr) close(chErr)
}() }()
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name) nw := makeWatch(t, nodes, testNodeCopy.Name)
defer nw.Stop() defer nw.Stop()
nr := nw.ResultChan() nr := nw.ResultChan()
lw := makeWatch(ctx, t, leases, testNodeCopy.Name) lw := makeWatch(t, leases, testNodeCopy.Name)
defer lw.Stop() defer lw.Stop()
lr := lw.ResultChan() lr := lw.ResultChan()
@@ -132,7 +130,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
} }
// trigger an async node status update // trigger an async node status update
n, err := nodes.Get(ctx, testNode.Name, metav1.GetOptions{}) n, err := nodes.Get(testNode.Name, metav1.GetOptions{})
assert.NilError(t, err) assert.NilError(t, err)
newCondition := corev1.NodeCondition{ newCondition := corev1.NodeCondition{
Type: corev1.NodeConditionType("UPDATED"), Type: corev1.NodeConditionType("UPDATED"),
@@ -140,7 +138,7 @@ func testNodeRun(t *testing.T, enableLease bool) {
} }
n.Status.Conditions = append(n.Status.Conditions, newCondition) n.Status.Conditions = append(n.Status.Conditions, newCondition)
nw = makeWatch(ctx, t, nodes, testNodeCopy.Name) nw = makeWatch(t, nodes, testNodeCopy.Name)
defer nw.Stop() defer nw.Stop()
nr = nw.ResultChan() nr = nw.ResultChan()
@@ -207,7 +205,7 @@ func TestNodeCustomUpdateStatusErrorHandler(t *testing.T) {
case <-node.Ready(): case <-node.Ready():
} }
err = nodes.Delete(ctx, node.n.Name, metav1.DeleteOptions{}) err = nodes.Delete(node.n.Name, nil)
assert.NilError(t, err) assert.NilError(t, err)
testP.triggerStatusUpdate(node.n.DeepCopy()) testP.triggerStatusUpdate(node.n.DeepCopy())
@@ -253,7 +251,7 @@ func TestUpdateNodeStatus(t *testing.T) {
updated, err := updateNodeStatus(ctx, nodes, n.DeepCopy()) updated, err := updateNodeStatus(ctx, nodes, n.DeepCopy())
assert.Equal(t, errors.IsNotFound(err), true, err) assert.Equal(t, errors.IsNotFound(err), true, err)
_, err = nodes.Create(ctx, n, metav1.CreateOptions{}) _, err = nodes.Create(n)
assert.NilError(t, err) assert.NilError(t, err)
updated, err = updateNodeStatus(ctx, nodes, n.DeepCopy()) updated, err = updateNodeStatus(ctx, nodes, n.DeepCopy())
@@ -267,10 +265,10 @@ func TestUpdateNodeStatus(t *testing.T) {
assert.NilError(t, err) assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(n.Status, updated.Status)) assert.Check(t, cmp.DeepEqual(n.Status, updated.Status))
err = nodes.Delete(ctx, n.Name, metav1.DeleteOptions{}) err = nodes.Delete(n.Name, nil)
assert.NilError(t, err) assert.NilError(t, err)
_, err = nodes.Get(ctx, n.Name, metav1.GetOptions{}) _, err = nodes.Get(n.Name, metav1.GetOptions{})
assert.Equal(t, errors.IsNotFound(err), true, err) assert.Equal(t, errors.IsNotFound(err), true, err)
_, err = updateNodeStatus(ctx, nodes, updated.DeepCopy()) _, err = updateNodeStatus(ctx, nodes, updated.DeepCopy())
@@ -289,7 +287,7 @@ func TestUpdateNodeLease(t *testing.T) {
assert.Equal(t, l.Name, lease.Name) assert.Equal(t, l.Name, lease.Name)
assert.Assert(t, cmp.DeepEqual(l.Spec.HolderIdentity, lease.Spec.HolderIdentity)) assert.Assert(t, cmp.DeepEqual(l.Spec.HolderIdentity, lease.Spec.HolderIdentity))
compare, err := leases.Get(ctx, l.Name, emptyGetOptions) compare, err := leases.Get(l.Name, emptyGetOptions)
assert.NilError(t, err) assert.NilError(t, err)
assert.Equal(t, l.Spec.RenewTime.Time.Unix(), compare.Spec.RenewTime.Time.Unix()) assert.Equal(t, l.Spec.RenewTime.Time.Unix(), compare.Spec.RenewTime.Time.Unix())
assert.Equal(t, compare.Name, lease.Name) assert.Equal(t, compare.Name, lease.Name)
@@ -376,252 +374,6 @@ func TestPingAfterStatusUpdate(t *testing.T) {
assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval) assert.Assert(t, testP.maxPingInterval < maxAllowedInterval, "maximum time between node pings (%v) was greater than the maximum expected interval (%v)", testP.maxPingInterval, maxAllowedInterval)
} }
// Are annotations that were created before the VK existed preserved?
func TestBeforeAnnotationsPreserved(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
interval := 10 * time.Millisecond
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
}
testNode := testNode(t)
testNodeCreateCopy := testNode.DeepCopy()
testNodeCreateCopy.Annotations = map[string]string{
"beforeAnnotation": "value",
}
_, err := nodes.Create(ctx, testNodeCreateCopy, metav1.CreateOptions{})
assert.NilError(t, err)
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
// so it will trigger the race detector.
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error)
defer func() {
cancel()
assert.NilError(t, <-chErr)
}()
go func() {
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr := nw.ResultChan()
t.Log("Waiting for node to exist")
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
return true
}))
testP.notifyNodeStatus(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"testAnnotation": "value",
},
},
})
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
_, ok := e.Object.(*corev1.Node).Annotations["testAnnotation"]
return ok
}))
newNode, err := nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation"))
assert.Assert(t, is.Contains(newNode.Annotations, "beforeAnnotation"))
}
// Are conditions set by systems outside of VK preserved?
func TestManualConditionsPreserved(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testclient.NewSimpleClientset()
testP := &testNodeProvider{NodeProvider: &NaiveNodeProvider{}}
nodes := c.CoreV1().Nodes()
interval := 10 * time.Millisecond
opts := []NodeControllerOpt{
WithNodePingInterval(interval),
}
testNode := testNode(t)
// We have to refer to testNodeCopy during the course of the test. testNode is modified by the node controller
// so it will trigger the race detector.
testNodeCopy := testNode.DeepCopy()
node, err := NewNodeController(testP, testNode, nodes, opts...)
assert.NilError(t, err)
chErr := make(chan error)
defer func() {
cancel()
assert.NilError(t, <-chErr)
}()
go func() {
chErr <- node.Run(ctx)
close(chErr)
}()
nw := makeWatch(ctx, t, nodes, testNodeCopy.Name)
defer nw.Stop()
nr := nw.ResultChan()
t.Log("Waiting for node to exist")
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
if e.Object == nil {
return false
}
receivedNode := e.Object.(*corev1.Node)
if len(receivedNode.Status.Conditions) != 0 {
return false
}
return true
}))
newNode, err := nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Len(newNode.Status.Conditions, 0))
baseCondition := corev1.NodeCondition{
Type: "BaseCondition",
Status: "Ok",
Reason: "NA",
Message: "This is the base condition. It is set by VK, and should always be there.",
}
testP.notifyNodeStatus(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"testAnnotation": "value",
},
},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
baseCondition,
},
},
})
// Wait for this (node with condition) to show up
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == baseCondition.Type {
return true
}
}
return false
}))
newNode, err = nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
assert.Assert(t, is.Len(newNode.Status.Conditions, 1))
assert.Assert(t, is.Contains(newNode.Annotations, "testAnnotation"))
// Add a new event manually
manuallyAddedCondition := corev1.NodeCondition{
Type: "ManuallyAddedCondition",
Status: "Ok",
Reason: "NA",
Message: "This is a manually added condition. Outside of VK. It should not be removed.",
}
assert.NilError(t, retry.RetryOnConflict(retry.DefaultRetry, func() error {
newNode, err = nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
if err != nil {
return err
}
newNode.Annotations["manuallyAddedAnnotation"] = "value"
newNode.Status.Conditions = append(newNode.Status.Conditions, manuallyAddedCondition)
_, err = nodes.UpdateStatus(ctx, newNode, metav1.UpdateOptions{})
return err
}))
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == manuallyAddedCondition.Type {
return true
}
}
assert.Assert(t, is.Contains(receivedNode.Annotations, "testAnnotation"))
assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation"))
return false
}))
// Let's have the VK have a new condition.
newCondition := corev1.NodeCondition{
Type: "NewCondition",
Status: "Ok",
Reason: "NA",
Message: "This is a newly added condition. It should only show up *with* / *after* ManuallyAddedCondition. It is set by the VK.",
}
// Everything but node status is ignored here
testP.notifyNodeStatus(&corev1.Node{
// Annotations is left empty
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
baseCondition,
newCondition,
},
},
})
i := 0
assert.NilError(t, <-waitForEvent(ctx, nr, func(e watch.Event) bool {
receivedNode := e.Object.(*corev1.Node)
for _, condition := range receivedNode.Status.Conditions {
if condition.Type == newCondition.Type {
// Wait for 2 updates / patches
if i > 2 {
return true
}
i++
}
}
return false
}))
// Make sure that all three conditions are there.
newNode, err = nodes.Get(ctx, testNodeCopy.Name, emptyGetOptions)
assert.NilError(t, err)
seenConditionTypes := make([]corev1.NodeConditionType, len(newNode.Status.Conditions))
for idx := range newNode.Status.Conditions {
seenConditionTypes[idx] = newNode.Status.Conditions[idx].Type
}
assert.Assert(t, is.Contains(seenConditionTypes, baseCondition.Type))
assert.Assert(t, is.Contains(seenConditionTypes, newCondition.Type))
assert.Assert(t, is.Contains(seenConditionTypes, manuallyAddedCondition.Type))
assert.Assert(t, is.Equal(newNode.Annotations["testAnnotation"], ""))
assert.Assert(t, is.Contains(newNode.Annotations, "manuallyAddedAnnotation"))
t.Log(newNode.Status.Conditions)
}
func testNode(t *testing.T) *corev1.Node { func testNode(t *testing.T) *corev1.Node {
n := &corev1.Node{} n := &corev1.Node{}
n.Name = strings.ToLower(t.Name()) n.Name = strings.ToLower(t.Name())
@@ -631,19 +383,16 @@ func testNode(t *testing.T) *corev1.Node {
type testNodeProvider struct { type testNodeProvider struct {
NodeProvider NodeProvider
statusHandlers []func(*corev1.Node) statusHandlers []func(*corev1.Node)
// Callback to VK
notifyNodeStatus func(*corev1.Node)
} }
func (p *testNodeProvider) NotifyNodeStatus(ctx context.Context, h func(*corev1.Node)) { func (p *testNodeProvider) NotifyNodeStatus(ctx context.Context, h func(*corev1.Node)) {
p.notifyNodeStatus = h p.statusHandlers = append(p.statusHandlers, h)
} }
func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) { func (p *testNodeProvider) triggerStatusUpdate(n *corev1.Node) {
for _, h := range p.statusHandlers { for _, h := range p.statusHandlers {
h(n) h(n)
} }
p.notifyNodeStatus(n)
} }
// testNodeProviderPing tracks the maximum time interval between calls to Ping // testNodeProviderPing tracks the maximum time interval between calls to Ping
@@ -667,13 +416,13 @@ func (tnp *testNodeProviderPing) Ping(ctx context.Context) error {
} }
type watchGetter interface { type watchGetter interface {
Watch(context.Context, metav1.ListOptions) (watch.Interface, error) Watch(metav1.ListOptions) (watch.Interface, error)
} }
func makeWatch(ctx context.Context, t *testing.T, wc watchGetter, name string) watch.Interface { func makeWatch(t *testing.T, wc watchGetter, name string) watch.Interface {
t.Helper() t.Helper()
w, err := wc.Watch(ctx, metav1.ListOptions{FieldSelector: "name=" + name}) w, err := wc.Watch(metav1.ListOptions{FieldSelector: "name=" + name})
assert.NilError(t, err) assert.NilError(t, err)
return w return w
} }

View File

@@ -16,8 +16,6 @@ package node
import ( import (
"context" "context"
"fmt"
"time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors" pkgerrors "github.com/pkg/errors"
@@ -26,23 +24,12 @@ import (
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
const ( const (
podStatusReasonProviderFailed = "ProviderFailed" podStatusReasonProviderFailed = "ProviderFailed"
podEventCreateFailed = "ProviderCreateFailed"
podEventCreateSuccess = "ProviderCreateSuccess"
podEventDeleteFailed = "ProviderDeleteFailed"
podEventDeleteSuccess = "ProviderDeleteSuccess"
podEventUpdateFailed = "ProviderUpdateFailed"
podEventUpdateSuccess = "ProviderUpdateSuccess"
// 151 milliseconds is just chosen as a small prime number to retry between
// attempts to get a notification from the provider to VK
notificationRetryPeriod = 151 * time.Millisecond
) )
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
@@ -85,22 +72,16 @@ func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod)
log.G(ctx).Debugf("Pod %s exists, updating pod in provider", podFromProvider.Name) log.G(ctx).Debugf("Pod %s exists, updating pod in provider", podFromProvider.Name)
if origErr := pc.provider.UpdatePod(ctx, podForProvider); origErr != nil { if origErr := pc.provider.UpdatePod(ctx, podForProvider); origErr != nil {
pc.handleProviderError(ctx, span, origErr, pod) pc.handleProviderError(ctx, span, origErr, pod)
pc.recorder.Event(pod, corev1.EventTypeWarning, podEventUpdateFailed, origErr.Error())
return origErr return origErr
} }
log.G(ctx).Info("Updated pod in provider") log.G(ctx).Info("Updated pod in provider")
pc.recorder.Event(pod, corev1.EventTypeNormal, podEventUpdateSuccess, "Update pod in provider successfully")
} }
} else { } else {
if origErr := pc.provider.CreatePod(ctx, podForProvider); origErr != nil { if origErr := pc.provider.CreatePod(ctx, podForProvider); origErr != nil {
pc.handleProviderError(ctx, span, origErr, pod) pc.handleProviderError(ctx, span, origErr, pod)
pc.recorder.Event(pod, corev1.EventTypeWarning, podEventCreateFailed, origErr.Error())
return origErr return origErr
} }
log.G(ctx).Info("Created pod in provider") log.G(ctx).Info("Created pod in provider")
pc.recorder.Event(pod, corev1.EventTypeNormal, podEventCreateSuccess, "Create pod in provider successfully")
} }
return nil return nil
} }
@@ -126,30 +107,6 @@ func podsEqual(pod1, pod2 *corev1.Pod) bool {
} }
func deleteGraceTimeEqual(old, new *int64) bool {
if old == nil && new == nil {
return true
}
if old != nil && new != nil {
return *old == *new
}
return false
}
// podShouldEnqueue checks if two pods equal according according to podsEqual func and DeleteTimeStamp
func podShouldEnqueue(oldPod, newPod *corev1.Pod) bool {
if !podsEqual(oldPod, newPod) {
return true
}
if !deleteGraceTimeEqual(oldPod.DeletionGracePeriodSeconds, newPod.DeletionGracePeriodSeconds) {
return true
}
if !oldPod.DeletionTimestamp.Equal(newPod.DeletionTimestamp) {
return true
}
return false
}
func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) { func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) {
podPhase := corev1.PodPending podPhase := corev1.PodPending
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
@@ -166,7 +123,7 @@ func (pc *PodController) handleProviderError(ctx context.Context, span trace.Spa
"reason": pod.Status.Reason, "reason": pod.Status.Reason,
}) })
_, err := pc.client.Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}) _, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod)
if err != nil { if err != nil {
logger.WithError(err).Warn("Failed to update pod status") logger.WithError(err).Warn("Failed to update pod status")
} else { } else {
@@ -183,10 +140,9 @@ func (pc *PodController) deletePod(ctx context.Context, pod *corev1.Pod) error {
err := pc.provider.DeletePod(ctx, pod.DeepCopy()) err := pc.provider.DeletePod(ctx, pod.DeepCopy())
if err != nil { if err != nil {
span.SetStatus(err) span.SetStatus(err)
pc.recorder.Event(pod, corev1.EventTypeWarning, podEventDeleteFailed, err.Error())
return err return err
} }
pc.recorder.Event(pod, corev1.EventTypeNormal, podEventDeleteSuccess, "Delete pod in provider successfully")
log.G(ctx).Debug("Deleted pod from provider") log.G(ctx).Debug("Deleted pod from provider")
return nil return nil
@@ -216,14 +172,12 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
kPod.Lock() kPod.Lock()
podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy() podFromProvider := kPod.lastPodStatusReceivedFromProvider.DeepCopy()
kPod.Unlock() kPod.Unlock()
if cmp.Equal(podFromKubernetes.Status, podFromProvider.Status) {
return nil
}
// We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating // We need to do this because the other parts of the pod can be updated elsewhere. Since we're only updating
// the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore // the pod status, and we should be the sole writers of the pod status, we can blind overwrite it. Therefore
// we need to copy the pod and set ResourceVersion to 0. // we need to copy the pod and set ResourceVersion to 0.
podFromProvider.ResourceVersion = "0" podFromProvider.ResourceVersion = "0"
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(ctx, podFromProvider, metav1.UpdateOptions{}); err != nil {
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil {
span.SetStatus(err) span.SetStatus(err)
return pkgerrors.Wrap(err, "error while updating pod status in kubernetes") return pkgerrors.Wrap(err, "error while updating pod status in kubernetes")
} }
@@ -241,72 +195,17 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
// enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd // enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd
// prior to enqueuePodStatusUpdate. // prior to enqueuePodStatusUpdate.
func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*maxRetries) if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
defer cancel() log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key")
ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate")
defer span.End()
ctx = span.WithField(ctx, "method", "enqueuePodStatusUpdate")
// TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications
// from the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation.
key, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
log.G(ctx).WithError(err).Error("Error getting pod meta namespace key")
span.SetStatus(err)
return
}
ctx = span.WithField(ctx, "key", key)
var obj interface{}
err = wait.PollImmediateUntil(notificationRetryPeriod, func() (bool, error) {
var ok bool
obj, ok = pc.knownPods.Load(key)
if ok {
return true, nil
}
// Blind sync. Partial sync is better than nothing. If this returns false, the poll loop should not be invoked
// again as it means the context has timed out.
if !cache.WaitForNamedCacheSync("enqueuePodStatusUpdate", ctx.Done(), pc.podsInformer.Informer().HasSynced) {
log.G(ctx).Warn("enqueuePodStatusUpdate proceeding with unsynced cache")
}
// The only transient error that pod lister returns is not found. The only case where not found
// should happen, and the pod *actually* exists is the above -- where we haven't been able to finish sync
// before context times out.
// The other class of errors is non-transient
_, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name)
if err != nil {
return false, err
}
// err is nil, and therefore the pod exists in k8s, but does not exist in our known pods map. This likely means
// that we're in some kind of startup synchronization issue where the provider knows about a pod (as it performs
// recover, that we do not yet know about).
return false, nil
}, ctx.Done())
if err != nil {
if errors.IsNotFound(err) {
err = fmt.Errorf("Pod %q not found in pod lister: %w", key, err)
log.G(ctx).WithError(err).Debug("Not enqueuing pod status update")
} else { } else {
log.G(ctx).WithError(err).Warn("Not enqueuing pod status update due to error from pod lister") if obj, ok := pc.knownPods.Load(key); ok {
}
span.SetStatus(err)
return
}
kpod := obj.(*knownPod) kpod := obj.(*knownPod)
kpod.Lock() kpod.Lock()
if cmp.Equal(kpod.lastPodStatusReceivedFromProvider, pod) {
kpod.Unlock()
return
}
kpod.lastPodStatusReceivedFromProvider = pod kpod.lastPodStatusReceivedFromProvider = pod
kpod.Unlock() kpod.Unlock()
q.AddRateLimited(key) q.AddRateLimited(key)
}
}
} }
func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) { func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) {
@@ -380,7 +279,7 @@ func (pc *PodController) deletePodHandler(ctx context.Context, key string) (retE
// We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update // We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update
// was in progress, // was in progress,
err = pc.client.Pods(namespace).Delete(ctx, name, *metav1.NewDeleteOptions(0)) err = pc.client.Pods(namespace).Delete(name, metav1.NewDeleteOptions(0))
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
return nil return nil
} }

View File

@@ -23,7 +23,6 @@ import (
"gotest.tools/assert" "gotest.tools/assert"
is "gotest.tools/assert/cmp" is "gotest.tools/assert/cmp"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@@ -49,11 +48,9 @@ func newTestController() *TestController {
recorder: testutil.FakeEventRecorder(5), recorder: testutil.FakeEventRecorder(5),
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
podStatusQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
done: make(chan struct{}), done: make(chan struct{}),
ready: make(chan struct{}), ready: make(chan struct{}),
podsInformer: iFactory.Core().V1().Pods(), podsInformer: iFactory.Core().V1().Pods(),
podsLister: iFactory.Core().V1().Pods().Lister(),
}, },
mock: p, mock: p,
client: fk8s, client: fk8s,
@@ -111,76 +108,6 @@ func TestPodsDifferentIgnoreValue(t *testing.T) {
assert.Assert(t, podsEqual(p1, p2)) assert.Assert(t, podsEqual(p1, p2))
} }
func TestPodShouldEnqueueDifferentDeleteTimeStamp(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
now := v1.NewTime(time.Now())
p2.DeletionTimestamp = &now
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueDifferentLabel(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
p2.Labels = map[string]string{"test": "test"}
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueDifferentAnnotation(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
p2.Annotations = map[string]string{"test": "test"}
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldNotEnqueueDifferentStatus(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
p2.Status.Phase = corev1.PodSucceeded
assert.Assert(t, !podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueDifferentDeleteGraceTime(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
oldTime := v1.NewTime(time.Now().Add(5))
newTime := v1.NewTime(time.Now().Add(10))
oldGraceTime := int64(5)
newGraceTime := int64(10)
p1.DeletionGracePeriodSeconds = &oldGraceTime
p2.DeletionTimestamp = &oldTime
p2.DeletionGracePeriodSeconds = &newGraceTime
p2.DeletionTimestamp = &newTime
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodShouldEnqueueGraceTimeChanged(t *testing.T) {
p1 := &corev1.Pod{
Spec: newPodSpec(),
}
p2 := p1.DeepCopy()
graceTime := int64(30)
p2.DeletionGracePeriodSeconds = &graceTime
assert.Assert(t, podShouldEnqueue(p1, p2))
}
func TestPodCreateNewPod(t *testing.T) { func TestPodCreateNewPod(t *testing.T) {
svr := newTestController() svr := newTestController()

View File

@@ -83,8 +83,7 @@ type PodNotifier interface {
// fashion. The provided pod's PodStatus should be up to date when // fashion. The provided pod's PodStatus should be up to date when
// this function is called. // this function is called.
// //
// NotifyPods must not block the caller since it is only used to register the callback. // NotifyPods will not block callers.
// The callback passed into `NotifyPods` may block when called.
NotifyPods(context.Context, func(*corev1.Pod)) NotifyPods(context.Context, func(*corev1.Pod))
} }
@@ -109,8 +108,6 @@ type PodController struct {
// deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period // deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period
deletionQ workqueue.RateLimitingInterface deletionQ workqueue.RateLimitingInterface
podStatusQ workqueue.RateLimitingInterface
// From the time of creation, to termination the knownPods map will contain the pods key // From the time of creation, to termination the knownPods map will contain the pods key
// (derived from Kubernetes' cache library) -> a *knownPod struct. // (derived from Kubernetes' cache library) -> a *knownPod struct.
knownPods sync.Map knownPods sync.Map
@@ -161,9 +158,6 @@ type PodControllerConfig struct {
ConfigMapInformer corev1informers.ConfigMapInformer ConfigMapInformer corev1informers.ConfigMapInformer
SecretInformer corev1informers.SecretInformer SecretInformer corev1informers.SecretInformer
ServiceInformer corev1informers.ServiceInformer ServiceInformer corev1informers.ServiceInformer
// RateLimiter defines the rate limit of work queue
RateLimiter workqueue.RateLimiter
} }
// NewPodController creates a new pod controller with the provided config. // NewPodController creates a new pod controller with the provided config.
@@ -189,9 +183,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.Provider == nil { if cfg.Provider == nil {
return nil, errdefs.InvalidInput("missing provider") return nil, errdefs.InvalidInput("missing provider")
} }
if cfg.RateLimiter == nil {
cfg.RateLimiter = workqueue.DefaultControllerRateLimiter()
}
rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister()) rm, err := manager.NewResourceManager(cfg.PodInformer.Lister(), cfg.SecretInformer.Lister(), cfg.ConfigMapInformer.Lister(), cfg.ServiceInformer.Lister())
if err != nil { if err != nil {
return nil, pkgerrors.Wrap(err, "could not create resource manager") return nil, pkgerrors.Wrap(err, "could not create resource manager")
@@ -206,9 +198,8 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
ready: make(chan struct{}), ready: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
recorder: cfg.EventRecorder, recorder: cfg.EventRecorder,
k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"), k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"), deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"),
podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"),
} }
return pc, nil return pc, nil
@@ -251,12 +242,13 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
} }
pc.provider = provider pc.provider = provider
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
provider.NotifyPods(ctx, func(pod *corev1.Pod) { provider.NotifyPods(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, pc.podStatusQ, pod.DeepCopy()) pc.enqueuePodStatusUpdate(ctx, podStatusQueue, pod.DeepCopy())
}) })
go runProvider(ctx) go runProvider(ctx)
defer pc.podStatusQ.ShutDown() defer podStatusQueue.ShutDown()
// Wait for the caches to be synced *before* starting to do work. // Wait for the caches to be synced *before* starting to do work.
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
@@ -278,17 +270,14 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
// Create a copy of the old and new pod objects so we don't mutate the cache. // Create a copy of the old and new pod objects so we don't mutate the cache.
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod) newPod := newObj.(*corev1.Pod)
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod. // At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.G(ctx).Error(err) log.G(ctx).Error(err)
} else { } else {
if podShouldEnqueue(oldPod, newPod) {
pc.k8sQ.AddRateLimited(key) pc.k8sQ.AddRateLimited(key)
} }
}
}, },
DeleteFunc: func(pod interface{}) { DeleteFunc: func(pod interface{}) {
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
@@ -316,7 +305,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
workerID := strconv.Itoa(id) workerID := strconv.Itoa(id)
go func() { go func() {
defer wg.Done() defer wg.Done()
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, pc.podStatusQ) pc.runSyncPodStatusFromProviderWorker(ctx, workerID, podStatusQueue)
}() }()
} }
@@ -344,7 +333,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
<-ctx.Done() <-ctx.Done()
log.G(ctx).Info("shutting down workers") log.G(ctx).Info("shutting down workers")
pc.k8sQ.ShutDown() pc.k8sQ.ShutDown()
pc.podStatusQ.ShutDown() podStatusQueue.ShutDown()
pc.deletionQ.ShutDown() pc.deletionQ.ShutDown()
wg.Wait() wg.Wait()

View File

@@ -1,7 +1,6 @@
package e2e package e2e
import ( import (
"context"
"fmt" "fmt"
"testing" "testing"
"time" "time"
@@ -19,64 +18,17 @@ const (
deleteGracePeriodForProvider = 1 * time.Second deleteGracePeriodForProvider = 1 * time.Second
) )
// TestGetPods tests that the /pods endpoint works, and only returns pods for our kubelet
func (ts *EndToEndTestSuite) TestGetPods(t *testing.T) {
ctx := context.Background()
// Create a pod with prefix "nginx-" having a single container.
podSpec := f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo")
podSpec.Spec.NodeName = f.NodeName
nginx, err := f.CreatePod(ctx, podSpec)
if err != nil {
t.Fatal(err)
}
// Delete the pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, nginx.Namespace, nginx.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
t.Logf("Created pod: %s", nginx.Name)
// Wait for the "nginx-" pod to be reported as running and ready.
if _, err := f.WaitUntilPodReady(nginx.Namespace, nginx.Name); err != nil {
t.Fatal(err)
}
t.Logf("Pod %s ready", nginx.Name)
k8sPods, err := f.GetRunningPodsFromKubernetes(ctx)
if err != nil {
t.Fatal(err)
}
podFound := false
for _, pod := range k8sPods.Items {
if pod.Spec.NodeName != f.NodeName {
t.Fatalf("Found pod with node name %s, whereas expected %s", pod.Spec.NodeName, f.NodeName)
}
if pod.UID == nginx.UID {
podFound = true
}
}
if !podFound {
t.Fatal("Nginx pod not found")
}
}
// TestGetStatsSummary creates a pod having two containers and queries the /stats/summary endpoint of the virtual-kubelet. // TestGetStatsSummary creates a pod having two containers and queries the /stats/summary endpoint of the virtual-kubelet.
// It expects this endpoint to return stats for the current node, as well as for the aforementioned pod and each of its two containers. // It expects this endpoint to return stats for the current node, as well as for the aforementioned pod and each of its two containers.
func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) { func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) {
ctx := context.Background()
// Create a pod with prefix "nginx-" having three containers. // Create a pod with prefix "nginx-" having three containers.
pod, err := f.CreatePod(ctx, f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo", "bar", "baz")) pod, err := f.CreatePod(f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo", "bar", "baz"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Delete the "nginx-0-X" pod after the test finishes. // Delete the "nginx-0-X" pod after the test finishes.
defer func() { defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) { if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err) t.Error(err)
} }
}() }()
@@ -87,7 +39,7 @@ func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) {
} }
// Grab the stats from the provider. // Grab the stats from the provider.
stats, err := f.GetStatsSummary(ctx) stats, err := f.GetStatsSummary()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -116,19 +68,17 @@ func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) {
// These verifications are made using the /stats/summary endpoint of the virtual-kubelet, by checking for the presence or absence of the pods. // These verifications are made using the /stats/summary endpoint of the virtual-kubelet, by checking for the presence or absence of the pods.
// Hence, the provider being tested must implement the PodMetricsProvider interface. // Hence, the provider being tested must implement the PodMetricsProvider interface.
func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) { func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
ctx := context.Background()
// Create a pod with prefix "nginx-" having a single container. // Create a pod with prefix "nginx-" having a single container.
podSpec := f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo") podSpec := f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo")
podSpec.Spec.NodeName = f.NodeName podSpec.Spec.NodeName = f.NodeName
pod, err := f.CreatePod(ctx, podSpec) pod, err := f.CreatePod(podSpec)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Delete the pod after the test finishes. // Delete the pod after the test finishes.
defer func() { defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) { if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err) t.Error(err)
} }
}() }()
@@ -141,7 +91,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
t.Logf("Pod %s ready", pod.Name) t.Logf("Pod %s ready", pod.Name)
// Grab the pods from the provider. // Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx) pods, err := f.GetRunningPods()
assert.NilError(t, err) assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats. // Check if the pod exists in the slice of PodStats.
@@ -162,7 +112,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
}() }()
// Gracefully delete the "nginx-" pod. // Gracefully delete the "nginx-" pod.
if err := f.DeletePod(ctx, pod.Namespace, pod.Name); err != nil { if err := f.DeletePod(pod.Namespace, pod.Name); err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Logf("Deleted pod: %s", pod.Name) t.Logf("Deleted pod: %s", pod.Name)
@@ -175,7 +125,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
time.Sleep(deleteGracePeriodForProvider) time.Sleep(deleteGracePeriodForProvider)
// Give the provider some time to react to the MODIFIED/DELETED events before proceeding. // Give the provider some time to react to the MODIFIED/DELETED events before proceeding.
// Grab the pods from the provider. // Grab the pods from the provider.
pods, err = f.GetRunningPodsFromProvider(ctx) pods, err = f.GetRunningPods()
assert.NilError(t, err) assert.NilError(t, err)
// Make sure the pod DOES NOT exist in the provider's set of running pods // Make sure the pod DOES NOT exist in the provider's set of running pods
@@ -191,17 +141,15 @@ func (ts *EndToEndTestSuite) TestPodLifecycleGracefulDelete(t *testing.T) {
// and put them in the running lifecycle. It then does a force delete on the pod, and verifies the provider // and put them in the running lifecycle. It then does a force delete on the pod, and verifies the provider
// has deleted it. // has deleted it.
func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) { func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
ctx := context.Background()
podSpec := f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo") podSpec := f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo")
// Create a pod with prefix having a single container. // Create a pod with prefix having a single container.
pod, err := f.CreatePod(ctx, podSpec) pod, err := f.CreatePod(podSpec)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Delete the pod after the test finishes. // Delete the pod after the test finishes.
defer func() { defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) { if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err) t.Error(err)
} }
}() }()
@@ -214,7 +162,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
t.Logf("Pod %s ready", pod.Name) t.Logf("Pod %s ready", pod.Name)
// Grab the pods from the provider. // Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx) pods, err := f.GetRunningPods()
assert.NilError(t, err) assert.NilError(t, err)
// Check if the pod exists in the slice of Pods. // Check if the pod exists in the slice of Pods.
@@ -240,7 +188,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
time.Sleep(deleteGracePeriodForProvider) time.Sleep(deleteGracePeriodForProvider)
// Forcibly delete the pod. // Forcibly delete the pod.
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil { if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil {
t.Logf("Last saw pod in state: %+v", podLast) t.Logf("Last saw pod in state: %+v", podLast)
t.Fatal(err) t.Fatal(err)
} }
@@ -255,7 +203,7 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
time.Sleep(deleteGracePeriodForProvider) time.Sleep(deleteGracePeriodForProvider)
// Grab the pods from the provider. // Grab the pods from the provider.
pods, err = f.GetRunningPodsFromProvider(ctx) pods, err = f.GetRunningPods()
assert.NilError(t, err) assert.NilError(t, err)
// Make sure the "nginx-" pod DOES NOT exist in the slice of Pods anymore. // Make sure the "nginx-" pod DOES NOT exist in the slice of Pods anymore.
@@ -268,17 +216,15 @@ func (ts *EndToEndTestSuite) TestPodLifecycleForceDelete(t *testing.T) {
// TestCreatePodWithOptionalInexistentSecrets tries to create a pod referencing optional, inexistent secrets. // TestCreatePodWithOptionalInexistentSecrets tries to create a pod referencing optional, inexistent secrets.
// It then verifies that the pod is created successfully. // It then verifies that the pod is created successfully.
func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentSecrets(t *testing.T) { func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentSecrets(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing optional, inexistent secrets. // Create a pod with a single container referencing optional, inexistent secrets.
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithOptionalSecretKey(t.Name())) pod, err := f.CreatePod(f.CreatePodObjectWithOptionalSecretKey(t.Name()))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Delete the pod after the test finishes. // Delete the pod after the test finishes.
defer func() { defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) { if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err) t.Error(err)
} }
}() }()
@@ -294,7 +240,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentSecrets(t *testi
} }
// Grab the pods from the provider. // Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx) pods, err := f.GetRunningPods()
assert.NilError(t, err) assert.NilError(t, err)
// Check if the pod exists in the slice of Pods. // Check if the pod exists in the slice of Pods.
@@ -304,17 +250,15 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentSecrets(t *testi
// TestCreatePodWithMandatoryInexistentSecrets tries to create a pod referencing inexistent secrets. // TestCreatePodWithMandatoryInexistentSecrets tries to create a pod referencing inexistent secrets.
// It then verifies that the pod is not created. // It then verifies that the pod is not created.
func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentSecrets(t *testing.T) { func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentSecrets(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing inexistent secrets. // Create a pod with a single container referencing inexistent secrets.
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithMandatorySecretKey(t.Name())) pod, err := f.CreatePod(f.CreatePodObjectWithMandatorySecretKey(t.Name()))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Delete the pod after the test finishes. // Delete the pod after the test finishes.
defer func() { defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) { if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err) t.Error(err)
} }
}() }()
@@ -325,7 +269,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentSecrets(t *test
} }
// Grab the pods from the provider. // Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx) pods, err := f.GetRunningPods()
assert.NilError(t, err) assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats. // Check if the pod exists in the slice of PodStats.
@@ -335,17 +279,15 @@ func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentSecrets(t *test
// TestCreatePodWithOptionalInexistentConfigMap tries to create a pod referencing optional, inexistent config map. // TestCreatePodWithOptionalInexistentConfigMap tries to create a pod referencing optional, inexistent config map.
// It then verifies that the pod is created successfully. // It then verifies that the pod is created successfully.
func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentConfigMap(t *testing.T) { func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentConfigMap(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing optional, inexistent config map. // Create a pod with a single container referencing optional, inexistent config map.
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithOptionalConfigMapKey(t.Name())) pod, err := f.CreatePod(f.CreatePodObjectWithOptionalConfigMapKey(t.Name()))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Delete the pod after the test finishes. // Delete the pod after the test finishes.
defer func() { defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) { if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err) t.Error(err)
} }
}() }()
@@ -361,7 +303,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentConfigMap(t *tes
} }
// Grab the pods from the provider. // Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx) pods, err := f.GetRunningPods()
assert.NilError(t, err) assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats. // Check if the pod exists in the slice of PodStats.
@@ -371,17 +313,15 @@ func (ts *EndToEndTestSuite) TestCreatePodWithOptionalInexistentConfigMap(t *tes
// TestCreatePodWithMandatoryInexistentConfigMap tries to create a pod referencing inexistent secrets. // TestCreatePodWithMandatoryInexistentConfigMap tries to create a pod referencing inexistent secrets.
// It then verifies that the pod is not created. // It then verifies that the pod is not created.
func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentConfigMap(t *testing.T) { func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentConfigMap(t *testing.T) {
ctx := context.Background()
// Create a pod with a single container referencing inexistent config map. // Create a pod with a single container referencing inexistent config map.
pod, err := f.CreatePod(ctx, f.CreatePodObjectWithMandatoryConfigMapKey(t.Name())) pod, err := f.CreatePod(f.CreatePodObjectWithMandatoryConfigMapKey(t.Name()))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Delete the pod after the test finishes. // Delete the pod after the test finishes.
defer func() { defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) { if err := f.DeletePodImmediately(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err) t.Error(err)
} }
}() }()
@@ -392,7 +332,7 @@ func (ts *EndToEndTestSuite) TestCreatePodWithMandatoryInexistentConfigMap(t *te
} }
// Grab the pods from the provider. // Grab the pods from the provider.
pods, err := f.GetRunningPodsFromProvider(ctx) pods, err := f.GetRunningPods()
assert.NilError(t, err) assert.NilError(t, err)
// Check if the pod exists in the slice of PodStats. // Check if the pod exists in the slice of PodStats.

View File

@@ -19,7 +19,7 @@ func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
podList, err := f.KubeClient.CoreV1().Pods(f.Namespace).List(ctx, metav1.ListOptions{ podList, err := f.KubeClient.CoreV1().Pods(f.Namespace).List(metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", f.NodeName).String(), FieldSelector: fields.OneTermEqualSelector("spec.nodeName", f.NodeName).String(),
}) })
@@ -28,7 +28,7 @@ func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) {
chErr := make(chan error, 1) chErr := make(chan error, 1)
originalNode, err := f.GetNode(ctx) originalNode, err := f.GetNode()
assert.NilError(t, err) assert.NilError(t, err)
ctx, cancel = context.WithTimeout(ctx, time.Minute) ctx, cancel = context.WithTimeout(ctx, time.Minute)
@@ -50,7 +50,7 @@ func (ts *EndToEndTestSuite) TestNodeCreateAfterDelete(t *testing.T) {
chErr <- f.WaitUntilNodeCondition(wait) chErr <- f.WaitUntilNodeCondition(wait)
}() }()
assert.NilError(t, f.DeleteNode(ctx)) assert.NilError(t, f.DeleteNode())
select { select {
case result := <-chErr: case result := <-chErr:

View File

@@ -10,7 +10,7 @@ You can install Virtual Kubelet by building it [from source](#source). First, ma
mkdir -p ${GOPATH}/src/github.com/virtual-kubelet mkdir -p ${GOPATH}/src/github.com/virtual-kubelet
cd ${GOPATH}/src/github.com/virtual-kubelet cd ${GOPATH}/src/github.com/virtual-kubelet
git clone https://github.com/virtual-kubelet/virtual-kubelet git clone https://github.com/virtual-kubelet/virtual-kubelet
cd virtual-kubelet && make build make build
``` ```
This method adds a `virtual-kubelet` executable to the `bin` folder. To run it: This method adds a `virtual-kubelet` executable to the `bin` folder. To run it:

View File

@@ -1,7 +1,3 @@
- name: Admiralty Multi-Cluster Scheduler
tag: multicluster-scheduler
org: admiraltyio
vanityImportPath: admiralty.io/multicluster-scheduler
- name: Alibaba Cloud Elastic Container Instance (**ECI**) - name: Alibaba Cloud Elastic Container Instance (**ECI**)
tag: alibabacloud-eci tag: alibabacloud-eci
- name: AWS Fargate - name: AWS Fargate
@@ -10,9 +6,6 @@
tag: azure-batch tag: azure-batch
- name: Azure Container Instances (**ACI**) - name: Azure Container Instances (**ACI**)
tag: azure-aci tag: azure-aci
- name: Elotl Kip
tag: kip
org: elotl
- name: Kubernetes Container Runtime Interface (**CRI**) - name: Kubernetes Container Runtime Interface (**CRI**)
tag: cri tag: cri
- name: Huawei Cloud Container Instance (**CCI**) - name: Huawei Cloud Container Instance (**CCI**)
@@ -21,5 +14,3 @@
tag: nomad tag: nomad
- name: OpenStack Zun - name: OpenStack Zun
tag: openstack-zun tag: openstack-zun
- name: Tencent Games Tensile Kube
tag: tensile-kube

View File

@@ -25,7 +25,7 @@
<ul> <ul>
{{ range $providers }} {{ range $providers }}
{{ $url := printf "https://github.com/%s/%s/blob/master/README.md#readme" (.org | default "virtual-kubelet") .tag }} {{ $url := printf "https://github.com/virtual-kubelet/%s/blob/master/README.md#readme" .tag }}
<li class="has-bottom-spacing"> <li class="has-bottom-spacing">
<a class="is-size-5 is-size-6-mobile has-text-grey-lighter has-text-weight-light" href="{{ $url }}" target="_blank"> <a class="is-size-5 is-size-6-mobile has-text-grey-lighter has-text-weight-light" href="{{ $url }}" target="_blank">
{{ .name | markdownify }} {{ .name | markdownify }}

View File

@@ -3,9 +3,8 @@
<tbody> <tbody>
{{ range $providers }} {{ range $providers }}
{{ $name := .name | markdownify }} {{ $name := .name | markdownify }}
{{ $githubPath := printf "github.com/%s/%s" (.org | default "virtual-kubelet") .tag }} {{ $pkgName := printf "github.com/virtual-kubelet/%s" .tag }}
{{ $pkgName := .vanityImportPath | default $githubPath }} {{ $pkgUrl := printf "https://github.com/virtual-kubelet/%s/blob/master/README.md#readme" .tag }}
{{ $pkgUrl := printf "https://%s/blob/master/README.md#readme" $githubPath }}
{{ $godocUrl := printf "https://godoc.org/%s" $pkgName }} {{ $godocUrl := printf "https://godoc.org/%s" $pkgName }}
<tr> <tr>
<td> <td>