Merge pull request #285 from yaron2/sfmesh-branch
Added Service Fabric Mesh provider
This commit is contained in:
14
README.md
14
README.md
@@ -23,6 +23,7 @@ The best description is "Kubernetes API on top, programmable back."
|
||||
+ [Azure Batch GPU Provider](./providers/azurebatch/README.md)
|
||||
+ [AWS Fargate Provider](#aws-fargate-provider)
|
||||
+ [Hyper.sh Provider](#hypersh-provider)
|
||||
+ [Service Fabric Mesh Provider](#service-fabric-mesh-provider)
|
||||
+ [Adding a New Provider via the Provider Interface](#adding-a-new-provider-via-the-provider-interface)
|
||||
* [Testing](#testing)
|
||||
+ [Testing the Azure Provider Client](#testing-the-azure-provider-client)
|
||||
@@ -143,6 +144,19 @@ Kubernetes cluster.
|
||||
./bin/virtual-kubelet --provider hyper
|
||||
```
|
||||
|
||||
### Service Fabric Mesh Provider
|
||||
|
||||
The Service Fabric Mesh Provider allows you to deploy pods to Azure [Service Fabric Mesh](https://docs.microsoft.com/en-us/azure/service-fabric-mesh/service-fabric-mesh-overview).
|
||||
|
||||
Service Fabric Mesh is a fully managed service that lets developers deploy microservices without managing the underlying infrastructure.
|
||||
Pods deployed to Service Fabric Mesh will be assigned Public IPs from the Service Fabric Mesh network.
|
||||
|
||||
```
|
||||
./bin/virtual-kubelet --provider sfmesh --taint azure.com/sfmesh
|
||||
```
|
||||
|
||||
More detailed instructions can be found [here](providers/sfmesh/README.md).
|
||||
|
||||
### Adding a New Provider via the Provider Interface
|
||||
|
||||
The structure we chose allows you to have all the power of the Kubernetes API
|
||||
|
||||
64
providers/sfmesh/README.md
Normal file
64
providers/sfmesh/README.md
Normal file
@@ -0,0 +1,64 @@
|
||||
# Kubernetes Virtual Kubelet with Service Fabric Mesh
|
||||
|
||||
[Service Fabric Mesh](https://docs.microsoft.com/en-us/azure/service-fabric-mesh/service-fabric-mesh-overview) is a fully managed service that enables developers to deploy microservices applications without managing virtual machines, storage, or networking. Applications hosted on Service Fabric Mesh run and scale without you worrying about the infrastructure powering it.
|
||||
|
||||
The Virtual kubelet integration allows you to use the Kubernetes API to burst out compute to a Service Fabric Mesh cluster and schedule pods as Mesh Applications.
|
||||
|
||||
## Status: Experimental
|
||||
|
||||
This provider is currently in the exterimental stages. Contributions welcome!
|
||||
|
||||
## Setup
|
||||
|
||||
The provider expects the following environment variables to be configured:
|
||||
|
||||
- AZURE_CLIENT_ID
|
||||
- AZURE_CLIENT_SECRET
|
||||
- AZURE_SUBSCRIPTION_ID
|
||||
- AZURE_TENANT_ID
|
||||
- RESOURCE_GROUP
|
||||
- REGION
|
||||
|
||||
## Quick Start
|
||||
|
||||
#### Run the Virtual Kubelet
|
||||
|
||||
```
|
||||
./virtual-kubelet --provider=sfmesh --taint azure.com/sfmesh
|
||||
```
|
||||
|
||||
#### Create pod yaml:
|
||||
|
||||
```
|
||||
$ cat pod-nginx
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
name: nginx
|
||||
spec:
|
||||
nodeName: virtual-kubelet
|
||||
containers:
|
||||
- name: nginx
|
||||
image: nginx:latest
|
||||
ports:
|
||||
- containerPort: 80
|
||||
tolerations:
|
||||
- key: azure.com/sfmesh
|
||||
effect: NoSchedule
|
||||
```
|
||||
|
||||
#### create pod
|
||||
|
||||
```
|
||||
$ kubectl create -f pod-nginx
|
||||
```
|
||||
|
||||
#### list containers on Service Fabric Mesh
|
||||
|
||||
```
|
||||
$ az mesh app list -o table
|
||||
|
||||
Name ResourceGroup ProvisioningState Location
|
||||
------ --------------- ------------------- ----------
|
||||
nginx myResourceGroup Succeeded eastus
|
||||
```
|
||||
851
providers/sfmesh/sfmesh.go
Normal file
851
providers/sfmesh/sfmesh.go
Normal file
@@ -0,0 +1,851 @@
|
||||
package sfmesh
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/profiles/preview/preview/servicefabricmesh/mgmt/servicefabricmesh"
|
||||
"github.com/Azure/go-autorest/autorest/azure/auth"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/tools/remotecommand"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultCPUCapacity = "60"
|
||||
defaultMemoryCapacity = "48Gi"
|
||||
defaultPodCapacity = "5"
|
||||
defaultCPURequests = 1.0
|
||||
defaultMemoryRequests = 1.0
|
||||
defaultCPULimit = 4.0
|
||||
defaultMemoryLimit = 16.0
|
||||
)
|
||||
|
||||
// SFMeshProvider implements the Virtual Kubelet provider interface
|
||||
type SFMeshProvider struct {
|
||||
nodeName string
|
||||
operatingSystem string
|
||||
internalIP string
|
||||
daemonEndpointPort int32
|
||||
appClient *servicefabricmesh.ApplicationClient
|
||||
networkClient *servicefabricmesh.NetworkClient
|
||||
serviceClient *servicefabricmesh.ServiceClient
|
||||
region string
|
||||
resourceGroup string
|
||||
subscriptionID string
|
||||
resourceManager *manager.ResourceManager
|
||||
}
|
||||
|
||||
// AuthConfig is the secret returned from an ImageRegistryCredential
|
||||
type AuthConfig struct {
|
||||
Username string `json:"username,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
Auth string `json:"auth,omitempty"`
|
||||
Email string `json:"email,omitempty"`
|
||||
ServerAddress string `json:"serveraddress,omitempty"`
|
||||
IdentityToken string `json:"identitytoken,omitempty"`
|
||||
RegistryToken string `json:"registrytoken,omitempty"`
|
||||
}
|
||||
|
||||
// NewSFMeshProvider creates a new SFMeshProvider
|
||||
func NewSFMeshProvider(rm *manager.ResourceManager, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*SFMeshProvider, error) {
|
||||
azureSubscriptionID := os.Getenv("AZURE_SUBSCRIPTION_ID")
|
||||
azureTenantID := os.Getenv("AZURE_TENANT_ID")
|
||||
azureClientID := os.Getenv("AZURE_CLIENT_ID")
|
||||
azureClientSecret := os.Getenv("AZURE_CLIENT_SECRET")
|
||||
region := os.Getenv("REGION")
|
||||
resourceGroup := os.Getenv("RESOURCE_GROUP")
|
||||
|
||||
if azureSubscriptionID == "" {
|
||||
return nil, errors.New("Subscription ID cannot be empty, please set AZURE_SUBSCRIPTION_ID")
|
||||
}
|
||||
if azureTenantID == "" {
|
||||
return nil, errors.New("Tenant ID cannot be empty, please set AZURE_TENANT_ID")
|
||||
}
|
||||
if azureClientID == "" {
|
||||
return nil, errors.New("Client ID cannot be empty, please set AZURE_CLIENT_ID ")
|
||||
}
|
||||
if azureClientSecret == "" {
|
||||
return nil, errors.New("Client Secret cannot be empty, please set AZURE_CLIENT_SECRET ")
|
||||
}
|
||||
if region == "" {
|
||||
return nil, errors.New("Region cannot be empty, please set REGION ")
|
||||
}
|
||||
if resourceGroup == "" {
|
||||
return nil, errors.New("Resource Group cannot be empty, please set RESOURCE_GROUP ")
|
||||
}
|
||||
|
||||
client := servicefabricmesh.NewApplicationClient(azureSubscriptionID)
|
||||
|
||||
auth, err := auth.NewAuthorizerFromEnvironment()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client.Authorizer = auth
|
||||
|
||||
networkClient := servicefabricmesh.NewNetworkClient(azureSubscriptionID)
|
||||
networkClient.Authorizer = auth
|
||||
|
||||
serviceClient := servicefabricmesh.NewServiceClient(azureSubscriptionID)
|
||||
serviceClient.Authorizer = auth
|
||||
|
||||
provider := SFMeshProvider{
|
||||
nodeName: nodeName,
|
||||
operatingSystem: operatingSystem,
|
||||
internalIP: internalIP,
|
||||
daemonEndpointPort: daemonEndpointPort,
|
||||
appClient: &client,
|
||||
networkClient: &networkClient,
|
||||
serviceClient: &serviceClient,
|
||||
region: region,
|
||||
resourceGroup: resourceGroup,
|
||||
resourceManager: rm,
|
||||
subscriptionID: azureSubscriptionID,
|
||||
}
|
||||
return &provider, nil
|
||||
}
|
||||
|
||||
func readDockerCfgSecret(secret *v1.Secret, ips []servicefabricmesh.ImageRegistryCredential) ([]servicefabricmesh.ImageRegistryCredential, error) {
|
||||
var err error
|
||||
var authConfigs map[string]AuthConfig
|
||||
repoData, ok := secret.Data[string(v1.DockerConfigKey)]
|
||||
|
||||
if !ok {
|
||||
return ips, fmt.Errorf("no dockercfg present in secret")
|
||||
}
|
||||
|
||||
err = json.Unmarshal(repoData, &authConfigs)
|
||||
if err != nil {
|
||||
return ips, err
|
||||
}
|
||||
|
||||
for server, authConfig := range authConfigs {
|
||||
ips = append(ips, servicefabricmesh.ImageRegistryCredential{
|
||||
Password: &authConfig.Password,
|
||||
Server: &server,
|
||||
Username: &authConfig.Username,
|
||||
})
|
||||
}
|
||||
|
||||
return ips, err
|
||||
}
|
||||
|
||||
func readDockerConfigJSONSecret(secret *v1.Secret, ips []servicefabricmesh.ImageRegistryCredential) ([]servicefabricmesh.ImageRegistryCredential, error) {
|
||||
var err error
|
||||
repoData, ok := secret.Data[string(v1.DockerConfigJsonKey)]
|
||||
|
||||
if !ok {
|
||||
return ips, fmt.Errorf("no dockerconfigjson present in secret")
|
||||
}
|
||||
|
||||
var authConfigs map[string]map[string]AuthConfig
|
||||
|
||||
err = json.Unmarshal(repoData, &authConfigs)
|
||||
if err != nil {
|
||||
return ips, err
|
||||
}
|
||||
|
||||
auths, ok := authConfigs["auths"]
|
||||
|
||||
if !ok {
|
||||
return ips, fmt.Errorf("malformed dockerconfigjson in secret")
|
||||
}
|
||||
|
||||
for server, authConfig := range auths {
|
||||
ips = append(ips, servicefabricmesh.ImageRegistryCredential{
|
||||
Password: &authConfig.Password,
|
||||
Server: &server,
|
||||
Username: &authConfig.Username,
|
||||
})
|
||||
}
|
||||
|
||||
return ips, err
|
||||
}
|
||||
|
||||
func (p *SFMeshProvider) getImagePullSecrets(pod *v1.Pod) ([]servicefabricmesh.ImageRegistryCredential, error) {
|
||||
ips := make([]servicefabricmesh.ImageRegistryCredential, 0, len(pod.Spec.ImagePullSecrets))
|
||||
for _, ref := range pod.Spec.ImagePullSecrets {
|
||||
secret, err := p.resourceManager.GetSecret(ref.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
return ips, err
|
||||
}
|
||||
if secret == nil {
|
||||
return nil, fmt.Errorf("error getting image pull secret")
|
||||
}
|
||||
|
||||
switch secret.Type {
|
||||
case v1.SecretTypeDockercfg:
|
||||
ips, err = readDockerCfgSecret(secret, ips)
|
||||
case v1.SecretTypeDockerConfigJson:
|
||||
ips, err = readDockerConfigJSONSecret(secret, ips)
|
||||
default:
|
||||
return nil, fmt.Errorf("image pull secret type is not one of kubernetes.io/dockercfg or kubernetes.io/dockerconfigjson")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return ips, err
|
||||
}
|
||||
|
||||
}
|
||||
return ips, nil
|
||||
}
|
||||
|
||||
func (p *SFMeshProvider) getMeshApplication(pod *v1.Pod) (servicefabricmesh.ApplicationResourceDescription, error) {
|
||||
meshApp := servicefabricmesh.ApplicationResourceDescription{}
|
||||
meshApp.Name = &pod.Name
|
||||
meshApp.Location = &p.region
|
||||
|
||||
podUID := string(pod.UID)
|
||||
podCreationTimestamp := pod.CreationTimestamp.String()
|
||||
|
||||
tags := map[string]*string{
|
||||
"PodName": &pod.Name,
|
||||
"ClusterName": &pod.ClusterName,
|
||||
"NodeName": &pod.Spec.NodeName,
|
||||
"Namespace": &pod.Namespace,
|
||||
"UID": &podUID,
|
||||
"CreationTimestamp": &podCreationTimestamp,
|
||||
}
|
||||
|
||||
meshApp.Tags = tags
|
||||
|
||||
properties := servicefabricmesh.ApplicationResourceProperties{}
|
||||
meshApp.ApplicationResourceProperties = &properties
|
||||
|
||||
services := []servicefabricmesh.ServiceResourceDescription{}
|
||||
service := servicefabricmesh.ServiceResourceDescription{}
|
||||
serviceName := *meshApp.Name + "-service"
|
||||
service.Name = &serviceName
|
||||
serviceType := "Microsoft.ServiceFabricMesh/services"
|
||||
service.Type = &serviceType
|
||||
|
||||
creds, err := p.getImagePullSecrets(pod)
|
||||
if err != nil {
|
||||
return meshApp, err
|
||||
}
|
||||
|
||||
codePackages := []servicefabricmesh.ContainerCodePackageProperties{}
|
||||
|
||||
for _, container := range pod.Spec.Containers {
|
||||
codePackage := servicefabricmesh.ContainerCodePackageProperties{}
|
||||
codePackage.Image = &container.Image
|
||||
codePackage.Name = &container.Name
|
||||
|
||||
if creds != nil {
|
||||
if len(creds) > 0 {
|
||||
// Mesh ImageRegistryCredential supports only a single credential
|
||||
codePackage.ImageRegistryCredential = &creds[0]
|
||||
}
|
||||
}
|
||||
|
||||
requirements := servicefabricmesh.ResourceRequirements{}
|
||||
requests := servicefabricmesh.ResourceRequests{}
|
||||
|
||||
cpuRequest := defaultCPURequests
|
||||
memoryRequest := defaultMemoryRequests
|
||||
|
||||
if container.Resources.Requests != nil {
|
||||
if _, ok := container.Resources.Requests[v1.ResourceCPU]; ok {
|
||||
containerCPURequest := float64(container.Resources.Requests.Cpu().MilliValue()/10.00) / 100.00
|
||||
if containerCPURequest > 1 && containerCPURequest <= 4 {
|
||||
cpuRequest = containerCPURequest
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := container.Resources.Requests[v1.ResourceMemory]; ok {
|
||||
containerMemoryRequest := float64(container.Resources.Requests.Memory().Value()/100000000.00) / 10.00
|
||||
if containerMemoryRequest < 0.10 {
|
||||
containerMemoryRequest = 0.10
|
||||
}
|
||||
|
||||
memoryRequest = containerMemoryRequest
|
||||
}
|
||||
}
|
||||
|
||||
requests.CPU = &cpuRequest
|
||||
requests.MemoryInGB = &memoryRequest
|
||||
|
||||
requirements.Requests = &requests
|
||||
|
||||
if container.Resources.Limits != nil {
|
||||
cpuLimit := defaultCPULimit
|
||||
memoryLimit := defaultMemoryLimit
|
||||
|
||||
limits := servicefabricmesh.ResourceLimits{}
|
||||
limits.CPU = &cpuLimit
|
||||
limits.MemoryInGB = &memoryLimit
|
||||
|
||||
if _, ok := container.Resources.Limits[v1.ResourceCPU]; ok {
|
||||
containerCPULimit := float64(container.Resources.Limits.Cpu().MilliValue()) / 1000.00
|
||||
if containerCPULimit > 1 {
|
||||
limits.CPU = &containerCPULimit
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := container.Resources.Limits[v1.ResourceMemory]; ok {
|
||||
containerMemoryLimit := float64(container.Resources.Limits.Memory().Value()) / 1000000000.00
|
||||
if containerMemoryLimit < 0.10 {
|
||||
containerMemoryLimit = 0.10
|
||||
}
|
||||
|
||||
limits.MemoryInGB = &containerMemoryLimit
|
||||
}
|
||||
|
||||
requirements.Limits = &limits
|
||||
}
|
||||
|
||||
codePackage.Resources = &requirements
|
||||
|
||||
if len(container.Command) > 0 {
|
||||
codePackage.Commands = &container.Command
|
||||
}
|
||||
|
||||
if len(container.Env) > 0 {
|
||||
envVars := []servicefabricmesh.EnvironmentVariable{}
|
||||
|
||||
for _, envVar := range container.Env {
|
||||
env := servicefabricmesh.EnvironmentVariable{}
|
||||
env.Name = &envVar.Name
|
||||
env.Value = &envVar.Value
|
||||
|
||||
envVars = append(envVars, env)
|
||||
}
|
||||
|
||||
codePackage.EnvironmentVariables = &envVars
|
||||
}
|
||||
|
||||
endpoints := []servicefabricmesh.EndpointProperties{}
|
||||
|
||||
for _, port := range container.Ports {
|
||||
endpoint := p.getEndpointFromContainerPort(port)
|
||||
endpoints = append(endpoints, endpoint)
|
||||
}
|
||||
|
||||
if len(endpoints) > 0 {
|
||||
codePackage.Endpoints = &endpoints
|
||||
}
|
||||
|
||||
codePackages = append(codePackages, codePackage)
|
||||
}
|
||||
|
||||
serviceProperties := servicefabricmesh.ServiceResourceProperties{}
|
||||
serviceProperties.OsType = servicefabricmesh.Linux
|
||||
replicaCount := int32(1)
|
||||
serviceProperties.ReplicaCount = &replicaCount
|
||||
serviceProperties.CodePackages = &codePackages
|
||||
service.ServiceResourceProperties = &serviceProperties
|
||||
services = append(services, service)
|
||||
properties.Services = &services
|
||||
|
||||
return meshApp, nil
|
||||
}
|
||||
|
||||
func (p *SFMeshProvider) getMeshNetwork(pod *v1.Pod, meshApp servicefabricmesh.ApplicationResourceDescription, location string) servicefabricmesh.NetworkResourceDescription {
|
||||
network := servicefabricmesh.NetworkResourceDescription{}
|
||||
network.Name = meshApp.Name
|
||||
network.Location = &location
|
||||
|
||||
networkProperties := servicefabricmesh.NetworkResourceProperties{}
|
||||
addressPrefix := "10.0.0.4/22"
|
||||
networkProperties.AddressPrefix = &addressPrefix
|
||||
|
||||
layers := []servicefabricmesh.Layer4IngressConfig{}
|
||||
|
||||
service := (*meshApp.Services)[0]
|
||||
|
||||
for _, codePackage := range *service.CodePackages {
|
||||
for _, endpoint := range *codePackage.Endpoints {
|
||||
layer := p.getLayer(&endpoint, *meshApp.Name, *service.Name)
|
||||
layers = append(layers, layer)
|
||||
}
|
||||
}
|
||||
|
||||
ingressConfig := servicefabricmesh.IngressConfig{}
|
||||
ingressConfig.Layer4 = &layers
|
||||
|
||||
networkProperties.IngressConfig = &ingressConfig
|
||||
network.NetworkResourceProperties = &networkProperties
|
||||
|
||||
return network
|
||||
}
|
||||
|
||||
func (p *SFMeshProvider) getLayer(endpoint *servicefabricmesh.EndpointProperties, appName string, serviceName string) servicefabricmesh.Layer4IngressConfig {
|
||||
layer := servicefabricmesh.Layer4IngressConfig{}
|
||||
name := *endpoint.Name + "Ingress"
|
||||
layerName := &name
|
||||
layer.Name = layerName
|
||||
layer.PublicPort = endpoint.Port
|
||||
layer.EndpointName = endpoint.Name
|
||||
layer.ApplicationName = &appName
|
||||
layer.ServiceName = &serviceName
|
||||
|
||||
return layer
|
||||
}
|
||||
|
||||
func (p *SFMeshProvider) getEndpointFromContainerPort(port v1.ContainerPort) servicefabricmesh.EndpointProperties {
|
||||
endpoint := servicefabricmesh.EndpointProperties{}
|
||||
endpointName := strconv.Itoa(int(port.ContainerPort)) + "Listener"
|
||||
endpoint.Name = &endpointName
|
||||
endpoint.Port = &port.ContainerPort
|
||||
|
||||
return endpoint
|
||||
}
|
||||
|
||||
// CreatePod accepts a Pod definition and creates a SF Mesh App.
|
||||
func (p *SFMeshProvider) CreatePod(pod *v1.Pod) error {
|
||||
log.Printf("receive CreatePod %q\n", pod.Name)
|
||||
|
||||
meshApp, err := p.getMeshApplication(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
meshNetwork := p.getMeshNetwork(pod, meshApp, p.region)
|
||||
_, err = p.networkClient.Create(context.Background(), p.resourceGroup, *meshNetwork.Name, meshNetwork)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
networkName := *meshNetwork.Name
|
||||
resourceID := "/subscriptions/" + p.subscriptionID + "/resourceGroups/" + p.resourceGroup + "/providers/Microsoft.ServiceFabricMesh/networks/" + networkName
|
||||
|
||||
service := (*meshApp.Services)[0]
|
||||
|
||||
networkRef := servicefabricmesh.NetworkRef{}
|
||||
networkRef.Name = &resourceID
|
||||
|
||||
networkRefs := []servicefabricmesh.NetworkRef{}
|
||||
networkRefs = append(networkRefs, networkRef)
|
||||
|
||||
service.NetworkRefs = &networkRefs
|
||||
|
||||
_, err = p.appClient.Create(context.Background(), p.resourceGroup, pod.Name, meshApp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdatePod updates the pod running inside SF Mesh.
|
||||
func (p *SFMeshProvider) UpdatePod(pod *v1.Pod) error {
|
||||
log.Printf("receive UpdatePod %q\n", pod.Name)
|
||||
|
||||
app, err := p.getMeshApplication(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = p.appClient.Create(context.Background(), p.resourceGroup, pod.Name, app)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeletePod deletes the specified pod out of SF Mesh.
|
||||
func (p *SFMeshProvider) DeletePod(pod *v1.Pod) (err error) {
|
||||
log.Printf("receive DeletePod %q\n", pod.Name)
|
||||
|
||||
_, err = p.appClient.Delete(context.Background(), p.resourceGroup, pod.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPod returns a pod by name that is running inside SF Mesh.
|
||||
// returns nil if a pod by that name is not found.
|
||||
func (p *SFMeshProvider) GetPod(namespace, name string) (pod *v1.Pod, err error) {
|
||||
log.Printf("receive GetPod %q\n", name)
|
||||
|
||||
resp, err := p.appClient.Get(context.Background(), p.resourceGroup, name)
|
||||
httpResponse := resp.Response.Response
|
||||
|
||||
if err != nil {
|
||||
if httpResponse.StatusCode == 404 {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.Tags == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
val, present := resp.Tags["NodeName"]
|
||||
|
||||
if !present {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if *val != p.nodeName {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
pod, err = p.applicationDescriptionToPod(resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pod, nil
|
||||
}
|
||||
|
||||
func (p *SFMeshProvider) appStateToPodPhase(state string) v1.PodPhase {
|
||||
switch state {
|
||||
case "Succeeded":
|
||||
return v1.PodRunning
|
||||
case "Failed":
|
||||
return v1.PodFailed
|
||||
case "Canceled":
|
||||
return v1.PodFailed
|
||||
case "Creating":
|
||||
return v1.PodPending
|
||||
case "Updating":
|
||||
return v1.PodPending
|
||||
}
|
||||
|
||||
return v1.PodUnknown
|
||||
}
|
||||
|
||||
func (p *SFMeshProvider) appStateToPodConditions(state string, transitiontime metav1.Time) []v1.PodCondition {
|
||||
switch state {
|
||||
case "Succeeded":
|
||||
return []v1.PodCondition{
|
||||
v1.PodCondition{
|
||||
Type: v1.PodReady,
|
||||
Status: v1.ConditionTrue,
|
||||
LastTransitionTime: transitiontime,
|
||||
}, v1.PodCondition{
|
||||
Type: v1.PodInitialized,
|
||||
Status: v1.ConditionTrue,
|
||||
LastTransitionTime: transitiontime,
|
||||
}, v1.PodCondition{
|
||||
Type: v1.PodScheduled,
|
||||
Status: v1.ConditionTrue,
|
||||
LastTransitionTime: transitiontime,
|
||||
},
|
||||
}
|
||||
}
|
||||
return []v1.PodCondition{}
|
||||
}
|
||||
|
||||
func (p *SFMeshProvider) getMeshService(appName string, serviceName string) (servicefabricmesh.ServiceResourceDescription, error) {
|
||||
svc, err := p.serviceClient.Get(context.Background(), p.resourceGroup, appName, serviceName)
|
||||
if err != nil {
|
||||
return servicefabricmesh.ServiceResourceDescription{}, err
|
||||
}
|
||||
|
||||
return svc, err
|
||||
}
|
||||
|
||||
func appStateToContainerState(state string, appStartTime metav1.Time) v1.ContainerState {
|
||||
if state == "Succeeded" {
|
||||
return v1.ContainerState{
|
||||
Running: &v1.ContainerStateRunning{
|
||||
StartedAt: appStartTime,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if state == "Failed" || state == "Canceled" {
|
||||
return v1.ContainerState{
|
||||
Terminated: &v1.ContainerStateTerminated{
|
||||
ExitCode: 1,
|
||||
Reason: "",
|
||||
Message: "",
|
||||
StartedAt: appStartTime,
|
||||
FinishedAt: metav1.NewTime(time.Now()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return v1.ContainerState{
|
||||
Waiting: &v1.ContainerStateWaiting{
|
||||
Reason: "",
|
||||
Message: "",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (p *SFMeshProvider) getMeshNetworkPublicIP(networkName string) (*string, error) {
|
||||
network, err := p.networkClient.Get(context.Background(), p.resourceGroup, networkName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ipAddress := network.IngressConfig.PublicIPAddress
|
||||
return ipAddress, nil
|
||||
}
|
||||
|
||||
func (p *SFMeshProvider) applicationDescriptionToPod(app servicefabricmesh.ApplicationResourceDescription) (*v1.Pod, error) {
|
||||
var podCreationTimestamp metav1.Time
|
||||
|
||||
if *app.Tags["CreationTimestamp"] != "" {
|
||||
t, err := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", *app.Tags["CreationTimestamp"])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
podCreationTimestamp = metav1.NewTime(t)
|
||||
}
|
||||
|
||||
containerStartTime := podCreationTimestamp
|
||||
|
||||
appState := app.ProvisioningState
|
||||
podPhase := p.appStateToPodPhase(*appState)
|
||||
podConditions := p.appStateToPodConditions(*appState, podCreationTimestamp)
|
||||
|
||||
service, err := p.getMeshService(*app.Name, (*app.ServiceNames)[0])
|
||||
|
||||
containers := []v1.Container{}
|
||||
containerStatuses := []v1.ContainerStatus{}
|
||||
|
||||
for _, codePkg := range *service.CodePackages {
|
||||
container := v1.Container{}
|
||||
container.Name = *codePkg.Name
|
||||
container.Image = *codePkg.Image
|
||||
|
||||
if codePkg.Commands != nil {
|
||||
container.Command = *codePkg.Commands
|
||||
}
|
||||
|
||||
container.Resources = v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%g", *codePkg.Resources.Requests.CPU)),
|
||||
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%gG", *codePkg.Resources.Requests.MemoryInGB)),
|
||||
},
|
||||
}
|
||||
|
||||
if codePkg.Resources.Limits != nil {
|
||||
container.Resources.Limits = v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%g", *codePkg.Resources.Limits.CPU)),
|
||||
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%gG", *codePkg.Resources.Limits.MemoryInGB)),
|
||||
}
|
||||
}
|
||||
|
||||
containerStatus := v1.ContainerStatus{
|
||||
Name: *codePkg.Name,
|
||||
State: appStateToContainerState(*appState, podCreationTimestamp),
|
||||
Ready: podPhase == v1.PodRunning,
|
||||
Image: container.Image,
|
||||
ImageID: "",
|
||||
ContainerID: "",
|
||||
}
|
||||
|
||||
containerStatuses = append(containerStatuses, containerStatus)
|
||||
containers = append(containers, container)
|
||||
}
|
||||
|
||||
appName := app.Name
|
||||
ipAddress := ""
|
||||
meshIP, err := p.getMeshNetworkPublicIP(*appName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if meshIP != nil {
|
||||
ipAddress = *meshIP
|
||||
}
|
||||
|
||||
pod := v1.Pod{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "Pod",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: *app.Tags["PodName"],
|
||||
Namespace: *app.Tags["Namespace"],
|
||||
ClusterName: *app.Tags["ClusterName"],
|
||||
UID: types.UID(*app.Tags["UID"]),
|
||||
CreationTimestamp: podCreationTimestamp,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: *app.Tags["NodeName"],
|
||||
Volumes: []v1.Volume{},
|
||||
Containers: containers,
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: podPhase,
|
||||
Conditions: podConditions,
|
||||
Message: "",
|
||||
Reason: "",
|
||||
HostIP: "",
|
||||
PodIP: ipAddress,
|
||||
StartTime: &containerStartTime,
|
||||
ContainerStatuses: containerStatuses,
|
||||
},
|
||||
}
|
||||
|
||||
return &pod, nil
|
||||
}
|
||||
|
||||
// GetContainerLogs retrieves the logs of a container by name.
|
||||
func (p *SFMeshProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
|
||||
log.Printf("receive GetContainerLogs %q\n", podName)
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// GetPodFullName gets the full pod name as defined in the provider context
|
||||
func (p *SFMeshProvider) GetPodFullName(namespace string, pod string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// ExecInContainer executes a command in a container in the pod, copying data
|
||||
// between in/out/err and the container's stdin/stdout/stderr.
|
||||
func (p *SFMeshProvider) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
|
||||
log.Printf("receive ExecInContainer %q\n", container)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPodStatus returns the status of a pod by name that is "running".
|
||||
// returns nil if a pod by that name is not found.
|
||||
func (p *SFMeshProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
|
||||
pod, err := p.GetPod(namespace, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if pod == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return &pod.Status, nil
|
||||
}
|
||||
|
||||
// GetPods returns a list of all pods known to be running within SF Mesh.
|
||||
func (p *SFMeshProvider) GetPods() ([]*v1.Pod, error) {
|
||||
log.Printf("receive GetPods\n")
|
||||
|
||||
var pods []*v1.Pod
|
||||
|
||||
list, err := p.appClient.ListByResourceGroup(context.Background(), p.resourceGroup)
|
||||
if err != nil {
|
||||
return pods, err
|
||||
}
|
||||
|
||||
apps := list.Values()
|
||||
|
||||
for _, app := range apps {
|
||||
if app.Tags == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
val, present := app.Tags["NodeName"]
|
||||
|
||||
if !present {
|
||||
continue
|
||||
}
|
||||
|
||||
if *val != p.nodeName {
|
||||
continue
|
||||
}
|
||||
|
||||
pod, err := p.applicationDescriptionToPod(app)
|
||||
if err != nil {
|
||||
return pods, err
|
||||
}
|
||||
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
// Capacity returns a resource list containing the capacity limits set for SF Mesh.
|
||||
func (p *SFMeshProvider) Capacity() v1.ResourceList {
|
||||
return v1.ResourceList{
|
||||
"cpu": resource.MustParse(defaultCPUCapacity),
|
||||
"memory": resource.MustParse(defaultMemoryCapacity),
|
||||
"pods": resource.MustParse(defaultPodCapacity),
|
||||
}
|
||||
}
|
||||
|
||||
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
|
||||
// within Kubernetes.
|
||||
func (p *SFMeshProvider) NodeConditions() []v1.NodeCondition {
|
||||
// TODO: Make this configurable
|
||||
return []v1.NodeCondition{
|
||||
{
|
||||
Type: "Ready",
|
||||
Status: v1.ConditionTrue,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletReady",
|
||||
Message: "kubelet is ready.",
|
||||
},
|
||||
{
|
||||
Type: "OutOfDisk",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletHasSufficientDisk",
|
||||
Message: "kubelet has sufficient disk space available",
|
||||
},
|
||||
{
|
||||
Type: "MemoryPressure",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletHasSufficientMemory",
|
||||
Message: "kubelet has sufficient memory available",
|
||||
},
|
||||
{
|
||||
Type: "DiskPressure",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletHasNoDiskPressure",
|
||||
Message: "kubelet has no disk pressure",
|
||||
},
|
||||
{
|
||||
Type: "NetworkUnavailable",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "RouteCreated",
|
||||
Message: "RouteController created a route",
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// NodeAddresses returns a list of addresses for the node status
|
||||
// within Kubernetes.
|
||||
func (p *SFMeshProvider) NodeAddresses() []v1.NodeAddress {
|
||||
return []v1.NodeAddress{
|
||||
{
|
||||
Type: "InternalIP",
|
||||
Address: p.internalIP,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
|
||||
// within Kubernetes.
|
||||
func (p *SFMeshProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
|
||||
return &v1.NodeDaemonEndpoints{
|
||||
KubeletEndpoint: v1.DaemonEndpoint{
|
||||
Port: p.daemonEndpointPort,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// OperatingSystem returns the operating system for this provider.
|
||||
// This is a noop to default to Linux for now.
|
||||
func (p *SFMeshProvider) OperatingSystem() string {
|
||||
return providers.OperatingSystemLinux
|
||||
}
|
||||
68
providers/sfmesh/sfmesh_test.go
Normal file
68
providers/sfmesh/sfmesh_test.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package sfmesh
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func setEnvVars() {
|
||||
os.Setenv("AZURE_SUBSCRIPTION_ID", "fake")
|
||||
os.Setenv("AZURE_TENANT_ID", "fake")
|
||||
os.Setenv("AZURE_CLIENT_ID", "fake")
|
||||
os.Setenv("AZURE_CLIENT_SECRET", "fake")
|
||||
os.Setenv("REGION", "fake")
|
||||
os.Setenv("RESOURCE_GROUP", "fake")
|
||||
}
|
||||
|
||||
func Test_podToMeshApp(t *testing.T) {
|
||||
setEnvVars()
|
||||
|
||||
pod := &v1.Pod{}
|
||||
pod.ObjectMeta = metav1.ObjectMeta{
|
||||
Name: "test-pod",
|
||||
}
|
||||
pod.Spec = v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "testcontainer",
|
||||
Image: "nginx",
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
Name: "http",
|
||||
ContainerPort: 80,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
provider, err := NewSFMeshProvider(nil, "testnode", "Linux", "6.6.6.6", 80)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
|
||||
_, err = provider.getMeshApplication(pod)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func Test_meshStateToPodCondition(t *testing.T) {
|
||||
setEnvVars()
|
||||
|
||||
meshStateSucceeded := "Succeeded"
|
||||
|
||||
provider, err := NewSFMeshProvider(nil, "testnode", "Linux", "6.6.6.6", 80)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
|
||||
phase := provider.appStateToPodPhase(meshStateSucceeded)
|
||||
if phase != v1.PodRunning {
|
||||
t.Error(errors.New("PodRunning phase expected"))
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/huawei"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/hypersh"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/mock"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/vic"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/web"
|
||||
"k8s.io/api/core/v1"
|
||||
@@ -28,6 +29,7 @@ var _ Provider = (*mock.MockProvider)(nil)
|
||||
var _ Provider = (*huawei.CCIProvider)(nil)
|
||||
var _ Provider = (*azurebatch.Provider)(nil)
|
||||
var _ Provider = (*cri.CRIProvider)(nil)
|
||||
var _ Provider = (*sfmesh.SFMeshProvider)(nil)
|
||||
|
||||
// Provider contains the methods required to implement a virtual-kubelet provider.
|
||||
type Provider interface {
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/huawei"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/hypersh"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/mock"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/sfmesh"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/vic"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers/web"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -31,7 +32,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
PodStatusReason_ProviderFailed = "ProviderFailed"
|
||||
PodStatusReason_ProviderFailed = "ProviderFailed"
|
||||
)
|
||||
|
||||
// Server masquarades itself as a kubelet and allows for the virtual node to be backed by non-vm/node providers.
|
||||
@@ -127,6 +128,11 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, taint, provider, prov
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "sfmesh":
|
||||
p, err = sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
default:
|
||||
fmt.Printf("Provider '%s' is not supported\n", provider)
|
||||
}
|
||||
@@ -236,7 +242,7 @@ func (s *Server) Run() error {
|
||||
log.Fatal("Failed to watch pods", err)
|
||||
}
|
||||
|
||||
loop:
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case ev, ok := <-s.podWatcher.ResultChan():
|
||||
@@ -289,10 +295,10 @@ func (s *Server) updateNode() {
|
||||
}
|
||||
|
||||
if errors.IsNotFound(err) {
|
||||
if err = s.registerNode(); err != nil {
|
||||
if err = s.registerNode(); err != nil {
|
||||
log.Println("Failed to register node:", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
n.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
|
||||
@@ -339,7 +345,7 @@ func (s *Server) reconcile() {
|
||||
for _, p := range providerPods {
|
||||
if p.Namespace == pod.Namespace && p.Name == pod.Name {
|
||||
providerPod = p
|
||||
break;
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user