tests: introduce e2e suite (#422)

* mock: implement GetStatsSummary

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* make: use skaffold to deploy vk

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* test: add an e2e test suite

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* test: add vendored code

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* docs: update README.md

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* ci: run e2e on circleci

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* make: improve the skaffold target

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* e2e: fix defer pod deletion

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* e2e: improve instructions

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* makefile: default shell is bash

Signed-off-by: Paulo Pires <pjpires@gmail.com>
This commit is contained in:
Paulo Pires
2018-11-28 17:01:36 +00:00
committed by Brian Goff
parent 688c10fa8b
commit 579823e6a5
22 changed files with 1487 additions and 21 deletions

View File

@@ -36,9 +36,66 @@ jobs:
name: Tests name: Tests
command: V=1 CI=1 SKIP_AWS_E2E=1 make test command: V=1 CI=1 SKIP_AWS_E2E=1 make test
e2e:
machine:
image: circleci/classic:201808-01
working_directory: /home/circleci/go/src/github.com/virtual-kubelet/virtual-kubelet
environment:
CHANGE_MINIKUBE_NONE_USER: true
GOPATH: /home/circleci/go
KUBECONFIG: /home/circleci/.kube/config
KUBERNETES_VERSION: v1.12.3
MINIKUBE_HOME: /home/circleci
MINIKUBE_VERSION: v0.30.0
MINIKUBE_WANTUPDATENOTIFICATION: false
MINIKUBE_WANTREPORTERRORPROMPT: false
SKAFFOLD_VERSION: v0.18.0
steps:
- checkout
- run:
name: Install kubectl
command: |
curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl
chmod +x kubectl
sudo mv kubectl /usr/local/bin/
mkdir -p ${HOME}/.kube
touch ${HOME}/.kube/config
- run:
name: Install Skaffold
command: |
curl -Lo skaffold https://github.com/GoogleContainerTools/skaffold/releases/download/${SKAFFOLD_VERSION}/skaffold-linux-amd64
chmod +x skaffold
sudo mv skaffold /usr/local/bin/
- run:
name: Install Minikube
command: |
curl -Lo minikube https://github.com/kubernetes/minikube/releases/download/${MINIKUBE_VERSION}/minikube-linux-amd64
chmod +x minikube
sudo mv minikube /usr/local/bin/
- run:
name: Start Minikube
command: |
sudo -E minikube start --vm-driver=none --cpus 2 --memory 2048 --kubernetes-version=${KUBERNETES_VERSION}
- run:
name: Wait for Minikube
command: |
JSONPATH='{range .items[*]}{@.metadata.name}:{range @.status.conditions[*]}{@.type}={@.status};{end}{end}';
until kubectl get nodes -o jsonpath="$JSONPATH" 2>&1 | grep -q "Ready=True"; do
sleep 1;
done
- run:
name: Deploy virtual-kubelet
command: |
make skaffold MODE=run
- run:
name: Run the end-to-end test suite
command: |
make e2e
workflows: workflows:
version: 2 version: 2
validate_and_test: validate_and_test:
jobs: jobs:
- validate - validate
- test - test
- e2e

3
.gitignore vendored
View File

@@ -7,6 +7,8 @@ bin/
# Certificates # Certificates
*.pem *.pem
!hack/skaffold/virtual-kubelet/vkubelet-mock-0-crt.pem
!hack/skaffold/virtual-kubelet/vkubelet-mock-0-key.pem
# Test binary, build with `go test -c` # Test binary, build with `go test -c`
*.test *.test
@@ -21,7 +23,6 @@ bin/
/dist /dist
/build /build
/cover /cover
/test
# Test credentials file # Test credentials file
credentials.json credentials.json

13
Gopkg.lock generated
View File

@@ -1005,7 +1005,7 @@
[[projects]] [[projects]]
branch = "feature/wolfpack" branch = "feature/wolfpack"
digest = "1:68a024f57bd12460c4a50d28f21066401fceaf741088252fea358815b8afc855" digest = "1:79b933ce82b0a5da2678624d52d714deff91854904e8649889dc83be36907dc5"
name = "github.com/vmware/vic" name = "github.com/vmware/vic"
packages = [ packages = [
"lib/apiservers/engine/backends/cache", "lib/apiservers/engine/backends/cache",
@@ -1051,7 +1051,7 @@
"pkg/vsphere/sys", "pkg/vsphere/sys",
] ]
pruneopts = "NUT" pruneopts = "NUT"
revision = "0fd01769dd5f4077763f3426a830709a9e986f1e" revision = "c7d40ac878b09c577c307bc5331c7dd39619ed7c"
[[projects]] [[projects]]
branch = "master" branch = "master"
@@ -1406,7 +1406,7 @@
revision = "25e79651c7e569b0ebcd97affc0fe7f1ffcbee94" revision = "25e79651c7e569b0ebcd97affc0fe7f1ffcbee94"
[[projects]] [[projects]]
digest = "1:57983b132a06bc5956ed950993abf7071610c00a738d8270a2f3d89d19daa019" digest = "1:f3060ab1e0f3fea59d642658b0f5fbf353e22bbe96e94175c02075ba831f6f3c"
name = "k8s.io/client-go" name = "k8s.io/client-go"
packages = [ packages = [
"discovery", "discovery",
@@ -1494,6 +1494,7 @@
"tools/pager", "tools/pager",
"tools/reference", "tools/reference",
"tools/remotecommand", "tools/remotecommand",
"tools/watch",
"transport", "transport",
"transport/spdy", "transport/spdy",
"util/buffer", "util/buffer",
@@ -1518,9 +1519,10 @@
revision = "9dfdf9be683f61f82cda12362c44c784e0778b56" revision = "9dfdf9be683f61f82cda12362c44c784e0778b56"
[[projects]] [[projects]]
digest = "1:8802ddb9cfe9b59a4c5388d5ab5f81e1cd59af1a7a7fb90162bdba838fa37ce2" digest = "1:8e43e15194de65c9e9223df5488e3f400b76d33b68085a2e70b2c6ab84132d2c"
name = "k8s.io/kubernetes" name = "k8s.io/kubernetes"
packages = [ packages = [
"pkg/api/v1/pod",
"pkg/apis/core", "pkg/apis/core",
"pkg/kubelet/apis/cri/runtime/v1alpha2", "pkg/kubelet/apis/cri/runtime/v1alpha2",
"pkg/kubelet/apis/stats/v1alpha1", "pkg/kubelet/apis/stats/v1alpha1",
@@ -1634,6 +1636,7 @@
"k8s.io/apimachinery/pkg/runtime", "k8s.io/apimachinery/pkg/runtime",
"k8s.io/apimachinery/pkg/types", "k8s.io/apimachinery/pkg/types",
"k8s.io/apimachinery/pkg/util/intstr", "k8s.io/apimachinery/pkg/util/intstr",
"k8s.io/apimachinery/pkg/util/net",
"k8s.io/apimachinery/pkg/util/uuid", "k8s.io/apimachinery/pkg/util/uuid",
"k8s.io/apimachinery/pkg/watch", "k8s.io/apimachinery/pkg/watch",
"k8s.io/client-go/kubernetes", "k8s.io/client-go/kubernetes",
@@ -1643,6 +1646,8 @@
"k8s.io/client-go/tools/clientcmd", "k8s.io/client-go/tools/clientcmd",
"k8s.io/client-go/tools/clientcmd/api/v1", "k8s.io/client-go/tools/clientcmd/api/v1",
"k8s.io/client-go/tools/remotecommand", "k8s.io/client-go/tools/remotecommand",
"k8s.io/client-go/tools/watch",
"k8s.io/kubernetes/pkg/api/v1/pod",
"k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2", "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2",
"k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1", "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1",
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand", "k8s.io/kubernetes/pkg/kubelet/server/remotecommand",

View File

@@ -1,3 +1,4 @@
SHELL := /bin/bash
IMPORT_PATH := github.com/virtual-kubelet/virtual-kubelet IMPORT_PATH := github.com/virtual-kubelet/virtual-kubelet
DOCKER_IMAGE := virtual-kubelet DOCKER_IMAGE := virtual-kubelet
exec := $(DOCKER_IMAGE) exec := $(DOCKER_IMAGE)
@@ -108,6 +109,44 @@ format: $(GOPATH)/bin/goimports
$Q find . -iname \*.go | grep -v \ $Q find . -iname \*.go | grep -v \
-e "^$$" $(addprefix -e ,$(IGNORED_PACKAGES)) | xargs goimports -w -e "^$$" $(addprefix -e ,$(IGNORED_PACKAGES)) | xargs goimports -w
# skaffold deploys the virtual-kubelet to the Kubernetes cluster targeted by the current kubeconfig using skaffold.
# The current context (as indicated by "kubectl config current-context") must be one of "minikube" or "docker-for-desktop".
# MODE must be set to one of "dev" (default), "delete" or "run", and is used as the skaffold command to be run.
.PHONY: skaffold
skaffold: MODE ?= dev
skaffold: PROFILE := local
skaffold: VK_BUILD_TAGS ?= no_alicloud_provider no_aws_provider no_azure_provider no_azurebatch_provider no_cri_provider no_huawei_provider no_hyper_provider no_vic_provider no_web_provider
skaffold:
@if [[ ! "minikube,docker-for-desktop" =~ .*"$(kubectl_context)".* ]]; then \
echo current-context is [$(kubectl_context)]. Must be one of [minikube,docker-for-desktop]; false; \
fi
@if [[ ! "$(MODE)" == "delete" ]]; then \
GOOS=linux GOARCH=amd64 VK_BUILD_TAGS="$(VK_BUILD_TAGS)" $(MAKE) build; \
fi
@skaffold $(MODE) \
-f $(PWD)/hack/skaffold/virtual-kubelet/skaffold.yml \
-p $(PROFILE)
# e2e runs the end-to-end test suite against the Kubernetes cluster targeted by the current kubeconfig.
# It is assumed that the virtual-kubelet node to be tested is running as a pod called NODE_NAME inside this Kubernetes cluster.
# It is also assumed that this virtual-kubelet node has been started with the "--node-name" flag set to NODE_NAME.
# Finally, running the e2e suite is not guaranteed to succeed against a provider other than "mock".
.PHONY: e2e
e2e: KUBECONFIG ?= $(HOME)/.kube/config
e2e: NAMESPACE ?= default
e2e: NODE_NAME ?= vkubelet-mock-0
e2e: TAINT_KEY ?= virtual-kubelet.io/provider
e2e: TAINT_VALUE ?= mock
e2e: TAINT_EFFECT ?= NoSchedule
e2e:
@cd $(PWD)/test/e2e && go test -v -tags e2e ./... \
-kubeconfig=$(KUBECONFIG) \
-namespace=$(NAMESPACE) \
-node-name=$(NODE_NAME) \
-taint-key=$(TAINT_KEY) \
-taint-value=$(TAINT_VALUE) \
-taint-effect=$(TAINT_EFFECT)
##### =====> Internals <===== ##### ##### =====> Internals <===== #####
.PHONY: setup .PHONY: setup
@@ -118,12 +157,6 @@ setup: clean
fi fi
if ! grep "/cover" .gitignore > /dev/null 2>&1; then \ if ! grep "/cover" .gitignore > /dev/null 2>&1; then \
echo "/cover" >> .gitignore; \ echo "/cover" >> .gitignore; \
fi
if ! grep "/bin" .gitignore > /dev/null 2>&1; then \
echo "/bin" >> .gitignore; \
fi
if ! grep "/test" .gitignore > /dev/null 2>&1; then \
echo "/test" >> .gitignore; \
fi fi
mkdir -p cover mkdir -p cover
mkdir -p bin mkdir -p bin

View File

@@ -27,6 +27,8 @@ The best description is "Kubernetes API on top, programmable back."
+ [Service Fabric Mesh Provider](#service-fabric-mesh-provider) + [Service Fabric Mesh Provider](#service-fabric-mesh-provider)
+ [Adding a New Provider via the Provider Interface](#adding-a-new-provider-via-the-provider-interface) + [Adding a New Provider via the Provider Interface](#adding-a-new-provider-via-the-provider-interface)
* [Testing](#testing) * [Testing](#testing)
+ [Unit tests](#unit-tests)
+ [End-to-end tests](#end-to-end-tests)
+ [Testing the Azure Provider Client](#testing-the-azure-provider-client) + [Testing the Azure Provider Client](#testing-the-azure-provider-client)
* [Known quirks and workarounds](#known-quirks-and-workarounds) * [Known quirks and workarounds](#known-quirks-and-workarounds)
* [Contributing](#contributing) * [Contributing](#contributing)
@@ -238,8 +240,68 @@ type PodMetricsProvider interface {
## Testing ## Testing
### Unit tests
Running the unit tests locally is as simple as `make test`. Running the unit tests locally is as simple as `make test`.
### End-to-end tests
Virtual Kubelet includes an end-to-end (e2e) test suite which is used to validate its implementation.
The current e2e suite **does not** run for any providers other than the `mock` provider.
To run the e2e suite, three things are required:
- a local Kubernetes cluster (we have tested with [Docker for Mac](https://docs.docker.com/docker-for-mac/install/) and [Minikube](https://github.com/kubernetes/minikube));
- Your _kubeconfig_ default context points to the local Kubernetes cluster;
- [`skaffold`](https://github.com/GoogleContainerTools/skaffold).
Since our CI uses Minikube, we describe below how to run e2e on top of it.
To create a Minikube cluster, run the following command after [installing Minikube](https://github.com/kubernetes/minikube#installation):
```console
$ minikube start
```
The e2e suite requires Virtual Kubelet to be running as a pod inside the Kubernetes cluster.
To make the deployment process easier, the build toolchain leverages on `skaffold`.
In order to deploy the Virtual Kubelet, run the following command after [installing `skaffold`](https://github.com/GoogleContainerTools/skaffold#installation):
```console
$ make skaffold
```
By default, this will run `skaffold` in [_development_ mode](https://github.com/GoogleContainerTools/skaffold#skaffold-dev).
This will make `skaffold` watch `hack/skaffold/virtual-kubelet/Dockerfile` and its dependencies for changes and re-deploy the Virtual Kubelet when said changes happen.
It will also make `skaffold` stream logs from the Virtual Kubelet pod.
As an alternative, and if you are not concerned about continuous deployment and log streaming, you can run the following command instead:
```console
$ make skaffold MODE=run
```
This will deploy the Virtual Kubelet and return immediately.
To run the e2e test suite, you can now run the following command:
```console
$ make e2e
```
When you're done testing, you can run the following command to cleanup the resources created by `skaffold`:
```console
$ make skaffold MODE=delete
```
Please note that this will not unregister the Virtual Kubelet as a node in the Kubernetes cluster.
In order to do so, you should run:
```console
$ kubectl delete node vkubelet-mock-0
```
### Testing the Azure Provider Client ### Testing the Azure Provider Client
The unit tests for the [`azure`](providers/azure/) provider require a `credentials.json` The unit tests for the [`azure`](providers/azure/) provider require a `credentials.json`

View File

@@ -0,0 +1,16 @@
FROM gcr.io/distroless/base
ENV APISERVER_CERT_LOCATION /vkubelet-mock-0-crt.pem
ENV APISERVER_KEY_LOCATION /vkubelet-mock-0-key.pem
ENV KUBELET_PORT 10250
# Use the pre-built binary in "bin/virtual-kubelet".
COPY bin/virtual-kubelet /virtual-kubelet
# Copy the configuration file for the mock provider.
COPY hack/skaffold/virtual-kubelet/vkubelet-mock-0-cfg.json /vkubelet-mock-0-cfg.json
# Copy the certificate for the HTTPS server.
COPY hack/skaffold/virtual-kubelet/vkubelet-mock-0-crt.pem /vkubelet-mock-0-crt.pem
# Copy the private key for the HTTPS server.
COPY hack/skaffold/virtual-kubelet/vkubelet-mock-0-key.pem /vkubelet-mock-0-key.pem
CMD ["/virtual-kubelet"]

View File

@@ -0,0 +1,52 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: virtual-kubelet
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: virtual-kubelet
rules:
- apiGroups:
- ""
resources:
- configmaps
- pods
- secrets
verbs:
- list
- watch
- apiGroups:
- ""
resources:
- nodes
verbs:
- create
- get
- apiGroups:
- ""
resources:
- nodes/status
verbs:
- get
- update
- apiGroups:
- ""
resources:
- pods/status
verbs:
- update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: virtual-kubelet
subjects:
- kind: ServiceAccount
name: virtual-kubelet
namespace: default
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: virtual-kubelet

View File

@@ -0,0 +1,26 @@
apiVersion: v1
kind: Pod
metadata:
name: vkubelet-mock-0
spec:
containers:
- name: vkubelet-mock-0
image: virtual-kubelet
# "IfNotPresent" is used to prevent Minikube from trying to pull from the registry (and failing) in the first place.
imagePullPolicy: IfNotPresent
args:
- /virtual-kubelet
- --nodename
- vkubelet-mock-0
- --provider
- mock
- --provider-config
- /vkubelet-mock-0-cfg.json
ports:
- name: metrics
containerPort: 10255
readinessProbe:
httpGet:
path: /stats/summary
port: metrics
serviceAccountName: virtual-kubelet

View File

@@ -0,0 +1,18 @@
apiVersion: skaffold/v1alpha5
kind: Config
build:
artifacts:
- image: virtual-kubelet
docker:
# Use a Dockerfile specific for development only.
dockerfile: hack/skaffold/virtual-kubelet/Dockerfile
deploy:
kubectl:
manifests:
- hack/skaffold/virtual-kubelet/base.yml
- hack/skaffold/virtual-kubelet/pod.yml
profiles:
- name: local
build:
# For the "local" profile, we must perform the build locally.
local: {}

View File

@@ -0,0 +1,7 @@
{
"vkubelet-mock-0": {
"cpu": "2",
"memory": "32Gi",
"pods": "128"
}
}

View File

@@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIID3jCCAsagAwIBAgIIUa45eqfb4sEwDQYJKoZIhvcNAQELBQAwfzELMAkGA1UE
BhMCVVMxDzANBgNVBAgTBk9yZWdvbjERMA8GA1UEBxMIUG9ydGxhbmQxGDAWBgNV
BAoTD3ZrdWJlbGV0LW1vY2stMDEYMBYGA1UECxMPdmt1YmVsZXQtbW9jay0wMRgw
FgYDVQQDEw92a3ViZWxldC1tb2NrLTAwHhcNMTgxMTI2MTIwMzIzWhcNMTkwMjI1
MTgwODIzWjB/MQswCQYDVQQGEwJVUzEPMA0GA1UECBMGT3JlZ29uMREwDwYDVQQH
EwhQb3J0bGFuZDEYMBYGA1UEChMPdmt1YmVsZXQtbW9jay0wMRgwFgYDVQQLEw92
a3ViZWxldC1tb2NrLTAxGDAWBgNVBAMTD3ZrdWJlbGV0LW1vY2stMDCCASIwDQYJ
KoZIhvcNAQEBBQADggEPADCCAQoCggEBALryHvK3UBBBqGV2Fpwymf0p/YKGQA9r
Nu0N6f2+RkUXLuQXG+WdFQl3ZQybPLfCE2hwFcl3IF+3hCzY3/2UIyGBloBIft7K
YFLM3YWJDy5ElKDg1bNDSLzF6tkpNLDnVlgkPPITzpEHIAu+BT5DZGWhYAWO/Dir
XdxoJBOhPZZCcBCV+kwQQPbsXzZy+q7Qhx270CRMIXso9C5LJhGYL9fwsxmukAOR
56SmfsAaml7UOlzHITRDwD5AQ1BkTSEFy08dk6JAYL8LDLhgaLoWoV0Ge2gOIepR
jpl87dGbSVGyBHmTXv4o6utqT6S6nU76Ln9NSi7YhMqj8uWv0pTDlYcCAwEAAaNe
MFwwDgYDVR0PAQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcD
AjAMBgNVHRMBAf8EAjAAMB0GA1UdDgQWBBQVHwU1sy7Qnw1WvVvFLcZrhoT40DAN
BgkqhkiG9w0BAQsFAAOCAQEAsNGNKz1Jwfwg7rYaO7VF/zan01XXZFP1bnFYnXJu
15RzhOBMsp3KvWCVhwUfxNe8GhUDSx2tmS5EA/8oaEngLFl3jtR3pnUNOwDVlzly
QOCN3rlOi4+p26LvMiAFp5hxXAv3LORs6Dzr6h3/QTtlV5jDShUOXZdFdOPJdZ2n
g4birrG7MO6vwvR8CiNcQ26b+b8p9BGXbE8bsJoHmcsqya8fbVs2n6CdEJeI+4hD
N6xlo5SvhjH5tFII7eCVedyZGl0BKvkocOigLgq8X+JzFxj1wtdmtXv7sjdKcB9r
6TWGJRrZVxoxUOzZhpxUj3j/pLaRcDmttSJCuDu3NAtkgQ==
-----END CERTIFICATE-----

View File

@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAuvIe8rdQEEGoZXYWnDKZ/Sn9goZAD2s27Q3p/b5GRRcu5Bcb
5Z0VCXdlDJs8t8ITaHAVyXcgX7eELNjf/ZQjIYGWgEh+3spgUszdhYkPLkSUoODV
s0NIvMXq2Sk0sOdWWCQ88hPOkQcgC74FPkNkZaFgBY78OKtd3GgkE6E9lkJwEJX6
TBBA9uxfNnL6rtCHHbvQJEwheyj0LksmEZgv1/CzGa6QA5HnpKZ+wBqaXtQ6XMch
NEPAPkBDUGRNIQXLTx2TokBgvwsMuGBouhahXQZ7aA4h6lGOmXzt0ZtJUbIEeZNe
/ijq62pPpLqdTvouf01KLtiEyqPy5a/SlMOVhwIDAQABAoIBAEN84tVGfh3QRiWS
sujij5rITN+Q7ZFjaCm96yoSRbXtf50SBp0mzxBizNT3Ob0wz+bVB9h6K/LCAnJa
PMqDbwdKi/V1tm9hadKaaKIrb5KJaYqGgD893AViAb0x1fbDHPWm52WQ5vKOOvBi
QexPUfAqiMqY6s7ednz6D4QSonQamxCUPBPYvudmayHtPlc8Qb6eY0V+pcdFnW08
SDZXYOxey3/IAjZydcA7XgvNSc+6XOwmhKsGAW71uFTTagJvzX3ePCY14rkGJmDG
m/10hoW6NMKGeV/RyX3dX0jJmDk1VfxAQW3xpOipZfgfvgavCOqHnKA6I8dK3zhg
vE9BleECgYEA87X/ztQZDI4qOTA9CW/nMXfwAy9QO1K6bGhBHUu7Js4pqgxuH8Fk
hQgQK7V8iasny/dCyj6Cu3QJNofxudAvLLQKkquyQOa+zqFCUpVid7JVRMcRLJlt
3HlyCNvVlhfjDT0cI2RdU45q8MnZoy1f3DPZB16cHb3HL9z1gQZTiXECgYEAxF9a
68SbxmWFBK7PaobI8wVfDoTirHmoAvnypYK0oQkAX8VmEmtEEs2+N1moKjSTPr+t
us4JKguA8z2tuLk5j+eF+zDl/2U+7djTF8FCNprwz3sXr427GCIGL5YvpIBZ+TL8
Bji2uyoo8k9SAWMb4ObOzfGm4teCvciS99gw0ncCgYAt5GbAVtZEs/ylejz0KvtZ
KGGs59ru4Nw0D8m7L4iVfRsBZ4fROQSpvGP3JxzFe9JpqS0NkonhrK8TcrQFLnvD
qj+XcPeHGyxxEpK/pFu/eHhwFCBayqWSb9gWbPciZWsfEhPbYknksxvWLdxqyt+T
QrwqlBlHzHXWwIAGhN90MQKBgQC5CYkpBFgsuFiBMx+rJ1qO9I6/paPaFcClHVTx
dJoz68F4fQ9TZ9P7S/djPI5jRqtAw2k2zxJ/ldtqWMIrgA2ndegf69GtuH91q4wt
pCN6RMGJIFoPSCP194mQqZo3DeK6GLq2OhalgnKW8Ps652LLp3FTSdORiLVfk3I5
LHPEvQKBgDCxa/3vneG8vgs8ArEjN89B/YxO1qIU5mxJe6Zafb81NdhYUjfRAVro
ALTofiApMsnDbJDHMiwvwcDUHbPLpruK80R//zmX7Xen+F+5obfSQ8j0GSmmeWFQ
SVG6ApNtktLPI0nK2nEIH/Qx4ouGC9N0pADRClQQPSxEPmDvf4xf
-----END RSA PRIVATE KEY-----

View File

@@ -7,15 +7,18 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"math/rand"
"time" "time"
"github.com/cpuguy83/strongerrors" "github.com/cpuguy83/strongerrors"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"github.com/virtual-kubelet/virtual-kubelet/providers"
) )
const ( const (
@@ -33,6 +36,7 @@ type MockProvider struct {
daemonEndpointPort int32 daemonEndpointPort int32
pods map[string]*v1.Pod pods map[string]*v1.Pod
config MockConfig config MockConfig
startTime time.Time
} }
// MockConfig contains a mock virtual-kubelet's configurable parameters. // MockConfig contains a mock virtual-kubelet's configurable parameters.
@@ -56,6 +60,7 @@ func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalI
daemonEndpointPort: daemonEndpointPort, daemonEndpointPort: daemonEndpointPort,
pods: make(map[string]*v1.Pod), pods: make(map[string]*v1.Pod),
config: config, config: config,
startTime: time.Now(),
} }
return &provider, nil return &provider, nil
} }
@@ -326,6 +331,80 @@ func (p *MockProvider) OperatingSystem() string {
return providers.OperatingSystemLinux return providers.OperatingSystemLinux
} }
// GetStatsSummary returns dummy stats for all pods known by this provider.
func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) {
// Grab the current timestamp so we can report it as the time the stats were generated.
time := metav1.NewTime(time.Now())
// Create the Summary object that will later be populated with node and pod stats.
res := &stats.Summary{}
// Populate the Summary object with basic node stats.
res.Node = stats.NodeStats{
NodeName: p.nodeName,
StartTime: metav1.NewTime(p.startTime),
}
// Populate the Summary object with dummy stats for each pod known by this provider.
for _, pod := range p.pods {
var (
// totalUsageNanoCores will be populated with the sum of the values of UsageNanoCores computes across all containers in the pod.
totalUsageNanoCores uint64
// totalUsageBytes will be populated with the sum of the values of UsageBytes computed across all containers in the pod.
totalUsageBytes uint64
)
// Create a PodStats object to populate with pod stats.
pss := stats.PodStats{
PodRef: stats.PodReference{
Name: pod.Name,
Namespace: pod.Namespace,
UID: string(pod.UID),
},
StartTime: pod.CreationTimestamp,
}
// Iterate over all containers in the current pod to compute dummy stats.
for _, container := range pod.Spec.Containers {
// Grab a dummy value to be used as the total CPU usage.
// The value should fit a uint32 in order to avoid overflows later on when computing pod stats.
dummyUsageNanoCores := uint64(rand.Uint32())
totalUsageNanoCores += dummyUsageNanoCores
// Create a dummy value to be used as the total RAM usage.
// The value should fit a uint32 in order to avoid overflows later on when computing pod stats.
dummyUsageBytes := uint64(rand.Uint32())
totalUsageBytes += dummyUsageBytes
// Append a ContainerStats object containing the dummy stats to the PodStats object.
pss.Containers = append(pss.Containers, stats.ContainerStats{
Name: container.Name,
StartTime: pod.CreationTimestamp,
CPU: &stats.CPUStats{
Time: time,
UsageNanoCores: &dummyUsageNanoCores,
},
Memory: &stats.MemoryStats{
Time: time,
UsageBytes: &dummyUsageBytes,
},
})
}
// Populate the CPU and RAM stats for the pod and append the PodsStats object to the Summary object to be returned.
pss.CPU = &stats.CPUStats{
Time: time,
UsageNanoCores: &totalUsageNanoCores,
}
pss.Memory = &stats.MemoryStats{
Time: time,
UsageBytes: &totalUsageBytes,
}
res.Pods = append(res.Pods, pss)
}
// Return the dummy stats.
return res, nil
}
func buildKeyFromNames(namespace string, name string) (string, error) { func buildKeyFromNames(namespace string, name string) (string, error) {
return fmt.Sprintf("%s-%s", namespace, name), nil return fmt.Sprintf("%s-%s", namespace, name), nil
} }

144
test/e2e/basic_test.go Normal file
View File

@@ -0,0 +1,144 @@
// +build e2e
package e2e
import (
"fmt"
"testing"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// TestGetStatsSummary creates a pod having two containers and queries the /stats/summary endpoint of the virtual-kubelet.
// It expects this endpoint to return stats for the current node, as well as for the aforementioned pod and each of its two containers.
func TestGetStatsSummary(t *testing.T) {
// Create a pod with prefix "nginx-0-" having three containers.
pod, err := f.CreatePod(f.CreateDummyPodObjectWithPrefix("nginx-0-", "foo", "bar", "baz"))
if err != nil {
t.Fatal(err)
}
// Delete the "nginx-0-X" pod after the test finishes.
defer func() {
if err := f.DeletePod(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
// Wait for the "nginx-0-X" pod to be reported as running and ready.
if err := f.WaitUntilPodReady(pod.Namespace, pod.Name); err != nil {
t.Fatal(err)
}
// Grab the stats from the provider.
stats, err := f.GetStatsSummary()
if err != nil {
t.Fatal(err)
}
// Make sure that we've got stats for the current node.
if stats.Node.NodeName != f.NodeName {
t.Fatalf("expected stats for node %s, got stats for node %s", f.NodeName, stats.Node.NodeName)
}
// Make sure the "nginx-0-X" pod exists in the slice of PodStats.
idx, err := findPodInPodStats(stats, pod)
if err != nil {
t.Fatal(err)
}
// Make sure that we've got stats for all the containers in the "nginx-0-X" pod.
desiredContainerStatsCount := len(pod.Spec.Containers)
currentContainerStatsCount := len(stats.Pods[idx].Containers)
if currentContainerStatsCount != desiredContainerStatsCount {
t.Fatalf("expected stats for %d containers, got stats for %d containers", desiredContainerStatsCount, currentContainerStatsCount)
}
}
// TestPodLifecycle creates two pods and verifies that the provider has been asked to create them.
// Then, it deletes one of the pods and verifies that the provider has been asked to delete it.
// These verifications are made using the /stats/summary endpoint of the virtual-kubelet, by checking for the presence or absence of the pods.
// Hence, the provider being tested must implement the PodMetricsProvider interface.
func TestPodLifecycle(t *testing.T) {
// Create a pod with prefix "nginx-0-" having a single container.
pod0, err := f.CreatePod(f.CreateDummyPodObjectWithPrefix("nginx-0-", "foo"))
if err != nil {
t.Fatal(err)
}
// Delete the "nginx-0-X" pod after the test finishes.
defer func() {
if err := f.DeletePod(pod0.Namespace, pod0.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
// Create a pod with prefix "nginx-1-" having a single container.
pod1, err := f.CreatePod(f.CreateDummyPodObjectWithPrefix("nginx-1-", "bar"))
if err != nil {
t.Fatal(err)
}
// Delete the "nginx-1-Y" pod after the test finishes.
defer func() {
if err := f.DeletePod(pod1.Namespace, pod1.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()
// Wait for the "nginx-0-X" pod to be reported as running and ready.
if err := f.WaitUntilPodReady(pod0.Namespace, pod0.Name); err != nil {
t.Fatal(err)
}
// Wait for the "nginx-1-Y" pod to be reported as running and ready.
if err := f.WaitUntilPodReady(pod1.Namespace, pod1.Name); err != nil {
t.Fatal(err)
}
// Grab the stats from the provider.
stats, err := f.GetStatsSummary()
if err != nil {
t.Fatal(err)
}
// Make sure the "nginx-0-X" pod exists in the slice of PodStats.
if _, err := findPodInPodStats(stats, pod0); err != nil {
t.Fatal(err)
}
// Make sure the "nginx-1-Y" pod exists in the slice of PodStats.
if _, err := findPodInPodStats(stats, pod1); err != nil {
t.Fatal(err)
}
// Delete the "nginx-1" pod.
if err := f.DeletePod(pod1.Namespace, pod1.Name); err != nil {
t.Fatal(err)
}
// Wait for the "nginx-1-Y" pod to be reported as having been marked for deletion.
if err := f.WaitUntilPodDeleted(pod1.Namespace, pod1.Name); err != nil {
t.Fatal(err)
}
// Grab the stats from the provider.
stats, err = f.GetStatsSummary()
if err != nil {
t.Fatal(err)
}
// Make sure the "nginx-1-Y" pod DOES NOT exist in the slice of PodStats anymore.
if _, err := findPodInPodStats(stats, pod1); err == nil {
t.Fatalf("expected to NOT find pod \"%s/%s\" in the slice of pod stats", pod1.Namespace, pod1.Name)
}
}
// findPodInPodStats returns the index of the specified pod in the .pods field of the specified Summary object.
// It returns an error if the specified pod is not found.
func findPodInPodStats(summary *v1alpha1.Summary, pod *v1.Pod) (int, error) {
for i, p := range summary.Pods {
if p.PodRef.Namespace == pod.Namespace && p.PodRef.Name == pod.Name && string(p.PodRef.UID) == string(pod.UID) {
return i, nil
}
}
return -1, fmt.Errorf("failed to find pod \"%s/%s\" in the slice of pod stats", pod.Namespace, pod.Name)
}

View File

@@ -0,0 +1,47 @@
package framework
import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// Framework encapsulates the configuration for the current run, and provides helper methods to be used during testing.
type Framework struct {
KubeClient kubernetes.Interface
Namespace string
NodeName string
TaintKey string
TaintValue string
TaintEffect string
}
// NewTestingFramework returns a new instance of the testing framework.
func NewTestingFramework(kubeconfig, namespace, nodeName, taintKey, taintValue, taintEffect string) *Framework {
return &Framework{
KubeClient: createKubeClient(kubeconfig),
Namespace: namespace,
NodeName: nodeName,
TaintKey: taintKey,
TaintValue: taintValue,
TaintEffect: taintEffect,
}
}
// createKubeClient creates a new Kubernetes client based on the specified kubeconfig file.
// If no value for kubeconfig is specified, in-cluster configuration is assumed.
func createKubeClient(kubeconfig string) *kubernetes.Clientset {
var (
cfg *rest.Config
err error
)
if kubeconfig == "" {
cfg, err = rest.InClusterConfig()
} else {
cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
}
if err != nil {
panic(err)
}
return kubernetes.NewForConfigOrDie(cfg)
}

108
test/e2e/framework/pod.go Normal file
View File

@@ -0,0 +1,108 @@
package framework
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
watchapi "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)
const (
defaultWatchTimeout = 2 * time.Minute
hostnameNodeSelectorLabel = "kubernetes.io/hostname"
)
// CreateDummyPodObjectWithPrefix creates a dujmmy pod object using the specified prefix as the value of .metadata.generateName.
// A variable number of strings can be provided.
// For each one of these strings, a container that uses the string as its image will be appended to the pod.
// This method DOES NOT create the pod in the Kubernetes API.
func (f *Framework) CreateDummyPodObjectWithPrefix(prefix string, images ...string) *corev1.Pod {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: prefix,
Namespace: f.Namespace,
},
Spec: corev1.PodSpec{
NodeSelector: map[string]string{
hostnameNodeSelectorLabel: f.NodeName,
},
Tolerations: []corev1.Toleration{
{
Key: f.TaintKey,
Value: f.TaintValue,
Effect: corev1.TaintEffect(f.TaintEffect),
},
},
},
}
for idx, img := range images {
pod.Spec.Containers = append(pod.Spec.Containers, corev1.Container{
Name: fmt.Sprintf("%s%d", prefix, idx),
Image: img,
})
}
return pod
}
// CreatePod creates the specified pod in the Kubernetes API.
func (f *Framework) CreatePod(pod *corev1.Pod) (*corev1.Pod, error) {
return f.KubeClient.CoreV1().Pods(f.Namespace).Create(pod)
}
// DeletePod deletes the pod with the specified name and namespace in the Kubernetes API.
func (f *Framework) DeletePod(namespace, name string) error {
return f.KubeClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{})
}
// WaitUntilPodCondition establishes a watch on the pod with the specified name and namespace.
// Then, it waits for the specified condition function to be verified.
func (f *Framework) WaitUntilPodCondition(namespace, name string, fn watch.ConditionFunc) error {
// Create a field selector that matches the specified Pod resource.
fs := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.namespace==%s,metadata.name==%s", namespace, name))
// Create a ListWatch so we can receive events for the matched Pod resource.
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) {
options.FieldSelector = fs.String()
return f.KubeClient.CoreV1().Pods(namespace).Watch(options)
},
}
// Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached.
ctx, cfn := context.WithTimeout(context.Background(), defaultWatchTimeout)
defer cfn()
last, err := watch.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, fn)
if err != nil {
return err
}
if last == nil {
return fmt.Errorf("no events received for pod %q", name)
}
return nil
}
// WaitUntilPodReady blocks until the pod with the specified name and namespace is reported to be running and ready.
func (f *Framework) WaitUntilPodReady(namespace, name string) error {
return f.WaitUntilPodCondition(namespace, name, func(event watchapi.Event) (bool, error) {
pod := event.Object.(*corev1.Pod)
return pod.Status.Phase == corev1.PodRunning && podutil.IsPodReady(pod) && pod.Status.PodIP != "", nil
})
}
// WaitUntilPodDeleted blocks until the pod with the specified name and namespace is marked for deletion (or, alternatively, effectively deleted).
func (f *Framework) WaitUntilPodDeleted(namespace, name string) error {
return f.WaitUntilPodCondition(namespace, name, func(event watchapi.Event) (bool, error) {
pod := event.Object.(*corev1.Pod)
return event.Type == watchapi.Deleted || pod.DeletionTimestamp != nil, nil
})
}

View File

@@ -0,0 +1,31 @@
package framework
import (
"encoding/json"
"strconv"
"k8s.io/apimachinery/pkg/util/net"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
// GetStatsSummary queries the /stats/summary endpoint of the virtual-kubelet and returns the Summary object obtained as a response.
func (f *Framework) GetStatsSummary() (*stats.Summary, error) {
// Query the /stats/summary endpoint.
b, err := f.KubeClient.CoreV1().
RESTClient().
Get().
Namespace(f.Namespace).
Resource("pods").
SubResource("proxy").
Name(net.JoinSchemeNamePort("http", f.NodeName, strconv.Itoa(10255))).
Suffix("/stats/summary").DoRaw()
if err != nil {
return nil, err
}
// Unmarshal the response as a Summary object and return it.
res := &stats.Summary{}
if err := json.Unmarshal(b, res); err != nil {
return nil, err
}
return res, nil
}

81
test/e2e/main_test.go Normal file
View File

@@ -0,0 +1,81 @@
// +build e2e
package e2e
import (
"flag"
"os"
"testing"
"k8s.io/api/core/v1"
"github.com/virtual-kubelet/virtual-kubelet/test/e2e/framework"
)
const (
defaultNamespace = v1.NamespaceDefault
defaultNodeName = "vkubelet-mock-0"
defaultTaintKey = "virtual-kubelet.io/provider"
defaultTaintValue = "mock"
defaultTaintEffect = string(v1.TaintEffectNoSchedule)
)
var (
// f is the testing framework used for running the test suite.
f *framework.Framework
// kubeconfig is the path to the kubeconfig file to use when running the test suite outside a Kubernetes cluster.
kubeconfig string
// namespace is the name of the Kubernetes namespace to use for running the test suite (i.e. where to create pods).
namespace string
// nodeName is the name of the virtual-kubelet node to test.
nodeName string
// taintKey is the key of the taint that is expected to be associated with the virtual-kubelet node to test.
taintKey string
// taintValue is the value of the taint that is expected to be associated with the virtual-kubelet node to test.
taintValue string
// taintEffect is the effect of the taint that is expected to be associated with the virtual-kubelet node to test.
taintEffect string
)
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "path to the kubeconfig file to use when running the test suite outside a kubernetes cluster")
flag.StringVar(&namespace, "namespace", defaultNamespace, "the name of the kubernetes namespace to use for running the test suite (i.e. where to create pods)")
flag.StringVar(&nodeName, "node-name", defaultNodeName, "the name of the virtual-kubelet node to test")
flag.StringVar(&taintKey, "taint-key", defaultTaintKey, "the key of the taint that is expected to be associated with the virtual-kubelet node to test")
flag.StringVar(&taintValue, "taint-value", defaultTaintValue, "the value of the taint that is expected to be associated with the virtual-kubelet node to test")
flag.StringVar(&taintEffect, "taint-effect", defaultTaintEffect, "the effect of the taint that is expected to be associated with the virtual-kubelet node to test")
flag.Parse()
}
func TestMain(m *testing.M) {
// Set sane defaults in case no values (or empty ones) have been provided.
setDefaults()
// Create a new instance of the test framework targeting the specified node.
f = framework.NewTestingFramework(kubeconfig, namespace, nodeName, taintKey, taintValue, taintEffect)
// Wait for the virtual-kubelet pod to be ready.
if err := f.WaitUntilPodReady(namespace, nodeName); err != nil {
panic(err)
}
// Run the test suite.
os.Exit(m.Run())
}
// setDefaults sets sane defaults in case no values (or empty ones) have been provided.
func setDefaults() {
if namespace == "" {
namespace = defaultNamespace
}
if nodeName == "" {
nodeName = defaultNodeName
}
if taintKey == "" {
taintKey = defaultTaintKey
}
if taintValue == "" {
taintValue = defaultTaintValue
}
if taintEffect == "" {
taintEffect = defaultTaintEffect
}
}

View File

@@ -52,13 +52,17 @@ var (
cbpLock sync.Mutex cbpLock sync.Mutex
ContainerByPort map[string]string // port:containerID ContainerByPort map[string]string // port:containerID
once sync.Once
) )
func init() { func init() {
portMapper = portmap.NewPortMapper() portMapper = portmap.NewPortMapper()
btbRules = make(map[string][]string) btbRules = make(map[string][]string)
ContainerByPort = make(map[string]string) ContainerByPort = make(map[string]string)
}
func Init() {
once.Do(func() {
l, err := netlink.LinkByName(publicIfaceName) l, err := netlink.LinkByName(publicIfaceName)
if l == nil { if l == nil {
l, err = netlink.LinkByAlias(publicIfaceName) l, err = netlink.LinkByAlias(publicIfaceName)
@@ -70,6 +74,7 @@ func init() {
// don't use interface alias for iptables rules // don't use interface alias for iptables rules
publicIfaceName = l.Attrs().Name publicIfaceName = l.Attrs().Name
})
} }
// requestHostPort finds a free port on the host // requestHostPort finds a free port on the host

114
vendor/k8s.io/client-go/tools/watch/informerwatcher.go generated vendored Normal file
View File

@@ -0,0 +1,114 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
func newTicketer() *ticketer {
return &ticketer{
cond: sync.NewCond(&sync.Mutex{}),
}
}
type ticketer struct {
counter uint64
cond *sync.Cond
current uint64
}
func (t *ticketer) GetTicket() uint64 {
// -1 to start from 0
return atomic.AddUint64(&t.counter, 1) - 1
}
func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
t.cond.L.Lock()
defer t.cond.L.Unlock()
for ticket != t.current {
t.cond.Wait()
}
f()
t.current++
t.cond.Broadcast()
}
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) {
ch := make(chan watch.Event)
w := watch.NewProxyWatcher(ch)
t := newTicketer()
indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
select {
case ch <- watch.Event{
Type: watch.Added,
Object: obj.(runtime.Object),
}:
case <-w.StopChan():
}
})
},
UpdateFunc: func(old, new interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
select {
case ch <- watch.Event{
Type: watch.Modified,
Object: new.(runtime.Object),
}:
case <-w.StopChan():
}
})
},
DeleteFunc: func(obj interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
if stale {
// We have no means of passing the additional information down using watch API based on watch.Event
// but the caller can filter such objects by checking if metadata.deletionTimestamp is set
obj = staleObj
}
select {
case ch <- watch.Event{
Type: watch.Deleted,
Object: obj.(runtime.Object),
}:
case <-w.StopChan():
}
})
},
}, cache.Indexers{})
go func() {
informer.Run(w.StopChan())
}()
return indexer, informer, w
}

225
vendor/k8s.io/client-go/tools/watch/until.go generated vendored Normal file
View File

@@ -0,0 +1,225 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"context"
"errors"
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition failed or detected an error state.
type PreconditionFunc func(store cache.Store) (bool, error)
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
// from false to true).
type ConditionFunc func(event watch.Event) (bool, error)
// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry.
var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout")
// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
// If no event has been received, the returned event will be nil.
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
// Waits until context deadline or until context is canceled.
//
// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!!
// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error.
// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below,
// Warning: solving such issues.
// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone.
func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) {
ch := watcher.ResultChan()
defer watcher.Stop()
var lastEvent *watch.Event
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
continue
}
}
ConditionSucceeded:
for {
select {
case event, ok := <-ch:
if !ok {
return lastEvent, ErrWatchClosed
}
lastEvent = &event
done, err := condition(event)
if err != nil {
return lastEvent, err
}
if done {
break ConditionSucceeded
}
case <-ctx.Done():
return lastEvent, wait.ErrWaitTimeout
}
}
}
return lastEvent, nil
}
// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced,
// and watches the output until each provided condition succeeds, in a way that is identical
// to function UntilWithoutRetry. (See above.)
// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'.
// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will
// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple
// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will
// re-list to recover and you always get an event, if there has been a change, after recovery.
// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for
// particular object, not between more of them even it's the same resource.
// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like:
// waiting for object reaching a state, "small" controllers, ...
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) {
indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType)
// Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and
// let UntilWithoutRetry to stop it
defer watcher.Stop()
if precondition != nil {
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %v", ctx.Err())
}
done, err := precondition(indexer)
if err != nil {
return nil, err
}
if done {
return nil, nil
}
}
return UntilWithoutRetry(ctx, watcher, conditions...)
}
// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration.
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout < 0 {
// This should be handled in validation
glog.Errorf("Timeout for context shall not be negative!")
timeout = 0
}
if timeout == 0 {
return context.WithCancel(parent)
}
return context.WithTimeout(parent, timeout)
}
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
// TODO: remove when no longer used
//
// Deprecated: Use UntilWithSync instead.
func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
return nil, nil
}
list, err := lw.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
// use the initial items as simulated "adds"
var lastEvent *watch.Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}
ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
currIndex++
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()
watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion})
if err != nil {
return nil, err
}
ctx, cancel := ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
evt, err := UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
if err == ErrWatchClosed {
// present a consistent error interface to callers
err = wait.ErrWaitTimeout
}
return evt, err
}

305
vendor/k8s.io/kubernetes/pkg/api/v1/pod/util.go generated vendored Normal file
View File

@@ -0,0 +1,305 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pod
import (
"fmt"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
// FindPort locates the container port for the given pod and portName. If the
// targetPort is a number, use that. If the targetPort is a string, look that
// string up in all named ports in all containers in the target pod. If no
// match is found, fail.
func FindPort(pod *v1.Pod, svcPort *v1.ServicePort) (int, error) {
portName := svcPort.TargetPort
switch portName.Type {
case intstr.String:
name := portName.StrVal
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == name && port.Protocol == svcPort.Protocol {
return int(port.ContainerPort), nil
}
}
}
case intstr.Int:
return portName.IntValue(), nil
}
return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
}
// Visitor is called with each object name, and returns true if visiting should continue
type Visitor func(name string) (shouldContinue bool)
// VisitPodSecretNames invokes the visitor function with the name of every secret
// referenced by the pod spec. If visitor returns false, visiting is short-circuited.
// Transitive references (e.g. pod -> pvc -> pv -> secret) are not visited.
// Returns true if visiting completed, false if visiting was short-circuited.
func VisitPodSecretNames(pod *v1.Pod, visitor Visitor) bool {
for _, reference := range pod.Spec.ImagePullSecrets {
if !visitor(reference.Name) {
return false
}
}
for i := range pod.Spec.InitContainers {
if !visitContainerSecretNames(&pod.Spec.InitContainers[i], visitor) {
return false
}
}
for i := range pod.Spec.Containers {
if !visitContainerSecretNames(&pod.Spec.Containers[i], visitor) {
return false
}
}
var source *v1.VolumeSource
for i := range pod.Spec.Volumes {
source = &pod.Spec.Volumes[i].VolumeSource
switch {
case source.AzureFile != nil:
if len(source.AzureFile.SecretName) > 0 && !visitor(source.AzureFile.SecretName) {
return false
}
case source.CephFS != nil:
if source.CephFS.SecretRef != nil && !visitor(source.CephFS.SecretRef.Name) {
return false
}
case source.Cinder != nil:
if source.Cinder.SecretRef != nil && !visitor(source.Cinder.SecretRef.Name) {
return false
}
case source.FlexVolume != nil:
if source.FlexVolume.SecretRef != nil && !visitor(source.FlexVolume.SecretRef.Name) {
return false
}
case source.Projected != nil:
for j := range source.Projected.Sources {
if source.Projected.Sources[j].Secret != nil {
if !visitor(source.Projected.Sources[j].Secret.Name) {
return false
}
}
}
case source.RBD != nil:
if source.RBD.SecretRef != nil && !visitor(source.RBD.SecretRef.Name) {
return false
}
case source.Secret != nil:
if !visitor(source.Secret.SecretName) {
return false
}
case source.ScaleIO != nil:
if source.ScaleIO.SecretRef != nil && !visitor(source.ScaleIO.SecretRef.Name) {
return false
}
case source.ISCSI != nil:
if source.ISCSI.SecretRef != nil && !visitor(source.ISCSI.SecretRef.Name) {
return false
}
case source.StorageOS != nil:
if source.StorageOS.SecretRef != nil && !visitor(source.StorageOS.SecretRef.Name) {
return false
}
}
}
return true
}
func visitContainerSecretNames(container *v1.Container, visitor Visitor) bool {
for _, env := range container.EnvFrom {
if env.SecretRef != nil {
if !visitor(env.SecretRef.Name) {
return false
}
}
}
for _, envVar := range container.Env {
if envVar.ValueFrom != nil && envVar.ValueFrom.SecretKeyRef != nil {
if !visitor(envVar.ValueFrom.SecretKeyRef.Name) {
return false
}
}
}
return true
}
// VisitPodConfigmapNames invokes the visitor function with the name of every configmap
// referenced by the pod spec. If visitor returns false, visiting is short-circuited.
// Transitive references (e.g. pod -> pvc -> pv -> secret) are not visited.
// Returns true if visiting completed, false if visiting was short-circuited.
func VisitPodConfigmapNames(pod *v1.Pod, visitor Visitor) bool {
for i := range pod.Spec.InitContainers {
if !visitContainerConfigmapNames(&pod.Spec.InitContainers[i], visitor) {
return false
}
}
for i := range pod.Spec.Containers {
if !visitContainerConfigmapNames(&pod.Spec.Containers[i], visitor) {
return false
}
}
var source *v1.VolumeSource
for i := range pod.Spec.Volumes {
source = &pod.Spec.Volumes[i].VolumeSource
switch {
case source.Projected != nil:
for j := range source.Projected.Sources {
if source.Projected.Sources[j].ConfigMap != nil {
if !visitor(source.Projected.Sources[j].ConfigMap.Name) {
return false
}
}
}
case source.ConfigMap != nil:
if !visitor(source.ConfigMap.Name) {
return false
}
}
}
return true
}
func visitContainerConfigmapNames(container *v1.Container, visitor Visitor) bool {
for _, env := range container.EnvFrom {
if env.ConfigMapRef != nil {
if !visitor(env.ConfigMapRef.Name) {
return false
}
}
}
for _, envVar := range container.Env {
if envVar.ValueFrom != nil && envVar.ValueFrom.ConfigMapKeyRef != nil {
if !visitor(envVar.ValueFrom.ConfigMapKeyRef.Name) {
return false
}
}
}
return true
}
// GetContainerStatus extracts the status of container "name" from "statuses".
// It also returns if "name" exists.
func GetContainerStatus(statuses []v1.ContainerStatus, name string) (v1.ContainerStatus, bool) {
for i := range statuses {
if statuses[i].Name == name {
return statuses[i], true
}
}
return v1.ContainerStatus{}, false
}
// GetExistingContainerStatus extracts the status of container "name" from "statuses",
// It also returns if "name" exists.
func GetExistingContainerStatus(statuses []v1.ContainerStatus, name string) v1.ContainerStatus {
status, _ := GetContainerStatus(statuses, name)
return status
}
// IsPodAvailable returns true if a pod is available; false otherwise.
// Precondition for an available pod is that it must be ready. On top
// of that, there are two cases when a pod can be considered available:
// 1. minReadySeconds == 0, or
// 2. LastTransitionTime (is set) + minReadySeconds < current time
func IsPodAvailable(pod *v1.Pod, minReadySeconds int32, now metav1.Time) bool {
if !IsPodReady(pod) {
return false
}
c := GetPodReadyCondition(pod.Status)
minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second
if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now.Time) {
return true
}
return false
}
// IsPodReady returns true if a pod is ready; false otherwise.
func IsPodReady(pod *v1.Pod) bool {
return IsPodReadyConditionTrue(pod.Status)
}
// IsPodReady returns true if a pod is ready; false otherwise.
func IsPodReadyConditionTrue(status v1.PodStatus) bool {
condition := GetPodReadyCondition(status)
return condition != nil && condition.Status == v1.ConditionTrue
}
// Extracts the pod ready condition from the given status and returns that.
// Returns nil if the condition is not present.
func GetPodReadyCondition(status v1.PodStatus) *v1.PodCondition {
_, condition := GetPodCondition(&status, v1.PodReady)
return condition
}
// GetPodCondition extracts the provided condition from the given status and returns that.
// Returns nil and -1 if the condition is not present, and the index of the located condition.
func GetPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
if status == nil {
return -1, nil
}
return GetPodConditionFromList(status.Conditions, conditionType)
}
// GetPodConditionFromList extracts the provided condition from the given list of condition and
// returns the index of the condition and the condition. Returns -1 and nil if the condition is not present.
func GetPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
if conditions == nil {
return -1, nil
}
for i := range conditions {
if conditions[i].Type == conditionType {
return i, &conditions[i]
}
}
return -1, nil
}
// Updates existing pod condition or creates a new one. Sets LastTransitionTime to now if the
// status has changed.
// Returns true if pod condition has changed or has been added.
func UpdatePodCondition(status *v1.PodStatus, condition *v1.PodCondition) bool {
condition.LastTransitionTime = metav1.Now()
// Try to find this pod condition.
conditionIndex, oldCondition := GetPodCondition(status, condition.Type)
if oldCondition == nil {
// We are adding new pod condition.
status.Conditions = append(status.Conditions, *condition)
return true
} else {
// We are updating an existing condition, so we need to check if it has changed.
if condition.Status == oldCondition.Status {
condition.LastTransitionTime = oldCondition.LastTransitionTime
}
isEqual := condition.Status == oldCondition.Status &&
condition.Reason == oldCondition.Reason &&
condition.Message == oldCondition.Message &&
condition.LastProbeTime.Equal(&oldCondition.LastProbeTime) &&
condition.LastTransitionTime.Equal(&oldCondition.LastTransitionTime)
status.Conditions[conditionIndex] = *condition
// Return true if one of the fields have changed.
return !isEqual
}
}