Merge pull request #21 from ritazh/feat-logs

Enable kubectl logs
This commit is contained in:
Erik St. Martin
2018-01-04 10:56:35 -05:00
committed by GitHub
14 changed files with 207 additions and 24 deletions

2
Gopkg.lock generated
View File

@@ -403,6 +403,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "e476a3cf3fc7c556db93b5f202d0f578cdca9cda551563756ea30261aff2bd9b"
inputs-digest = "db2f3e11453c573c43ded9e17f9e23e76111609d4936482d19a7d18930e1dcce"
solver-name = "gps-cdcl"
solver-version = 1

View File

@@ -48,3 +48,7 @@
[[constraint]]
name = "github.com/xeipuuv/gojsonschema"
revision = "0c8571ac0ce161a5feb57375a9cdf148c98c0f70"
[[constraint]]
name = "github.com/gorilla/mux"
version = "1.6.0"

View File

@@ -100,8 +100,11 @@ If you want to run the connector from the Azure command-line check out this.
RELEASE_NAME=virtual-kubelet
CHART_URL=https://github.com/virtual-kubelet/virtual-kubelet/raw/master/charts/virtual-kubelet-0.1.0.tgz
curl https://raw.githubusercontent.com/virtual-kubelet/virtual-kubelet/master/scripts/createCertAndKey.sh > createCertAndKey.sh
. createCertAndKey.sh
helm install "$CHART_URL" --name "$RELEASE_NAME" \
--set env.azureClientId=<YOUR-AZURECLIENTID-HERE>,env.azureClientKey=<YOUR-AZURECLIENTKEY-HERE>,env.azureTenantId=<YOUR-AZURETENANTID-HERE>,env.azureSubscriptionId=<YOUR-AZURESUBSCRIPTIONID-HERE>,env.aciResourceGroup=<YOUR-ACIRESOURCEGROUP-HERE>,env.nodeName=<YOUR-NODE-NAME>, env.nodeOsType=<Linux|Windows>,env.nodeTaint=<YOUR-NODE-TAINT>
--set env.azureClientId=<YOUR-AZURECLIENTID-HERE>,env.azureClientKey=<YOUR-AZURECLIENTKEY-HERE>,env.azureTenantId=<YOUR-AZURETENANTID-HERE>,env.azureSubscriptionId=<YOUR-AZURESUBSCRIPTIONID-HERE>,env.aciResourceGroup=<YOUR-ACIRESOURCEGROUP-HERE>,env.nodeName=<YOUR-NODE-NAME>,env.nodeOsType=<Linux|Windows>,env.nodeTaint=<YOUR-NODE-TAINT>,env.apiserverCert=$cert,env.apiserverKey=$key
```
## Providers

View File

@@ -14,19 +14,28 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
env:
- name: KUBELET_PORT
value: "10250"
- name: AZURE_AUTH_LOCATION
value: /etc/virtual-kubelet/credentials.json
- name: ACI_RESOURCE_GROUP
value: {{ .Values.env.aciResourceGroup }}
- name: ACI_REGION
value: {{ default "westus" .Values.env.aciRegion }}
- name: APISERVER_CERT_LOCATION
value: /etc/virtual-kubelet/cert.pem
- name: APISERVER_KEY_LOCATION
value: /etc/virtual-kubelet/key.pem
- name: VKUBELET_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: credentials
mountPath: "/etc/virtual-kubelet"
readOnly: true
command: ["virtual-kubelet"]
args: ["--provider", "azure", "--nodename", {{ default "aci-conn" .Values.env.nodeName | quote }} , "--os", {{ default "Linux" .Values.env.nodeOsType | quote }}, "--taint", {{ default "azure.com/aci" .Values.env.nodeTaint | quote }}]
args: ["--provider", "azure", "--namespace", "default", "--nodename", {{ default "virtual-kubelet" .Values.env.nodeName | quote }} , "--os", {{ default "Linux" .Values.env.nodeOsType | quote }}, "--taint", {{ default "azure.com/aci" .Values.env.nodeTaint | quote }}]
volumes:
- name: credentials
secret:
secretName: {{ template "fullname" . }}
secretName: {{ template "fullname" . }}

View File

@@ -4,4 +4,6 @@ metadata:
name: {{ template "fullname" . }}
type: Opaque
data:
credentials.json: {{ printf "{ \"clientId\": \"%s\", \"clientSecret\": \"%s\", \"subscriptionId\": \"%s\", \"tenantId\": \"%s\", \"activeDirectoryEndpointUrl\": \"https://login.microsoftonline.com/\", \"resourceManagerEndpointUrl\": \"https://management.azure.com/\", \"activeDirectoryGraphResourceId\": \"https://graph.windows.net/\", \"sqlManagementEndpointUrl\": \"database.windows.net\", \"galleryEndpointUrl\": \"https://gallery.azure.com/\", \"managementEndpointUrl\": \"https://management.core.windows.net/\" }" (default "MISSING" .Values.env.azureClientId) (default "MISSING" .Values.env.azureClientKey) (default "MISSING" .Values.env.azureSubscriptionId) (default "MISSING" .Values.env.azureTenantId) | b64enc | quote }}
credentials.json: {{ printf "{ \"clientId\": \"%s\", \"clientSecret\": \"%s\", \"subscriptionId\": \"%s\", \"tenantId\": \"%s\", \"activeDirectoryEndpointUrl\": \"https://login.microsoftonline.com/\", \"resourceManagerEndpointUrl\": \"https://management.azure.com/\", \"activeDirectoryGraphResourceId\": \"https://graph.windows.net/\", \"sqlManagementEndpointUrl\": \"database.windows.net\", \"galleryEndpointUrl\": \"https://gallery.azure.com/\", \"managementEndpointUrl\": \"https://management.core.windows.net/\" }" (default "MISSING" .Values.env.azureClientId) (default "MISSING" .Values.env.azureClientKey) (default "MISSING" .Values.env.azureSubscriptionId) (default "MISSING" .Values.env.azureTenantId) | b64enc | quote }}
cert.pem: {{ (default "MISSING" .Values.env.apiserverCert) | quote }}
key.pem: {{ (default "MISSING" .Values.env.apiserverKey) | quote }}

View File

@@ -11,4 +11,6 @@ env:
aciRegion:
nodeName:
nodeTaint:
nodeOsType:
nodeOsType:
apiserverCert:
apiserverKey:

View File

@@ -7,8 +7,8 @@ import (
"errors"
"fmt"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/virtual-kubelet/virtual-kubelet/manager"
@@ -31,6 +31,8 @@ type ACIProvider struct {
cpu string
memory string
pods string
internalIP string
daemonEndpointPort int32
}
// AuthConfig is the secret returned from an ImageRegistryCredential
@@ -45,7 +47,7 @@ type AuthConfig struct {
}
// NewACIProvider creates a new ACIProvider.
func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operatingSystem string) (*ACIProvider, error) {
func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32) (*ACIProvider, error) {
var p ACIProvider
var err error
@@ -89,6 +91,8 @@ func NewACIProvider(config string, rm *manager.ResourceManager, nodeName, operat
p.operatingSystem = operatingSystem
p.nodeName = nodeName
p.internalIP = internalIP
p.daemonEndpointPort = daemonEndpointPort
return &p, err
}
@@ -177,10 +181,9 @@ func (p *ACIProvider) DeletePod(pod *v1.Pod) error {
// GetPod returns a pod by name that is running inside ACI
// returns nil if a pod by that name is not found.
func (p *ACIProvider) GetPod(namespace, name string) (*v1.Pod, error) {
cg, err := p.aciClient.GetContainerGroup(p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name))
cg, err, status := p.aciClient.GetContainerGroup(p.resourceGroup, fmt.Sprintf("%s-%s", namespace, name))
if err != nil {
// Trap error for 404 and return gracefully
if strings.Contains(err.Error(), "ResourceNotFound") {
if *status == http.StatusNotFound {
return nil, nil
}
return nil, err
@@ -193,6 +196,33 @@ func (p *ACIProvider) GetPod(namespace, name string) (*v1.Pod, error) {
return containerGroupToPod(cg)
}
// GetPodLogs returns the logs of a pod by name that is running inside ACI.
func (p *ACIProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
logContent := ""
cg, err, _ := p.aciClient.GetContainerGroup(p.resourceGroup, fmt.Sprintf("%s-%s", namespace, podName))
if err != nil {
return logContent, err
}
if cg.Tags["NodeName"] != p.nodeName {
return logContent, nil
}
// get logs from cg
retry := 10
for i := 0; i < retry; i++ {
cLogs, err := p.aciClient.GetContainerLogs(p.resourceGroup, cg.Name, containerName, tail)
if err != nil {
log.Println(err)
time.Sleep(5000 * time.Millisecond)
} else {
logContent = cLogs.Content
break
}
}
return logContent, err
}
// GetPodStatus returns the status of a pod by name that is running inside ACI
// returns nil if a pod by that name is not found.
func (p *ACIProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
@@ -290,6 +320,28 @@ func (p *ACIProvider) NodeConditions() []v1.NodeCondition {
}
}
// NodeAddresses returns a list of addresses for the node status
// within Kuberentes.
func (p *ACIProvider) NodeAddresses() []v1.NodeAddress {
// TODO: Make these dynamic and augment with custom ACI specific conditions of interest
return []v1.NodeAddress{
{
Type: "InternalIP",
Address: p.internalIP,
},
}
}
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kuberentes.
func (p *ACIProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
return &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{
Port: p.daemonEndpointPort,
},
}
}
// OperatingSystem returns the operating system that was provided by the config.
func (p *ACIProvider) OperatingSystem() string {
return p.operatingSystem

View File

@@ -163,7 +163,7 @@ func TestCreateContainerGroup(t *testing.T) {
}
func TestGetContainerGroup(t *testing.T) {
cg, err := client.GetContainerGroup(resourceGroup, containerGroup)
cg, err, _ := client.GetContainerGroup(resourceGroup, containerGroup)
if err != nil {
t.Fatal(err)
}

View File

@@ -13,7 +13,7 @@ import (
// GetContainerGroup gets an Azure Container Instance in the provided
// resource group with the given container group name.
// From: https://docs.microsoft.com/en-us/rest/api/container-instances/containergroups/get
func (c *Client) GetContainerGroup(resourceGroup, containerGroupName string) (*ContainerGroup, error) {
func (c *Client) GetContainerGroup(resourceGroup, containerGroupName string) (*ContainerGroup, error, *int) {
urlParams := url.Values{
"api-version": []string{apiVersion},
}
@@ -25,7 +25,7 @@ func (c *Client) GetContainerGroup(resourceGroup, containerGroupName string) (*C
// Create the request.
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, fmt.Errorf("Creating get container group uri request failed: %v", err)
return nil, fmt.Errorf("Creating get container group uri request failed: %v", err), nil
}
// Add the parameters to the url.
@@ -34,29 +34,29 @@ func (c *Client) GetContainerGroup(resourceGroup, containerGroupName string) (*C
"resourceGroup": resourceGroup,
"containerGroupName": containerGroupName,
}); err != nil {
return nil, fmt.Errorf("Expanding URL with parameters failed: %v", err)
return nil, fmt.Errorf("Expanding URL with parameters failed: %v", err), nil
}
// Send the request.
resp, err := c.hc.Do(req)
if err != nil {
return nil, fmt.Errorf("Sending get container group request failed: %v", err)
return nil, fmt.Errorf("Sending get container group request failed: %v", err), &resp.StatusCode
}
defer resp.Body.Close()
// 200 (OK) is a success response.
if err := api.CheckResponse(resp); err != nil {
return nil, err
return nil, err, &resp.StatusCode
}
// Decode the body from the response.
if resp.Body == nil {
return nil, errors.New("Create container group returned an empty body in the response")
return nil, errors.New("Create container group returned an empty body in the response"), &resp.StatusCode
}
var cg ContainerGroup
if err := json.NewDecoder(resp.Body).Decode(&cg); err != nil {
return nil, fmt.Errorf("Decoding get container group response body failed: %v", err)
return nil, fmt.Errorf("Decoding get container group response body failed: %v", err), &resp.StatusCode
}
return &cg, nil
return &cg, nil, &resp.StatusCode
}

View File

@@ -313,6 +313,11 @@ func (p *HyperProvider) GetPod(namespace, name string) (pod *v1.Pod, err error)
}
}
// GetContainerLogs retrieves the logs of a container by name from the provider.
func (p *HyperProvider) GetContainerLogs(namespace, podName, containerName string, tail int) (string, error) {
return "", nil
}
// GetPodStatus returns the status of a pod by name that is running inside hyper.sh
// returns nil if a pod by that name is not found.
func (p *HyperProvider) GetPodStatus(namespace, name string) (*v1.PodStatus, error) {
@@ -411,6 +416,18 @@ func (p *HyperProvider) NodeConditions() []v1.NodeCondition {
}
// NodeAddresses returns a list of addresses for the node status
// within Kuberentes.
func (p *HyperProvider) NodeAddresses() []v1.NodeAddress {
return nil
}
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kuberentes.
func (p *HyperProvider) NodeDaemonEndpoints() *v1.NodeDaemonEndpoints {
return nil
}
// OperatingSystem returns the operating system for this provider.
// This is a noop to default to Linux for now.
func (p *HyperProvider) OperatingSystem() string {

View File

@@ -0,0 +1,5 @@
#!/bin/bash
# Generate cert and key for chart
openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 -keyout key.pem -out cert.pem -subj "/C=US/ST=CA/L=virtualkubelet/O=virtualkubelet/OU=virtualkubelet/CN=virtualkubelet"
cert=$(base64 cert.pem)
key=$(base64 key.pem)

66
vkubelet/apiserver.go Normal file
View File

@@ -0,0 +1,66 @@
package vkubelet
import (
"fmt"
"io"
"log"
"net/http"
"os"
"strconv"
"github.com/gorilla/mux"
)
var p Provider
var r mux.Router
func NotFound(w http.ResponseWriter, r *http.Request) {
log.Println("404 request not found")
http.Error(w, "404 request not found", http.StatusNotFound)
}
func ApiserverStart(provider Provider) {
p = provider
certFilePath := os.Getenv("APISERVER_CERT_LOCATION")
keyFilePath := os.Getenv("APISERVER_KEY_LOCATION")
port := os.Getenv("KUBELET_PORT")
addr := fmt.Sprintf(":%s", port)
r := mux.NewRouter()
r.HandleFunc("/containerLogs/{namespace}/{pod}/{container}", ApiServerHandler).Methods("GET")
r.NotFoundHandler = http.HandlerFunc(NotFound)
err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r)
if err != nil {
log.Println(err)
}
}
func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
if len(vars) == 3 {
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
queryTail := q.Get("tailLines")
if queryTail != "" {
t, err := strconv.Atoi(queryTail)
if err != nil {
log.Println(err)
io.WriteString(w, err.Error())
} else {
tail = t
}
}
podsLogs, err := p.GetContainerLogs(namespace, pod, container, tail)
if err != nil {
log.Println(err)
io.WriteString(w, err.Error())
} else {
io.WriteString(w, podsLogs)
}
} else {
NotFound(w, req)
}
}

View File

@@ -24,7 +24,10 @@ type Provider interface {
// GetPod retrieves a pod by name from the provider (can be cached).
GetPod(namespace, name string) (*v1.Pod, error)
// GetPodStatus retrievesthe status of a pod by name from the provider.
// GetContainerLogs retrieves the logs of a container by name from the provider.
GetContainerLogs(namespace, podName, containerName string, tail int) (string, error)
// GetPodStatus retrieves the status of a pod by name from the provider.
GetPodStatus(namespace, name string) (*v1.PodStatus, error)
// GetPods retrieves a list of all pods running on the provider (can be cached).
@@ -37,6 +40,14 @@ type Provider interface {
// within Kuberentes.
NodeConditions() []v1.NodeCondition
// NodeAddresses returns a list of addresses for the node status
// within Kuberentes.
NodeAddresses() []v1.NodeAddress
// NodeDaemonEndpoints returns NodeDaemonEndpoints for the node status
// within Kuberentes.
NodeDaemonEndpoints() *v1.NodeDaemonEndpoints
// OperatingSystem returns the operating system the provider is for.
OperatingSystem() string
}

View File

@@ -1,11 +1,12 @@
package vkubelet
import (
"strings"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
@@ -64,7 +65,14 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, taint, provider, prov
var p Provider
switch provider {
case "azure":
p, err = azure.NewACIProvider(providerConfig, rm, nodeName, operatingSystem)
internalIP := os.Getenv("VKUBELET_POD_IP")
daemonEndpointPortEnv := os.Getenv("KUBELET_PORT")
i64value, err := strconv.ParseInt(daemonEndpointPortEnv, 10, 32)
daemonEndpointPort := int32(i64value)
if err != nil {
return nil, err
}
p, err = azure.NewACIProvider(providerConfig, rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
if err != nil {
return nil, err
}
@@ -90,6 +98,8 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, taint, provider, prov
return s, err
}
go ApiserverStart(p)
tick := time.Tick(5 * time.Second)
go func() {
for range tick {
@@ -136,6 +146,8 @@ func (s *Server) registerNode() error {
Capacity: s.provider.Capacity(),
Allocatable: s.provider.Capacity(),
Conditions: s.provider.NodeConditions(),
Addresses: s.provider.NodeAddresses(),
DaemonEndpoints: *s.provider.NodeDaemonEndpoints(),
},
}