From de32752395b75b20fe29fa5c3beeabd4c28058a2 Mon Sep 17 00:00:00 2001 From: Yash Desai Date: Wed, 17 Apr 2019 11:30:39 -0700 Subject: [PATCH] Set container env var using services. (#573) * Introduce service env vars. --- Gopkg.lock | 8 +- manager/resource_test.go | 4 +- test/e2e/framework/pod.go | 3 + test/util/kubernetes.go | 14 +- .../pkg/apis/core/helper/helpers.go | 539 ++++++++++++++++++ .../pkg/apis/core/v1/helper/helpers.go | 527 +++++++++++++++++ .../kubernetes/pkg/kubelet/envvars/doc.go | 19 + .../kubernetes/pkg/kubelet/envvars/envvars.go | 113 ++++ vkubelet/env.go | 77 +++ vkubelet/env_internal_test.go | 90 +++ 10 files changed, 1389 insertions(+), 5 deletions(-) create mode 100644 vendor/k8s.io/kubernetes/pkg/apis/core/helper/helpers.go create mode 100644 vendor/k8s.io/kubernetes/pkg/apis/core/v1/helper/helpers.go create mode 100644 vendor/k8s.io/kubernetes/pkg/kubelet/envvars/doc.go create mode 100644 vendor/k8s.io/kubernetes/pkg/kubelet/envvars/envvars.go mode change 100644 => 100755 vkubelet/env.go diff --git a/Gopkg.lock b/Gopkg.lock index 8d0b75f7d..a90fed758 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1761,15 +1761,18 @@ revision = "0317810137be915b9cf888946c6e115c1bfac693" [[projects]] - digest = "1:02e74ccd0c045a31940dca52ba2545108fd299d4f66632b1bf643fb92b662de2" + digest = "1:4ff70ea1888545c3a245a76657ba38a308d2c068bd26fa4310f63560ebaf265c" name = "k8s.io/kubernetes" packages = [ "pkg/api/v1/pod", "pkg/apis/core", + "pkg/apis/core/helper", "pkg/apis/core/pods", + "pkg/apis/core/v1/helper", "pkg/fieldpath", "pkg/kubelet/apis/cri/runtime/v1alpha2", "pkg/kubelet/apis/stats/v1alpha1", + "pkg/kubelet/envvars", "pkg/kubelet/server/remotecommand", ] pruneopts = "NUT" @@ -1883,6 +1886,7 @@ "k8s.io/apimachinery/pkg/types", "k8s.io/apimachinery/pkg/util/intstr", "k8s.io/apimachinery/pkg/util/net", + "k8s.io/apimachinery/pkg/util/sets", "k8s.io/apimachinery/pkg/util/strategicpatch", "k8s.io/apimachinery/pkg/util/uuid", "k8s.io/apimachinery/pkg/util/validation", @@ -1906,9 +1910,11 @@ "k8s.io/client-go/util/workqueue", "k8s.io/kubernetes/pkg/api/v1/pod", "k8s.io/kubernetes/pkg/apis/core/pods", + "k8s.io/kubernetes/pkg/apis/core/v1/helper", "k8s.io/kubernetes/pkg/fieldpath", "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2", "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1", + "k8s.io/kubernetes/pkg/kubelet/envvars", "k8s.io/kubernetes/pkg/kubelet/server/remotecommand", ] solver-name = "gps-cdcl" diff --git a/manager/resource_test.go b/manager/resource_test.go index 0c4ae235f..216daf96b 100644 --- a/manager/resource_test.go +++ b/manager/resource_test.go @@ -123,8 +123,8 @@ func TestGetConfigMap(t *testing.T) { func TestListServices(t *testing.T) { var ( lsServices = []*v1.Service{ - testutil.FakeService("namespace-0", "service-0"), - testutil.FakeService("namespace-1", "service-1"), + testutil.FakeService("namespace-0", "service-0", "1.2.3.1", "TCP", 8081), + testutil.FakeService("namespace-1", "service-1", "1.2.3.2", "TCP", 8082), } ) diff --git a/test/e2e/framework/pod.go b/test/e2e/framework/pod.go index fd8676b0a..a10c4738a 100644 --- a/test/e2e/framework/pod.go +++ b/test/e2e/framework/pod.go @@ -25,6 +25,8 @@ const ( // For each one of these strings, a container that uses the string as its image will be appended to the pod. // This method DOES NOT create the pod in the Kubernetes API. func (f *Framework) CreateDummyPodObjectWithPrefix(prefix string, images ...string) *corev1.Pod { + enableServiceLink := false + pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ GenerateName: prefix, @@ -41,6 +43,7 @@ func (f *Framework) CreateDummyPodObjectWithPrefix(prefix string, images ...stri Effect: corev1.TaintEffect(f.TaintEffect), }, }, + EnableServiceLinks: &enableServiceLink, }, } for idx, img := range images { diff --git a/test/util/kubernetes.go b/test/util/kubernetes.go index c973f5ead..36e11ddfe 100644 --- a/test/util/kubernetes.go +++ b/test/util/kubernetes.go @@ -24,6 +24,8 @@ func FakeEventRecorder(bufferSize int) *record.FakeRecorder { // FakePodWithSingleContainer returns a pod with the specified namespace and name, and having a single container with the specified image. func FakePodWithSingleContainer(namespace, name, image string) *corev1.Pod { + enableServiceLink := corev1.DefaultEnableServiceLinks + return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, @@ -36,6 +38,7 @@ func FakePodWithSingleContainer(namespace, name, image string) *corev1.Pod { Image: image, }, }, + EnableServiceLinks: &enableServiceLink, }, } } @@ -55,12 +58,19 @@ func FakeSecret(namespace, name string, data map[string]string) *corev1.Secret { return res } -// FakeService returns a service with the specified namespace and name. -func FakeService(namespace, name string) *corev1.Service { +// FakeService returns a service with the specified namespace and name and service info. +func FakeService(namespace, name, clusterIP, protocol string, port int32) *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, Name: name, }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ + Protocol: corev1.Protocol(protocol), + Port: port, + }}, + ClusterIP: clusterIP, + }, } } diff --git a/vendor/k8s.io/kubernetes/pkg/apis/core/helper/helpers.go b/vendor/k8s.io/kubernetes/pkg/apis/core/helper/helpers.go new file mode 100644 index 000000000..10c33f66b --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/apis/core/helper/helpers.go @@ -0,0 +1,539 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helper + +import ( + "encoding/json" + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/kubernetes/pkg/apis/core" +) + +// IsHugePageResourceName returns true if the resource name has the huge page +// resource prefix. +func IsHugePageResourceName(name core.ResourceName) bool { + return strings.HasPrefix(string(name), core.ResourceHugePagesPrefix) +} + +// IsQuotaHugePageResourceName returns true if the resource name has the quota +// related huge page resource prefix. +func IsQuotaHugePageResourceName(name core.ResourceName) bool { + return strings.HasPrefix(string(name), core.ResourceHugePagesPrefix) || strings.HasPrefix(string(name), core.ResourceRequestsHugePagesPrefix) +} + +// HugePageResourceName returns a ResourceName with the canonical hugepage +// prefix prepended for the specified page size. The page size is converted +// to its canonical representation. +func HugePageResourceName(pageSize resource.Quantity) core.ResourceName { + return core.ResourceName(fmt.Sprintf("%s%s", core.ResourceHugePagesPrefix, pageSize.String())) +} + +// HugePageSizeFromResourceName returns the page size for the specified huge page +// resource name. If the specified input is not a valid huge page resource name +// an error is returned. +func HugePageSizeFromResourceName(name core.ResourceName) (resource.Quantity, error) { + if !IsHugePageResourceName(name) { + return resource.Quantity{}, fmt.Errorf("resource name: %s is an invalid hugepage name", name) + } + pageSize := strings.TrimPrefix(string(name), core.ResourceHugePagesPrefix) + return resource.ParseQuantity(pageSize) +} + +// NonConvertibleFields iterates over the provided map and filters out all but +// any keys with the "non-convertible.kubernetes.io" prefix. +func NonConvertibleFields(annotations map[string]string) map[string]string { + nonConvertibleKeys := map[string]string{} + for key, value := range annotations { + if strings.HasPrefix(key, core.NonConvertibleAnnotationPrefix) { + nonConvertibleKeys[key] = value + } + } + return nonConvertibleKeys +} + +// Semantic can do semantic deep equality checks for core objects. +// Example: apiequality.Semantic.DeepEqual(aPod, aPodWithNonNilButEmptyMaps) == true +var Semantic = conversion.EqualitiesOrDie( + func(a, b resource.Quantity) bool { + // Ignore formatting, only care that numeric value stayed the same. + // TODO: if we decide it's important, it should be safe to start comparing the format. + // + // Uninitialized quantities are equivalent to 0 quantities. + return a.Cmp(b) == 0 + }, + func(a, b metav1.MicroTime) bool { + return a.UTC() == b.UTC() + }, + func(a, b metav1.Time) bool { + return a.UTC() == b.UTC() + }, + func(a, b labels.Selector) bool { + return a.String() == b.String() + }, + func(a, b fields.Selector) bool { + return a.String() == b.String() + }, +) + +var standardResourceQuotaScopes = sets.NewString( + string(core.ResourceQuotaScopeTerminating), + string(core.ResourceQuotaScopeNotTerminating), + string(core.ResourceQuotaScopeBestEffort), + string(core.ResourceQuotaScopeNotBestEffort), + string(core.ResourceQuotaScopePriorityClass), +) + +// IsStandardResourceQuotaScope returns true if the scope is a standard value +func IsStandardResourceQuotaScope(str string) bool { + return standardResourceQuotaScopes.Has(str) +} + +var podObjectCountQuotaResources = sets.NewString( + string(core.ResourcePods), +) + +var podComputeQuotaResources = sets.NewString( + string(core.ResourceCPU), + string(core.ResourceMemory), + string(core.ResourceLimitsCPU), + string(core.ResourceLimitsMemory), + string(core.ResourceRequestsCPU), + string(core.ResourceRequestsMemory), +) + +// IsResourceQuotaScopeValidForResource returns true if the resource applies to the specified scope +func IsResourceQuotaScopeValidForResource(scope core.ResourceQuotaScope, resource string) bool { + switch scope { + case core.ResourceQuotaScopeTerminating, core.ResourceQuotaScopeNotTerminating, core.ResourceQuotaScopeNotBestEffort, core.ResourceQuotaScopePriorityClass: + return podObjectCountQuotaResources.Has(resource) || podComputeQuotaResources.Has(resource) + case core.ResourceQuotaScopeBestEffort: + return podObjectCountQuotaResources.Has(resource) + default: + return true + } +} + +var standardContainerResources = sets.NewString( + string(core.ResourceCPU), + string(core.ResourceMemory), + string(core.ResourceEphemeralStorage), +) + +// IsStandardContainerResourceName returns true if the container can make a resource request +// for the specified resource +func IsStandardContainerResourceName(str string) bool { + return standardContainerResources.Has(str) || IsHugePageResourceName(core.ResourceName(str)) +} + +// IsExtendedResourceName returns true if: +// 1. the resource name is not in the default namespace; +// 2. resource name does not have "requests." prefix, +// to avoid confusion with the convention in quota +// 3. it satisfies the rules in IsQualifiedName() after converted into quota resource name +func IsExtendedResourceName(name core.ResourceName) bool { + if IsNativeResource(name) || strings.HasPrefix(string(name), core.DefaultResourceRequestsPrefix) { + return false + } + // Ensure it satisfies the rules in IsQualifiedName() after converted into quota resource name + nameForQuota := fmt.Sprintf("%s%s", core.DefaultResourceRequestsPrefix, string(name)) + if errs := validation.IsQualifiedName(string(nameForQuota)); len(errs) != 0 { + return false + } + return true +} + +// IsNativeResource returns true if the resource name is in the +// *kubernetes.io/ namespace. Partially-qualified (unprefixed) names are +// implicitly in the kubernetes.io/ namespace. +func IsNativeResource(name core.ResourceName) bool { + return !strings.Contains(string(name), "/") || + strings.Contains(string(name), core.ResourceDefaultNamespacePrefix) +} + +// IsOvercommitAllowed returns true if the resource is in the default +// namespace and is not hugepages. +func IsOvercommitAllowed(name core.ResourceName) bool { + return IsNativeResource(name) && + !IsHugePageResourceName(name) +} + +var standardLimitRangeTypes = sets.NewString( + string(core.LimitTypePod), + string(core.LimitTypeContainer), + string(core.LimitTypePersistentVolumeClaim), +) + +// IsStandardLimitRangeType returns true if the type is Pod or Container +func IsStandardLimitRangeType(str string) bool { + return standardLimitRangeTypes.Has(str) +} + +var standardQuotaResources = sets.NewString( + string(core.ResourceCPU), + string(core.ResourceMemory), + string(core.ResourceEphemeralStorage), + string(core.ResourceRequestsCPU), + string(core.ResourceRequestsMemory), + string(core.ResourceRequestsStorage), + string(core.ResourceRequestsEphemeralStorage), + string(core.ResourceLimitsCPU), + string(core.ResourceLimitsMemory), + string(core.ResourceLimitsEphemeralStorage), + string(core.ResourcePods), + string(core.ResourceQuotas), + string(core.ResourceServices), + string(core.ResourceReplicationControllers), + string(core.ResourceSecrets), + string(core.ResourcePersistentVolumeClaims), + string(core.ResourceConfigMaps), + string(core.ResourceServicesNodePorts), + string(core.ResourceServicesLoadBalancers), +) + +// IsStandardQuotaResourceName returns true if the resource is known to +// the quota tracking system +func IsStandardQuotaResourceName(str string) bool { + return standardQuotaResources.Has(str) || IsQuotaHugePageResourceName(core.ResourceName(str)) +} + +var standardResources = sets.NewString( + string(core.ResourceCPU), + string(core.ResourceMemory), + string(core.ResourceEphemeralStorage), + string(core.ResourceRequestsCPU), + string(core.ResourceRequestsMemory), + string(core.ResourceRequestsEphemeralStorage), + string(core.ResourceLimitsCPU), + string(core.ResourceLimitsMemory), + string(core.ResourceLimitsEphemeralStorage), + string(core.ResourcePods), + string(core.ResourceQuotas), + string(core.ResourceServices), + string(core.ResourceReplicationControllers), + string(core.ResourceSecrets), + string(core.ResourceConfigMaps), + string(core.ResourcePersistentVolumeClaims), + string(core.ResourceStorage), + string(core.ResourceRequestsStorage), + string(core.ResourceServicesNodePorts), + string(core.ResourceServicesLoadBalancers), +) + +// IsStandardResourceName returns true if the resource is known to the system +func IsStandardResourceName(str string) bool { + return standardResources.Has(str) || IsQuotaHugePageResourceName(core.ResourceName(str)) +} + +var integerResources = sets.NewString( + string(core.ResourcePods), + string(core.ResourceQuotas), + string(core.ResourceServices), + string(core.ResourceReplicationControllers), + string(core.ResourceSecrets), + string(core.ResourceConfigMaps), + string(core.ResourcePersistentVolumeClaims), + string(core.ResourceServicesNodePorts), + string(core.ResourceServicesLoadBalancers), +) + +// IsIntegerResourceName returns true if the resource is measured in integer values +func IsIntegerResourceName(str string) bool { + return integerResources.Has(str) || IsExtendedResourceName(core.ResourceName(str)) +} + +// this function aims to check if the service's ClusterIP is set or not +// the objective is not to perform validation here +func IsServiceIPSet(service *core.Service) bool { + return service.Spec.ClusterIP != core.ClusterIPNone && service.Spec.ClusterIP != "" +} + +var standardFinalizers = sets.NewString( + string(core.FinalizerKubernetes), + metav1.FinalizerOrphanDependents, + metav1.FinalizerDeleteDependents, +) + +func IsStandardFinalizerName(str string) bool { + return standardFinalizers.Has(str) +} + +// AddToNodeAddresses appends the NodeAddresses to the passed-by-pointer slice, +// only if they do not already exist +func AddToNodeAddresses(addresses *[]core.NodeAddress, addAddresses ...core.NodeAddress) { + for _, add := range addAddresses { + exists := false + for _, existing := range *addresses { + if existing.Address == add.Address && existing.Type == add.Type { + exists = true + break + } + } + if !exists { + *addresses = append(*addresses, add) + } + } +} + +// TODO: make method on LoadBalancerStatus? +func LoadBalancerStatusEqual(l, r *core.LoadBalancerStatus) bool { + return ingressSliceEqual(l.Ingress, r.Ingress) +} + +func ingressSliceEqual(lhs, rhs []core.LoadBalancerIngress) bool { + if len(lhs) != len(rhs) { + return false + } + for i := range lhs { + if !ingressEqual(&lhs[i], &rhs[i]) { + return false + } + } + return true +} + +func ingressEqual(lhs, rhs *core.LoadBalancerIngress) bool { + if lhs.IP != rhs.IP { + return false + } + if lhs.Hostname != rhs.Hostname { + return false + } + return true +} + +// GetAccessModesAsString returns a string representation of an array of access modes. +// modes, when present, are always in the same order: RWO,ROX,RWX. +func GetAccessModesAsString(modes []core.PersistentVolumeAccessMode) string { + modes = removeDuplicateAccessModes(modes) + modesStr := []string{} + if containsAccessMode(modes, core.ReadWriteOnce) { + modesStr = append(modesStr, "RWO") + } + if containsAccessMode(modes, core.ReadOnlyMany) { + modesStr = append(modesStr, "ROX") + } + if containsAccessMode(modes, core.ReadWriteMany) { + modesStr = append(modesStr, "RWX") + } + return strings.Join(modesStr, ",") +} + +// GetAccessModesAsString returns an array of AccessModes from a string created by GetAccessModesAsString +func GetAccessModesFromString(modes string) []core.PersistentVolumeAccessMode { + strmodes := strings.Split(modes, ",") + accessModes := []core.PersistentVolumeAccessMode{} + for _, s := range strmodes { + s = strings.Trim(s, " ") + switch { + case s == "RWO": + accessModes = append(accessModes, core.ReadWriteOnce) + case s == "ROX": + accessModes = append(accessModes, core.ReadOnlyMany) + case s == "RWX": + accessModes = append(accessModes, core.ReadWriteMany) + } + } + return accessModes +} + +// removeDuplicateAccessModes returns an array of access modes without any duplicates +func removeDuplicateAccessModes(modes []core.PersistentVolumeAccessMode) []core.PersistentVolumeAccessMode { + accessModes := []core.PersistentVolumeAccessMode{} + for _, m := range modes { + if !containsAccessMode(accessModes, m) { + accessModes = append(accessModes, m) + } + } + return accessModes +} + +func containsAccessMode(modes []core.PersistentVolumeAccessMode, mode core.PersistentVolumeAccessMode) bool { + for _, m := range modes { + if m == mode { + return true + } + } + return false +} + +// NodeSelectorRequirementsAsSelector converts the []NodeSelectorRequirement core type into a struct that implements +// labels.Selector. +func NodeSelectorRequirementsAsSelector(nsm []core.NodeSelectorRequirement) (labels.Selector, error) { + if len(nsm) == 0 { + return labels.Nothing(), nil + } + selector := labels.NewSelector() + for _, expr := range nsm { + var op selection.Operator + switch expr.Operator { + case core.NodeSelectorOpIn: + op = selection.In + case core.NodeSelectorOpNotIn: + op = selection.NotIn + case core.NodeSelectorOpExists: + op = selection.Exists + case core.NodeSelectorOpDoesNotExist: + op = selection.DoesNotExist + case core.NodeSelectorOpGt: + op = selection.GreaterThan + case core.NodeSelectorOpLt: + op = selection.LessThan + default: + return nil, fmt.Errorf("%q is not a valid node selector operator", expr.Operator) + } + r, err := labels.NewRequirement(expr.Key, op, expr.Values) + if err != nil { + return nil, err + } + selector = selector.Add(*r) + } + return selector, nil +} + +// NodeSelectorRequirementsAsFieldSelector converts the []NodeSelectorRequirement core type into a struct that implements +// fields.Selector. +func NodeSelectorRequirementsAsFieldSelector(nsm []core.NodeSelectorRequirement) (fields.Selector, error) { + if len(nsm) == 0 { + return fields.Nothing(), nil + } + + selectors := []fields.Selector{} + for _, expr := range nsm { + switch expr.Operator { + case core.NodeSelectorOpIn: + if len(expr.Values) != 1 { + return nil, fmt.Errorf("unexpected number of value (%d) for node field selector operator %q", + len(expr.Values), expr.Operator) + } + selectors = append(selectors, fields.OneTermEqualSelector(expr.Key, expr.Values[0])) + + case core.NodeSelectorOpNotIn: + if len(expr.Values) != 1 { + return nil, fmt.Errorf("unexpected number of value (%d) for node field selector operator %q", + len(expr.Values), expr.Operator) + } + selectors = append(selectors, fields.OneTermNotEqualSelector(expr.Key, expr.Values[0])) + + default: + return nil, fmt.Errorf("%q is not a valid node field selector operator", expr.Operator) + } + } + + return fields.AndSelectors(selectors...), nil +} + +// GetTolerationsFromPodAnnotations gets the json serialized tolerations data from Pod.Annotations +// and converts it to the []Toleration type in core. +func GetTolerationsFromPodAnnotations(annotations map[string]string) ([]core.Toleration, error) { + var tolerations []core.Toleration + if len(annotations) > 0 && annotations[core.TolerationsAnnotationKey] != "" { + err := json.Unmarshal([]byte(annotations[core.TolerationsAnnotationKey]), &tolerations) + if err != nil { + return tolerations, err + } + } + return tolerations, nil +} + +// AddOrUpdateTolerationInPod tries to add a toleration to the pod's toleration list. +// Returns true if something was updated, false otherwise. +func AddOrUpdateTolerationInPod(pod *core.Pod, toleration *core.Toleration) bool { + podTolerations := pod.Spec.Tolerations + + var newTolerations []core.Toleration + updated := false + for i := range podTolerations { + if toleration.MatchToleration(&podTolerations[i]) { + if Semantic.DeepEqual(toleration, podTolerations[i]) { + return false + } + newTolerations = append(newTolerations, *toleration) + updated = true + continue + } + + newTolerations = append(newTolerations, podTolerations[i]) + } + + if !updated { + newTolerations = append(newTolerations, *toleration) + } + + pod.Spec.Tolerations = newTolerations + return true +} + +// GetTaintsFromNodeAnnotations gets the json serialized taints data from Pod.Annotations +// and converts it to the []Taint type in core. +func GetTaintsFromNodeAnnotations(annotations map[string]string) ([]core.Taint, error) { + var taints []core.Taint + if len(annotations) > 0 && annotations[core.TaintsAnnotationKey] != "" { + err := json.Unmarshal([]byte(annotations[core.TaintsAnnotationKey]), &taints) + if err != nil { + return []core.Taint{}, err + } + } + return taints, nil +} + +// GetPersistentVolumeClass returns StorageClassName. +func GetPersistentVolumeClass(volume *core.PersistentVolume) string { + // Use beta annotation first + if class, found := volume.Annotations[core.BetaStorageClassAnnotation]; found { + return class + } + + return volume.Spec.StorageClassName +} + +// GetPersistentVolumeClaimClass returns StorageClassName. If no storage class was +// requested, it returns "". +func GetPersistentVolumeClaimClass(claim *core.PersistentVolumeClaim) string { + // Use beta annotation first + if class, found := claim.Annotations[core.BetaStorageClassAnnotation]; found { + return class + } + + if claim.Spec.StorageClassName != nil { + return *claim.Spec.StorageClassName + } + + return "" +} + +// PersistentVolumeClaimHasClass returns true if given claim has set StorageClassName field. +func PersistentVolumeClaimHasClass(claim *core.PersistentVolumeClaim) bool { + // Use beta annotation first + if _, found := claim.Annotations[core.BetaStorageClassAnnotation]; found { + return true + } + + if claim.Spec.StorageClassName != nil { + return true + } + + return false +} diff --git a/vendor/k8s.io/kubernetes/pkg/apis/core/v1/helper/helpers.go b/vendor/k8s.io/kubernetes/pkg/apis/core/v1/helper/helpers.go new file mode 100644 index 000000000..fa11a6b36 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/apis/core/v1/helper/helpers.go @@ -0,0 +1,527 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helper + +import ( + "encoding/json" + "fmt" + "strings" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/kubernetes/pkg/apis/core/helper" +) + +// IsExtendedResourceName returns true if: +// 1. the resource name is not in the default namespace; +// 2. resource name does not have "requests." prefix, +// to avoid confusion with the convention in quota +// 3. it satisfies the rules in IsQualifiedName() after converted into quota resource name +func IsExtendedResourceName(name v1.ResourceName) bool { + if IsNativeResource(name) || strings.HasPrefix(string(name), v1.DefaultResourceRequestsPrefix) { + return false + } + // Ensure it satisfies the rules in IsQualifiedName() after converted into quota resource name + nameForQuota := fmt.Sprintf("%s%s", v1.DefaultResourceRequestsPrefix, string(name)) + if errs := validation.IsQualifiedName(string(nameForQuota)); len(errs) != 0 { + return false + } + return true +} + +// IsPrefixedNativeResource returns true if the resource name is in the +// *kubernetes.io/ namespace. +func IsPrefixedNativeResource(name v1.ResourceName) bool { + return strings.Contains(string(name), v1.ResourceDefaultNamespacePrefix) +} + +// IsNativeResource returns true if the resource name is in the +// *kubernetes.io/ namespace. Partially-qualified (unprefixed) names are +// implicitly in the kubernetes.io/ namespace. +func IsNativeResource(name v1.ResourceName) bool { + return !strings.Contains(string(name), "/") || + IsPrefixedNativeResource(name) +} + +// IsHugePageResourceName returns true if the resource name has the huge page +// resource prefix. +func IsHugePageResourceName(name v1.ResourceName) bool { + return strings.HasPrefix(string(name), v1.ResourceHugePagesPrefix) +} + +// HugePageResourceName returns a ResourceName with the canonical hugepage +// prefix prepended for the specified page size. The page size is converted +// to its canonical representation. +func HugePageResourceName(pageSize resource.Quantity) v1.ResourceName { + return v1.ResourceName(fmt.Sprintf("%s%s", v1.ResourceHugePagesPrefix, pageSize.String())) +} + +// HugePageSizeFromResourceName returns the page size for the specified huge page +// resource name. If the specified input is not a valid huge page resource name +// an error is returned. +func HugePageSizeFromResourceName(name v1.ResourceName) (resource.Quantity, error) { + if !IsHugePageResourceName(name) { + return resource.Quantity{}, fmt.Errorf("resource name: %s is an invalid hugepage name", name) + } + pageSize := strings.TrimPrefix(string(name), v1.ResourceHugePagesPrefix) + return resource.ParseQuantity(pageSize) +} + +// IsOvercommitAllowed returns true if the resource is in the default +// namespace and is not hugepages. +func IsOvercommitAllowed(name v1.ResourceName) bool { + return IsNativeResource(name) && + !IsHugePageResourceName(name) +} + +func IsAttachableVolumeResourceName(name v1.ResourceName) bool { + return strings.HasPrefix(string(name), v1.ResourceAttachableVolumesPrefix) +} + +// Extended and Hugepages resources +func IsScalarResourceName(name v1.ResourceName) bool { + return IsExtendedResourceName(name) || IsHugePageResourceName(name) || + IsPrefixedNativeResource(name) || IsAttachableVolumeResourceName(name) +} + +// this function aims to check if the service's ClusterIP is set or not +// the objective is not to perform validation here +func IsServiceIPSet(service *v1.Service) bool { + return service.Spec.ClusterIP != v1.ClusterIPNone && service.Spec.ClusterIP != "" +} + +// AddToNodeAddresses appends the NodeAddresses to the passed-by-pointer slice, +// only if they do not already exist +func AddToNodeAddresses(addresses *[]v1.NodeAddress, addAddresses ...v1.NodeAddress) { + for _, add := range addAddresses { + exists := false + for _, existing := range *addresses { + if existing.Address == add.Address && existing.Type == add.Type { + exists = true + break + } + } + if !exists { + *addresses = append(*addresses, add) + } + } +} + +// TODO: make method on LoadBalancerStatus? +func LoadBalancerStatusEqual(l, r *v1.LoadBalancerStatus) bool { + return ingressSliceEqual(l.Ingress, r.Ingress) +} + +func ingressSliceEqual(lhs, rhs []v1.LoadBalancerIngress) bool { + if len(lhs) != len(rhs) { + return false + } + for i := range lhs { + if !ingressEqual(&lhs[i], &rhs[i]) { + return false + } + } + return true +} + +func ingressEqual(lhs, rhs *v1.LoadBalancerIngress) bool { + if lhs.IP != rhs.IP { + return false + } + if lhs.Hostname != rhs.Hostname { + return false + } + return true +} + +// TODO: make method on LoadBalancerStatus? +func LoadBalancerStatusDeepCopy(lb *v1.LoadBalancerStatus) *v1.LoadBalancerStatus { + c := &v1.LoadBalancerStatus{} + c.Ingress = make([]v1.LoadBalancerIngress, len(lb.Ingress)) + for i := range lb.Ingress { + c.Ingress[i] = lb.Ingress[i] + } + return c +} + +// GetAccessModesAsString returns a string representation of an array of access modes. +// modes, when present, are always in the same order: RWO,ROX,RWX. +func GetAccessModesAsString(modes []v1.PersistentVolumeAccessMode) string { + modes = removeDuplicateAccessModes(modes) + modesStr := []string{} + if containsAccessMode(modes, v1.ReadWriteOnce) { + modesStr = append(modesStr, "RWO") + } + if containsAccessMode(modes, v1.ReadOnlyMany) { + modesStr = append(modesStr, "ROX") + } + if containsAccessMode(modes, v1.ReadWriteMany) { + modesStr = append(modesStr, "RWX") + } + return strings.Join(modesStr, ",") +} + +// GetAccessModesAsString returns an array of AccessModes from a string created by GetAccessModesAsString +func GetAccessModesFromString(modes string) []v1.PersistentVolumeAccessMode { + strmodes := strings.Split(modes, ",") + accessModes := []v1.PersistentVolumeAccessMode{} + for _, s := range strmodes { + s = strings.Trim(s, " ") + switch { + case s == "RWO": + accessModes = append(accessModes, v1.ReadWriteOnce) + case s == "ROX": + accessModes = append(accessModes, v1.ReadOnlyMany) + case s == "RWX": + accessModes = append(accessModes, v1.ReadWriteMany) + } + } + return accessModes +} + +// removeDuplicateAccessModes returns an array of access modes without any duplicates +func removeDuplicateAccessModes(modes []v1.PersistentVolumeAccessMode) []v1.PersistentVolumeAccessMode { + accessModes := []v1.PersistentVolumeAccessMode{} + for _, m := range modes { + if !containsAccessMode(accessModes, m) { + accessModes = append(accessModes, m) + } + } + return accessModes +} + +func containsAccessMode(modes []v1.PersistentVolumeAccessMode, mode v1.PersistentVolumeAccessMode) bool { + for _, m := range modes { + if m == mode { + return true + } + } + return false +} + +// NodeSelectorRequirementsAsSelector converts the []NodeSelectorRequirement api type into a struct that implements +// labels.Selector. +func NodeSelectorRequirementsAsSelector(nsm []v1.NodeSelectorRequirement) (labels.Selector, error) { + if len(nsm) == 0 { + return labels.Nothing(), nil + } + selector := labels.NewSelector() + for _, expr := range nsm { + var op selection.Operator + switch expr.Operator { + case v1.NodeSelectorOpIn: + op = selection.In + case v1.NodeSelectorOpNotIn: + op = selection.NotIn + case v1.NodeSelectorOpExists: + op = selection.Exists + case v1.NodeSelectorOpDoesNotExist: + op = selection.DoesNotExist + case v1.NodeSelectorOpGt: + op = selection.GreaterThan + case v1.NodeSelectorOpLt: + op = selection.LessThan + default: + return nil, fmt.Errorf("%q is not a valid node selector operator", expr.Operator) + } + r, err := labels.NewRequirement(expr.Key, op, expr.Values) + if err != nil { + return nil, err + } + selector = selector.Add(*r) + } + return selector, nil +} + +// NodeSelectorRequirementsAsFieldSelector converts the []NodeSelectorRequirement core type into a struct that implements +// fields.Selector. +func NodeSelectorRequirementsAsFieldSelector(nsm []v1.NodeSelectorRequirement) (fields.Selector, error) { + if len(nsm) == 0 { + return fields.Nothing(), nil + } + + selectors := []fields.Selector{} + for _, expr := range nsm { + switch expr.Operator { + case v1.NodeSelectorOpIn: + if len(expr.Values) != 1 { + return nil, fmt.Errorf("unexpected number of value (%d) for node field selector operator %q", + len(expr.Values), expr.Operator) + } + selectors = append(selectors, fields.OneTermEqualSelector(expr.Key, expr.Values[0])) + + case v1.NodeSelectorOpNotIn: + if len(expr.Values) != 1 { + return nil, fmt.Errorf("unexpected number of value (%d) for node field selector operator %q", + len(expr.Values), expr.Operator) + } + selectors = append(selectors, fields.OneTermNotEqualSelector(expr.Key, expr.Values[0])) + + default: + return nil, fmt.Errorf("%q is not a valid node field selector operator", expr.Operator) + } + } + + return fields.AndSelectors(selectors...), nil +} + +// NodeSelectorRequirementKeysExistInNodeSelectorTerms checks if a NodeSelectorTerm with key is already specified in terms +func NodeSelectorRequirementKeysExistInNodeSelectorTerms(reqs []v1.NodeSelectorRequirement, terms []v1.NodeSelectorTerm) bool { + for _, req := range reqs { + for _, term := range terms { + for _, r := range term.MatchExpressions { + if r.Key == req.Key { + return true + } + } + } + } + return false +} + +// MatchNodeSelectorTerms checks whether the node labels and fields match node selector terms in ORed; +// nil or empty term matches no objects. +func MatchNodeSelectorTerms( + nodeSelectorTerms []v1.NodeSelectorTerm, + nodeLabels labels.Set, + nodeFields fields.Set, +) bool { + for _, req := range nodeSelectorTerms { + // nil or empty term selects no objects + if len(req.MatchExpressions) == 0 && len(req.MatchFields) == 0 { + continue + } + + if len(req.MatchExpressions) != 0 { + labelSelector, err := NodeSelectorRequirementsAsSelector(req.MatchExpressions) + if err != nil || !labelSelector.Matches(nodeLabels) { + continue + } + } + + if len(req.MatchFields) != 0 { + fieldSelector, err := NodeSelectorRequirementsAsFieldSelector(req.MatchFields) + if err != nil || !fieldSelector.Matches(nodeFields) { + continue + } + } + + return true + } + + return false +} + +// TopologySelectorRequirementsAsSelector converts the []TopologySelectorLabelRequirement api type into a struct +// that implements labels.Selector. +func TopologySelectorRequirementsAsSelector(tsm []v1.TopologySelectorLabelRequirement) (labels.Selector, error) { + if len(tsm) == 0 { + return labels.Nothing(), nil + } + + selector := labels.NewSelector() + for _, expr := range tsm { + r, err := labels.NewRequirement(expr.Key, selection.In, expr.Values) + if err != nil { + return nil, err + } + selector = selector.Add(*r) + } + + return selector, nil +} + +// MatchTopologySelectorTerms checks whether given labels match topology selector terms in ORed; +// nil or empty term matches no objects; while empty term list matches all objects. +func MatchTopologySelectorTerms(topologySelectorTerms []v1.TopologySelectorTerm, lbls labels.Set) bool { + if len(topologySelectorTerms) == 0 { + // empty term list matches all objects + return true + } + + for _, req := range topologySelectorTerms { + // nil or empty term selects no objects + if len(req.MatchLabelExpressions) == 0 { + continue + } + + labelSelector, err := TopologySelectorRequirementsAsSelector(req.MatchLabelExpressions) + if err != nil || !labelSelector.Matches(lbls) { + continue + } + + return true + } + + return false +} + +// AddOrUpdateTolerationInPodSpec tries to add a toleration to the toleration list in PodSpec. +// Returns true if something was updated, false otherwise. +func AddOrUpdateTolerationInPodSpec(spec *v1.PodSpec, toleration *v1.Toleration) bool { + podTolerations := spec.Tolerations + + var newTolerations []v1.Toleration + updated := false + for i := range podTolerations { + if toleration.MatchToleration(&podTolerations[i]) { + if helper.Semantic.DeepEqual(toleration, podTolerations[i]) { + return false + } + newTolerations = append(newTolerations, *toleration) + updated = true + continue + } + + newTolerations = append(newTolerations, podTolerations[i]) + } + + if !updated { + newTolerations = append(newTolerations, *toleration) + } + + spec.Tolerations = newTolerations + return true +} + +// AddOrUpdateTolerationInPod tries to add a toleration to the pod's toleration list. +// Returns true if something was updated, false otherwise. +func AddOrUpdateTolerationInPod(pod *v1.Pod, toleration *v1.Toleration) bool { + return AddOrUpdateTolerationInPodSpec(&pod.Spec, toleration) +} + +// TolerationsTolerateTaint checks if taint is tolerated by any of the tolerations. +func TolerationsTolerateTaint(tolerations []v1.Toleration, taint *v1.Taint) bool { + for i := range tolerations { + if tolerations[i].ToleratesTaint(taint) { + return true + } + } + return false +} + +type taintsFilterFunc func(*v1.Taint) bool + +// TolerationsTolerateTaintsWithFilter checks if given tolerations tolerates +// all the taints that apply to the filter in given taint list. +func TolerationsTolerateTaintsWithFilter(tolerations []v1.Toleration, taints []v1.Taint, applyFilter taintsFilterFunc) bool { + if len(taints) == 0 { + return true + } + + for i := range taints { + if applyFilter != nil && !applyFilter(&taints[i]) { + continue + } + + if !TolerationsTolerateTaint(tolerations, &taints[i]) { + return false + } + } + + return true +} + +// Returns true and list of Tolerations matching all Taints if all are tolerated, or false otherwise. +func GetMatchingTolerations(taints []v1.Taint, tolerations []v1.Toleration) (bool, []v1.Toleration) { + if len(taints) == 0 { + return true, []v1.Toleration{} + } + if len(tolerations) == 0 && len(taints) > 0 { + return false, []v1.Toleration{} + } + result := []v1.Toleration{} + for i := range taints { + tolerated := false + for j := range tolerations { + if tolerations[j].ToleratesTaint(&taints[i]) { + result = append(result, tolerations[j]) + tolerated = true + break + } + } + if !tolerated { + return false, []v1.Toleration{} + } + } + return true, result +} + +func GetAvoidPodsFromNodeAnnotations(annotations map[string]string) (v1.AvoidPods, error) { + var avoidPods v1.AvoidPods + if len(annotations) > 0 && annotations[v1.PreferAvoidPodsAnnotationKey] != "" { + err := json.Unmarshal([]byte(annotations[v1.PreferAvoidPodsAnnotationKey]), &avoidPods) + if err != nil { + return avoidPods, err + } + } + return avoidPods, nil +} + +// GetPersistentVolumeClass returns StorageClassName. +func GetPersistentVolumeClass(volume *v1.PersistentVolume) string { + // Use beta annotation first + if class, found := volume.Annotations[v1.BetaStorageClassAnnotation]; found { + return class + } + + return volume.Spec.StorageClassName +} + +// GetPersistentVolumeClaimClass returns StorageClassName. If no storage class was +// requested, it returns "". +func GetPersistentVolumeClaimClass(claim *v1.PersistentVolumeClaim) string { + // Use beta annotation first + if class, found := claim.Annotations[v1.BetaStorageClassAnnotation]; found { + return class + } + + if claim.Spec.StorageClassName != nil { + return *claim.Spec.StorageClassName + } + + return "" +} + +// ScopedResourceSelectorRequirementsAsSelector converts the ScopedResourceSelectorRequirement api type into a struct that implements +// labels.Selector. +func ScopedResourceSelectorRequirementsAsSelector(ssr v1.ScopedResourceSelectorRequirement) (labels.Selector, error) { + selector := labels.NewSelector() + var op selection.Operator + switch ssr.Operator { + case v1.ScopeSelectorOpIn: + op = selection.In + case v1.ScopeSelectorOpNotIn: + op = selection.NotIn + case v1.ScopeSelectorOpExists: + op = selection.Exists + case v1.ScopeSelectorOpDoesNotExist: + op = selection.DoesNotExist + default: + return nil, fmt.Errorf("%q is not a valid scope selector operator", ssr.Operator) + } + r, err := labels.NewRequirement(string(ssr.ScopeName), op, ssr.Values) + if err != nil { + return nil, err + } + selector = selector.Add(*r) + return selector, nil +} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/envvars/doc.go b/vendor/k8s.io/kubernetes/pkg/kubelet/envvars/doc.go new file mode 100644 index 000000000..d95193771 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/envvars/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package envvars is the package that build the environment variables that kubernetes provides +// to the containers run by it. +package envvars // import "k8s.io/kubernetes/pkg/kubelet/envvars" diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/envvars/envvars.go b/vendor/k8s.io/kubernetes/pkg/kubelet/envvars/envvars.go new file mode 100644 index 000000000..789c20820 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/envvars/envvars.go @@ -0,0 +1,113 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package envvars + +import ( + "fmt" + "net" + "strconv" + "strings" + + "k8s.io/api/core/v1" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" +) + +// FromServices builds environment variables that a container is started with, +// which tell the container where to find the services it may need, which are +// provided as an argument. +func FromServices(services []*v1.Service) []v1.EnvVar { + var result []v1.EnvVar + for i := range services { + service := services[i] + + // ignore services where ClusterIP is "None" or empty + // the services passed to this method should be pre-filtered + // only services that have the cluster IP set should be included here + if !v1helper.IsServiceIPSet(service) { + continue + } + + // Host + name := makeEnvVariableName(service.Name) + "_SERVICE_HOST" + result = append(result, v1.EnvVar{Name: name, Value: service.Spec.ClusterIP}) + // First port - give it the backwards-compatible name + name = makeEnvVariableName(service.Name) + "_SERVICE_PORT" + result = append(result, v1.EnvVar{Name: name, Value: strconv.Itoa(int(service.Spec.Ports[0].Port))}) + // All named ports (only the first may be unnamed, checked in validation) + for i := range service.Spec.Ports { + sp := &service.Spec.Ports[i] + if sp.Name != "" { + pn := name + "_" + makeEnvVariableName(sp.Name) + result = append(result, v1.EnvVar{Name: pn, Value: strconv.Itoa(int(sp.Port))}) + } + } + // Docker-compatible vars. + result = append(result, makeLinkVariables(service)...) + } + return result +} + +func makeEnvVariableName(str string) string { + // TODO: If we simplify to "all names are DNS1123Subdomains" this + // will need two tweaks: + // 1) Handle leading digits + // 2) Handle dots + return strings.ToUpper(strings.Replace(str, "-", "_", -1)) +} + +func makeLinkVariables(service *v1.Service) []v1.EnvVar { + prefix := makeEnvVariableName(service.Name) + all := []v1.EnvVar{} + for i := range service.Spec.Ports { + sp := &service.Spec.Ports[i] + + protocol := string(v1.ProtocolTCP) + if sp.Protocol != "" { + protocol = string(sp.Protocol) + } + + hostPort := net.JoinHostPort(service.Spec.ClusterIP, strconv.Itoa(int(sp.Port))) + + if i == 0 { + // Docker special-cases the first port. + all = append(all, v1.EnvVar{ + Name: prefix + "_PORT", + Value: fmt.Sprintf("%s://%s", strings.ToLower(protocol), hostPort), + }) + } + portPrefix := fmt.Sprintf("%s_PORT_%d_%s", prefix, sp.Port, strings.ToUpper(protocol)) + all = append(all, []v1.EnvVar{ + { + Name: portPrefix, + Value: fmt.Sprintf("%s://%s", strings.ToLower(protocol), hostPort), + }, + { + Name: portPrefix + "_PROTO", + Value: strings.ToLower(protocol), + }, + { + Name: portPrefix + "_PORT", + Value: strconv.Itoa(int(sp.Port)), + }, + { + Name: portPrefix + "_ADDR", + Value: service.Spec.ClusterIP, + }, + }...) + } + return all +} diff --git a/vkubelet/env.go b/vkubelet/env.go old mode 100644 new mode 100755 index 40431ae59..140720ec0 --- a/vkubelet/env.go +++ b/vkubelet/env.go @@ -8,10 +8,14 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" apivalidation "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/tools/record" podshelper "k8s.io/kubernetes/pkg/apis/core/pods" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" fieldpath "k8s.io/kubernetes/pkg/fieldpath" + "k8s.io/kubernetes/pkg/kubelet/envvars" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" @@ -50,9 +54,12 @@ const ( ReasonInvalidEnvironmentVariableNames = "InvalidEnvironmentVariableNames" ) +var masterServices = sets.NewString("kubernetes") + // populateEnvironmentVariables populates the environment of each container (and init container) in the specified pod. // TODO Make this the single exported function of a "pkg/environment" package in the future. func populateEnvironmentVariables(ctx context.Context, pod *corev1.Pod, rm *manager.ResourceManager, recorder record.EventRecorder) error { + // Populate each init container's environment. for idx := range pod.Spec.InitContainers { if err := populateContainerEnvironment(ctx, pod, &pod.Spec.InitContainers[idx], rm, recorder); err != nil { @@ -89,6 +96,53 @@ func populateContainerEnvironment(ctx context.Context, pod *corev1.Pod, containe return nil } +// getServiceEnvVarMap makes a map[string]string of env vars for services a +// pod in namespace ns should see. +// Based on getServiceEnvVarMap in kubelet_pods.go. +func getServiceEnvVarMap(rm *manager.ResourceManager, ns string, enableServiceLinks bool) (map[string]string, error) { + var ( + serviceMap = make(map[string]*corev1.Service) + m = make(map[string]string) + ) + + services, err := rm.ListServices() + if err != nil { + return nil, err + } + + // project the services in namespace ns onto the master services + for i := range services { + service := services[i] + // ignore services where ClusterIP is "None" or empty + if !v1helper.IsServiceIPSet(service) { + continue + } + serviceName := service.Name + + // We always want to add environment variables for master kubernetes service + // from the default namespace, even if enableServiceLinks is false. + // We also add environment variables for other services in the same + // namespace, if enableServiceLinks is true. + if service.Namespace == metav1.NamespaceDefault && masterServices.Has(serviceName) { + if _, exists := serviceMap[serviceName]; !exists { + serviceMap[serviceName] = service + } + } else if service.Namespace == ns && enableServiceLinks { + serviceMap[serviceName] = service + } + } + + mappedServices := make([]*corev1.Service, 0, len(serviceMap)) + for key := range serviceMap { + mappedServices = append(mappedServices, serviceMap[key]) + } + + for _, e := range envvars.FromServices(mappedServices) { + m[e.Name] = e.Value + } + return m, nil +} + // makeEnvironmentMapBasedOnEnvFrom returns a map representing the resolved environment of the specified container after being populated from the entries in the ".envFrom" field. func makeEnvironmentMapBasedOnEnvFrom(ctx context.Context, pod *corev1.Pod, container *corev1.Container, rm *manager.ResourceManager, recorder record.EventRecorder) (map[string]string, error) { // Create a map to hold the resulting environment. @@ -347,6 +401,29 @@ loop: continue loop } } + + // TODO If pod.Spec.EnableServiceLinks is nil then fail as per 1.14 kubelet. + enableServiceLinks := corev1.DefaultEnableServiceLinks + if pod.Spec.EnableServiceLinks != nil { + enableServiceLinks = *pod.Spec.EnableServiceLinks + } + + // Note that there is a race between Kubelet seeing the pod and kubelet seeing the service. + // To avoid this users can: (1) wait between starting a service and starting; or (2) detect + // missing service env var and exit and be restarted; or (3) use DNS instead of env vars + // and keep trying to resolve the DNS name of the service (recommended). + svcEnv, err := getServiceEnvVarMap(rm, pod.Namespace, enableServiceLinks) + if err != nil { + return nil, err + } + + // Append service env vars. + for k, v := range svcEnv { + if _, present := res[k]; !present { + res[k] = v + } + } + // Return the populated environment. return res, nil } diff --git a/vkubelet/env_internal_test.go b/vkubelet/env_internal_test.go index cf11fe122..ff71d65af 100644 --- a/vkubelet/env_internal_test.go +++ b/vkubelet/env_internal_test.go @@ -192,6 +192,7 @@ func TestPopulatePodWithInitContainersUsingEnv(t *testing.T) { }, }, }, + EnableServiceLinks: &bFalse, }, } @@ -351,6 +352,7 @@ func TestPopulatePodWithInitContainersUsingEnvWithFieldRef(t *testing.T) { }, }, }, + EnableServiceLinks: &bFalse, }, } @@ -466,6 +468,7 @@ func TestPopulatePodWithInitContainersUsingEnvFrom(t *testing.T) { }, }, }, + EnableServiceLinks: &bFalse, }, } @@ -544,6 +547,7 @@ func TestEnvFromTwoConfigMapsAndOneSecret(t *testing.T) { }, }, }, + EnableServiceLinks: &bFalse, }, } @@ -605,6 +609,7 @@ func TestEnvFromConfigMapAndSecretWithInvalidKeys(t *testing.T) { }, }, }, + EnableServiceLinks: &bFalse, }, } @@ -675,6 +680,7 @@ func TestEnvOverridesEnvFrom(t *testing.T) { }, }, }, + EnableServiceLinks: &bFalse, }, } @@ -751,6 +757,7 @@ func TestEnvFromInexistentConfigMaps(t *testing.T) { }, }, }, + EnableServiceLinks: &bFalse, }, } @@ -807,6 +814,7 @@ func TestEnvFromInexistentSecrets(t *testing.T) { }, }, }, + EnableServiceLinks: &bFalse, }, } @@ -859,6 +867,7 @@ func TestEnvReferencingInexistentConfigMapKey(t *testing.T) { }, }, }, + EnableServiceLinks: &bFalse, }, } @@ -908,6 +917,7 @@ func TestEnvReferencingInexistentSecretKey(t *testing.T) { }, }, }, + EnableServiceLinks: &bFalse, }, } @@ -921,3 +931,83 @@ func TestEnvReferencingInexistentSecretKey(t *testing.T) { assert.Check(t, is.Contains(event1, ReasonMandatorySecretNotFound)) assert.Check(t, is.Contains(event1, missingSecretName)) } + +// TestServiceEnvVar tries to populate the environment of a container using services with ServiceLinks enabled and disabled. +func TestServiceEnvVar(t *testing.T) { + namespace2 := "namespace-02" + + service1 := testutil.FakeService(metav1.NamespaceDefault, "kubernetes", "1.2.3.1", "TCP", 8081) + service2 := testutil.FakeService(namespace, "test", "1.2.3.3", "TCP", 8083) + // unused svc to show it isn't populated within a different namespace. + service3 := testutil.FakeService(namespace2, "unused", "1.2.3.4", "TCP", 8084) + + rm := testutil.FakeResourceManager(service1, service2, service3) + er := testutil.FakeEventRecorder(defaultEventRecorderBufferSize) + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "test-pod-name", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Env: []corev1.EnvVar{ + {Name: envVarName1, Value: envVarValue1}, + }, + }, + }, + }, + } + + testCases := []struct { + name string // the name of the test case + enableServiceLinks *bool // enabling service links + expectedEnvs []corev1.EnvVar // a set of expected environment vars + }{ + { + name: "ServiceLinks disabled", + enableServiceLinks: &bFalse, + expectedEnvs: []corev1.EnvVar{ + {Name: envVarName1, Value: envVarValue1}, + {Name: "KUBERNETES_SERVICE_PORT", Value: "8081"}, + {Name: "KUBERNETES_SERVICE_HOST", Value: "1.2.3.1"}, + {Name: "KUBERNETES_PORT", Value: "tcp://1.2.3.1:8081"}, + {Name: "KUBERNETES_PORT_8081_TCP", Value: "tcp://1.2.3.1:8081"}, + {Name: "KUBERNETES_PORT_8081_TCP_PROTO", Value: "tcp"}, + {Name: "KUBERNETES_PORT_8081_TCP_PORT", Value: "8081"}, + {Name: "KUBERNETES_PORT_8081_TCP_ADDR", Value: "1.2.3.1"}, + }, + }, + { + name: "ServiceLinks enabled", + enableServiceLinks: &bTrue, + expectedEnvs: []corev1.EnvVar{ + {Name: envVarName1, Value: envVarValue1}, + {Name: "TEST_SERVICE_HOST", Value: "1.2.3.3"}, + {Name: "TEST_SERVICE_PORT", Value: "8083"}, + {Name: "TEST_PORT", Value: "tcp://1.2.3.3:8083"}, + {Name: "TEST_PORT_8083_TCP", Value: "tcp://1.2.3.3:8083"}, + {Name: "TEST_PORT_8083_TCP_PROTO", Value: "tcp"}, + {Name: "TEST_PORT_8083_TCP_PORT", Value: "8083"}, + {Name: "TEST_PORT_8083_TCP_ADDR", Value: "1.2.3.3"}, + {Name: "KUBERNETES_SERVICE_PORT", Value: "8081"}, + {Name: "KUBERNETES_SERVICE_HOST", Value: "1.2.3.1"}, + {Name: "KUBERNETES_PORT", Value: "tcp://1.2.3.1:8081"}, + {Name: "KUBERNETES_PORT_8081_TCP", Value: "tcp://1.2.3.1:8081"}, + {Name: "KUBERNETES_PORT_8081_TCP_PROTO", Value: "tcp"}, + {Name: "KUBERNETES_PORT_8081_TCP_PORT", Value: "8081"}, + {Name: "KUBERNETES_PORT_8081_TCP_ADDR", Value: "1.2.3.1"}, + }, + }, + } + + for _, tc := range testCases { + pod.Spec.EnableServiceLinks = tc.enableServiceLinks + + err := populateEnvironmentVariables(context.Background(), pod, rm, er) + assert.NilError(t, err, "[%s]", tc.name) + assert.Check(t, is.DeepEqual(pod.Spec.Containers[0].Env, tc.expectedEnvs, sortOpt)) + } + +}