Merge branch 'master' into ci_validate_vendor

This commit is contained in:
Jeremy Rickard
2018-08-03 16:55:53 -06:00
committed by GitHub
46 changed files with 1907 additions and 107 deletions

3
.gitignore vendored
View File

@@ -26,6 +26,9 @@ bin/
# Test credentials file
credentials.json
# Test loganalytics file
loganalytics.json
# VS Code files
.vscode/

View File

@@ -23,6 +23,7 @@ The best description is "Kubernetes API on top, programmable back."
+ [Azure Batch GPU Provider](./providers/azurebatch/README.md)
+ [AWS Fargate Provider](#aws-fargate-provider)
+ [Hyper.sh Provider](#hypersh-provider)
+ [Service Fabric Mesh Provider](#service-fabric-mesh-provider)
+ [Adding a New Provider via the Provider Interface](#adding-a-new-provider-via-the-provider-interface)
* [Testing](#testing)
+ [Testing the Azure Provider Client](#testing-the-azure-provider-client)
@@ -143,6 +144,19 @@ Kubernetes cluster.
./bin/virtual-kubelet --provider hyper
```
### Service Fabric Mesh Provider
The Service Fabric Mesh Provider allows you to deploy pods to Azure [Service Fabric Mesh](https://docs.microsoft.com/en-us/azure/service-fabric-mesh/service-fabric-mesh-overview).
Service Fabric Mesh is a fully managed service that lets developers deploy microservices without managing the underlying infrastructure.
Pods deployed to Service Fabric Mesh will be assigned Public IPs from the Service Fabric Mesh network.
```
./bin/virtual-kubelet --provider sfmesh --taint azure.com/sfmesh
```
More detailed instructions can be found [here](providers/sfmesh/README.md).
### Adding a New Provider via the Provider Interface
The structure we chose allows you to have all the power of the Kubernetes API

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,5 +1,5 @@
name: virtual-kubelet-for-aks
version: 0.1.4
version: 0.1.6
description: a Helm chart to install virtual kubelet in an AKS or ACS cluster.
sources:
- https://github.com/virtual-kubelet/virtual-kubelet

View File

@@ -2,4 +2,11 @@ The virtual kubelet is getting deployed on your cluster.
To verify that virtual kubelet has started, run:
kubectl --namespace={{ .Release.Namespace }} get pods -l "app={{ template "fullname" . }}"
kubectl --namespace={{ .Release.Namespace }} get pods -l "app={{ template "fullname" . }}"
{{- if (not .Values.env.apiserverCert) and (not .Values.env.apiserverKey) }}
Note:
TLS key pair not provided for VK HTTP listener. A key pair was generated for you. This generated key pair is not suitable for production use.
{{- end }}

View File

@@ -0,0 +1,14 @@
{{ if .Values.rbac.install }}
apiVersion: "rbac.authorization.k8s.io/{{ .Values.rbac.apiVersion }}"
kind: ClusterRoleBinding
metadata:
name: {{ template "fullname" . }}
subjects:
- kind: ServiceAccount
name: {{ template "fullname" . }}
namespace: {{ .Release.Namespace }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ .Values.rbac.roleRef }}
{{ end }}

View File

@@ -56,5 +56,8 @@ spec:
hostPath:
path: /etc/kubernetes/azure.json
type: File
{{ if .Values.rbac.install }}
serviceAccountName: {{ template "fullname" . }}
{{ end }}
nodeSelector:
beta.kubernetes.io/os: linux

View File

@@ -4,6 +4,16 @@ metadata:
name: {{ template "fullname" . }}
type: Opaque
data:
cert.pem: {{ (default "TUlTU0lORw==" .Values.env.apiserverCert) | quote }}
key.pem: {{ (default "TUlTU0lORw==" .Values.env.apiserverKey) | quote }}
clientSecret: {{ default "" .Values.env.azureClientKey | b64enc | quote }}
{{- if (not .Values.env.apiserverCert) and (not .Values.env.apiserverKey) }}
{{- $ca := genCA "virtual-kubelet-ca" 3650 }}
{{- $cn := printf "%s-virtual-kubelet-apiserver" .Release.Name }}
{{- $altName1 := printf "%s-virtual-kubelet-apiserver.%s" .Release.Name .Release.Namespace }}
{{- $altName2 := printf "%s-virtual-kubelet-apiserver.%s.svc" .Release.Name .Release.Namespace }}
{{- $cert := genSignedCert $cn nil (list $altName1 $altName2) 3650 $ca }}
cert.pem: {{ b64enc $cert.Cert }}
key.pem: {{ b64enc $cert.Key }}
{{ else }}
cert.pem: {{ quote .Values.env.apiserverCert }}
key.pem: {{ quote .Values.env.apiserverKey }}
{{ end}}
clientSecret: {{ default "" .Values.env.azureClientKey | b64enc | quote }}

View File

@@ -0,0 +1,6 @@
{{ if .Values.rbac.install }}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ template "fullname" . }}
{{ end }}

View File

@@ -15,3 +15,11 @@ env:
apiserverCert:
apiserverKey:
monitoredNamespace:
# Install Default RBAC roles and bindings
rbac:
install: true
## RBAC api version
apiVersion: v1beta1
# Cluster role reference
roleRef: cluster-admin

Binary file not shown.

View File

@@ -1,5 +1,5 @@
name: virtual-kubelet
version: 0.1.2
version: 0.1.3
description: a Helm chart to install virtual kubelet inside a Kubernetes cluster.
sources:
- https://github.com/virtual-kubelet/virtual-kubelet

View File

@@ -19,3 +19,10 @@ To update, run:
--set env.azureClientId=<YOUR-AZURECLIENTID-HERE>,env.azureClientKey=<YOUR-AZURECLIENTKEY-HERE>,env.azureTenantId=<YOUR-AZURETENANTID-HERE>,env.azureSubscriptionId=<YOUR-AZURESUBSCRIPTIONID-HERE>,env.aciResourceGroup=<YOUR-ACIRESOURCEGROUP-HERE>,ev.aciOsType=<Linux|Windows>,rbac.install=<false|true>
{{- end }}
{{- if (not .Values.env.apiserverCert) and (not .Values.env.apiserverKey) }}
Note:
TLS key pair not provided for VK HTTP listener. A key pair was generated for you. This generated key pair is not suitable for production use.
{{- end }}

View File

@@ -26,6 +26,10 @@ spec:
value: /etc/virtual-kubelet/cert.pem
- name: APISERVER_KEY_LOCATION
value: /etc/virtual-kubelet/key.pem
{{ if .Values.loganalytics.enabled }}
- name: LOG_ANALYTICS_AUTH_LOCATION
value: /etc/virtual-kubelet/loganalytics.json
{{ end }}
- name: VKUBELET_POD_IP
valueFrom:
fieldRef:

View File

@@ -5,5 +5,18 @@ metadata:
type: Opaque
data:
credentials.json: {{ printf "{ \"clientId\": \"%s\", \"clientSecret\": \"%s\", \"subscriptionId\": \"%s\", \"tenantId\": \"%s\", \"activeDirectoryEndpointUrl\": \"https://login.microsoftonline.com/\", \"resourceManagerEndpointUrl\": \"https://management.azure.com/\", \"activeDirectoryGraphResourceId\": \"https://graph.windows.net/\", \"sqlManagementEndpointUrl\": \"database.windows.net\", \"galleryEndpointUrl\": \"https://gallery.azure.com/\", \"managementEndpointUrl\": \"https://management.core.windows.net/\" }" (default "MISSING" .Values.env.azureClientId) (default "MISSING" .Values.env.azureClientKey) (default "MISSING" .Values.env.azureSubscriptionId) (default "MISSING" .Values.env.azureTenantId) | b64enc | quote }}
cert.pem: {{ (default "TUlTU0lORw==" .Values.env.apiserverCert) | quote }}
key.pem: {{ (default "TUlTU0lORw==" .Values.env.apiserverKey) | quote }}
{{- if (not .Values.env.apiserverCert) and (not .Values.env.apiserverKey) }}
{{- $ca := genCA "virtual-kubelet-ca" 3650 }}
{{- $cn := printf "%s-virtual-kubelet-apiserver" .Release.Name }}
{{- $altName1 := printf "%s-virtual-kubelet-apiserver.%s" .Release.Name .Release.Namespace }}
{{- $altName2 := printf "%s-virtual-kubelet-apiserver.%s.svc" .Release.Name .Release.Namespace }}
{{- $cert := genSignedCert $cn nil (list $altName1 $altName2) 3650 $ca }}
cert.pem: {{ b64enc $cert.Cert }}
key.pem: {{ b64enc $cert.Key }}
{{ else }}
cert.pem: {{ quote .Values.env.apiserverCert }}
key.pem: {{ quote .Values.env.apiserverKey }}
{{ end}}
{{ if .Values.loganalytics.enabled }}
loganalytics.json: {{ printf "{\"workspaceID\": \"%s\",\"workspaceKey\": \"%s\"}" (required "workspaceID is required for loganalytics" .Values.loganalytics.workspaceID ) (required "workspaceKey is required for loganalytics" .Values.loganalytics.workspaceKey ) }}
{{ end }}

View File

@@ -14,7 +14,11 @@ env:
nodeOsType:
apiserverCert:
apiserverKey:
monitoredNamespace:
monitoredNamespace:
loganalytics:
enabled: false
workspaceID:
workspaceKey:
# Install Default RBAC roles and bindings
rbac:

View File

@@ -67,6 +67,11 @@ func Execute() {
func init() {
cobra.OnInitialize(initConfig)
// read default node name from environment variable.
// it can be overwritten by cli flags if specified.
if os.Getenv("DEFAULT_NODE_NAME") != "" {
nodeName = os.Getenv("DEFAULT_NODE_NAME")
}
// Here you will define your flags and configuration settings.
// Cobra supports persistent flags, which, if defined here,
// will be global for your application.

View File

@@ -18,6 +18,7 @@ type ResourceManager struct {
k8sClient kubernetes.Interface
pods map[string]*v1.Pod
deletingPods map[string]*v1.Pod
configMapRef map[string]int64
configMaps map[string]*v1.ConfigMap
secretRef map[string]int64
@@ -28,6 +29,7 @@ type ResourceManager struct {
func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
rm := ResourceManager{
pods: make(map[string]*v1.Pod, 0),
deletingPods: make(map[string]*v1.Pod, 0),
configMapRef: make(map[string]int64, 0),
secretRef: make(map[string]int64, 0),
configMaps: make(map[string]*v1.ConfigMap, 0),
@@ -81,53 +83,52 @@ func (rm *ResourceManager) SetPods(pods *v1.PodList) {
rm.secrets = make(map[string]*v1.Secret, len(pods.Items))
for k, p := range pods.Items {
if p.Status.Phase == v1.PodSucceeded {
continue
}
rm.pods[rm.getStoreKey(p.Namespace, p.Name)] = &pods.Items[k]
rm.incrementRefCounters(&p)
}
}
// AddPod adds a pod to the internal cache.
func (rm *ResourceManager) AddPod(p *v1.Pod) {
rm.Lock()
defer rm.Unlock()
if p.Status.Phase == v1.PodSucceeded {
return
}
podKey := rm.getStoreKey(p.Namespace, p.Name)
if _, ok := rm.pods[podKey]; ok {
rm.UpdatePod(p)
return
}
rm.pods[podKey] = p
rm.incrementRefCounters(p)
}
// UpdatePod updates the supplied pod in the cache.
func (rm *ResourceManager) UpdatePod(p *v1.Pod) {
func (rm *ResourceManager) UpdatePod(p *v1.Pod) bool {
rm.Lock()
defer rm.Unlock()
podKey := rm.getStoreKey(p.Namespace, p.Name)
if p.Status.Phase == v1.PodSucceeded {
delete(rm.pods, podKey)
if p.DeletionTimestamp != nil {
if old, ok := rm.pods[podKey]; ok {
rm.deletingPods[podKey] = p
rm.decrementRefCounters(old)
delete(rm.pods, podKey)
return true
}
if _, ok := rm.deletingPods[podKey]; ok {
return false
}
return false
}
if old, ok := rm.pods[podKey]; ok {
rm.decrementRefCounters(old)
rm.pods[podKey] = p
rm.incrementRefCounters(p)
// NOTE(junjiez): no reconcile as we don't support update pod.
return false
}
rm.incrementRefCounters(p)
rm.pods[podKey] = p
rm.incrementRefCounters(p)
return true
}
// DeletePod removes the pod from the cache.
func (rm *ResourceManager) DeletePod(p *v1.Pod) {
func (rm *ResourceManager) DeletePod(p *v1.Pod) bool {
rm.Lock()
defer rm.Unlock()
@@ -135,7 +136,14 @@ func (rm *ResourceManager) DeletePod(p *v1.Pod) {
if old, ok := rm.pods[podKey]; ok {
rm.decrementRefCounters(old)
delete(rm.pods, podKey)
return true
}
if _, ok := rm.deletingPods[podKey]; ok {
delete(rm.deletingPods, podKey)
}
return false
}
// GetPod retrieves the specified pod from the cache. It returns nil if a pod is not found.

View File

@@ -23,7 +23,7 @@ func TestResourceManager(t *testing.T) {
pod1Name := "Pod1"
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)
pm.AddPod(pod1)
pm.UpdatePod(pod1)
pods := pm.GetPods()
if len(pods) != 1 {
@@ -40,7 +40,7 @@ func TestResourceManagerDeletePod(t *testing.T) {
pod1Name := "Pod1"
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)
pm.AddPod(pod1)
pm.UpdatePod(pod1)
pods := pm.GetPods()
if len(pods) != 1 {
t.Errorf("Got %d, expected 1 pod", len(pods))
@@ -65,7 +65,7 @@ func TestResourceManagerUpdatePod(t *testing.T) {
pod1Name := "Pod1"
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)
pm.AddPod(pod1)
pm.UpdatePod(pod1)
pods := pm.GetPods()
if len(pods) != 1 {

View File

@@ -63,7 +63,7 @@ func (p *FargateProvider) loadConfigFile(filePath string) error {
return err
}
// loadConfigStream loads the given Fargate provider TOML configuration stream.
// loadConfig loads the given Fargate provider TOML configuration stream.
func (p *FargateProvider) loadConfig(r io.Reader) error {
var config providerConfig
var q resource.Quantity

View File

@@ -52,6 +52,18 @@ func newContainer(spec *corev1.Container) (*container, error) {
cntr.definition.WorkingDirectory = aws.String(spec.WorkingDir)
}
// Add environment variables.
if spec.Env != nil {
for _, env := range spec.Env {
cntr.definition.Environment = append(
cntr.definition.Environment,
&ecs.KeyValuePair{
Name: aws.String(env.Name),
Value: aws.String(env.Value),
})
}
}
// Translate the Kubernetes container resource requirements to Fargate units.
cntr.setResourceRequirements(&spec.Resources)

View File

@@ -32,6 +32,10 @@ var (
Command: []string{"anyCmd"},
Args: []string{"anyArg1", "anyArg2"},
WorkingDir: "/any/working/dir",
Env: []corev1.EnvVar{
{Name: "anyEnvName1", Value: "anyEnvValue1"},
{Name: "anyEnvName2", Value: "anyEnvValue2"},
},
}
)
@@ -46,8 +50,17 @@ func TestContainerDefinition(t *testing.T) {
assert.Equal(t, cntrSpec.Name, *cntr.definition.Name, "incorrect name")
assert.Equal(t, cntrSpec.Image, *cntr.definition.Image, "incorrect image")
assert.Equal(t, cntrSpec.Command[0], *cntr.definition.EntryPoint[0], "incorrect command")
assert.Equal(t, cntrSpec.Args[0], *cntr.definition.Command[0], "incorrect args")
for i, env := range cntrSpec.Args {
assert.Equal(t, env, *cntr.definition.Command[i], "incorrect args")
}
assert.Equal(t, cntrSpec.WorkingDir, *cntr.definition.WorkingDirectory, "incorrect working dir")
for i, env := range cntrSpec.Env {
assert.Equal(t, env.Name, *cntr.definition.Environment[i].Name, "incorrect env name")
assert.Equal(t, env.Value, *cntr.definition.Environment[i].Value, "incorrect env value")
}
}
// TestContainerResourceRequirementsDefaults verifies whether the container gets default CPU

View File

@@ -178,17 +178,18 @@ func (p *FargateProvider) GetContainerLogs(namespace, podName, containerName str
return p.cluster.GetContainerLogs(namespace, podName, containerName, tail)
}
// Get full pod name as defined in the provider context
// GetPodFullName retrieves the full pod name as defined in the provider context.
func (p *FargateProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
// TODO: Implementation
func (p *FargateProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
func (p *FargateProvider) ExecInContainer(
name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser,
tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("Received ExecInContainer request for %s.\n", container)
return errNotImplemented
}
// GetPodStatus retrieves the status of a pod by name from the provider.

View File

@@ -237,7 +237,13 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) {
"/bin/sh",
},
Args: []string{
"-c", "echo \"Started\"; while true; do sleep 1; done",
"-c",
"echo \"Started\";" +
"echo \"TEST_ENV=$TEST_ENV\";" +
"while true; do sleep 1; done",
},
Env: []v1.EnvVar{
{Name: "TEST_ENV", Value: "AnyValue"},
},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
@@ -281,8 +287,18 @@ func TestAWSFargateProviderPodLifecycle(t *testing.T) {
t.Error(err)
}
if logs != "Started\n" {
t.Errorf("Expected logs to be \"Started\\n\", but received \"%v\"", logs)
// Test log output.
receivedLogs := strings.Split(logs, "\n")
expectedLogs := []string{
"Started",
pod.Spec.Containers[0].Env[0].Name + "=" + pod.Spec.Containers[0].Env[0].Value,
}
for i, line := range receivedLogs {
fmt.Printf("Log[#%d]: %v\n", i, line)
if len(expectedLogs) > i && receivedLogs[i] != expectedLogs[i] {
t.Errorf("Expected log line %d to be %q, but received %q", i, line, receivedLogs[i])
}
}
// Delete the pod.

View File

@@ -120,7 +120,7 @@ that you've created an [AKS cluster](https://docs.microsoft.com/en-us/azure/aks/
To install the ACI Connector use the az cli and the aks namespace. Make sure to use the resource group of the aks cluster you've created and the name of the aks cluster you've created. You can choose the connector name to be anything. Choose any command below to install the Linux, Windows, or both the Windows and Linux Connector.
Note: You need to specify the --aci-resource-group, due to a bug in the az cli. The resource groupis the auto-generated. To find the name navigate to the Azure Portal resource groups, scroll down and find the name that matches MC_aks cluster name_aks rg_location.
Note: You need to specify the --aci-resource-group, due to a bug in the az cli. The resource group is then auto-generated. To find the name navigate to the Azure Portal resource groups, scroll down and find the name that matches MC_aks cluster name_aks rg_location.
1. Install the Linux ACI Connector

View File

@@ -44,6 +44,7 @@ type ACIProvider struct {
pods string
internalIP string
daemonEndpointPort int32
diagnostics *aci.ContainerGroupDiagnostics
}
// AuthConfig is the secret returned from an ImageRegistryCredential
@@ -155,6 +156,25 @@ func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operat
return nil, err
}
// If the log analytics file has been specified, load workspace credentials from the file
if logAnalyticsAuthFile := os.Getenv("LOG_ANALYTICS_AUTH_LOCATION"); logAnalyticsAuthFile != "" {
p.diagnostics, err = aci.NewContainerGroupDiagnosticsFromFile(logAnalyticsAuthFile)
if err != nil {
return nil, err
}
}
// If we have both the log analytics workspace id and key, add them to the provider
// Environment variables overwrite the values provided in the file
if logAnalyticsID := os.Getenv("LOG_ANALYTICS_ID"); logAnalyticsID != "" {
if logAnalyticsKey := os.Getenv("LOG_ANALYTICS_KEY"); logAnalyticsKey != "" {
p.diagnostics, err = aci.NewContainerGroupDiagnostics(logAnalyticsID, logAnalyticsKey)
if err != nil {
return nil, err
}
}
}
if rg := os.Getenv("ACI_RESOURCE_GROUP"); rg != "" {
p.resourceGroup = rg
}
@@ -227,6 +247,7 @@ func (p *ACIProvider) CreatePod(pod *v1.Pod) error {
containerGroup.ContainerGroupProperties.Containers = containers
containerGroup.ContainerGroupProperties.Volumes = volumes
containerGroup.ContainerGroupProperties.ImageRegistryCredentials = creds
containerGroup.ContainerGroupProperties.Diagnostics = p.diagnostics
filterServiceAccountSecretVolume(p.operatingSystem, &containerGroup)
@@ -655,10 +676,8 @@ func (p *ACIProvider) getContainers(pod *v1.Pod) ([]aci.Container, error) {
c.EnvironmentVariables = make([]aci.EnvironmentVariable, 0, len(container.Env))
for _, e := range container.Env {
c.EnvironmentVariables = append(c.EnvironmentVariables, aci.EnvironmentVariable{
Name: e.Name,
Value: e.Value,
})
envVar := getACIEnvVar(e)
c.EnvironmentVariables = append(c.EnvironmentVariables, envVar)
}
// NOTE(robbiezhang): ACI CPU request must be times of 10m
@@ -703,11 +722,67 @@ func (p *ACIProvider) getContainers(pod *v1.Pod) ([]aci.Container, error) {
}
}
if container.LivenessProbe != nil {
probe, err := getProbe(container.LivenessProbe)
if err != nil {
return nil, err
}
c.LivenessProbe = probe
}
if container.ReadinessProbe != nil {
probe, err := getProbe(container.ReadinessProbe)
if err != nil {
return nil, err
}
c.ReadinessProbe = probe
}
containers = append(containers, c)
}
return containers, nil
}
func getProbe(probe *v1.Probe) (*aci.ContainerProbe, error) {
if probe.Handler.Exec != nil && probe.Handler.HTTPGet != nil {
return nil, fmt.Errorf("probe may not specify more than one of \"exec\" and \"httpGet\"")
}
if probe.Handler.Exec == nil && probe.Handler.HTTPGet == nil {
return nil, fmt.Errorf("probe must specify one of \"exec\" and \"httpGet\"")
}
// Probes have can have a Exec or HTTP Get Handler.
// Create those if they exist, then add to the
// ContainerProbe struct
var exec *aci.ContainerExecProbe
if probe.Handler.Exec != nil {
exec = &aci.ContainerExecProbe{
Command: probe.Handler.Exec.Command,
}
}
var httpGET *aci.ContainerHTTPGetProbe
if probe.Handler.HTTPGet != nil {
httpGET = &aci.ContainerHTTPGetProbe{
Port: probe.Handler.HTTPGet.Port.IntValue(),
Path: probe.Handler.HTTPGet.Path,
Scheme: string(probe.Handler.HTTPGet.Scheme),
}
}
return &aci.ContainerProbe{
Exec: exec,
HTTPGet: httpGET,
InitialDelaySeconds: probe.InitialDelaySeconds,
Period: probe.PeriodSeconds,
FailureThreshold: probe.FailureThreshold,
SuccessThreshold: probe.SuccessThreshold,
TimeoutSeconds: probe.TimeoutSeconds,
}, nil
}
func (p *ACIProvider) getVolumes(pod *v1.Pod) ([]aci.Volume, error) {
volumes := make([]aci.Volume, 0, len(pod.Spec.Volumes))
for _, v := range pod.Spec.Volumes {
@@ -1037,3 +1112,20 @@ func filterServiceAccountSecretVolume(osType string, containerGroup *aci.Contain
containerGroup.ContainerGroupProperties.Volumes = volumes
}
}
func getACIEnvVar(e v1.EnvVar) aci.EnvironmentVariable {
var envVar aci.EnvironmentVariable
// If the variable is a secret, use SecureValue
if e.ValueFrom.SecretKeyRef != nil {
envVar = aci.EnvironmentVariable{
Name: e.Name,
SecureValue: e.Value,
}
} else {
envVar = aci.EnvironmentVariable{
Name: e.Name,
Value: e.Value,
}
}
return envVar
}

View File

@@ -20,6 +20,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/fake"
)
@@ -363,6 +364,58 @@ func TestGetPodWithoutResourceRequestsLimits(t *testing.T) {
"Containers[0].Resources.Requests.Memory doesn't match")
}
func TestPodToACISecretEnvVar(t *testing.T) {
testKey := "testVar"
testVal := "testVal"
e := v1.EnvVar{
Name: testKey,
Value: testVal,
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: &v1.SecretKeySelector{},
},
}
aciEnvVar := getACIEnvVar(e)
if aciEnvVar.Value != "" {
t.Fatalf("ACI Env Variable Value should be empty for a secret")
}
if aciEnvVar.Name != testKey {
t.Fatalf("ACI Env Variable Name does not match expected Name")
}
if aciEnvVar.SecureValue != testVal {
t.Fatalf("ACI Env Variable Secure Value does not match expected value")
}
}
func TestPodToACIEnvVar(t *testing.T) {
testKey := "testVar"
testVal := "testVal"
e := v1.EnvVar{
Name: testKey,
Value: testVal,
ValueFrom: &v1.EnvVarSource{},
}
aciEnvVar := getACIEnvVar(e)
if aciEnvVar.SecureValue != "" {
t.Fatalf("ACI Env Variable Secure Value should be empty for non-secret variables")
}
if aciEnvVar.Name != testKey {
t.Fatalf("ACI Env Variable Name does not match expected Name")
}
if aciEnvVar.Value != testVal {
t.Fatalf("ACI Env Variable Value does not match expected value")
}
}
func prepareMocks() (*AADMock, *ACIMock, *ACIProvider, error) {
aadServerMocker := NewAADMock()
aciServerMocker := NewACIMock()
@@ -408,3 +461,127 @@ func prepareMocks() (*AADMock, *ACIMock, *ACIProvider, error) {
func ptrQuantity(q resource.Quantity) *resource.Quantity {
return &q
}
func TestCreatePodWithLivenessProbe(t *testing.T) {
_, aciServerMocker, provider, err := prepareMocks()
if err != nil {
t.Fatal("Unable to prepare the mocks", err)
}
podName := "pod-" + uuid.New().String()
podNamespace := "ns-" + uuid.New().String()
aciServerMocker.OnCreate = func(subscription, resourceGroup, containerGroup string, cg *aci.ContainerGroup) (int, interface{}) {
assert.Equal(t, fakeSubscription, subscription, "Subscription doesn't match")
assert.Equal(t, fakeResourceGroup, resourceGroup, "Resource group doesn't match")
assert.NotNil(t, cg, "Container group is nil")
assert.Equal(t, podNamespace+"-"+podName, containerGroup, "Container group name is not expected")
assert.NotNil(t, cg.ContainerGroupProperties, "Container group properties should not be nil")
assert.NotNil(t, cg.ContainerGroupProperties.Containers, "Containers should not be nil")
assert.Equal(t, 1, len(cg.ContainerGroupProperties.Containers), "1 Container is expected")
assert.Equal(t, "nginx", cg.ContainerGroupProperties.Containers[0].Name, "Container nginx is expected")
assert.NotNil(t, cg.Containers[0].LivenessProbe, "Liveness probe expected")
assert.Equal(t, cg.Containers[0].LivenessProbe.InitialDelaySeconds, 10, "Initial Probe Delay doesn't match")
assert.Equal(t, cg.Containers[0].LivenessProbe.Period, 5, "Probe Period doesn't match")
assert.Equal(t, cg.Containers[0].LivenessProbe.TimeoutSeconds, 60, "Probe Timeout doesn't match")
assert.Equal(t, cg.Containers[0].LivenessProbe.SuccessThreshold, 3, "Probe Success Threshold doesn't match")
assert.Equal(t, cg.Containers[0].LivenessProbe.FailureThreshold, 5, "Probe Failure Threshold doesn't match")
assert.NotNil(t, cg.Containers[0].LivenessProbe.HTTPGet, "Expected an HTTP Get Probe")
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "nginx",
LivenessProbe: &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Port: intstr.FromString("8080"),
Path: "/",
},
},
InitialDelaySeconds: 10,
PeriodSeconds: 5,
TimeoutSeconds: 60,
SuccessThreshold: 3,
FailureThreshold: 5,
},
},
},
},
}
if err := provider.CreatePod(pod); err != nil {
t.Fatal("Failed to create pod", err)
}
return http.StatusOK, cg
}
}
func TestCreatePodWithReadinessProbe(t *testing.T) {
_, aciServerMocker, provider, err := prepareMocks()
if err != nil {
t.Fatal("Unable to prepare the mocks", err)
}
podName := "pod-" + uuid.New().String()
podNamespace := "ns-" + uuid.New().String()
aciServerMocker.OnCreate = func(subscription, resourceGroup, containerGroup string, cg *aci.ContainerGroup) (int, interface{}) {
assert.Equal(t, fakeSubscription, subscription, "Subscription doesn't match")
assert.Equal(t, fakeResourceGroup, resourceGroup, "Resource group doesn't match")
assert.NotNil(t, cg, "Container group is nil")
assert.Equal(t, podNamespace+"-"+podName, containerGroup, "Container group name is not expected")
assert.NotNil(t, cg.ContainerGroupProperties, "Container group properties should not be nil")
assert.NotNil(t, cg.ContainerGroupProperties.Containers, "Containers should not be nil")
assert.Equal(t, 1, len(cg.ContainerGroupProperties.Containers), "1 Container is expected")
assert.Equal(t, "nginx", cg.ContainerGroupProperties.Containers[0].Name, "Container nginx is expected")
assert.NotNil(t, cg.Containers[0].ReadinessProbe, "Readiness probe expected")
assert.Equal(t, cg.Containers[0].ReadinessProbe.InitialDelaySeconds, 10, "Initial Probe Delay doesn't match")
assert.Equal(t, cg.Containers[0].ReadinessProbe.Period, 5, "Probe Period doesn't match")
assert.Equal(t, cg.Containers[0].ReadinessProbe.TimeoutSeconds, 60, "Probe Timeout doesn't match")
assert.Equal(t, cg.Containers[0].ReadinessProbe.SuccessThreshold, 3, "Probe Success Threshold doesn't match")
assert.Equal(t, cg.Containers[0].ReadinessProbe.FailureThreshold, 5, "Probe Failure Threshold doesn't match")
assert.NotNil(t, cg.Containers[0].ReadinessProbe.HTTPGet, "Expected an HTTP Get Probe")
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "nginx",
ReadinessProbe: &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Port: intstr.FromString("8080"),
Path: "/",
},
},
InitialDelaySeconds: 10,
PeriodSeconds: 5,
TimeoutSeconds: 60,
SuccessThreshold: 3,
FailureThreshold: 5,
},
},
},
},
}
if err := provider.CreatePod(pod); err != nil {
t.Fatal("Failed to create pod", err)
}
return http.StatusOK, cg
}
}

View File

@@ -38,3 +38,29 @@ The file looks like this, in case you want to create it yourself:
"managementEndpointUrl": "https://management.core.windows.net/"
}
```
## Log Analytics support
Log Analytics is supported through environment variables:
- `LOG_ANALYTICS_KEY`
- `LOG_ANALYTICS_ID`
You can also specify a file with these values and specify the path to it in the `LOG_ANALYTICS_AUTH_LOCATION`:
``` bash
export LOG_ANALYTICS_AUTH_LOCATION=/secure/location/loganalytics.json
```
``` powershell
$env:LOG_ANALYTICS_AUTH_LOCATION= "/secure/location/loganalytics.json"
```
The file should look like this:
``` json
{
"workspaceID": "<YOUR_LOG_ANALYTICS_WORKSPACE_ID>",
"workspaceKey": "<YOUR_LOG_ANALYTICS_WORKSPACE_KEY>"
}
```

View File

@@ -0,0 +1,39 @@
package aci
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
)
func NewContainerGroupDiagnostics(logAnalyticsID, logAnalyticsKey string) (*ContainerGroupDiagnostics, error) {
if logAnalyticsID == "" || logAnalyticsKey == "" {
return nil, errors.New("Log Analytics configuration requires both the workspace ID and Key")
}
return &ContainerGroupDiagnostics{
LogAnalytics: &LogAnalyticsWorkspace{
WorkspaceID: logAnalyticsID,
WorkspaceKey: logAnalyticsKey,
},
}, nil
}
func NewContainerGroupDiagnosticsFromFile(filepath string) (*ContainerGroupDiagnostics, error) {
analyticsdata, err := ioutil.ReadFile(filepath)
if err != nil {
return nil, fmt.Errorf("Reading Log Analytics Auth file %q failed: %v", filepath, err)
}
// Unmarshal the log analytics file.
var law LogAnalyticsWorkspace
if err := json.Unmarshal(analyticsdata, &law); err != nil {
return nil, err
}
return &ContainerGroupDiagnostics{
LogAnalytics: &law,
}, nil
}

View File

@@ -0,0 +1,38 @@
package aci
import (
"io/ioutil"
"os"
"testing"
)
func TestLogAnalyticsFileParsingSuccess(t *testing.T) {
diagnostics, err := NewContainerGroupDiagnosticsFromFile("../../../../loganalytics.json")
if err != nil {
t.Fatal(err)
}
if diagnostics == nil || diagnostics.LogAnalytics == nil {
t.Fatalf("Unexpected nil diagnostics. Log Analytics file not parsed correctly")
}
if diagnostics.LogAnalytics.WorkspaceID == "" || diagnostics.LogAnalytics.WorkspaceKey == "" {
t.Fatalf("Unexpected empty analytics authentication credentials. Log Analytics file not parsed correctly")
}
}
func TestLogAnalyticsFileParsingFailure(t *testing.T) {
tempFile, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
_, err = NewContainerGroupDiagnosticsFromFile(tempFile.Name())
// Cleaup
tempFile.Close()
os.Remove(tempFile.Name())
if err == nil {
t.Fatalf("Expected parsing an empty Log Analytics auth file to fail, but there were no errors")
}
}

View File

@@ -10,8 +10,8 @@ import (
const (
// BaseURI is the default URI used for compute services.
baseURI = "https://management.azure.com"
userAgent = "virtual-kubelet/azure-arm-aci/2018-02-01"
apiVersion = "2018-02-01-preview"
userAgent = "virtual-kubelet/azure-arm-aci/2018-06-01"
apiVersion = "2018-06-01"
containerGroupURLPath = "subscriptions/{{.subscriptionId}}/resourceGroups/{{.resourceGroup}}/providers/Microsoft.ContainerInstance/containerGroups/{{.containerGroupName}}"
containerGroupListURLPath = "subscriptions/{{.subscriptionId}}/providers/Microsoft.ContainerInstance/containerGroups"

View File

@@ -217,6 +217,239 @@ func TestListContainerGroup(t *testing.T) {
}
}
func TestCreateContainerGroupWithLivenessProbe(t *testing.T) {
uid := uuid.New()
congainerGroupName := containerGroup + "-" + uid.String()[0:6]
cg, err := client.CreateContainerGroup(resourceGroup, congainerGroupName, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
Containers: []Container{
{
Name: "nginx",
ContainerProperties: ContainerProperties{
Image: "nginx",
Command: []string{"nginx", "-g", "daemon off;"},
Ports: []ContainerPort{
{
Protocol: ContainerNetworkProtocolTCP,
Port: 80,
},
},
Resources: ResourceRequirements{
Requests: &ResourceRequests{
CPU: 1,
MemoryInGB: 1,
},
Limits: &ResourceLimits{
CPU: 1,
MemoryInGB: 1,
},
},
LivenessProbe: &ContainerProbe{
HTTPGet: &ContainerHTTPGetProbe{
Port: 80,
},
},
},
},
},
},
})
if err != nil {
t.Fatal(err)
}
if cg.Name != congainerGroupName {
t.Fatalf("resource group name is %s, expected %s", cg.Name, congainerGroupName)
}
}
func TestCreateContainerGroupFailsWithLivenessProbeMissingPort(t *testing.T) {
uid := uuid.New()
congainerGroupName := containerGroup + "-" + uid.String()[0:6]
_, err := client.CreateContainerGroup(resourceGroup, congainerGroupName, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
Containers: []Container{
{
Name: "nginx",
ContainerProperties: ContainerProperties{
Image: "nginx",
Command: []string{"nginx", "-g", "daemon off;"},
Ports: []ContainerPort{
{
Protocol: ContainerNetworkProtocolTCP,
Port: 80,
},
},
Resources: ResourceRequirements{
Requests: &ResourceRequests{
CPU: 1,
MemoryInGB: 1,
},
Limits: &ResourceLimits{
CPU: 1,
MemoryInGB: 1,
},
},
LivenessProbe: &ContainerProbe{
HTTPGet: &ContainerHTTPGetProbe{
Path: "/",
},
},
},
},
},
},
})
if err == nil {
t.Fatal("expected failure")
}
}
func TestCreateContainerGroupWithReadinessProbe(t *testing.T) {
uid := uuid.New()
congainerGroupName := containerGroup + "-" + uid.String()[0:6]
cg, err := client.CreateContainerGroup(resourceGroup, congainerGroupName, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
Containers: []Container{
{
Name: "nginx",
ContainerProperties: ContainerProperties{
Image: "nginx",
Command: []string{"nginx", "-g", "daemon off;"},
Ports: []ContainerPort{
{
Protocol: ContainerNetworkProtocolTCP,
Port: 80,
},
},
Resources: ResourceRequirements{
Requests: &ResourceRequests{
CPU: 1,
MemoryInGB: 1,
},
Limits: &ResourceLimits{
CPU: 1,
MemoryInGB: 1,
},
},
ReadinessProbe: &ContainerProbe{
HTTPGet: &ContainerHTTPGetProbe{
Port: 80,
Path: "/",
},
InitialDelaySeconds: 5,
SuccessThreshold: 3,
FailureThreshold: 5,
TimeoutSeconds: 120,
},
},
},
},
},
})
if err != nil {
t.Fatal(err)
}
if cg.Name != congainerGroupName {
t.Fatalf("resource group name is %s, expected %s", cg.Name, congainerGroupName)
}
}
func TestCreateContainerGroupWithLogAnalytics(t *testing.T) {
diagnostics, err := NewContainerGroupDiagnosticsFromFile("../../../../loganalytics.json")
if err != nil {
t.Fatal(err)
}
cgname := "cgla"
cg, err := client.CreateContainerGroup(resourceGroup, cgname, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
Containers: []Container{
{
Name: "nginx",
ContainerProperties: ContainerProperties{
Image: "nginx",
Command: []string{"nginx", "-g", "daemon off;"},
Ports: []ContainerPort{
{
Protocol: ContainerNetworkProtocolTCP,
Port: 80,
},
},
Resources: ResourceRequirements{
Requests: &ResourceRequests{
CPU: 1,
MemoryInGB: 1,
},
Limits: &ResourceLimits{
CPU: 1,
MemoryInGB: 1,
},
},
},
},
},
Diagnostics: diagnostics,
},
})
if err != nil {
t.Fatal(err)
}
if cg.Name != cgname {
t.Fatalf("resource group name is %s, expected %s", cg.Name, cgname)
}
if err := client.DeleteContainerGroup(resourceGroup, cgname); err != nil {
t.Fatalf("Delete Container Group failed: %s", err.Error())
}
}
func TestCreateContainerGroupWithInvalidLogAnalytics(t *testing.T) {
law := &LogAnalyticsWorkspace{}
_, err := client.CreateContainerGroup(resourceGroup, containerGroup, ContainerGroup{
Location: location,
ContainerGroupProperties: ContainerGroupProperties{
OsType: Linux,
Containers: []Container{
{
Name: "nginx",
ContainerProperties: ContainerProperties{
Image: "nginx",
Command: []string{"nginx", "-g", "daemon off;"},
Ports: []ContainerPort{
{
Protocol: ContainerNetworkProtocolTCP,
Port: 80,
},
},
Resources: ResourceRequirements{
Requests: &ResourceRequests{
CPU: 1,
MemoryInGB: 1,
},
Limits: &ResourceLimits{
CPU: 1,
MemoryInGB: 1,
},
},
},
},
},
Diagnostics: &ContainerGroupDiagnostics{
LogAnalytics: law,
},
},
})
if err == nil {
t.Fatal("TestCreateContainerGroupWithInvalidLogAnalytics should fail but encountered no errors")
}
}
func TestDeleteContainerGroup(t *testing.T) {
err := client.DeleteContainerGroup(resourceGroup, containerGroup)
if err != nil {

View File

@@ -46,10 +46,5 @@ func (c *Client) DeleteContainerGroup(resourceGroup, containerGroupName string)
return err
}
// 204 No Content means the specified container group was not found.
if resp.StatusCode == http.StatusNoContent {
return fmt.Errorf("Container group with name %q was not found", containerGroupName)
}
return nil
}

View File

@@ -91,6 +91,7 @@ type ContainerGroupProperties struct {
OsType OperatingSystemTypes `json:"osType,omitempty"`
Volumes []Volume `json:"volumes,omitempty"`
InstanceView ContainerGroupPropertiesInstanceView `json:"instanceView,omitempty"`
Diagnostics *ContainerGroupDiagnostics `json:"diagnostics,omitempty"`
}
// ContainerGroupPropertiesInstanceView is the instance view of the container group. Only valid in response.
@@ -121,6 +122,8 @@ type ContainerProperties struct {
InstanceView ContainerPropertiesInstanceView `json:"instanceView,omitempty"`
Resources ResourceRequirements `json:"resources,omitempty"`
VolumeMounts []VolumeMount `json:"volumeMounts,omitempty"`
LivenessProbe *ContainerProbe `json:"livenessProbe,omitempty"`
ReadinessProbe *ContainerProbe `json:"readinessProbe,omitempty"`
}
// ContainerPropertiesInstanceView is the instance view of the container instance. Only valid in response.
@@ -142,8 +145,9 @@ type ContainerState struct {
// EnvironmentVariable is the environment variable to set within the container instance.
type EnvironmentVariable struct {
Name string `json:"name,omitempty"`
Value string `json:"value,omitempty"`
Name string `json:"name,omitempty"`
Value string `json:"value,omitempty"`
SecureValue string `json:"secureValue,omitempty"`
}
// Event is a container group or container instance event.
@@ -293,3 +297,38 @@ type ExecResponse struct {
WebSocketUri string `json:"webSocketUri,omitempty"`
Password string `json:"password,omitempty"`
}
// ContainerProbe is a probe definition that can be used for Liveness
// or Readiness checks.
type ContainerProbe struct {
Exec *ContainerExecProbe `json:"exec,omitempty"`
HTTPGet *ContainerHTTPGetProbe `json:"httpGet,omitempty"`
InitialDelaySeconds int32 `json:"initialDelaySeconds,omitempty"`
Period int32 `json:"periodSeconds,omitempty"`
FailureThreshold int32 `json:"failureThreshold,omitempty"`
SuccessThreshold int32 `json:"successThreshold,omitempty"`
TimeoutSeconds int32 `json:"timeoutSeconds,omitempty"`
}
// ContainerExecProbe defines a command based probe
type ContainerExecProbe struct {
Command []string `json:"command,omitempty"`
}
// ContainerHTTPGetProbe defines an HTTP probe
type ContainerHTTPGetProbe struct {
Port int `json:"port"`
Path string `json:"path,omitempty"`
Scheme string `json:"scheme,omitempty"`
}
// ContainerGroupDiagnostics contains an instance of LogAnalyticsWorkspace
type ContainerGroupDiagnostics struct {
LogAnalytics *LogAnalyticsWorkspace `json:"loganalytics,omitempty"`
}
// LogAnalyticsWorkspace defines details for a Log Analytics workspace
type LogAnalyticsWorkspace struct {
WorkspaceID string `json:"workspaceID,omitempty"`
WorkspaceKey string `json:"workspaceKey,omitempty"`
}

View File

@@ -1,6 +1,6 @@
# Kubernetes Virtual Kubelet with Azure Batch
[Azure Batch](https://docs.microsoft.com/en-us/azure/batch/) provides a HPC Computing environment in Azure for distributed tasks. Azure Batch handles scheduling decrete jobs and tasks accross pools of VM's. It is commonly used for batch processing tasks such as rendering.
[Azure Batch](https://docs.microsoft.com/en-us/azure/batch/) provides a HPC Computing environment in Azure for distributed tasks. Azure Batch handles scheduling of discrete jobs and tasks accross pools of VM's. It is commonly used for batch processing tasks such as rendering.
The Virtual kubelet integration allows you to take advantage of this from within Kubernetes. The primary usecase for the provider is to make it easy to use GPU based workload from normal Kubernetes clusters. For example, creating Kubernetes Jobs which train or execute ML models using Nvidia GPU's or using FFMPEG.
@@ -76,4 +76,4 @@ The provider expects the following environment variables to be configured:
The provider will assign pods to machines in the Azure Batch Pool. Each machine can, by default, process only one pod at a time
running more than 1 pod per machine isn't currently supported and will result in errors.
Azure Batch queues tasks when no machines are available so pods will sit in `podPending` state while waiting for a VM to become available.
Azure Batch queues tasks when no machines are available so pods will sit in `podPending` state while waiting for a VM to become available.

View File

@@ -42,7 +42,7 @@ type MockConfig struct {
// NewMockProvider creates a new MockProvider
func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*MockProvider, error) {
config, err := loadConfig(providerConfig)
config, err := loadConfig(providerConfig, nodeName)
if err != nil {
return nil, err
}
@@ -59,25 +59,28 @@ func NewMockProvider(providerConfig, nodeName, operatingSystem string, internalI
}
// loadConfig loads the given json configuration files.
func loadConfig(providerConfig string) (config MockConfig, err error) {
if providerConfig != "" {
data, err := ioutil.ReadFile(providerConfig)
if err != nil {
return config, err
func loadConfig(providerConfig, nodeName string) (config MockConfig, err error) {
data, err := ioutil.ReadFile(providerConfig)
if err != nil {
return config, err
}
configMap := map[string]MockConfig{}
err = json.Unmarshal(data, &configMap)
if err != nil {
return config, err
}
if _, exist := configMap[nodeName]; exist {
config = configMap[nodeName]
if config.CPU == "" {
config.CPU = defaultCPUCapacity
}
err = json.Unmarshal(data, &config)
if err != nil {
return config, err
if config.Memory == "" {
config.Memory = defaultMemoryCapacity
}
if config.Pods == "" {
config.Pods = defaultPodCapacity
}
}
if config.CPU == "" {
config.CPU = defaultCPUCapacity
}
if config.Memory == "" {
config.Memory = defaultMemoryCapacity
}
if config.Pods == "" {
config.Pods = defaultPodCapacity
}
if _, err = resource.ParseQuantity(config.CPU); err != nil {

View File

@@ -0,0 +1,64 @@
# Kubernetes Virtual Kubelet with Service Fabric Mesh
[Service Fabric Mesh](https://docs.microsoft.com/en-us/azure/service-fabric-mesh/service-fabric-mesh-overview) is a fully managed service that enables developers to deploy microservices applications without managing virtual machines, storage, or networking. Applications hosted on Service Fabric Mesh run and scale without you worrying about the infrastructure powering them.
The Virtual kubelet integration allows you to use the Kubernetes API to burst out compute to a Service Fabric Mesh cluster and schedule pods as Mesh Applications.
## Status: Experimental
This provider is currently in the experimental stages. Contributions are welcome!
## Setup
The provider expects the following environment variables to be configured:
- AZURE_CLIENT_ID
- AZURE_CLIENT_SECRET
- AZURE_SUBSCRIPTION_ID
- AZURE_TENANT_ID
- RESOURCE_GROUP
- REGION
## Quick Start
#### Run the Virtual Kubelet
```
./virtual-kubelet --provider=sfmesh --taint azure.com/sfmesh
```
#### Create pod yaml:
```
$ cat pod-nginx
apiVersion: v1
kind: Pod
metadata:
name: nginx
spec:
nodeName: virtual-kubelet
containers:
- name: nginx
image: nginx:latest
ports:
- containerPort: 80
tolerations:
- key: azure.com/sfmesh
effect: NoSchedule
```
#### create pod
```
$ kubectl create -f pod-nginx
```
#### list containers on Service Fabric Mesh
```
$ az mesh app list -o table
Name ResourceGroup ProvisioningState Location
------ --------------- ------------------- ----------
nginx myResourceGroup Succeeded eastus
```

851
providers/sfmesh/sfmesh.go Normal file
View File

@@ -0,0 +1,851 @@
package sfmesh
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
"strconv"
"time"
"github.com/Azure/azure-sdk-for-go/profiles/preview/preview/servicefabricmesh/mgmt/servicefabricmesh"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
)
const (
defaultCPUCapacity = "60"
defaultMemoryCapacity = "48Gi"
defaultPodCapacity = "5"
defaultCPURequests = 1.0
defaultMemoryRequests = 1.0
defaultCPULimit = 4.0
defaultMemoryLimit = 16.0
)
// SFMeshProvider implements the Virtual Kubelet provider interface
type SFMeshProvider struct {
nodeName string
operatingSystem string
internalIP string
daemonEndpointPort int32
appClient *servicefabricmesh.ApplicationClient
networkClient *servicefabricmesh.NetworkClient
serviceClient *servicefabricmesh.ServiceClient
region string
resourceGroup string
subscriptionID string
resourceManager *manager.ResourceManager
}
// AuthConfig is the secret returned from an ImageRegistryCredential
type AuthConfig struct {
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
Auth string `json:"auth,omitempty"`
Email string `json:"email,omitempty"`
ServerAddress string `json:"serveraddress,omitempty"`
IdentityToken string `json:"identitytoken,omitempty"`
RegistryToken string `json:"registrytoken,omitempty"`
}
// NewSFMeshProvider creates a new SFMeshProvider
func NewSFMeshProvider(rm *manager.ResourceManager, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*SFMeshProvider, error) {
azureSubscriptionID := os.Getenv("AZURE_SUBSCRIPTION_ID")
azureTenantID := os.Getenv("AZURE_TENANT_ID")
azureClientID := os.Getenv("AZURE_CLIENT_ID")
azureClientSecret := os.Getenv("AZURE_CLIENT_SECRET")
region := os.Getenv("REGION")
resourceGroup := os.Getenv("RESOURCE_GROUP")
if azureSubscriptionID == "" {
return nil, errors.New("Subscription ID cannot be empty, please set AZURE_SUBSCRIPTION_ID")
}
if azureTenantID == "" {
return nil, errors.New("Tenant ID cannot be empty, please set AZURE_TENANT_ID")
}
if azureClientID == "" {
return nil, errors.New("Client ID cannot be empty, please set AZURE_CLIENT_ID ")
}
if azureClientSecret == "" {
return nil, errors.New("Client Secret cannot be empty, please set AZURE_CLIENT_SECRET ")
}
if region == "" {
return nil, errors.New("Region cannot be empty, please set REGION ")
}
if resourceGroup == "" {
return nil, errors.New("Resource Group cannot be empty, please set RESOURCE_GROUP ")
}
client := servicefabricmesh.NewApplicationClient(azureSubscriptionID)
auth, err := auth.NewAuthorizerFromEnvironment()
if err != nil {
return nil, err
}
client.Authorizer = auth
networkClient := servicefabricmesh.NewNetworkClient(azureSubscriptionID)
networkClient.Authorizer = auth
serviceClient := servicefabricmesh.NewServiceClient(azureSubscriptionID)
serviceClient.Authorizer = auth
provider := SFMeshProvider{
nodeName: nodeName,
operatingSystem: operatingSystem,
internalIP: internalIP,
daemonEndpointPort: daemonEndpointPort,
appClient: &client,
networkClient: &networkClient,
serviceClient: &serviceClient,
region: region,
resourceGroup: resourceGroup,
resourceManager: rm,
subscriptionID: azureSubscriptionID,
}
return &provider, nil
}
func readDockerCfgSecret(secret *v1.Secret, ips []servicefabricmesh.ImageRegistryCredential) ([]servicefabricmesh.ImageRegistryCredential, error) {
var err error
var authConfigs map[string]AuthConfig
repoData, ok := secret.Data[string(v1.DockerConfigKey)]
if !ok {
return ips, fmt.Errorf("no dockercfg present in secret")
}
err = json.Unmarshal(repoData, &authConfigs)
if err != nil {
return ips, err
}
for server, authConfig := range authConfigs {
ips = append(ips, servicefabricmesh.ImageRegistryCredential{
Password: &authConfig.Password,
Server: &server,
Username: &authConfig.Username,
})
}
return ips, err
}
func readDockerConfigJSONSecret(secret *v1.Secret, ips []servicefabricmesh.ImageRegistryCredential) ([]servicefabricmesh.ImageRegistryCredential, error) {
var err error
repoData, ok := secret.Data[string(v1.DockerConfigJsonKey)]
if !ok {
return ips, fmt.Errorf("no dockerconfigjson present in secret")
}
var authConfigs map[string]map[string]AuthConfig
err = json.Unmarshal(repoData, &authConfigs)
if err != nil {
return ips, err
}
auths, ok := authConfigs["auths"]
if !ok {
return ips, fmt.Errorf("malformed dockerconfigjson in secret")
}
for server, authConfig := range auths {
ips = append(ips, servicefabricmesh.ImageRegistryCredential{
Password: &authConfig.Password,
Server: &server,
Username: &authConfig.Username,
})
}
return ips, err
}
func (p *SFMeshProvider) getImagePullSecrets(pod *v1.Pod) ([]servicefabricmesh.ImageRegistryCredential, error) {
ips := make([]servicefabricmesh.ImageRegistryCredential, 0, len(pod.Spec.ImagePullSecrets))
for _, ref := range pod.Spec.ImagePullSecrets {
secret, err := p.resourceManager.GetSecret(ref.Name, pod.Namespace)
if err != nil {
return ips, err
}
if secret == nil {
return nil, fmt.Errorf("error getting image pull secret")
}
switch secret.Type {
case v1.SecretTypeDockercfg:
ips, err = readDockerCfgSecret(secret, ips)
case v1.SecretTypeDockerConfigJson:
ips, err = readDockerConfigJSONSecret(secret, ips)
default:
return nil, fmt.Errorf("image pull secret type is not one of kubernetes.io/dockercfg or kubernetes.io/dockerconfigjson")
}
if err != nil {
return ips, err
}
}
return ips, nil
}
func (p *SFMeshProvider) getMeshApplication(pod *v1.Pod) (servicefabricmesh.ApplicationResourceDescription, error) {
meshApp := servicefabricmesh.ApplicationResourceDescription{}
meshApp.Name = &pod.Name
meshApp.Location = &p.region
podUID := string(pod.UID)
podCreationTimestamp := pod.CreationTimestamp.String()
tags := map[string]*string{
"PodName": &pod.Name,
"ClusterName": &pod.ClusterName,
"NodeName": &pod.Spec.NodeName,
"Namespace": &pod.Namespace,
"UID": &podUID,
"CreationTimestamp": &podCreationTimestamp,
}
meshApp.Tags = tags
properties := servicefabricmesh.ApplicationResourceProperties{}
meshApp.ApplicationResourceProperties = &properties
services := []servicefabricmesh.ServiceResourceDescription{}
service := servicefabricmesh.ServiceResourceDescription{}
serviceName := *meshApp.Name + "-service"
service.Name = &serviceName
serviceType := "Microsoft.ServiceFabricMesh/services"
service.Type = &serviceType
creds, err := p.getImagePullSecrets(pod)
if err != nil {
return meshApp, err
}
codePackages := []servicefabricmesh.ContainerCodePackageProperties{}
for _, container := range pod.Spec.Containers {
codePackage := servicefabricmesh.ContainerCodePackageProperties{}
codePackage.Image = &container.Image
codePackage.Name = &container.Name
if creds != nil {
if len(creds) > 0 {
// Mesh ImageRegistryCredential supports only a single credential
codePackage.ImageRegistryCredential = &creds[0]
}
}
requirements := servicefabricmesh.ResourceRequirements{}
requests := servicefabricmesh.ResourceRequests{}
cpuRequest := defaultCPURequests
memoryRequest := defaultMemoryRequests
if container.Resources.Requests != nil {
if _, ok := container.Resources.Requests[v1.ResourceCPU]; ok {
containerCPURequest := float64(container.Resources.Requests.Cpu().MilliValue()/10.00) / 100.00
if containerCPURequest > 1 && containerCPURequest <= 4 {
cpuRequest = containerCPURequest
}
}
if _, ok := container.Resources.Requests[v1.ResourceMemory]; ok {
containerMemoryRequest := float64(container.Resources.Requests.Memory().Value()/100000000.00) / 10.00
if containerMemoryRequest < 0.10 {
containerMemoryRequest = 0.10
}
memoryRequest = containerMemoryRequest
}
}
requests.CPU = &cpuRequest
requests.MemoryInGB = &memoryRequest
requirements.Requests = &requests
if container.Resources.Limits != nil {
cpuLimit := defaultCPULimit
memoryLimit := defaultMemoryLimit
limits := servicefabricmesh.ResourceLimits{}
limits.CPU = &cpuLimit
limits.MemoryInGB = &memoryLimit
if _, ok := container.Resources.Limits[v1.ResourceCPU]; ok {
containerCPULimit := float64(container.Resources.Limits.Cpu().MilliValue()) / 1000.00
if containerCPULimit > 1 {
limits.CPU = &containerCPULimit
}
}
if _, ok := container.Resources.Limits[v1.ResourceMemory]; ok {
containerMemoryLimit := float64(container.Resources.Limits.Memory().Value()) / 1000000000.00
if containerMemoryLimit < 0.10 {
containerMemoryLimit = 0.10
}
limits.MemoryInGB = &containerMemoryLimit
}
requirements.Limits = &limits
}
codePackage.Resources = &requirements
if len(container.Command) > 0 {
codePackage.Commands = &container.Command
}
if len(container.Env) > 0 {
envVars := []servicefabricmesh.EnvironmentVariable{}
for _, envVar := range container.Env {
env := servicefabricmesh.EnvironmentVariable{}
env.Name = &envVar.Name
env.Value = &envVar.Value
envVars = append(envVars, env)
}
codePackage.EnvironmentVariables = &envVars
}
endpoints := []servicefabricmesh.EndpointProperties{}
for _, port := range container.Ports {
endpoint := p.getEndpointFromContainerPort(port)
endpoints = append(endpoints, endpoint)
}
if len(endpoints) > 0 {
codePackage.Endpoints = &endpoints
}
codePackages = append(codePackages, codePackage)
}
serviceProperties := servicefabricmesh.ServiceResourceProperties{}
serviceProperties.OsType = servicefabricmesh.Linux
replicaCount := int32(1)
serviceProperties.ReplicaCount = &replicaCount
serviceProperties.CodePackages = &codePackages
service.ServiceResourceProperties = &serviceProperties
services = append(services, service)
properties.Services = &services
return meshApp, nil
}
func (p *SFMeshProvider) getMeshNetwork(pod *v1.Pod, meshApp servicefabricmesh.ApplicationResourceDescription, location string) servicefabricmesh.NetworkResourceDescription {
network := servicefabricmesh.NetworkResourceDescription{}
network.Name = meshApp.Name
network.Location = &location
networkProperties := servicefabricmesh.NetworkResourceProperties{}
addressPrefix := "10.0.0.4/22"
networkProperties.AddressPrefix = &addressPrefix
layers := []servicefabricmesh.Layer4IngressConfig{}
service := (*meshApp.Services)[0]
for _, codePackage := range *service.CodePackages {
for _, endpoint := range *codePackage.Endpoints {
layer := p.getLayer(&endpoint, *meshApp.Name, *service.Name)
layers = append(layers, layer)
}
}
ingressConfig := servicefabricmesh.IngressConfig{}
ingressConfig.Layer4 = &layers
networkProperties.IngressConfig = &ingressConfig
network.NetworkResourceProperties = &networkProperties
return network
}
func (p *SFMeshProvider) getLayer(endpoint *servicefabricmesh.EndpointProperties, appName string, serviceName string) servicefabricmesh.Layer4IngressConfig {
layer := servicefabricmesh.Layer4IngressConfig{}
name := *endpoint.Name + "Ingress"
layerName := &name
layer.Name = layerName
layer.PublicPort = endpoint.Port
layer.EndpointName = endpoint.Name
layer.ApplicationName = &appName
layer.ServiceName = &serviceName
return layer
}
func (p *SFMeshProvider) getEndpointFromContainerPort(port v1.ContainerPort) servicefabricmesh.EndpointProperties {
endpoint := servicefabricmesh.EndpointProperties{}
endpointName := strconv.Itoa(int(port.ContainerPort)) + "Listener"
endpoint.Name = &endpointName
endpoint.Port = &port.ContainerPort
return endpoint
}
// CreatePod accepts a Pod definition and creates a SF Mesh App.
func (p *SFMeshProvider) CreatePod(pod *v1.Pod) error {
log.Printf("receive CreatePod %q\n", pod.Name)
meshApp, err := p.getMeshApplication(pod)
if err != nil {
return err
}
meshNetwork := p.getMeshNetwork(pod, meshApp, p.region)
_, err = p.networkClient.Create(context.Background(), p.resourceGroup, *meshNetwork.Name, meshNetwork)
if err != nil {
return err
}
networkName := *meshNetwork.Name
resourceID := "/subscriptions/" + p.subscriptionID + "/resourceGroups/" + p.resourceGroup + "/providers/Microsoft.ServiceFabricMesh/networks/" + networkName
service := (*meshApp.Services)[0]
networkRef := servicefabricmesh.NetworkRef{}
networkRef.Name = &resourceID
networkRefs := []servicefabricmesh.NetworkRef{}
networkRefs = append(networkRefs, networkRef)
service.NetworkRefs = &networkRefs
_, err = p.appClient.Create(context.Background(), p.resourceGroup, pod.Name, meshApp)
if err != nil {
return err
}
return nil
}
// UpdatePod updates the pod running inside SF Mesh.
func (p *SFMeshProvider) UpdatePod(pod *v1.Pod) error {
log.Printf("receive UpdatePod %q\n", pod.Name)
app, err := p.getMeshApplication(pod)
if err != nil {
return err
}
_, err = p.appClient.Create(context.Background(), p.resourceGroup, pod.Name, app)
if err != nil {
return err
}
return nil
}
// DeletePod deletes the specified pod out of SF Mesh.
func (p *SFMeshProvider) DeletePod(pod *v1.Pod) (err error) {
log.Printf("receive DeletePod %q\n", pod.Name)
_, err = p.appClient.Delete(context.Background(), p.resourceGroup, pod.Name)
if err != nil {
return err
}
return nil
}
// GetPod returns a pod by name that is running inside SF Mesh.
// returns nil if a pod by that name is not found.
func (p *SFMeshProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) {
log.Printf("receive GetPod %q\n", name)
resp, err := p.appClient.Get(context.Background(), p.resourceGroup, name)
httpResponse := resp.Response.Response
if err != nil {
if httpResponse.StatusCode == 404 {
return nil, nil
}
return nil, err
}
if resp.Tags == nil {
return nil, nil
}
val, present := resp.Tags["NodeName"]
if !present {
return nil, nil
}
if *val != p.nodeName {
return nil, nil
}
pod, err = p.applicationDescriptionToPod(resp)
if err != nil {
return nil, err
}
return pod, nil
}
func (p *SFMeshProvider) appStateToPodPhase(state string) v1.PodPhase {
switch state {
case "Succeeded":
return v1.PodRunning
case "Failed":
return v1.PodFailed
case "Canceled":
return v1.PodFailed
case "Creating":
return v1.PodPending
case "Updating":
return v1.PodPending
}
return v1.PodUnknown
}
func (p *SFMeshProvider) appStateToPodConditions(state string, transitiontime metav1.Time) []v1.PodCondition {
switch state {
case "Succeeded":
return []v1.PodCondition{
v1.PodCondition{
Type: v1.PodReady,
Status: v1.ConditionTrue,
LastTransitionTime: transitiontime,
}, v1.PodCondition{
Type: v1.PodInitialized,
Status: v1.ConditionTrue,
LastTransitionTime: transitiontime,
}, v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
LastTransitionTime: transitiontime,
},
}
}
return []v1.PodCondition{}
}
func (p *SFMeshProvider) getMeshService(appName string, serviceName string) (servicefabricmesh.ServiceResourceDescription, error) {
svc, err := p.serviceClient.Get(context.Background(), p.resourceGroup, appName, serviceName)
if err != nil {
return servicefabricmesh.ServiceResourceDescription{}, err
}
return svc, err
}
func appStateToContainerState(state string, appStartTime metav1.Time) v1.ContainerState {
if state == "Succeeded" {
return v1.ContainerState{
Running: &v1.ContainerStateRunning{
StartedAt: appStartTime,
},
}
}
if state == "Failed" || state == "Canceled" {
return v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 1,
Reason: "",
Message: "",
StartedAt: appStartTime,
FinishedAt: metav1.NewTime(time.Now()),
},
}
}
return v1.ContainerState{
Waiting: &v1.ContainerStateWaiting{
Reason: "",
Message: "",
},
}
}
func (p *SFMeshProvider) getMeshNetworkPublicIP(networkName string) (*string, error) {
network, err := p.networkClient.Get(context.Background(), p.resourceGroup, networkName)
if err != nil {
return nil, err
}
ipAddress := network.IngressConfig.PublicIPAddress
return ipAddress, nil
}
func (p *SFMeshProvider) applicationDescriptionToPod(app servicefabricmesh.ApplicationResourceDescription) (*v1.Pod, error) {
var podCreationTimestamp metav1.Time
if *app.Tags["CreationTimestamp"] != "" {
t, err := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", *app.Tags["CreationTimestamp"])
if err != nil {
return nil, err
}
podCreationTimestamp = metav1.NewTime(t)
}
containerStartTime := podCreationTimestamp
appState := app.ProvisioningState
podPhase := p.appStateToPodPhase(*appState)
podConditions := p.appStateToPodConditions(*appState, podCreationTimestamp)
service, err := p.getMeshService(*app.Name, (*app.ServiceNames)[0])
containers := []v1.Container{}
containerStatuses := []v1.ContainerStatus{}
for _, codePkg := range *service.CodePackages {
container := v1.Container{}
container.Name = *codePkg.Name
container.Image = *codePkg.Image
if codePkg.Commands != nil {
container.Command = *codePkg.Commands
}
container.Resources = v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%g", *codePkg.Resources.Requests.CPU)),
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%gG", *codePkg.Resources.Requests.MemoryInGB)),
},
}
if codePkg.Resources.Limits != nil {
container.Resources.Limits = v1.ResourceList{
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%g", *codePkg.Resources.Limits.CPU)),
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%gG", *codePkg.Resources.Limits.MemoryInGB)),
}
}
containerStatus := v1.ContainerStatus{
Name: *codePkg.Name,
State: appStateToContainerState(*appState, podCreationTimestamp),
Ready: podPhase == v1.PodRunning,
Image: container.Image,
ImageID: "",
ContainerID: "",
}
containerStatuses = append(containerStatuses, containerStatus)
containers = append(containers, container)
}
appName := app.Name
ipAddress := ""
meshIP, err := p.getMeshNetworkPublicIP(*appName)
if err != nil {
return nil, err
}
if meshIP != nil {
ipAddress = *meshIP
}
pod := v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: *app.Tags["PodName"],
Namespace: *app.Tags["Namespace"],
ClusterName: *app.Tags["ClusterName"],
UID: types.UID(*app.Tags["UID"]),
CreationTimestamp: podCreationTimestamp,
},
Spec: v1.PodSpec{
NodeName: *app.Tags["NodeName"],
Volumes: []v1.Volume{},
Containers: containers,
},
Status: v1.PodStatus{
Phase: podPhase,
Conditions: podConditions,
Message: "",
Reason: "",
HostIP: "",
PodIP: ipAddress,
StartTime: &containerStartTime,
ContainerStatuses: containerStatuses,
},
}
return &pod, nil
}
// GetContainerLogs retrieves the logs of a container by name.
func (p *SFMeshProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
log.Printf("receive GetContainerLogs %q\n", podName)
return "", nil
}
// GetPodFullName gets the full pod name as defined in the provider context
func (p *SFMeshProvider) GetPodFullName(namespace string, pod string) string {
return ""
}
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
func (p *SFMeshProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
log.Printf("receive ExecInContainer %q\n", container)
return nil
}
// GetPodStatus returns the status of a pod by name that is "running".
// returns nil if a pod by that name is not found.
func (p *SFMeshProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
pod, err := p.GetPod(namespace, name)
if err != nil {
return nil, err
}
if pod == nil {
return nil, nil
}
return &pod.Status, nil
}
// GetPods returns a list of all pods known to be running within SF Mesh.
func (p *SFMeshProvider) GetPods() ([]*v1.Pod, error) {
log.Printf("receive GetPods\n")
var pods []*v1.Pod
list, err := p.appClient.ListByResourceGroup(context.Background(), p.resourceGroup)
if err != nil {
return pods, err
}
apps := list.Values()
for _, app := range apps {
if app.Tags == nil {
continue
}
val, present := app.Tags["NodeName"]
if !present {
continue
}
if *val != p.nodeName {
continue
}
pod, err := p.applicationDescriptionToPod(app)
if err != nil {
return pods, err
}
pods = append(pods, pod)
}
return pods, nil
}
// Capacity returns a resource list containing the capacity limits set for SF Mesh.
func (p *SFMeshProvider) Capacity() v1.ResourceList {
return v1.ResourceList{
"cpu": resource.MustParse(defaultCPUCapacity),
"memory": resource.MustParse(defaultMemoryCapacity),
"pods": resource.MustParse(defaultPodCapacity),
}
}
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
// within Kubernetes.
func (p *SFMeshProvider) NodeConditions() []v1.NodeCondition {
// TODO: Make this configurable
return []v1.NodeCondition{
{
Type: "Ready",
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "KubeletReady",
Message: "kubelet is ready.",
},
{
Type: "OutOfDisk",
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "KubeletHasSufficientDisk",
Message: "kubelet has sufficient disk space available",
},
{
Type: "MemoryPressure",
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "KubeletHasSufficientMemory",
Message: "kubelet has sufficient memory available",
},
{
Type: "DiskPressure",
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "KubeletHasNoDiskPressure",
Message: "kubelet has no disk pressure",
},
{
Type: "NetworkUnavailable",
Status: v1.ConditionFalse,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "RouteCreated",
Message: "RouteController created a route",
},
}
}
// NodeAddresses returns a list of addresses for the node status
// within Kubernetes.
func (p *SFMeshProvider) NodeAddresses() []v1.NodeAddress {
return []v1.NodeAddress{
{
Type: "InternalIP",
Address: p.internalIP,
},
}
}
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kubernetes.
func (p *SFMeshProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
return &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort,
},
}
}
// OperatingSystem returns the operating system for this provider.
// This is a noop to default to Linux for now.
func (p *SFMeshProvider) OperatingSystem() string {
return providers.OperatingSystemLinux
}

View File

@@ -0,0 +1,68 @@
package sfmesh
import (
"errors"
"os"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func setEnvVars() {
os.Setenv("AZURE_SUBSCRIPTION_ID", "fake")
os.Setenv("AZURE_TENANT_ID", "fake")
os.Setenv("AZURE_CLIENT_ID", "fake")
os.Setenv("AZURE_CLIENT_SECRET", "fake")
os.Setenv("REGION", "fake")
os.Setenv("RESOURCE_GROUP", "fake")
}
func Test_podToMeshApp(t *testing.T) {
setEnvVars()
pod := &v1.Pod{}
pod.ObjectMeta = metav1.ObjectMeta{
Name: "test-pod",
}
pod.Spec = v1.PodSpec{
Containers: []v1.Container{
{
Name: "testcontainer",
Image: "nginx",
Ports: []v1.ContainerPort{
{
Name: "http",
ContainerPort: 80,
},
},
},
},
}
provider, err := NewSFMeshProvider(nil, "testnode", "Linux", "6.6.6.6", 80)
if err != nil {
t.Error(err.Error())
}
_, err = provider.getMeshApplication(pod)
if err != nil {
t.Error(err.Error())
}
}
func Test_meshStateToPodCondition(t *testing.T) {
setEnvVars()
meshStateSucceeded := "Succeeded"
provider, err := NewSFMeshProvider(nil, "testnode", "Linux", "6.6.6.6", 80)
if err != nil {
t.Error(err.Error())
}
phase := provider.appStateToPodPhase(meshStateSucceeded)
if phase != v1.PodRunning {
t.Error(errors.New("PodRunning phase expected"))
}
}

View File

@@ -15,6 +15,14 @@ cat <<EOF > ${outputPathCredsfile}
}
EOF
# This will build the log analytics credentials during CI
cat <<EOF > ${outputPathLogAnalyticsFile}
{
"workspaceID": "$omsworkspaceID",
"workspaceKey": "$omsworkspaceKey"
}
EOF
# This will build the kubeConfig during the CI
cat <<EOF > ${outputPathKubeConfigFile}
---

View File

@@ -79,7 +79,7 @@ func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",")
q := req.URL.Query()
command := q.Get("command")
command := q["command"]
// streamOpts := &remotecommand.Options{
// Stdin: (q.Get("input") == "1"),
@@ -99,5 +99,5 @@ func ApiServerHandlerExec(w http.ResponseWriter, req *http.Request) {
idleTimeout := time.Second * 30
streamCreationTimeout := time.Second * 30
remotecommand.ServeExec(w, req, p, fmt.Sprintf("%s-%s", namespace, pod), "", container, []string{command}, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
remotecommand.ServeExec(w, req, p, fmt.Sprintf("%s-%s", namespace, pod), "", container, command, streamOpts, idleTimeout, streamCreationTimeout, supportedStreamProtocols)
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/providers/huawei"
"github.com/virtual-kubelet/virtual-kubelet/providers/hypersh"
"github.com/virtual-kubelet/virtual-kubelet/providers/mock"
"github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh"
"github.com/virtual-kubelet/virtual-kubelet/providers/vic"
"github.com/virtual-kubelet/virtual-kubelet/providers/web"
"k8s.io/api/core/v1"
@@ -28,6 +29,7 @@ var _ Provider = (*mock.MockProvider)(nil)
var _ Provider = (*huawei.CCIProvider)(nil)
var _ Provider = (*azurebatch.Provider)(nil)
var _ Provider = (*cri.CRIProvider)(nil)
var _ Provider = (*sfmesh.SFMeshProvider)(nil)
// Provider contains the methods required to implement a virtual-kubelet provider.
type Provider interface {

View File

@@ -18,6 +18,7 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/providers/huawei"
"github.com/virtual-kubelet/virtual-kubelet/providers/hypersh"
"github.com/virtual-kubelet/virtual-kubelet/providers/mock"
"github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh"
"github.com/virtual-kubelet/virtual-kubelet/providers/vic"
"github.com/virtual-kubelet/virtual-kubelet/providers/web"
corev1 "k8s.io/api/core/v1"
@@ -31,7 +32,7 @@ import (
)
const (
PodStatusReason_ProviderFailed = "ProviderFailed"
PodStatusReason_ProviderFailed = "ProviderFailed"
)
// Server masquarades itself as a kubelet and allows for the virtual node to be backed by non-vm/node providers.
@@ -127,6 +128,11 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, taint, provider, prov
if err != nil {
return nil, err
}
case "sfmesh":
p, err = sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
if err != nil {
return nil, err
}
default:
fmt.Printf("Provider '%s' is not supported\n", provider)
}
@@ -236,7 +242,7 @@ func (s *Server) Run() error {
log.Fatal("Failed to watch pods", err)
}
loop:
loop:
for {
select {
case ev, ok := <-s.podWatcher.ResultChan():
@@ -251,15 +257,19 @@ func (s *Server) Run() error {
}
log.Println("Pod watcher event is received:", ev.Type)
reconcile := false
switch ev.Type {
case watch.Added:
s.resourceManager.AddPod(ev.Object.(*corev1.Pod))
reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
case watch.Modified:
s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
reconcile = s.resourceManager.UpdatePod(ev.Object.(*corev1.Pod))
case watch.Deleted:
s.resourceManager.DeletePod(ev.Object.(*corev1.Pod))
reconcile = s.resourceManager.DeletePod(ev.Object.(*corev1.Pod))
}
if reconcile {
s.reconcile()
}
s.reconcile()
}
}
@@ -285,10 +295,10 @@ func (s *Server) updateNode() {
}
if errors.IsNotFound(err) {
if err = s.registerNode(); err != nil {
if err = s.registerNode(); err != nil {
log.Println("Failed to register node:", err)
return
}
return
}
}
n.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
@@ -310,6 +320,7 @@ func (s *Server) updateNode() {
// reconcile is the main reconciliation loop that compares differences between Kubernetes and
// the active provider and reconciles the differences.
func (s *Server) reconcile() {
log.Println("Start reconcile.")
providerPods, err := s.provider.GetPods()
if err != nil {
log.Println(err)
@@ -318,7 +329,8 @@ func (s *Server) reconcile() {
for _, pod := range providerPods {
// Delete pods that don't exist in Kubernetes
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil {
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil {
log.Printf("Deleting pod '%s'\n", pod.Name)
if err := s.deletePod(pod); err != nil {
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
continue
@@ -329,21 +341,25 @@ func (s *Server) reconcile() {
// Create any pods for k8s pods that don't exist in the provider
pods := s.resourceManager.GetPods()
for _, pod := range pods {
p, err := s.provider.GetPod(pod.Namespace, pod.Name)
if err != nil {
log.Printf("Error retrieving pod '%s' from provider: %s\n", pod.Name, err)
var providerPod *corev1.Pod
for _, p := range providerPods {
if p.Namespace == pod.Namespace && p.Name == pod.Name {
providerPod = p
break
}
}
if pod.DeletionTimestamp == nil && pod.Status.Phase != corev1.PodFailed && p == nil {
if pod.DeletionTimestamp == nil && pod.Status.Phase != corev1.PodFailed && providerPod == nil {
log.Printf("Creating pod '%s'\n", pod.Name)
if err := s.createPod(pod); err != nil {
log.Printf("Error creating pod '%s': %s\n", pod.Name, err)
continue
}
log.Printf("Pod '%s' created.\n", pod.Name)
}
// Delete pod if DeletionTimestamp set
// Delete pod if DeletionTimestamp is set
if pod.DeletionTimestamp != nil {
log.Printf("Pod '%s' is pending deletion.\n", pod.Name)
var err error
if err = s.deletePod(pod); err != nil {
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
@@ -373,25 +389,30 @@ func (s *Server) createPod(pod *corev1.Pod) error {
return origErr
}
log.Printf("Pod '%s' created.\n", pod.Name)
return nil
}
func (s *Server) deletePod(pod *corev1.Pod) error {
var delErr error
if delErr = s.provider.DeletePod(pod); delErr != nil && errors.IsNotFound(delErr) {
return fmt.Errorf("Error deleting pod '%s': %s", pod.Name, delErr)
return delErr
}
if !errors.IsNotFound(delErr) {
var grace int64
if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
if errors.IsNotFound(err) {
log.Printf("Pod '%s' doesn't exist.\n", pod.Name)
return nil
}
return fmt.Errorf("Failed to delete kubernetes pod: %s", err)
}
s.resourceManager.DeletePod(pod)
log.Printf("Pod '%s' deleted.\n", pod.Name)
}
@@ -403,17 +424,13 @@ func (s *Server) updatePodStatuses() {
// Update all the pods with the provider status.
pods := s.resourceManager.GetPods()
for _, pod := range pods {
if pod.DeletionTimestamp != nil && pod.Status.Phase == corev1.PodSucceeded {
continue
}
if pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == PodStatusReason_ProviderFailed {
if pod.Status.Phase == corev1.PodSucceeded || (pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == PodStatusReason_ProviderFailed) {
continue
}
status, err := s.provider.GetPodStatus(pod.Namespace, pod.Name)
if err != nil {
log.Printf("Error retrieving pod '%s' status from provider: %s\n", pod.Name, err)
log.Printf("Error retrieving pod '%s' in namespace '%s' status from provider: %s\n", pod.Name, pod.Namespace, err)
return
}