Add skeleton Fargate provider

This commit is contained in:
Onur Filiz
2018-03-16 14:15:29 -07:00
committed by Robbie Zhang
parent 71ec511ba0
commit feb2366e60
3 changed files with 137 additions and 8 deletions

121
providers/aws/provider.go Normal file
View File

@@ -0,0 +1,121 @@
package aws
import (
"fmt"
"log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
corev1 "k8s.io/api/core/v1"
)
// FargateProvider implements the virtual-kubelet provider interface.
type FargateProvider struct {
resourceManager *manager.ResourceManager
nodeName string
operatingSystem string
internalIP string
daemonEndpointPort int32
}
var (
errNotImplemented = fmt.Errorf("Not implemented by Fargate provider")
)
// NewFargateProvider creates a new Fargate provider.
func NewFargateProvider(
config string,
rm *manager.ResourceManager,
nodeName string,
operatingSystem string,
internalIP string,
daemonEndpointPort int32) (*FargateProvider, error) {
// Create the Fargate provider.
log.Println("Creating Fargate provider.")
p := FargateProvider{
resourceManager: rm,
nodeName: nodeName,
operatingSystem: operatingSystem,
internalIP: internalIP,
daemonEndpointPort: daemonEndpointPort,
}
log.Printf("Created Fargate provider: %+v.", p)
return &p, nil
}
// CreatePod takes a Kubernetes Pod and deploys it within the Fargate provider.
func (p *FargateProvider) CreatePod(pod *corev1.Pod) error {
log.Printf("Received CreatePod request for %+v.\n", pod)
return errNotImplemented
}
// UpdatePod takes a Kubernetes Pod and updates it within the provider.
func (p *FargateProvider) UpdatePod(pod *corev1.Pod) error {
log.Printf("Received UpdatePod request for %+v.\n", pod)
return errNotImplemented
}
// DeletePod takes a Kubernetes Pod and deletes it from the provider.
func (p *FargateProvider) DeletePod(pod *corev1.Pod) error {
log.Printf("Received DeletePod request for %+v.\n", pod)
return errNotImplemented
}
// GetPod retrieves a pod by name from the provider (can be cached).
func (p *FargateProvider) GetPod(namespace, name string) (*corev1.Pod, error) {
log.Printf("Received GetPod request for %s::%s.\n", namespace, name)
return nil, errNotImplemented
}
// GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *FargateProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
log.Printf("Received GetContainerLogs request for %s::%s::%s.\n", namespace, podName, containerName)
return "", errNotImplemented
}
// GetPodStatus retrieves the status of a pod by name from the provider.
func (p *FargateProvider) GetPodStatus(namespace, name string) (*corev1.PodStatus, error) {
log.Printf("Received GetPodStatus request for %s::%s.\n", namespace, name)
return nil, errNotImplemented
}
// GetPods retrieves a list of all pods running on the provider (can be cached).
func (p *FargateProvider) GetPods() ([]*corev1.Pod, error) {
log.Println("Received GetPods request.")
return nil, errNotImplemented
}
// Capacity returns a resource list with the capacity constraints of the provider.
func (p *FargateProvider) Capacity() corev1.ResourceList {
log.Println("Received Capacity request.")
return nil
}
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is polled
// periodically to update the node status within Kubernetes.
func (p *FargateProvider) NodeConditions() []corev1.NodeCondition {
log.Println("Received NodeConditions request.")
return nil
}
// NodeAddresses returns a list of addresses for the node status within Kubernetes.
func (p *FargateProvider) NodeAddresses() []corev1.NodeAddress {
log.Println("Received NodeAddresses request.")
return nil
}
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status within Kubernetes.
func (p *FargateProvider) NodeDaemonEndpoints() *corev1.NodeDaemonEndpoints {
log.Println("Received NodeDaemonEndpoints request.")
return nil
}
// OperatingSystem returns the operating system the provider is for.
func (p *FargateProvider) OperatingSystem() string {
log.Println("Received OperatingSystem request.")
return p.operatingSystem
}

View File

@@ -1,6 +1,7 @@
package vkubelet
import (
"github.com/virtual-kubelet/virtual-kubelet/providers/aws"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure"
"github.com/virtual-kubelet/virtual-kubelet/providers/hypersh"
"github.com/virtual-kubelet/virtual-kubelet/providers/web"
@@ -8,6 +9,7 @@ import (
)
// Compile time proof that our implementations meet the Provider interface.
var _ Provider = (*aws.FargateProvider)(nil)
var _ Provider = (*azure.ACIProvider)(nil)
var _ Provider = (*hypersh.HyperProvider)(nil)
var _ Provider = (*web.BrokerProvider)(nil)
@@ -38,8 +40,8 @@ type Provider interface {
// Capacity returns a resource list with the capacity constraints of the provider.
Capacity() v1.ResourceList
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is polled periodically to update the node status
// within Kubernetes.
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), which is
// polled periodically to update the node status within Kubernetes.
NodeConditions() []v1.NodeCondition
// NodeAddresses returns a list of addresses for the node status

View File

@@ -11,6 +11,7 @@ import (
"time"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers/aws"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure"
"github.com/virtual-kubelet/virtual-kubelet/providers/hypersh"
"github.com/virtual-kubelet/virtual-kubelet/providers/mock"
@@ -38,10 +39,8 @@ type Server struct {
// New creates a new virtual-kubelet server.
func New(nodeName, operatingSystem, namespace, kubeConfig, taint, provider, providerConfig string) (*Server, error) {
var (
config *rest.Config
err error
)
var config *rest.Config
// Check if the kubeConfig file exists.
if _, err := os.Stat(kubeConfig); !os.IsNotExist(err) {
// Get the kubeconfig from the filepath.
@@ -75,6 +74,11 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, taint, provider, prov
var p Provider
switch provider {
case "aws":
p, err = aws.NewFargateProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
if err != nil {
return nil, err
}
case "azure":
p, err = azure.NewACIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
if err != nil {
@@ -143,6 +147,7 @@ func (s *Server) registerNode() error {
"type": "virtual-kubelet",
"kubernetes.io/role": "agent",
"beta.kubernetes.io/os": strings.ToLower(s.provider.OperatingSystem()),
"alpha.service-controller.kubernetes.io/exclude-balancer": "true",
},
},
@@ -245,7 +250,8 @@ func (s *Server) updateNode() {
}
}
// reconcile is the main reconiliation loop that compares differences between Kubernetes and the active provider and reconciles the differences.
// reconcile is the main reconciliation loop that compares differences between Kubernetes and
// the active provider and reconciles the differences.
func (s *Server) reconcile() {
providerPods, err := s.provider.GetPods()
if err != nil {
@@ -320,7 +326,7 @@ func (s *Server) deletePod(pod *corev1.Pod) error {
}
if !errors.IsNotFound(delErr) {
var grace int64 = 0
var grace int64
if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
if errors.IsNotFound(err) {
return nil