Add HashiCorp Nomad provider (#483)
* provider: adding Nomad provider * updating CONTRIBUTING.md with Nomad provider * updated README.md by adding the Nomad provider * fix typo * adding nomad/api and nomad/testutil deps * adding Nomad binary dependency for provider tests * fixed the nomad binary download command step and added tolerations to the nomad provider. * adding nomad provider demo gif * adding my name to authors * adding two missing go-rootcerts files after dep ensure * delete pod comment
This commit is contained in:
committed by
Robbie Zhang
parent
5796be449b
commit
a46e1dd2ce
119
providers/nomad/README.md
Normal file
119
providers/nomad/README.md
Normal file
@@ -0,0 +1,119 @@
|
||||
# HashiCorp Nomad Provider for Virtual Kubelet
|
||||
|
||||
HashiCorp Nomad provider for Virtual Kubelet connects your Kubernetes cluster
|
||||
with Nomad cluster by exposing the Nomad cluster as a node in Kubernetes. By
|
||||
using the provider, pods that are scheduled on the virtual Nomad node
|
||||
registered on Kubernetes will run as jobs on Nomad clients as they
|
||||
would on a Kubernetes node.
|
||||
|
||||
**This is an experimental project. This project isn't production ready.**
|
||||
|
||||
## Demo
|
||||
|
||||

|
||||
|
||||
## Prerequisites
|
||||
|
||||
This guide assumes the following:
|
||||
|
||||
* A Nomad cluster up and running.
|
||||
* A Kubernetes cluster up and running.
|
||||
* The Nomad API is accessible from the Kubernetes cluster.
|
||||
* [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/#install-kubectl) installed.
|
||||
|
||||
## Usage
|
||||
|
||||
The Nomad provider accepts the following two environment variables:
|
||||
|
||||
* `NOMAD_ADDR` - The Nomad API address. Set to `127.0.0.1:4646` by default.
|
||||
* `NOMAD_REGION` - The Nomad region. Set to `global` by default.
|
||||
|
||||
```bash
|
||||
export NOMAD_ADDR="127.0.0.1:4646"
|
||||
export NOMAD_REGION="global"
|
||||
```
|
||||
|
||||
### Run Virtual Kubelet with Nomad Provider
|
||||
|
||||
```bash
|
||||
VK_TAINT_KEY="hashicorp.com/nomad" ./virtual-kubelet --provider="nomad"
|
||||
```
|
||||
|
||||
Validate that the virtual kubelet node is registered.
|
||||
|
||||
```bash
|
||||
kubectl get nodes
|
||||
```
|
||||
|
||||
Expected output.
|
||||
|
||||
```bash
|
||||
NAME STATUS ROLES AGE VERSION
|
||||
minikube Ready master 55d v1.10.0
|
||||
virtual-kubelet Ready agent 1m v1.13.1-vk-N/A
|
||||
```
|
||||
|
||||
### Create a Pod in Kubernetes
|
||||
|
||||
```bash
|
||||
kubectl apply -f pods/nginx-pod.yaml
|
||||
```
|
||||
|
||||
Validate pod.
|
||||
|
||||
```bash
|
||||
kubectl get pods
|
||||
```
|
||||
|
||||
Expected output.
|
||||
|
||||
```bash
|
||||
NAME READY STATUS RESTARTS AGE
|
||||
nginx 1/1 Running 0 1m
|
||||
```
|
||||
|
||||
Validate Nomad job.
|
||||
|
||||
```bash
|
||||
nomad status
|
||||
```
|
||||
|
||||
Expected output.
|
||||
|
||||
```bash
|
||||
ID Type Priority Status Submit Date
|
||||
nomad-virtual-kubelet-nginx service 100 running 2018-12-31T16:52:52+05:30
|
||||
```
|
||||
|
||||
### Configuration Options
|
||||
|
||||
The Nomad provider has support for annotations to define Nomad [datacenters](https://www.nomadproject.io/docs/job-specification/job.html#datacenters).
|
||||
|
||||
Here is an example usage of the Nomad datacenter annotations in a pod spec.
|
||||
|
||||
```yaml
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
name: nginx
|
||||
annotations:
|
||||
"nomad.hashicorp.com/datacenters": "us-east1,us-west1"
|
||||
spec:
|
||||
containers:
|
||||
- image: nginx
|
||||
imagePullPolicy: Always
|
||||
name: nginx
|
||||
ports:
|
||||
- containerPort: 80
|
||||
- containerPort: 443
|
||||
dnsPolicy: ClusterFirst
|
||||
nodeSelector:
|
||||
kubernetes.io/role: agent
|
||||
beta.kubernetes.io/os: linux
|
||||
type: virtual-kubelet
|
||||
tolerations:
|
||||
- key: virtual-kubelet.io/provider
|
||||
operator: Exists
|
||||
- key: hashicorp.com/nomad
|
||||
effect: NoSchedule
|
||||
```
|
||||
297
providers/nomad/helpers.go
Normal file
297
providers/nomad/helpers.go
Normal file
@@ -0,0 +1,297 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
nomad "github.com/hashicorp/nomad/api"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// createNomadTasks takes the containers in a kubernetes pod and creates
|
||||
// a list of Nomad tasks from them.
|
||||
func (p *Provider) createNomadTasks(pod *v1.Pod) []*nomad.Task {
|
||||
nomadTasks := make([]*nomad.Task, 0, len(pod.Spec.Containers))
|
||||
|
||||
for _, ctr := range pod.Spec.Containers {
|
||||
portMap, networkResourcess := createPortMap(ctr.Ports)
|
||||
image := ctr.Image
|
||||
labels := pod.Labels
|
||||
command := ctr.Command
|
||||
args := ctr.Args
|
||||
resources := createResources(ctr.Resources.Limits, networkResourcess)
|
||||
envVars := createEnvVars(ctr.Env)
|
||||
task := nomad.Task{
|
||||
Name: ctr.Name,
|
||||
Driver: "docker",
|
||||
Config: map[string]interface{}{
|
||||
"image": image,
|
||||
"port_map": portMap,
|
||||
"labels": labels,
|
||||
// TODO: Add volumes support
|
||||
"command": strings.Join(command, ""),
|
||||
"args": args,
|
||||
},
|
||||
Resources: resources,
|
||||
Env: envVars,
|
||||
}
|
||||
nomadTasks = append(nomadTasks, &task)
|
||||
}
|
||||
|
||||
return nomadTasks
|
||||
}
|
||||
|
||||
func createPortMap(ports []v1.ContainerPort) ([]map[string]interface{}, []*nomad.NetworkResource) {
|
||||
var portMap []map[string]interface{}
|
||||
var dynamicPorts []nomad.Port
|
||||
var networkResources []*nomad.NetworkResource
|
||||
|
||||
for i, port := range ports {
|
||||
portName := fmt.Sprintf("port_%s", strconv.Itoa(i+1))
|
||||
if port.Name != "" {
|
||||
portName = port.Name
|
||||
}
|
||||
portMap = append(portMap, map[string]interface{}{portName: port.ContainerPort})
|
||||
dynamicPorts = append(dynamicPorts, nomad.Port{Label: portName})
|
||||
}
|
||||
|
||||
return portMap, append(networkResources, &nomad.NetworkResource{DynamicPorts: dynamicPorts})
|
||||
}
|
||||
|
||||
func createResources(limits v1.ResourceList, networkResources []*nomad.NetworkResource) *nomad.Resources {
|
||||
taskMemory := int(limits.Memory().Value())
|
||||
taskCPU := int(limits.Cpu().Value())
|
||||
|
||||
if taskMemory == 0 {
|
||||
taskMemory = 128
|
||||
}
|
||||
|
||||
if taskCPU == 0 {
|
||||
taskCPU = 100
|
||||
}
|
||||
|
||||
return &nomad.Resources{
|
||||
Networks: networkResources,
|
||||
MemoryMB: &taskMemory,
|
||||
CPU: &taskCPU,
|
||||
}
|
||||
}
|
||||
|
||||
func createEnvVars(podEnvVars []v1.EnvVar) map[string]string {
|
||||
envVars := map[string]string{}
|
||||
|
||||
for _, v := range podEnvVars {
|
||||
envVars[v.Name] = v.Value
|
||||
}
|
||||
return envVars
|
||||
}
|
||||
|
||||
func (p *Provider) createTaskGroups(name string, tasks []*nomad.Task) []*nomad.TaskGroup {
|
||||
count := 1
|
||||
restartDelay := 1 * time.Second
|
||||
restartMode := "delay"
|
||||
restartAttempts := 25
|
||||
|
||||
return []*nomad.TaskGroup{
|
||||
&nomad.TaskGroup{
|
||||
Name: &name,
|
||||
Count: &count,
|
||||
RestartPolicy: &nomad.RestartPolicy{
|
||||
Delay: &restartDelay,
|
||||
Mode: &restartMode,
|
||||
Attempts: &restartAttempts,
|
||||
},
|
||||
Tasks: tasks,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) createJob(name string, datacenters []string, taskGroups []*nomad.TaskGroup) *nomad.Job {
|
||||
jobName := fmt.Sprintf("%s-%s", jobNamePrefix, name)
|
||||
|
||||
// Create a new nomad job
|
||||
job := nomad.NewServiceJob(jobName, jobName, p.nomadRegion, 100)
|
||||
|
||||
job.Datacenters = datacenters
|
||||
job.TaskGroups = taskGroups
|
||||
|
||||
return job
|
||||
}
|
||||
|
||||
func (p *Provider) jobToPod(job *nomad.Job, allocs []*nomad.AllocationListStub) (*v1.Pod, error) {
|
||||
containers := []v1.Container{}
|
||||
containerStatues := []v1.ContainerStatus{}
|
||||
jobStatus := *job.Status
|
||||
jobCreatedAt := *job.SubmitTime
|
||||
podCondition := convertJobStatusToPodCondition(jobStatus)
|
||||
containerStatusesMap := allocToContainerStatuses(allocs)
|
||||
|
||||
// containerPorts are specified for task in a task
|
||||
// group
|
||||
var containerPorts []v1.ContainerPort
|
||||
for _, tg := range job.TaskGroups {
|
||||
for _, task := range tg.Tasks {
|
||||
for _, taskNetwork := range task.Resources.Networks {
|
||||
for _, dynamicPort := range taskNetwork.DynamicPorts {
|
||||
// TODO: Dynamic ports aren't being reported via the
|
||||
// Nomad `/jobs` endpoint.
|
||||
containerPorts = append(containerPorts, v1.ContainerPort{
|
||||
Name: dynamicPort.Label,
|
||||
HostPort: int32(dynamicPort.Value),
|
||||
HostIP: taskNetwork.IP,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
containers = append(containers, v1.Container{
|
||||
Name: task.Name,
|
||||
Image: fmt.Sprintf("%s", task.Config["image"]),
|
||||
Command: strings.Split(fmt.Sprintf("%s", task.Config["command"]), ""),
|
||||
Args: strings.Split(fmt.Sprintf("%s", task.Config["args"]), " "),
|
||||
Ports: containerPorts,
|
||||
})
|
||||
|
||||
containerStatus := containerStatusesMap[task.Name]
|
||||
containerStatus.Image = fmt.Sprintf("%s", task.Config["image"])
|
||||
containerStatus.ImageID = fmt.Sprintf("%s", task.Config["image"])
|
||||
containerStatues = append(containerStatues, containerStatus)
|
||||
}
|
||||
}
|
||||
|
||||
pod := v1.Pod{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "Pod",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: *job.Name,
|
||||
Namespace: "default",
|
||||
CreationTimestamp: metav1.NewTime(time.Unix(jobCreatedAt, 0)),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: p.nodeName,
|
||||
Volumes: []v1.Volume{},
|
||||
Containers: containers,
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: jobStatusToPodPhase(jobStatus),
|
||||
Conditions: []v1.PodCondition{podCondition},
|
||||
Message: "",
|
||||
Reason: "",
|
||||
HostIP: "", // TODO: find out the HostIP
|
||||
PodIP: "", // TODO: find out the equalent for PodIP
|
||||
ContainerStatuses: containerStatues,
|
||||
},
|
||||
}
|
||||
|
||||
return &pod, nil
|
||||
}
|
||||
|
||||
func allocToContainerStatuses(allocs []*nomad.AllocationListStub) map[string]v1.ContainerStatus {
|
||||
containerStatusesMap := map[string]v1.ContainerStatus{}
|
||||
|
||||
for _, alloc := range allocs {
|
||||
for name, taskState := range alloc.TaskStates {
|
||||
containerState, readyFlag := convertTaskStateToContainerState(taskState.State,
|
||||
taskState.StartedAt,
|
||||
taskState.FinishedAt,
|
||||
)
|
||||
containerStatusesMap[name] = v1.ContainerStatus{
|
||||
Name: name,
|
||||
RestartCount: int32(taskState.Restarts),
|
||||
Ready: readyFlag,
|
||||
State: containerState,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return containerStatusesMap
|
||||
}
|
||||
|
||||
func jobStatusToPodPhase(status string) v1.PodPhase {
|
||||
switch status {
|
||||
case "pending":
|
||||
return v1.PodPending
|
||||
case "running":
|
||||
return v1.PodRunning
|
||||
// TODO: Make sure we take PodFailed into account.
|
||||
case "dead":
|
||||
return v1.PodFailed
|
||||
}
|
||||
return v1.PodUnknown
|
||||
}
|
||||
|
||||
func convertJobStatusToPodCondition(jobStatus string) v1.PodCondition {
|
||||
podCondition := v1.PodCondition{}
|
||||
|
||||
switch jobStatus {
|
||||
case "pending":
|
||||
podCondition = v1.PodCondition{
|
||||
Type: v1.PodInitialized,
|
||||
Status: v1.ConditionFalse,
|
||||
}
|
||||
case "running":
|
||||
podCondition = v1.PodCondition{
|
||||
Type: v1.PodReady,
|
||||
Status: v1.ConditionTrue,
|
||||
}
|
||||
case "dead":
|
||||
podCondition = v1.PodCondition{
|
||||
Type: v1.PodReasonUnschedulable,
|
||||
Status: v1.ConditionFalse,
|
||||
}
|
||||
default:
|
||||
podCondition = v1.PodCondition{
|
||||
Type: v1.PodReasonUnschedulable,
|
||||
Status: v1.ConditionUnknown,
|
||||
}
|
||||
}
|
||||
|
||||
return podCondition
|
||||
}
|
||||
|
||||
func convertTaskStateToContainerState(taskState string, startedAt time.Time, finishedAt time.Time) (v1.ContainerState, bool) {
|
||||
containerState := v1.ContainerState{}
|
||||
readyFlag := false
|
||||
|
||||
switch taskState {
|
||||
case "pending":
|
||||
containerState = v1.ContainerState{
|
||||
Waiting: &v1.ContainerStateWaiting{},
|
||||
}
|
||||
case "running":
|
||||
containerState = v1.ContainerState{
|
||||
Running: &v1.ContainerStateRunning{
|
||||
StartedAt: metav1.NewTime(startedAt),
|
||||
},
|
||||
}
|
||||
readyFlag = true
|
||||
// TODO: Make sure containers that are exiting with non-zero status codes
|
||||
// are accounted for using events or something similar?
|
||||
//case v1.PodSucceeded:
|
||||
// podCondition = v1.PodCondition{
|
||||
// Type: v1.PodReasonUnschedulable,
|
||||
// Status: v1.ConditionFalse,
|
||||
// }
|
||||
// containerState = v1.ContainerState{
|
||||
// Terminated: &v1.ContainerStateTerminated{
|
||||
// ExitCode: int32(container.State.ExitCode),
|
||||
// FinishedAt: metav1.NewTime(finishedAt),
|
||||
// },
|
||||
// }
|
||||
case "dead":
|
||||
containerState = v1.ContainerState{
|
||||
Terminated: &v1.ContainerStateTerminated{
|
||||
ExitCode: 0,
|
||||
FinishedAt: metav1.NewTime(finishedAt),
|
||||
},
|
||||
}
|
||||
default:
|
||||
containerState = v1.ContainerState{}
|
||||
}
|
||||
|
||||
return containerState, readyFlag
|
||||
}
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 1.7 MiB |
292
providers/nomad/nomad.go
Normal file
292
providers/nomad/nomad.go
Normal file
@@ -0,0 +1,292 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/virtual-kubelet/virtual-kubelet/manager"
|
||||
"github.com/virtual-kubelet/virtual-kubelet/providers"
|
||||
|
||||
nomad "github.com/hashicorp/nomad/api"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
apitypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/tools/remotecommand"
|
||||
)
|
||||
|
||||
// Nomad provider constants
|
||||
const (
|
||||
jobNamePrefix = "nomad-virtual-kubelet"
|
||||
nomadDatacentersAnnotation = "nomad.hashicorp.com/datacenters"
|
||||
defaultNomadAddress = "127.0.0.1:4646"
|
||||
defaultNomadDatacenter = "dc1"
|
||||
defaultNomadRegion = "global"
|
||||
)
|
||||
|
||||
// Provider implements the virtual-kubelet provider interface and communicates with the Nomad API.
|
||||
type Provider struct {
|
||||
nomadClient *nomad.Client
|
||||
resourceManager *manager.ResourceManager
|
||||
nodeName string
|
||||
operatingSystem string
|
||||
nomadAddress string
|
||||
nomadRegion string
|
||||
cpu string
|
||||
memory string
|
||||
pods string
|
||||
}
|
||||
|
||||
// NewProvider creates a new Provider
|
||||
func NewProvider(rm *manager.ResourceManager, nodeName, operatingSystem string) (*Provider, error) {
|
||||
p := Provider{}
|
||||
p.resourceManager = rm
|
||||
p.nodeName = nodeName
|
||||
p.operatingSystem = operatingSystem
|
||||
p.nomadAddress = os.Getenv("NOMAD_ADDR")
|
||||
p.nomadRegion = os.Getenv("NOMAD_REGION")
|
||||
|
||||
if p.nomadAddress == "" {
|
||||
p.nomadAddress = defaultNomadAddress
|
||||
}
|
||||
|
||||
if p.nomadRegion == "" {
|
||||
p.nomadRegion = defaultNomadRegion
|
||||
}
|
||||
|
||||
c := nomad.DefaultConfig()
|
||||
log.Printf("nomad client address: %s", p.nomadAddress)
|
||||
nomadClient, err := nomad.NewClient(c.ClientConfig(p.nomadRegion, p.nomadAddress, false))
|
||||
if err != nil {
|
||||
log.Printf("Unable to create nomad client: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.nomadClient = nomadClient
|
||||
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
// CreatePod accepts a Pod definition and creates
|
||||
// a Nomad job
|
||||
func (p *Provider) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
log.Printf("CreatePod %q\n", pod.Name)
|
||||
|
||||
// Ignore daemonSet Pod
|
||||
if pod != nil && pod.OwnerReferences != nil && len(pod.OwnerReferences) != 0 && pod.OwnerReferences[0].Kind == "DaemonSet" {
|
||||
log.Printf("Skip to create DaemonSet pod %q\n", pod.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Default datacenter name
|
||||
datacenters := []string{defaultNomadDatacenter}
|
||||
nomadDatacenters := pod.Annotations[nomadDatacentersAnnotation]
|
||||
if nomadDatacenters != "" {
|
||||
datacenters = strings.Split(nomadDatacenters, ",")
|
||||
}
|
||||
|
||||
// Create a list of nomad tasks
|
||||
nomadTasks := p.createNomadTasks(pod)
|
||||
taskGroups := p.createTaskGroups(pod.Name, nomadTasks)
|
||||
job := p.createJob(pod.Name, datacenters, taskGroups)
|
||||
|
||||
// Register nomad job
|
||||
_, _, err := p.nomadClient.Jobs().Register(job, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't start nomad job: %q", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdatePod is a noop, nomad does not support live updates of a pod.
|
||||
func (p *Provider) UpdatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
log.Println("Pod Update called: No-op as not implemented")
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeletePod accepts a Pod definition and deletes a Nomad job.
|
||||
func (p *Provider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) {
|
||||
// Deregister job
|
||||
response, _, err := p.nomadClient.Jobs().Deregister(pod.Name, true, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't stop or deregister nomad job: %s: %s", response, err)
|
||||
}
|
||||
|
||||
log.Printf("deregistered nomad job %q response %q\n", pod.Name, response)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPod returns the pod running in the Nomad cluster. returns nil
|
||||
// if pod is not found.
|
||||
func (p *Provider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) {
|
||||
jobID := fmt.Sprintf("%s-%s", jobNamePrefix, name)
|
||||
|
||||
// Get nomad job
|
||||
job, _, err := p.nomadClient.Jobs().Info(jobID, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't retrieve nomad job: %s", err)
|
||||
}
|
||||
|
||||
// Get nomad job allocations to get individual task statuses
|
||||
jobAllocs, _, err := p.nomadClient.Jobs().Allocations(jobID, false, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't retrieve nomad job allocations: %s", err)
|
||||
}
|
||||
|
||||
// Change a nomad job into a kubernetes pod
|
||||
pod, err = p.jobToPod(job, jobAllocs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't convert a nomad job into a pod: %s", err)
|
||||
}
|
||||
|
||||
return pod, nil
|
||||
}
|
||||
|
||||
// GetContainerLogs retrieves the logs of a container by name from the provider.
|
||||
func (p *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, tail int) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// GetPodFullName as defined in the provider context
|
||||
func (p *Provider) GetPodFullName(ctx context.Context, namespace string, pod string) string {
|
||||
return fmt.Sprintf("%s-%s", jobNamePrefix, pod)
|
||||
}
|
||||
|
||||
// ExecInContainer executes a command in a container in the pod, copying data
|
||||
// between in/out/err and the container's stdin/stdout/stderr.
|
||||
// TODO: Implementation
|
||||
func (p *Provider) ExecInContainer(name string, uid apitypes.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
|
||||
log.Printf("ExecInContainer %q\n", container)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPodStatus returns the status of a pod by name that is running as a job
|
||||
// in the Nomad cluster returns nil if a pod by that name is not found.
|
||||
func (p *Provider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) {
|
||||
pod, err := p.GetPod(ctx, namespace, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pod.Status, nil
|
||||
}
|
||||
|
||||
// GetPods returns a list of all pods known to be running in Nomad nodes.
|
||||
func (p *Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
|
||||
log.Printf("GetPods\n")
|
||||
jobsList, _, err := p.nomadClient.Jobs().PrefixList(jobNamePrefix)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't get job list from nomad: %s", err)
|
||||
}
|
||||
|
||||
var pods = []*v1.Pod{}
|
||||
for _, job := range jobsList {
|
||||
// Get nomad job
|
||||
j, _, err := p.nomadClient.Jobs().Info(job.ID, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't retrieve nomad job: %s", err)
|
||||
}
|
||||
|
||||
// Get nomad job allocations to get individual task statuses
|
||||
jobAllocs, _, err := p.nomadClient.Jobs().Allocations(job.ID, false, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't retrieve nomad job allocations: %s", err)
|
||||
}
|
||||
|
||||
// Change a nomad job into a kubernetes pod
|
||||
pod, err := p.jobToPod(j, jobAllocs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't convert a nomad job into a pod: %s", err)
|
||||
}
|
||||
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
// Capacity returns a resource list containing the capacity limits set for Nomad.
|
||||
func (p *Provider) Capacity(ctx context.Context) v1.ResourceList {
|
||||
// TODO: Use nomad /nodes api to get a list of nodes in the cluster
|
||||
// and then use the read node /node/:node_id endpoint to calculate
|
||||
// the total resources of the cluster to report back to kubernetes.
|
||||
return v1.ResourceList{
|
||||
"cpu": resource.MustParse("20"),
|
||||
"memory": resource.MustParse("100Gi"),
|
||||
"pods": resource.MustParse("20"),
|
||||
}
|
||||
}
|
||||
|
||||
// NodeConditions returns a list of conditions (Ready, OutOfDisk, etc), for updates to the node status
|
||||
// within Kubernetes.
|
||||
func (p *Provider) NodeConditions(ctx context.Context) []v1.NodeCondition {
|
||||
// TODO: Make these dynamic.
|
||||
return []v1.NodeCondition{
|
||||
{
|
||||
Type: "Ready",
|
||||
Status: v1.ConditionTrue,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletReady",
|
||||
Message: "kubelet is ready.",
|
||||
},
|
||||
{
|
||||
Type: "OutOfDisk",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletHasSufficientDisk",
|
||||
Message: "kubelet has sufficient disk space available",
|
||||
},
|
||||
{
|
||||
Type: "MemoryPressure",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletHasSufficientMemory",
|
||||
Message: "kubelet has sufficient memory available",
|
||||
},
|
||||
{
|
||||
Type: "DiskPressure",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "KubeletHasNoDiskPressure",
|
||||
Message: "kubelet has no disk pressure",
|
||||
},
|
||||
{
|
||||
Type: "NetworkUnavailable",
|
||||
Status: v1.ConditionFalse,
|
||||
LastHeartbeatTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "RouteCreated",
|
||||
Message: "RouteController created a route",
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// NodeAddresses returns a list of addresses for the node status
|
||||
// within Kubernetes.
|
||||
func (p *Provider) NodeAddresses(ctx context.Context) []v1.NodeAddress {
|
||||
// TODO: Use nomad api to get a list of node addresses.
|
||||
return nil
|
||||
}
|
||||
|
||||
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
|
||||
// within Kubernetes.
|
||||
func (p *Provider) NodeDaemonEndpoints(ctx context.Context) *v1.NodeDaemonEndpoints {
|
||||
return &v1.NodeDaemonEndpoints{}
|
||||
}
|
||||
|
||||
// OperatingSystem returns the operating system for this provider.
|
||||
// This is a noop to default to Linux for now.
|
||||
func (p *Provider) OperatingSystem() string {
|
||||
return providers.OperatingSystemLinux
|
||||
}
|
||||
125
providers/nomad/nomad_test.go
Normal file
125
providers/nomad/nomad_test.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
nomad "github.com/hashicorp/nomad/api"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
)
|
||||
|
||||
// Client provides a client to the Nomad API
|
||||
type Client struct {
|
||||
config nomad.Config
|
||||
}
|
||||
|
||||
func TestCreateGetDeletePod(t *testing.T) {
|
||||
provider, err := makeProvider(t)
|
||||
if err != nil {
|
||||
t.Fatal("unable to create mock provider", err)
|
||||
}
|
||||
|
||||
nomadClient, nomadServer := makeClient(t, nil)
|
||||
defer nomadServer.Stop()
|
||||
|
||||
provider.nomadClient = nomadClient
|
||||
provider.nomadAddress = nomadServer.HTTPAddr
|
||||
|
||||
podName := "pod-" + uuid.New().String()
|
||||
podNamespace := "ns-" + uuid.New().String()
|
||||
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
v1.Container{
|
||||
Name: "nginx",
|
||||
Image: "nginx",
|
||||
LivenessProbe: &v1.Probe{
|
||||
Handler: v1.Handler{
|
||||
HTTPGet: &v1.HTTPGetAction{
|
||||
Port: intstr.FromString("8080"),
|
||||
Path: "/",
|
||||
},
|
||||
},
|
||||
InitialDelaySeconds: 10,
|
||||
PeriodSeconds: 5,
|
||||
TimeoutSeconds: 60,
|
||||
SuccessThreshold: 3,
|
||||
FailureThreshold: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Create pod
|
||||
err = provider.CreatePod(context.Background(), pod)
|
||||
if err != nil {
|
||||
t.Fatal("failed to create pod", err)
|
||||
}
|
||||
|
||||
// Get pod
|
||||
pod, err = provider.GetPod(context.Background(), podNamespace, podName)
|
||||
if err != nil {
|
||||
t.Fatal("failed to get pod", err)
|
||||
}
|
||||
|
||||
// Get pod tests
|
||||
// Validate pod spec
|
||||
assert.NotNil(t, pod, "pod cannot be nil")
|
||||
assert.NotNil(t, pod.Spec.Containers, "containers cannot be nil")
|
||||
assert.Nil(t, pod.Annotations, "pod annotations should be nil")
|
||||
assert.Equal(t, pod.Name, fmt.Sprintf("%s-%s", jobNamePrefix, podName), "pod name should be equal")
|
||||
|
||||
// Get pods
|
||||
pods, err := provider.GetPods(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal("failed to get pods", err)
|
||||
}
|
||||
|
||||
// TODO: finish adding a few more assertions
|
||||
assert.Len(t, pods, 1, "number of pods should be 1")
|
||||
|
||||
// Delete pod
|
||||
err = provider.DeletePod(context.Background(), pod)
|
||||
if err != nil {
|
||||
t.Fatal("failed to delete pod", err)
|
||||
}
|
||||
}
|
||||
|
||||
func makeClient(t *testing.T, cb testutil.ServerConfigCallback) (*nomad.Client, *testutil.TestServer) {
|
||||
// Make client config
|
||||
conf := nomad.DefaultConfig()
|
||||
// Create server
|
||||
server := testutil.NewTestServer(t, cb)
|
||||
conf.Address = "http://" + server.HTTPAddr
|
||||
// Create client
|
||||
client, err := nomad.NewClient(conf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
return client, server
|
||||
}
|
||||
|
||||
func makeProvider(t *testing.T) (*Provider, error) {
|
||||
// Set default region
|
||||
os.Setenv("NOMAD_REGION", "global")
|
||||
|
||||
provider, err := NewProvider(nil, "fakeNomadNode", "linux")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return provider, nil
|
||||
}
|
||||
22
providers/nomad/pods/nginx-pod.yaml
Normal file
22
providers/nomad/pods/nginx-pod.yaml
Normal file
@@ -0,0 +1,22 @@
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
name: nginx
|
||||
spec:
|
||||
containers:
|
||||
- image: nginx
|
||||
imagePullPolicy: Always
|
||||
name: nginx
|
||||
ports:
|
||||
- containerPort: 80
|
||||
- containerPort: 443
|
||||
dnsPolicy: ClusterFirst
|
||||
nodeSelector:
|
||||
kubernetes.io/role: agent
|
||||
beta.kubernetes.io/os: linux
|
||||
type: virtual-kubelet
|
||||
tolerations:
|
||||
- key: virtual-kubelet.io/provider
|
||||
operator: Exists
|
||||
- key: hashicorp.com/nomad
|
||||
effect: NoSchedule
|
||||
Reference in New Issue
Block a user