diff --git a/.circleci/config.yml b/.circleci/config.yml index 886666bfb..7ccfce0fe 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -36,9 +36,66 @@ jobs: name: Tests command: V=1 CI=1 SKIP_AWS_E2E=1 make test + e2e: + machine: + image: circleci/classic:201808-01 + working_directory: /home/circleci/go/src/github.com/virtual-kubelet/virtual-kubelet + environment: + CHANGE_MINIKUBE_NONE_USER: true + GOPATH: /home/circleci/go + KUBECONFIG: /home/circleci/.kube/config + KUBERNETES_VERSION: v1.12.3 + MINIKUBE_HOME: /home/circleci + MINIKUBE_VERSION: v0.30.0 + MINIKUBE_WANTUPDATENOTIFICATION: false + MINIKUBE_WANTREPORTERRORPROMPT: false + SKAFFOLD_VERSION: v0.18.0 + steps: + - checkout + - run: + name: Install kubectl + command: | + curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl + chmod +x kubectl + sudo mv kubectl /usr/local/bin/ + mkdir -p ${HOME}/.kube + touch ${HOME}/.kube/config + - run: + name: Install Skaffold + command: | + curl -Lo skaffold https://github.com/GoogleContainerTools/skaffold/releases/download/${SKAFFOLD_VERSION}/skaffold-linux-amd64 + chmod +x skaffold + sudo mv skaffold /usr/local/bin/ + - run: + name: Install Minikube + command: | + curl -Lo minikube https://github.com/kubernetes/minikube/releases/download/${MINIKUBE_VERSION}/minikube-linux-amd64 + chmod +x minikube + sudo mv minikube /usr/local/bin/ + - run: + name: Start Minikube + command: | + sudo -E minikube start --vm-driver=none --cpus 2 --memory 2048 --kubernetes-version=${KUBERNETES_VERSION} + - run: + name: Wait for Minikube + command: | + JSONPATH='{range .items[*]}{@.metadata.name}:{range @.status.conditions[*]}{@.type}={@.status};{end}{end}'; + until kubectl get nodes -o jsonpath="$JSONPATH" 2>&1 | grep -q "Ready=True"; do + sleep 1; + done + - run: + name: Deploy virtual-kubelet + command: | + make skaffold MODE=run + - run: + name: Run the end-to-end test suite + command: | + make e2e + workflows: version: 2 validate_and_test: jobs: - validate - test + - e2e \ No newline at end of file diff --git a/.gitignore b/.gitignore index 7ef1ffcf7..0cdcc51c1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,8 @@ bin/ # Certificates *.pem +!hack/skaffold/virtual-kubelet/vkubelet-mock-0-crt.pem +!hack/skaffold/virtual-kubelet/vkubelet-mock-0-key.pem # Test binary, build with `go test -c` *.test @@ -21,7 +23,6 @@ bin/ /dist /build /cover -/test # Test credentials file credentials.json diff --git a/Gopkg.lock b/Gopkg.lock index 290a65532..3fdcf476f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1005,7 +1005,7 @@ [[projects]] branch = "feature/wolfpack" - digest = "1:68a024f57bd12460c4a50d28f21066401fceaf741088252fea358815b8afc855" + digest = "1:79b933ce82b0a5da2678624d52d714deff91854904e8649889dc83be36907dc5" name = "github.com/vmware/vic" packages = [ "lib/apiservers/engine/backends/cache", @@ -1051,7 +1051,7 @@ "pkg/vsphere/sys", ] pruneopts = "NUT" - revision = "0fd01769dd5f4077763f3426a830709a9e986f1e" + revision = "c7d40ac878b09c577c307bc5331c7dd39619ed7c" [[projects]] branch = "master" @@ -1406,7 +1406,7 @@ revision = "25e79651c7e569b0ebcd97affc0fe7f1ffcbee94" [[projects]] - digest = "1:57983b132a06bc5956ed950993abf7071610c00a738d8270a2f3d89d19daa019" + digest = "1:f3060ab1e0f3fea59d642658b0f5fbf353e22bbe96e94175c02075ba831f6f3c" name = "k8s.io/client-go" packages = [ "discovery", @@ -1494,6 +1494,7 @@ "tools/pager", "tools/reference", "tools/remotecommand", + "tools/watch", "transport", "transport/spdy", "util/buffer", @@ -1518,9 +1519,10 @@ revision = "9dfdf9be683f61f82cda12362c44c784e0778b56" [[projects]] - digest = "1:8802ddb9cfe9b59a4c5388d5ab5f81e1cd59af1a7a7fb90162bdba838fa37ce2" + digest = "1:8e43e15194de65c9e9223df5488e3f400b76d33b68085a2e70b2c6ab84132d2c" name = "k8s.io/kubernetes" packages = [ + "pkg/api/v1/pod", "pkg/apis/core", "pkg/kubelet/apis/cri/runtime/v1alpha2", "pkg/kubelet/apis/stats/v1alpha1", @@ -1634,6 +1636,7 @@ "k8s.io/apimachinery/pkg/runtime", "k8s.io/apimachinery/pkg/types", "k8s.io/apimachinery/pkg/util/intstr", + "k8s.io/apimachinery/pkg/util/net", "k8s.io/apimachinery/pkg/util/uuid", "k8s.io/apimachinery/pkg/watch", "k8s.io/client-go/kubernetes", @@ -1643,6 +1646,8 @@ "k8s.io/client-go/tools/clientcmd", "k8s.io/client-go/tools/clientcmd/api/v1", "k8s.io/client-go/tools/remotecommand", + "k8s.io/client-go/tools/watch", + "k8s.io/kubernetes/pkg/api/v1/pod", "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2", "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1", "k8s.io/kubernetes/pkg/kubelet/server/remotecommand", diff --git a/Makefile b/Makefile index fb03c953f..da2aa8268 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,4 @@ +SHELL := /bin/bash IMPORT_PATH := github.com/virtual-kubelet/virtual-kubelet DOCKER_IMAGE := virtual-kubelet exec := $(DOCKER_IMAGE) @@ -108,6 +109,44 @@ format: $(GOPATH)/bin/goimports $Q find . -iname \*.go | grep -v \ -e "^$$" $(addprefix -e ,$(IGNORED_PACKAGES)) | xargs goimports -w +# skaffold deploys the virtual-kubelet to the Kubernetes cluster targeted by the current kubeconfig using skaffold. +# The current context (as indicated by "kubectl config current-context") must be one of "minikube" or "docker-for-desktop". +# MODE must be set to one of "dev" (default), "delete" or "run", and is used as the skaffold command to be run. +.PHONY: skaffold +skaffold: MODE ?= dev +skaffold: PROFILE := local +skaffold: VK_BUILD_TAGS ?= no_alicloud_provider no_aws_provider no_azure_provider no_azurebatch_provider no_cri_provider no_huawei_provider no_hyper_provider no_vic_provider no_web_provider +skaffold: + @if [[ ! "minikube,docker-for-desktop" =~ .*"$(kubectl_context)".* ]]; then \ + echo current-context is [$(kubectl_context)]. Must be one of [minikube,docker-for-desktop]; false; \ + fi + @if [[ ! "$(MODE)" == "delete" ]]; then \ + GOOS=linux GOARCH=amd64 VK_BUILD_TAGS="$(VK_BUILD_TAGS)" $(MAKE) build; \ + fi + @skaffold $(MODE) \ + -f $(PWD)/hack/skaffold/virtual-kubelet/skaffold.yml \ + -p $(PROFILE) + +# e2e runs the end-to-end test suite against the Kubernetes cluster targeted by the current kubeconfig. +# It is assumed that the virtual-kubelet node to be tested is running as a pod called NODE_NAME inside this Kubernetes cluster. +# It is also assumed that this virtual-kubelet node has been started with the "--node-name" flag set to NODE_NAME. +# Finally, running the e2e suite is not guaranteed to succeed against a provider other than "mock". +.PHONY: e2e +e2e: KUBECONFIG ?= $(HOME)/.kube/config +e2e: NAMESPACE ?= default +e2e: NODE_NAME ?= vkubelet-mock-0 +e2e: TAINT_KEY ?= virtual-kubelet.io/provider +e2e: TAINT_VALUE ?= mock +e2e: TAINT_EFFECT ?= NoSchedule +e2e: + @cd $(PWD)/test/e2e && go test -v -tags e2e ./... \ + -kubeconfig=$(KUBECONFIG) \ + -namespace=$(NAMESPACE) \ + -node-name=$(NODE_NAME) \ + -taint-key=$(TAINT_KEY) \ + -taint-value=$(TAINT_VALUE) \ + -taint-effect=$(TAINT_EFFECT) + ##### =====> Internals <===== ##### .PHONY: setup @@ -118,12 +157,6 @@ setup: clean fi if ! grep "/cover" .gitignore > /dev/null 2>&1; then \ echo "/cover" >> .gitignore; \ - fi - if ! grep "/bin" .gitignore > /dev/null 2>&1; then \ - echo "/bin" >> .gitignore; \ - fi - if ! grep "/test" .gitignore > /dev/null 2>&1; then \ - echo "/test" >> .gitignore; \ fi mkdir -p cover mkdir -p bin diff --git a/README.md b/README.md index 4fafb78f3..96f385614 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,8 @@ The best description is "Kubernetes API on top, programmable back." + [Service Fabric Mesh Provider](#service-fabric-mesh-provider) + [Adding a New Provider via the Provider Interface](#adding-a-new-provider-via-the-provider-interface) * [Testing](#testing) + + [Unit tests](#unit-tests) + + [End-to-end tests](#end-to-end-tests) + [Testing the Azure Provider Client](#testing-the-azure-provider-client) * [Known quirks and workarounds](#known-quirks-and-workarounds) * [Contributing](#contributing) @@ -238,8 +240,68 @@ type PodMetricsProvider interface { ## Testing +### Unit tests + Running the unit tests locally is as simple as `make test`. +### End-to-end tests + +Virtual Kubelet includes an end-to-end (e2e) test suite which is used to validate its implementation. +The current e2e suite **does not** run for any providers other than the `mock` provider. + +To run the e2e suite, three things are required: +- a local Kubernetes cluster (we have tested with [Docker for Mac](https://docs.docker.com/docker-for-mac/install/) and [Minikube](https://github.com/kubernetes/minikube)); +- Your _kubeconfig_ default context points to the local Kubernetes cluster; +- [`skaffold`](https://github.com/GoogleContainerTools/skaffold). + +Since our CI uses Minikube, we describe below how to run e2e on top of it. + +To create a Minikube cluster, run the following command after [installing Minikube](https://github.com/kubernetes/minikube#installation): + +```console +$ minikube start +``` + +The e2e suite requires Virtual Kubelet to be running as a pod inside the Kubernetes cluster. +To make the deployment process easier, the build toolchain leverages on `skaffold`. + +In order to deploy the Virtual Kubelet, run the following command after [installing `skaffold`](https://github.com/GoogleContainerTools/skaffold#installation): + +```console +$ make skaffold +``` + +By default, this will run `skaffold` in [_development_ mode](https://github.com/GoogleContainerTools/skaffold#skaffold-dev). +This will make `skaffold` watch `hack/skaffold/virtual-kubelet/Dockerfile` and its dependencies for changes and re-deploy the Virtual Kubelet when said changes happen. +It will also make `skaffold` stream logs from the Virtual Kubelet pod. + +As an alternative, and if you are not concerned about continuous deployment and log streaming, you can run the following command instead: + +```console +$ make skaffold MODE=run +``` + +This will deploy the Virtual Kubelet and return immediately. + +To run the e2e test suite, you can now run the following command: + +```console +$ make e2e +``` + +When you're done testing, you can run the following command to cleanup the resources created by `skaffold`: + +```console +$ make skaffold MODE=delete +``` + +Please note that this will not unregister the Virtual Kubelet as a node in the Kubernetes cluster. +In order to do so, you should run: + +```console +$ kubectl delete node vkubelet-mock-0 +``` + ### Testing the Azure Provider Client The unit tests for the [`azure`](providers/azure/) provider require a `credentials.json` diff --git a/hack/skaffold/virtual-kubelet/Dockerfile b/hack/skaffold/virtual-kubelet/Dockerfile new file mode 100644 index 000000000..f1e9951a7 --- /dev/null +++ b/hack/skaffold/virtual-kubelet/Dockerfile @@ -0,0 +1,16 @@ +FROM gcr.io/distroless/base + +ENV APISERVER_CERT_LOCATION /vkubelet-mock-0-crt.pem +ENV APISERVER_KEY_LOCATION /vkubelet-mock-0-key.pem +ENV KUBELET_PORT 10250 + +# Use the pre-built binary in "bin/virtual-kubelet". +COPY bin/virtual-kubelet /virtual-kubelet +# Copy the configuration file for the mock provider. +COPY hack/skaffold/virtual-kubelet/vkubelet-mock-0-cfg.json /vkubelet-mock-0-cfg.json +# Copy the certificate for the HTTPS server. +COPY hack/skaffold/virtual-kubelet/vkubelet-mock-0-crt.pem /vkubelet-mock-0-crt.pem +# Copy the private key for the HTTPS server. +COPY hack/skaffold/virtual-kubelet/vkubelet-mock-0-key.pem /vkubelet-mock-0-key.pem + +CMD ["/virtual-kubelet"] diff --git a/hack/skaffold/virtual-kubelet/base.yml b/hack/skaffold/virtual-kubelet/base.yml new file mode 100644 index 000000000..fc856874f --- /dev/null +++ b/hack/skaffold/virtual-kubelet/base.yml @@ -0,0 +1,52 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: virtual-kubelet +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: virtual-kubelet +rules: +- apiGroups: + - "" + resources: + - configmaps + - pods + - secrets + verbs: + - list + - watch +- apiGroups: + - "" + resources: + - nodes + verbs: + - create + - get +- apiGroups: + - "" + resources: + - nodes/status + verbs: + - get + - update +- apiGroups: + - "" + resources: + - pods/status + verbs: + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: virtual-kubelet +subjects: +- kind: ServiceAccount + name: virtual-kubelet + namespace: default +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: virtual-kubelet diff --git a/hack/skaffold/virtual-kubelet/pod.yml b/hack/skaffold/virtual-kubelet/pod.yml new file mode 100644 index 000000000..a5a8ea46b --- /dev/null +++ b/hack/skaffold/virtual-kubelet/pod.yml @@ -0,0 +1,26 @@ +apiVersion: v1 +kind: Pod +metadata: + name: vkubelet-mock-0 +spec: + containers: + - name: vkubelet-mock-0 + image: virtual-kubelet + # "IfNotPresent" is used to prevent Minikube from trying to pull from the registry (and failing) in the first place. + imagePullPolicy: IfNotPresent + args: + - /virtual-kubelet + - --nodename + - vkubelet-mock-0 + - --provider + - mock + - --provider-config + - /vkubelet-mock-0-cfg.json + ports: + - name: metrics + containerPort: 10255 + readinessProbe: + httpGet: + path: /stats/summary + port: metrics + serviceAccountName: virtual-kubelet diff --git a/hack/skaffold/virtual-kubelet/skaffold.yml b/hack/skaffold/virtual-kubelet/skaffold.yml new file mode 100644 index 000000000..ba72b45f3 --- /dev/null +++ b/hack/skaffold/virtual-kubelet/skaffold.yml @@ -0,0 +1,18 @@ +apiVersion: skaffold/v1alpha5 +kind: Config +build: + artifacts: + - image: virtual-kubelet + docker: + # Use a Dockerfile specific for development only. + dockerfile: hack/skaffold/virtual-kubelet/Dockerfile +deploy: + kubectl: + manifests: + - hack/skaffold/virtual-kubelet/base.yml + - hack/skaffold/virtual-kubelet/pod.yml +profiles: +- name: local + build: + # For the "local" profile, we must perform the build locally. + local: {} diff --git a/hack/skaffold/virtual-kubelet/vkubelet-mock-0-cfg.json b/hack/skaffold/virtual-kubelet/vkubelet-mock-0-cfg.json new file mode 100644 index 000000000..1120ac184 --- /dev/null +++ b/hack/skaffold/virtual-kubelet/vkubelet-mock-0-cfg.json @@ -0,0 +1,7 @@ +{ + "vkubelet-mock-0": { + "cpu": "2", + "memory": "32Gi", + "pods": "128" + } +} diff --git a/hack/skaffold/virtual-kubelet/vkubelet-mock-0-crt.pem b/hack/skaffold/virtual-kubelet/vkubelet-mock-0-crt.pem new file mode 100644 index 000000000..ae66da31d --- /dev/null +++ b/hack/skaffold/virtual-kubelet/vkubelet-mock-0-crt.pem @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIID3jCCAsagAwIBAgIIUa45eqfb4sEwDQYJKoZIhvcNAQELBQAwfzELMAkGA1UE +BhMCVVMxDzANBgNVBAgTBk9yZWdvbjERMA8GA1UEBxMIUG9ydGxhbmQxGDAWBgNV +BAoTD3ZrdWJlbGV0LW1vY2stMDEYMBYGA1UECxMPdmt1YmVsZXQtbW9jay0wMRgw +FgYDVQQDEw92a3ViZWxldC1tb2NrLTAwHhcNMTgxMTI2MTIwMzIzWhcNMTkwMjI1 +MTgwODIzWjB/MQswCQYDVQQGEwJVUzEPMA0GA1UECBMGT3JlZ29uMREwDwYDVQQH +EwhQb3J0bGFuZDEYMBYGA1UEChMPdmt1YmVsZXQtbW9jay0wMRgwFgYDVQQLEw92 +a3ViZWxldC1tb2NrLTAxGDAWBgNVBAMTD3ZrdWJlbGV0LW1vY2stMDCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBALryHvK3UBBBqGV2Fpwymf0p/YKGQA9r +Nu0N6f2+RkUXLuQXG+WdFQl3ZQybPLfCE2hwFcl3IF+3hCzY3/2UIyGBloBIft7K +YFLM3YWJDy5ElKDg1bNDSLzF6tkpNLDnVlgkPPITzpEHIAu+BT5DZGWhYAWO/Dir +XdxoJBOhPZZCcBCV+kwQQPbsXzZy+q7Qhx270CRMIXso9C5LJhGYL9fwsxmukAOR +56SmfsAaml7UOlzHITRDwD5AQ1BkTSEFy08dk6JAYL8LDLhgaLoWoV0Ge2gOIepR +jpl87dGbSVGyBHmTXv4o6utqT6S6nU76Ln9NSi7YhMqj8uWv0pTDlYcCAwEAAaNe +MFwwDgYDVR0PAQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcD +AjAMBgNVHRMBAf8EAjAAMB0GA1UdDgQWBBQVHwU1sy7Qnw1WvVvFLcZrhoT40DAN +BgkqhkiG9w0BAQsFAAOCAQEAsNGNKz1Jwfwg7rYaO7VF/zan01XXZFP1bnFYnXJu +15RzhOBMsp3KvWCVhwUfxNe8GhUDSx2tmS5EA/8oaEngLFl3jtR3pnUNOwDVlzly +QOCN3rlOi4+p26LvMiAFp5hxXAv3LORs6Dzr6h3/QTtlV5jDShUOXZdFdOPJdZ2n +g4birrG7MO6vwvR8CiNcQ26b+b8p9BGXbE8bsJoHmcsqya8fbVs2n6CdEJeI+4hD +N6xlo5SvhjH5tFII7eCVedyZGl0BKvkocOigLgq8X+JzFxj1wtdmtXv7sjdKcB9r +6TWGJRrZVxoxUOzZhpxUj3j/pLaRcDmttSJCuDu3NAtkgQ== +-----END CERTIFICATE----- diff --git a/hack/skaffold/virtual-kubelet/vkubelet-mock-0-key.pem b/hack/skaffold/virtual-kubelet/vkubelet-mock-0-key.pem new file mode 100644 index 000000000..37b974979 --- /dev/null +++ b/hack/skaffold/virtual-kubelet/vkubelet-mock-0-key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAuvIe8rdQEEGoZXYWnDKZ/Sn9goZAD2s27Q3p/b5GRRcu5Bcb +5Z0VCXdlDJs8t8ITaHAVyXcgX7eELNjf/ZQjIYGWgEh+3spgUszdhYkPLkSUoODV +s0NIvMXq2Sk0sOdWWCQ88hPOkQcgC74FPkNkZaFgBY78OKtd3GgkE6E9lkJwEJX6 +TBBA9uxfNnL6rtCHHbvQJEwheyj0LksmEZgv1/CzGa6QA5HnpKZ+wBqaXtQ6XMch +NEPAPkBDUGRNIQXLTx2TokBgvwsMuGBouhahXQZ7aA4h6lGOmXzt0ZtJUbIEeZNe +/ijq62pPpLqdTvouf01KLtiEyqPy5a/SlMOVhwIDAQABAoIBAEN84tVGfh3QRiWS +sujij5rITN+Q7ZFjaCm96yoSRbXtf50SBp0mzxBizNT3Ob0wz+bVB9h6K/LCAnJa +PMqDbwdKi/V1tm9hadKaaKIrb5KJaYqGgD893AViAb0x1fbDHPWm52WQ5vKOOvBi +QexPUfAqiMqY6s7ednz6D4QSonQamxCUPBPYvudmayHtPlc8Qb6eY0V+pcdFnW08 +SDZXYOxey3/IAjZydcA7XgvNSc+6XOwmhKsGAW71uFTTagJvzX3ePCY14rkGJmDG +m/10hoW6NMKGeV/RyX3dX0jJmDk1VfxAQW3xpOipZfgfvgavCOqHnKA6I8dK3zhg +vE9BleECgYEA87X/ztQZDI4qOTA9CW/nMXfwAy9QO1K6bGhBHUu7Js4pqgxuH8Fk +hQgQK7V8iasny/dCyj6Cu3QJNofxudAvLLQKkquyQOa+zqFCUpVid7JVRMcRLJlt +3HlyCNvVlhfjDT0cI2RdU45q8MnZoy1f3DPZB16cHb3HL9z1gQZTiXECgYEAxF9a +68SbxmWFBK7PaobI8wVfDoTirHmoAvnypYK0oQkAX8VmEmtEEs2+N1moKjSTPr+t +us4JKguA8z2tuLk5j+eF+zDl/2U+7djTF8FCNprwz3sXr427GCIGL5YvpIBZ+TL8 +Bji2uyoo8k9SAWMb4ObOzfGm4teCvciS99gw0ncCgYAt5GbAVtZEs/ylejz0KvtZ +KGGs59ru4Nw0D8m7L4iVfRsBZ4fROQSpvGP3JxzFe9JpqS0NkonhrK8TcrQFLnvD +qj+XcPeHGyxxEpK/pFu/eHhwFCBayqWSb9gWbPciZWsfEhPbYknksxvWLdxqyt+T +QrwqlBlHzHXWwIAGhN90MQKBgQC5CYkpBFgsuFiBMx+rJ1qO9I6/paPaFcClHVTx +dJoz68F4fQ9TZ9P7S/djPI5jRqtAw2k2zxJ/ldtqWMIrgA2ndegf69GtuH91q4wt +pCN6RMGJIFoPSCP194mQqZo3DeK6GLq2OhalgnKW8Ps652LLp3FTSdORiLVfk3I5 +LHPEvQKBgDCxa/3vneG8vgs8ArEjN89B/YxO1qIU5mxJe6Zafb81NdhYUjfRAVro +ALTofiApMsnDbJDHMiwvwcDUHbPLpruK80R//zmX7Xen+F+5obfSQ8j0GSmmeWFQ +SVG6ApNtktLPI0nK2nEIH/Qx4ouGC9N0pADRClQQPSxEPmDvf4xf +-----END RSA PRIVATE KEY----- diff --git a/providers/mock/mock.go b/providers/mock/mock.go index efb8582ee..65fc06abd 100644 --- a/providers/mock/mock.go +++ b/providers/mock/mock.go @@ -7,15 +7,18 @@ import ( "io" "io/ioutil" "log" + "math/rand" "time" "github.com/cpuguy83/strongerrors" - "github.com/virtual-kubelet/virtual-kubelet/providers" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/remotecommand" + stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" + + "github.com/virtual-kubelet/virtual-kubelet/providers" ) const ( @@ -33,6 +36,7 @@ type MockProvider struct { daemonEndpointPort int32 pods map[string]*v1.Pod config MockConfig + startTime time.Time } // MockConfig contains a mock virtual-kubelet's configurable parameters. @@ -56,6 +60,7 @@ func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalI daemonEndpointPort: daemonEndpointPort, pods: make(map[string]*v1.Pod), config: config, + startTime: time.Now(), } return &provider, nil } @@ -326,6 +331,80 @@ func (p *MockProvider) OperatingSystem() string { return providers.OperatingSystemLinux } +// GetStatsSummary returns dummy stats for all pods known by this provider. +func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) { + // Grab the current timestamp so we can report it as the time the stats were generated. + time := metav1.NewTime(time.Now()) + + // Create the Summary object that will later be populated with node and pod stats. + res := &stats.Summary{} + + // Populate the Summary object with basic node stats. + res.Node = stats.NodeStats{ + NodeName: p.nodeName, + StartTime: metav1.NewTime(p.startTime), + } + + // Populate the Summary object with dummy stats for each pod known by this provider. + for _, pod := range p.pods { + var ( + // totalUsageNanoCores will be populated with the sum of the values of UsageNanoCores computes across all containers in the pod. + totalUsageNanoCores uint64 + // totalUsageBytes will be populated with the sum of the values of UsageBytes computed across all containers in the pod. + totalUsageBytes uint64 + ) + + // Create a PodStats object to populate with pod stats. + pss := stats.PodStats{ + PodRef: stats.PodReference{ + Name: pod.Name, + Namespace: pod.Namespace, + UID: string(pod.UID), + }, + StartTime: pod.CreationTimestamp, + } + + // Iterate over all containers in the current pod to compute dummy stats. + for _, container := range pod.Spec.Containers { + // Grab a dummy value to be used as the total CPU usage. + // The value should fit a uint32 in order to avoid overflows later on when computing pod stats. + dummyUsageNanoCores := uint64(rand.Uint32()) + totalUsageNanoCores += dummyUsageNanoCores + // Create a dummy value to be used as the total RAM usage. + // The value should fit a uint32 in order to avoid overflows later on when computing pod stats. + dummyUsageBytes := uint64(rand.Uint32()) + totalUsageBytes += dummyUsageBytes + // Append a ContainerStats object containing the dummy stats to the PodStats object. + pss.Containers = append(pss.Containers, stats.ContainerStats{ + Name: container.Name, + StartTime: pod.CreationTimestamp, + CPU: &stats.CPUStats{ + Time: time, + UsageNanoCores: &dummyUsageNanoCores, + }, + Memory: &stats.MemoryStats{ + Time: time, + UsageBytes: &dummyUsageBytes, + }, + }) + } + + // Populate the CPU and RAM stats for the pod and append the PodsStats object to the Summary object to be returned. + pss.CPU = &stats.CPUStats{ + Time: time, + UsageNanoCores: &totalUsageNanoCores, + } + pss.Memory = &stats.MemoryStats{ + Time: time, + UsageBytes: &totalUsageBytes, + } + res.Pods = append(res.Pods, pss) + } + + // Return the dummy stats. + return res, nil +} + func buildKeyFromNames(namespace string, name string) (string, error) { return fmt.Sprintf("%s-%s", namespace, name), nil } diff --git a/test/e2e/basic_test.go b/test/e2e/basic_test.go new file mode 100644 index 000000000..3584fd659 --- /dev/null +++ b/test/e2e/basic_test.go @@ -0,0 +1,144 @@ +// +build e2e + +package e2e + +import ( + "fmt" + "testing" + + "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" +) + +// TestGetStatsSummary creates a pod having two containers and queries the /stats/summary endpoint of the virtual-kubelet. +// It expects this endpoint to return stats for the current node, as well as for the aforementioned pod and each of its two containers. +func TestGetStatsSummary(t *testing.T) { + // Create a pod with prefix "nginx-0-" having three containers. + pod, err := f.CreatePod(f.CreateDummyPodObjectWithPrefix("nginx-0-", "foo", "bar", "baz")) + if err != nil { + t.Fatal(err) + } + // Delete the "nginx-0-X" pod after the test finishes. + defer func() { + if err := f.DeletePod(pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) { + t.Error(err) + } + }() + + // Wait for the "nginx-0-X" pod to be reported as running and ready. + if err := f.WaitUntilPodReady(pod.Namespace, pod.Name); err != nil { + t.Fatal(err) + } + + // Grab the stats from the provider. + stats, err := f.GetStatsSummary() + if err != nil { + t.Fatal(err) + } + + // Make sure that we've got stats for the current node. + if stats.Node.NodeName != f.NodeName { + t.Fatalf("expected stats for node %s, got stats for node %s", f.NodeName, stats.Node.NodeName) + } + + // Make sure the "nginx-0-X" pod exists in the slice of PodStats. + idx, err := findPodInPodStats(stats, pod) + if err != nil { + t.Fatal(err) + } + + // Make sure that we've got stats for all the containers in the "nginx-0-X" pod. + desiredContainerStatsCount := len(pod.Spec.Containers) + currentContainerStatsCount := len(stats.Pods[idx].Containers) + if currentContainerStatsCount != desiredContainerStatsCount { + t.Fatalf("expected stats for %d containers, got stats for %d containers", desiredContainerStatsCount, currentContainerStatsCount) + } +} + +// TestPodLifecycle creates two pods and verifies that the provider has been asked to create them. +// Then, it deletes one of the pods and verifies that the provider has been asked to delete it. +// These verifications are made using the /stats/summary endpoint of the virtual-kubelet, by checking for the presence or absence of the pods. +// Hence, the provider being tested must implement the PodMetricsProvider interface. +func TestPodLifecycle(t *testing.T) { + // Create a pod with prefix "nginx-0-" having a single container. + pod0, err := f.CreatePod(f.CreateDummyPodObjectWithPrefix("nginx-0-", "foo")) + if err != nil { + t.Fatal(err) + } + // Delete the "nginx-0-X" pod after the test finishes. + defer func() { + if err := f.DeletePod(pod0.Namespace, pod0.Name); err != nil && !apierrors.IsNotFound(err) { + t.Error(err) + } + }() + + // Create a pod with prefix "nginx-1-" having a single container. + pod1, err := f.CreatePod(f.CreateDummyPodObjectWithPrefix("nginx-1-", "bar")) + if err != nil { + t.Fatal(err) + } + // Delete the "nginx-1-Y" pod after the test finishes. + defer func() { + if err := f.DeletePod(pod1.Namespace, pod1.Name); err != nil && !apierrors.IsNotFound(err) { + t.Error(err) + } + }() + + // Wait for the "nginx-0-X" pod to be reported as running and ready. + if err := f.WaitUntilPodReady(pod0.Namespace, pod0.Name); err != nil { + t.Fatal(err) + } + // Wait for the "nginx-1-Y" pod to be reported as running and ready. + if err := f.WaitUntilPodReady(pod1.Namespace, pod1.Name); err != nil { + t.Fatal(err) + } + + // Grab the stats from the provider. + stats, err := f.GetStatsSummary() + if err != nil { + t.Fatal(err) + } + + // Make sure the "nginx-0-X" pod exists in the slice of PodStats. + if _, err := findPodInPodStats(stats, pod0); err != nil { + t.Fatal(err) + } + + // Make sure the "nginx-1-Y" pod exists in the slice of PodStats. + if _, err := findPodInPodStats(stats, pod1); err != nil { + t.Fatal(err) + } + + // Delete the "nginx-1" pod. + if err := f.DeletePod(pod1.Namespace, pod1.Name); err != nil { + t.Fatal(err) + } + + // Wait for the "nginx-1-Y" pod to be reported as having been marked for deletion. + if err := f.WaitUntilPodDeleted(pod1.Namespace, pod1.Name); err != nil { + t.Fatal(err) + } + + // Grab the stats from the provider. + stats, err = f.GetStatsSummary() + if err != nil { + t.Fatal(err) + } + + // Make sure the "nginx-1-Y" pod DOES NOT exist in the slice of PodStats anymore. + if _, err := findPodInPodStats(stats, pod1); err == nil { + t.Fatalf("expected to NOT find pod \"%s/%s\" in the slice of pod stats", pod1.Namespace, pod1.Name) + } +} + +// findPodInPodStats returns the index of the specified pod in the .pods field of the specified Summary object. +// It returns an error if the specified pod is not found. +func findPodInPodStats(summary *v1alpha1.Summary, pod *v1.Pod) (int, error) { + for i, p := range summary.Pods { + if p.PodRef.Namespace == pod.Namespace && p.PodRef.Name == pod.Name && string(p.PodRef.UID) == string(pod.UID) { + return i, nil + } + } + return -1, fmt.Errorf("failed to find pod \"%s/%s\" in the slice of pod stats", pod.Namespace, pod.Name) +} diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go new file mode 100644 index 000000000..17f20213c --- /dev/null +++ b/test/e2e/framework/framework.go @@ -0,0 +1,47 @@ +package framework + +import ( + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +// Framework encapsulates the configuration for the current run, and provides helper methods to be used during testing. +type Framework struct { + KubeClient kubernetes.Interface + Namespace string + NodeName string + TaintKey string + TaintValue string + TaintEffect string +} + +// NewTestingFramework returns a new instance of the testing framework. +func NewTestingFramework(kubeconfig, namespace, nodeName, taintKey, taintValue, taintEffect string) *Framework { + return &Framework{ + KubeClient: createKubeClient(kubeconfig), + Namespace: namespace, + NodeName: nodeName, + TaintKey: taintKey, + TaintValue: taintValue, + TaintEffect: taintEffect, + } +} + +// createKubeClient creates a new Kubernetes client based on the specified kubeconfig file. +// If no value for kubeconfig is specified, in-cluster configuration is assumed. +func createKubeClient(kubeconfig string) *kubernetes.Clientset { + var ( + cfg *rest.Config + err error + ) + if kubeconfig == "" { + cfg, err = rest.InClusterConfig() + } else { + cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + } + if err != nil { + panic(err) + } + return kubernetes.NewForConfigOrDie(cfg) +} diff --git a/test/e2e/framework/pod.go b/test/e2e/framework/pod.go new file mode 100644 index 000000000..2eee42ecc --- /dev/null +++ b/test/e2e/framework/pod.go @@ -0,0 +1,108 @@ +package framework + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + watchapi "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/watch" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" +) + +const ( + defaultWatchTimeout = 2 * time.Minute + hostnameNodeSelectorLabel = "kubernetes.io/hostname" +) + +// CreateDummyPodObjectWithPrefix creates a dujmmy pod object using the specified prefix as the value of .metadata.generateName. +// A variable number of strings can be provided. +// For each one of these strings, a container that uses the string as its image will be appended to the pod. +// This method DOES NOT create the pod in the Kubernetes API. +func (f *Framework) CreateDummyPodObjectWithPrefix(prefix string, images ...string) *corev1.Pod { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: prefix, + Namespace: f.Namespace, + }, + Spec: corev1.PodSpec{ + NodeSelector: map[string]string{ + hostnameNodeSelectorLabel: f.NodeName, + }, + Tolerations: []corev1.Toleration{ + { + Key: f.TaintKey, + Value: f.TaintValue, + Effect: corev1.TaintEffect(f.TaintEffect), + }, + }, + }, + } + for idx, img := range images { + pod.Spec.Containers = append(pod.Spec.Containers, corev1.Container{ + Name: fmt.Sprintf("%s%d", prefix, idx), + Image: img, + }) + } + return pod +} + +// CreatePod creates the specified pod in the Kubernetes API. +func (f *Framework) CreatePod(pod *corev1.Pod) (*corev1.Pod, error) { + return f.KubeClient.CoreV1().Pods(f.Namespace).Create(pod) +} + +// DeletePod deletes the pod with the specified name and namespace in the Kubernetes API. +func (f *Framework) DeletePod(namespace, name string) error { + return f.KubeClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{}) +} + +// WaitUntilPodCondition establishes a watch on the pod with the specified name and namespace. +// Then, it waits for the specified condition function to be verified. +func (f *Framework) WaitUntilPodCondition(namespace, name string, fn watch.ConditionFunc) error { + // Create a field selector that matches the specified Pod resource. + fs := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.namespace==%s,metadata.name==%s", namespace, name)) + // Create a ListWatch so we can receive events for the matched Pod resource. + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fs.String() + return f.KubeClient.CoreV1().Pods(namespace).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watchapi.Interface, error) { + options.FieldSelector = fs.String() + return f.KubeClient.CoreV1().Pods(namespace).Watch(options) + }, + } + // Watch for updates to the Pod resource until fn is satisfied, or until the timeout is reached. + ctx, cfn := context.WithTimeout(context.Background(), defaultWatchTimeout) + defer cfn() + last, err := watch.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, fn) + if err != nil { + return err + } + if last == nil { + return fmt.Errorf("no events received for pod %q", name) + } + return nil +} + +// WaitUntilPodReady blocks until the pod with the specified name and namespace is reported to be running and ready. +func (f *Framework) WaitUntilPodReady(namespace, name string) error { + return f.WaitUntilPodCondition(namespace, name, func(event watchapi.Event) (bool, error) { + pod := event.Object.(*corev1.Pod) + return pod.Status.Phase == corev1.PodRunning && podutil.IsPodReady(pod) && pod.Status.PodIP != "", nil + }) +} + +// WaitUntilPodDeleted blocks until the pod with the specified name and namespace is marked for deletion (or, alternatively, effectively deleted). +func (f *Framework) WaitUntilPodDeleted(namespace, name string) error { + return f.WaitUntilPodCondition(namespace, name, func(event watchapi.Event) (bool, error) { + pod := event.Object.(*corev1.Pod) + return event.Type == watchapi.Deleted || pod.DeletionTimestamp != nil, nil + }) +} diff --git a/test/e2e/framework/stats.go b/test/e2e/framework/stats.go new file mode 100644 index 000000000..3dd7b8e55 --- /dev/null +++ b/test/e2e/framework/stats.go @@ -0,0 +1,31 @@ +package framework + +import ( + "encoding/json" + "strconv" + + "k8s.io/apimachinery/pkg/util/net" + stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" +) + +// GetStatsSummary queries the /stats/summary endpoint of the virtual-kubelet and returns the Summary object obtained as a response. +func (f *Framework) GetStatsSummary() (*stats.Summary, error) { + // Query the /stats/summary endpoint. + b, err := f.KubeClient.CoreV1(). + RESTClient(). + Get(). + Namespace(f.Namespace). + Resource("pods"). + SubResource("proxy"). + Name(net.JoinSchemeNamePort("http", f.NodeName, strconv.Itoa(10255))). + Suffix("/stats/summary").DoRaw() + if err != nil { + return nil, err + } + // Unmarshal the response as a Summary object and return it. + res := &stats.Summary{} + if err := json.Unmarshal(b, res); err != nil { + return nil, err + } + return res, nil +} diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go new file mode 100644 index 000000000..f9403606e --- /dev/null +++ b/test/e2e/main_test.go @@ -0,0 +1,81 @@ +// +build e2e + +package e2e + +import ( + "flag" + "os" + "testing" + + "k8s.io/api/core/v1" + + "github.com/virtual-kubelet/virtual-kubelet/test/e2e/framework" +) + +const ( + defaultNamespace = v1.NamespaceDefault + defaultNodeName = "vkubelet-mock-0" + defaultTaintKey = "virtual-kubelet.io/provider" + defaultTaintValue = "mock" + defaultTaintEffect = string(v1.TaintEffectNoSchedule) +) + +var ( + // f is the testing framework used for running the test suite. + f *framework.Framework + + // kubeconfig is the path to the kubeconfig file to use when running the test suite outside a Kubernetes cluster. + kubeconfig string + // namespace is the name of the Kubernetes namespace to use for running the test suite (i.e. where to create pods). + namespace string + // nodeName is the name of the virtual-kubelet node to test. + nodeName string + // taintKey is the key of the taint that is expected to be associated with the virtual-kubelet node to test. + taintKey string + // taintValue is the value of the taint that is expected to be associated with the virtual-kubelet node to test. + taintValue string + // taintEffect is the effect of the taint that is expected to be associated with the virtual-kubelet node to test. + taintEffect string +) + +func init() { + flag.StringVar(&kubeconfig, "kubeconfig", "", "path to the kubeconfig file to use when running the test suite outside a kubernetes cluster") + flag.StringVar(&namespace, "namespace", defaultNamespace, "the name of the kubernetes namespace to use for running the test suite (i.e. where to create pods)") + flag.StringVar(&nodeName, "node-name", defaultNodeName, "the name of the virtual-kubelet node to test") + flag.StringVar(&taintKey, "taint-key", defaultTaintKey, "the key of the taint that is expected to be associated with the virtual-kubelet node to test") + flag.StringVar(&taintValue, "taint-value", defaultTaintValue, "the value of the taint that is expected to be associated with the virtual-kubelet node to test") + flag.StringVar(&taintEffect, "taint-effect", defaultTaintEffect, "the effect of the taint that is expected to be associated with the virtual-kubelet node to test") + flag.Parse() +} + +func TestMain(m *testing.M) { + // Set sane defaults in case no values (or empty ones) have been provided. + setDefaults() + // Create a new instance of the test framework targeting the specified node. + f = framework.NewTestingFramework(kubeconfig, namespace, nodeName, taintKey, taintValue, taintEffect) + // Wait for the virtual-kubelet pod to be ready. + if err := f.WaitUntilPodReady(namespace, nodeName); err != nil { + panic(err) + } + // Run the test suite. + os.Exit(m.Run()) +} + +// setDefaults sets sane defaults in case no values (or empty ones) have been provided. +func setDefaults() { + if namespace == "" { + namespace = defaultNamespace + } + if nodeName == "" { + nodeName = defaultNodeName + } + if taintKey == "" { + taintKey = defaultTaintKey + } + if taintValue == "" { + taintValue = defaultTaintValue + } + if taintEffect == "" { + taintEffect = defaultTaintEffect + } +} diff --git a/vendor/github.com/vmware/vic/lib/apiservers/engine/network/utils.go b/vendor/github.com/vmware/vic/lib/apiservers/engine/network/utils.go index 2c6de5c50..883fff116 100644 --- a/vendor/github.com/vmware/vic/lib/apiservers/engine/network/utils.go +++ b/vendor/github.com/vmware/vic/lib/apiservers/engine/network/utils.go @@ -52,24 +52,29 @@ var ( cbpLock sync.Mutex ContainerByPort map[string]string // port:containerID + once sync.Once ) func init() { portMapper = portmap.NewPortMapper() btbRules = make(map[string][]string) ContainerByPort = make(map[string]string) +} - l, err := netlink.LinkByName(publicIfaceName) - if l == nil { - l, err = netlink.LinkByAlias(publicIfaceName) - if err != nil { - log.Errorf("interface %s not found", publicIfaceName) - return +func Init() { + once.Do(func() { + l, err := netlink.LinkByName(publicIfaceName) + if l == nil { + l, err = netlink.LinkByAlias(publicIfaceName) + if err != nil { + log.Errorf("interface %s not found", publicIfaceName) + return + } } - } - // don't use interface alias for iptables rules - publicIfaceName = l.Attrs().Name + // don't use interface alias for iptables rules + publicIfaceName = l.Attrs().Name + }) } // requestHostPort finds a free port on the host diff --git a/vendor/k8s.io/client-go/tools/watch/informerwatcher.go b/vendor/k8s.io/client-go/tools/watch/informerwatcher.go new file mode 100644 index 000000000..35a346949 --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/informerwatcher.go @@ -0,0 +1,114 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" + "sync/atomic" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +func newTicketer() *ticketer { + return &ticketer{ + cond: sync.NewCond(&sync.Mutex{}), + } +} + +type ticketer struct { + counter uint64 + + cond *sync.Cond + current uint64 +} + +func (t *ticketer) GetTicket() uint64 { + // -1 to start from 0 + return atomic.AddUint64(&t.counter, 1) - 1 +} + +func (t *ticketer) WaitForTicket(ticket uint64, f func()) { + t.cond.L.Lock() + defer t.cond.L.Unlock() + for ticket != t.current { + t.cond.Wait() + } + + f() + + t.current++ + t.cond.Broadcast() +} + +// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface +// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. +func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) { + ch := make(chan watch.Event) + w := watch.NewProxyWatcher(ch) + t := newTicketer() + + indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + select { + case ch <- watch.Event{ + Type: watch.Added, + Object: obj.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + UpdateFunc: func(old, new interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + select { + case ch <- watch.Event{ + Type: watch.Modified, + Object: new.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + DeleteFunc: func(obj interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + staleObj, stale := obj.(cache.DeletedFinalStateUnknown) + if stale { + // We have no means of passing the additional information down using watch API based on watch.Event + // but the caller can filter such objects by checking if metadata.deletionTimestamp is set + obj = staleObj + } + + select { + case ch <- watch.Event{ + Type: watch.Deleted, + Object: obj.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + }, cache.Indexers{}) + + go func() { + informer.Run(w.StopChan()) + }() + + return indexer, informer, w +} diff --git a/vendor/k8s.io/client-go/tools/watch/until.go b/vendor/k8s.io/client-go/tools/watch/until.go new file mode 100644 index 000000000..933578843 --- /dev/null +++ b/vendor/k8s.io/client-go/tools/watch/until.go @@ -0,0 +1,225 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition failed or detected an error state. +type PreconditionFunc func(store cache.Store) (bool, error) + +// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition cannot be checked and should terminate. In general, it is better to define +// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed +// from false to true). +type ConditionFunc func(event watch.Event) (bool, error) + +// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry. +var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout") + +// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch +// encountered. The first condition that returns an error terminates the watch (and the event is also returned). +// If no event has been received, the returned event will be nil. +// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +// Waits until context deadline or until context is canceled. +// +// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!! +// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error. +// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below, +// Warning: solving such issues. +// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone. +func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) { + ch := watcher.ResultChan() + defer watcher.Stop() + var lastEvent *watch.Event + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + continue + } + } + ConditionSucceeded: + for { + select { + case event, ok := <-ch: + if !ok { + return lastEvent, ErrWatchClosed + } + lastEvent = &event + + done, err := condition(event) + if err != nil { + return lastEvent, err + } + if done { + break ConditionSucceeded + } + + case <-ctx.Done(): + return lastEvent, wait.ErrWaitTimeout + } + } + } + return lastEvent, nil +} + +// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced, +// and watches the output until each provided condition succeeds, in a way that is identical +// to function UntilWithoutRetry. (See above.) +// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'. +// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will +// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple +// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will +// re-list to recover and you always get an event, if there has been a change, after recovery. +// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for +// particular object, not between more of them even it's the same resource. +// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: +// waiting for object reaching a state, "small" controllers, ... +func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { + indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType) + // Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and + // let UntilWithoutRetry to stop it + defer watcher.Stop() + + if precondition != nil { + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %v", ctx.Err()) + } + + done, err := precondition(indexer) + if err != nil { + return nil, err + } + + if done { + return nil, nil + } + } + + return UntilWithoutRetry(ctx, watcher, conditions...) +} + +// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration. +func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout < 0 { + // This should be handled in validation + glog.Errorf("Timeout for context shall not be negative!") + timeout = 0 + } + + if timeout == 0 { + return context.WithCancel(parent) + } + + return context.WithTimeout(parent, timeout) +} + +// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout +// if timeout is exceeded without all conditions returning true, or an error if an error occurs. +// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. +// TODO: remove when no longer used +// +// Deprecated: Use UntilWithSync instead. +func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) { + if len(conditions) == 0 { + return nil, nil + } + + list, err := lw.List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + initialItems, err := meta.ExtractList(list) + if err != nil { + return nil, err + } + + // use the initial items as simulated "adds" + var lastEvent *watch.Event + currIndex := 0 + passedConditions := 0 + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + passedConditions = passedConditions + 1 + continue + } + } + + ConditionSucceeded: + for currIndex < len(initialItems) { + lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]} + currIndex++ + + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + passedConditions = passedConditions + 1 + break ConditionSucceeded + } + } + } + if passedConditions == len(conditions) { + return lastEvent, nil + } + remainingConditions := conditions[passedConditions:] + + metaObj, err := meta.ListAccessor(list) + if err != nil { + return nil, err + } + currResourceVersion := metaObj.GetResourceVersion() + + watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion}) + if err != nil { + return nil, err + } + + ctx, cancel := ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + evt, err := UntilWithoutRetry(ctx, watchInterface, remainingConditions...) + if err == ErrWatchClosed { + // present a consistent error interface to callers + err = wait.ErrWaitTimeout + } + return evt, err +} diff --git a/vendor/k8s.io/kubernetes/pkg/api/v1/pod/util.go b/vendor/k8s.io/kubernetes/pkg/api/v1/pod/util.go new file mode 100644 index 000000000..416221d52 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/api/v1/pod/util.go @@ -0,0 +1,305 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "fmt" + "time" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +// FindPort locates the container port for the given pod and portName. If the +// targetPort is a number, use that. If the targetPort is a string, look that +// string up in all named ports in all containers in the target pod. If no +// match is found, fail. +func FindPort(pod *v1.Pod, svcPort *v1.ServicePort) (int, error) { + portName := svcPort.TargetPort + switch portName.Type { + case intstr.String: + name := portName.StrVal + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.Name == name && port.Protocol == svcPort.Protocol { + return int(port.ContainerPort), nil + } + } + } + case intstr.Int: + return portName.IntValue(), nil + } + + return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID) +} + +// Visitor is called with each object name, and returns true if visiting should continue +type Visitor func(name string) (shouldContinue bool) + +// VisitPodSecretNames invokes the visitor function with the name of every secret +// referenced by the pod spec. If visitor returns false, visiting is short-circuited. +// Transitive references (e.g. pod -> pvc -> pv -> secret) are not visited. +// Returns true if visiting completed, false if visiting was short-circuited. +func VisitPodSecretNames(pod *v1.Pod, visitor Visitor) bool { + for _, reference := range pod.Spec.ImagePullSecrets { + if !visitor(reference.Name) { + return false + } + } + for i := range pod.Spec.InitContainers { + if !visitContainerSecretNames(&pod.Spec.InitContainers[i], visitor) { + return false + } + } + for i := range pod.Spec.Containers { + if !visitContainerSecretNames(&pod.Spec.Containers[i], visitor) { + return false + } + } + var source *v1.VolumeSource + + for i := range pod.Spec.Volumes { + source = &pod.Spec.Volumes[i].VolumeSource + switch { + case source.AzureFile != nil: + if len(source.AzureFile.SecretName) > 0 && !visitor(source.AzureFile.SecretName) { + return false + } + case source.CephFS != nil: + if source.CephFS.SecretRef != nil && !visitor(source.CephFS.SecretRef.Name) { + return false + } + case source.Cinder != nil: + if source.Cinder.SecretRef != nil && !visitor(source.Cinder.SecretRef.Name) { + return false + } + case source.FlexVolume != nil: + if source.FlexVolume.SecretRef != nil && !visitor(source.FlexVolume.SecretRef.Name) { + return false + } + case source.Projected != nil: + for j := range source.Projected.Sources { + if source.Projected.Sources[j].Secret != nil { + if !visitor(source.Projected.Sources[j].Secret.Name) { + return false + } + } + } + case source.RBD != nil: + if source.RBD.SecretRef != nil && !visitor(source.RBD.SecretRef.Name) { + return false + } + case source.Secret != nil: + if !visitor(source.Secret.SecretName) { + return false + } + case source.ScaleIO != nil: + if source.ScaleIO.SecretRef != nil && !visitor(source.ScaleIO.SecretRef.Name) { + return false + } + case source.ISCSI != nil: + if source.ISCSI.SecretRef != nil && !visitor(source.ISCSI.SecretRef.Name) { + return false + } + case source.StorageOS != nil: + if source.StorageOS.SecretRef != nil && !visitor(source.StorageOS.SecretRef.Name) { + return false + } + } + } + return true +} + +func visitContainerSecretNames(container *v1.Container, visitor Visitor) bool { + for _, env := range container.EnvFrom { + if env.SecretRef != nil { + if !visitor(env.SecretRef.Name) { + return false + } + } + } + for _, envVar := range container.Env { + if envVar.ValueFrom != nil && envVar.ValueFrom.SecretKeyRef != nil { + if !visitor(envVar.ValueFrom.SecretKeyRef.Name) { + return false + } + } + } + return true +} + +// VisitPodConfigmapNames invokes the visitor function with the name of every configmap +// referenced by the pod spec. If visitor returns false, visiting is short-circuited. +// Transitive references (e.g. pod -> pvc -> pv -> secret) are not visited. +// Returns true if visiting completed, false if visiting was short-circuited. +func VisitPodConfigmapNames(pod *v1.Pod, visitor Visitor) bool { + for i := range pod.Spec.InitContainers { + if !visitContainerConfigmapNames(&pod.Spec.InitContainers[i], visitor) { + return false + } + } + for i := range pod.Spec.Containers { + if !visitContainerConfigmapNames(&pod.Spec.Containers[i], visitor) { + return false + } + } + var source *v1.VolumeSource + for i := range pod.Spec.Volumes { + source = &pod.Spec.Volumes[i].VolumeSource + switch { + case source.Projected != nil: + for j := range source.Projected.Sources { + if source.Projected.Sources[j].ConfigMap != nil { + if !visitor(source.Projected.Sources[j].ConfigMap.Name) { + return false + } + } + } + case source.ConfigMap != nil: + if !visitor(source.ConfigMap.Name) { + return false + } + } + } + return true +} + +func visitContainerConfigmapNames(container *v1.Container, visitor Visitor) bool { + for _, env := range container.EnvFrom { + if env.ConfigMapRef != nil { + if !visitor(env.ConfigMapRef.Name) { + return false + } + } + } + for _, envVar := range container.Env { + if envVar.ValueFrom != nil && envVar.ValueFrom.ConfigMapKeyRef != nil { + if !visitor(envVar.ValueFrom.ConfigMapKeyRef.Name) { + return false + } + } + } + return true +} + +// GetContainerStatus extracts the status of container "name" from "statuses". +// It also returns if "name" exists. +func GetContainerStatus(statuses []v1.ContainerStatus, name string) (v1.ContainerStatus, bool) { + for i := range statuses { + if statuses[i].Name == name { + return statuses[i], true + } + } + return v1.ContainerStatus{}, false +} + +// GetExistingContainerStatus extracts the status of container "name" from "statuses", +// It also returns if "name" exists. +func GetExistingContainerStatus(statuses []v1.ContainerStatus, name string) v1.ContainerStatus { + status, _ := GetContainerStatus(statuses, name) + return status +} + +// IsPodAvailable returns true if a pod is available; false otherwise. +// Precondition for an available pod is that it must be ready. On top +// of that, there are two cases when a pod can be considered available: +// 1. minReadySeconds == 0, or +// 2. LastTransitionTime (is set) + minReadySeconds < current time +func IsPodAvailable(pod *v1.Pod, minReadySeconds int32, now metav1.Time) bool { + if !IsPodReady(pod) { + return false + } + + c := GetPodReadyCondition(pod.Status) + minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second + if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now.Time) { + return true + } + return false +} + +// IsPodReady returns true if a pod is ready; false otherwise. +func IsPodReady(pod *v1.Pod) bool { + return IsPodReadyConditionTrue(pod.Status) +} + +// IsPodReady returns true if a pod is ready; false otherwise. +func IsPodReadyConditionTrue(status v1.PodStatus) bool { + condition := GetPodReadyCondition(status) + return condition != nil && condition.Status == v1.ConditionTrue +} + +// Extracts the pod ready condition from the given status and returns that. +// Returns nil if the condition is not present. +func GetPodReadyCondition(status v1.PodStatus) *v1.PodCondition { + _, condition := GetPodCondition(&status, v1.PodReady) + return condition +} + +// GetPodCondition extracts the provided condition from the given status and returns that. +// Returns nil and -1 if the condition is not present, and the index of the located condition. +func GetPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) { + if status == nil { + return -1, nil + } + return GetPodConditionFromList(status.Conditions, conditionType) +} + +// GetPodConditionFromList extracts the provided condition from the given list of condition and +// returns the index of the condition and the condition. Returns -1 and nil if the condition is not present. +func GetPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) { + if conditions == nil { + return -1, nil + } + for i := range conditions { + if conditions[i].Type == conditionType { + return i, &conditions[i] + } + } + return -1, nil +} + +// Updates existing pod condition or creates a new one. Sets LastTransitionTime to now if the +// status has changed. +// Returns true if pod condition has changed or has been added. +func UpdatePodCondition(status *v1.PodStatus, condition *v1.PodCondition) bool { + condition.LastTransitionTime = metav1.Now() + // Try to find this pod condition. + conditionIndex, oldCondition := GetPodCondition(status, condition.Type) + + if oldCondition == nil { + // We are adding new pod condition. + status.Conditions = append(status.Conditions, *condition) + return true + } else { + // We are updating an existing condition, so we need to check if it has changed. + if condition.Status == oldCondition.Status { + condition.LastTransitionTime = oldCondition.LastTransitionTime + } + + isEqual := condition.Status == oldCondition.Status && + condition.Reason == oldCondition.Reason && + condition.Message == oldCondition.Message && + condition.LastProbeTime.Equal(&oldCondition.LastProbeTime) && + condition.LastTransitionTime.Equal(&oldCondition.LastTransitionTime) + + status.Conditions[conditionIndex] = *condition + // Return true if one of the fields have changed. + return !isEqual + } +}