Use standard logging package (#323)

This commit is contained in:
Brian Goff
2018-08-17 16:50:24 -07:00
committed by Robbie Zhang
parent d7f97b9bfc
commit 1e774a32b3
14 changed files with 287 additions and 130 deletions

View File

@@ -85,6 +85,9 @@ spec:
"--provider", "{{ required "provider is required" .Values.provider }}",
"--namespace", "{{ .Values.monitoredNamespace }}",
"--nodename", "{{ required "nodeName is required" .Values.nodeName }}",
{{- if .Values.logLevel }}
"--log-level", "{{.Values.logLevel}}",
{{- end }}
"--os", "{{ .Values.nodeOsType }}"
]
volumes:

View File

@@ -10,6 +10,7 @@ nodeOsType: "Linux"
monitoredNamespace: ""
apiserverCert:
apiserverKey:
logLevel:
taint:
enabled: true

View File

@@ -15,15 +15,16 @@
package cmd
import (
"fmt"
"log"
"context"
"os"
"path/filepath"
"strings"
"github.com/Sirupsen/logrus"
homedir "github.com/mitchellh/go-homedir"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/providers"
vkubelet "github.com/virtual-kubelet/virtual-kubelet/vkubelet"
corev1 "k8s.io/api/core/v1"
@@ -38,6 +39,7 @@ var provider string
var providerConfig string
var taintKey string
var disableTaint bool
var logLevel string
// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
@@ -47,12 +49,13 @@ var RootCmd = &cobra.Command{
backend implementation allowing users to create kubernetes nodes without running the kubelet.
This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Println(kubeConfig)
f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint)
if err != nil {
log.Fatal(err)
log.L.WithError(err).Fatal("Error initializing vritual kubelet")
}
if err := f.Run(context.Background()); err != nil {
log.L.Fatal(err)
}
f.Run()
},
}
@@ -60,8 +63,7 @@ This allows users to schedule kubernetes workloads on nodes that aren't running
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
if err := RootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
log.GetLogger(context.TODO()).WithError(err).Fatal("Error executing root command")
}
}
@@ -87,6 +89,7 @@ func init() {
RootCmd.PersistentFlags().StringVar(&providerConfig, "provider-config", "", "cloud provider configuration file")
RootCmd.PersistentFlags().StringVar(&taintKey, "taint", "", "Set node taint key")
RootCmd.PersistentFlags().MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable")
RootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", `set the log level, e.g. "trace", debug", "info", "warn", "error"`)
// Cobra also supports local flags, which will only run
// when this action is called directly.
@@ -96,15 +99,13 @@ func init() {
// initConfig reads in config file and ENV variables if set.
func initConfig() {
if provider == "" {
fmt.Println("You must supply a cloud provider option: use --provider")
os.Exit(1)
log.G(context.TODO()).Fatal("You must supply a cloud provider option: use --provider")
}
// Find home directory.
home, err := homedir.Dir()
if err != nil {
fmt.Println(err)
os.Exit(1)
log.G(context.TODO()).WithError(err).Fatal("Error reading homedir")
}
if kubeletConfig != "" {
@@ -120,7 +121,7 @@ func initConfig() {
// If a config file is found, read it in.
if err := viper.ReadInConfig(); err == nil {
fmt.Println("Using config file:", viper.ConfigFileUsed())
log.G(context.TODO()).Debugf("Using config file %s", viper.ConfigFileUsed())
}
if kubeConfig == "" {
@@ -135,7 +136,20 @@ func initConfig() {
// Validate operating system.
ok, _ := providers.ValidOperatingSystems[operatingSystem]
if !ok {
fmt.Printf("Operating system '%s' not supported. Valid options are %s\n", operatingSystem, strings.Join(providers.ValidOperatingSystems.Names(), " | "))
os.Exit(1)
log.G(context.TODO()).WithField("OperatingSystem", operatingSystem).Fatalf("Operating system not supported. Valid options are: %s", strings.Join(providers.ValidOperatingSystems.Names(), " | "))
}
level, err := log.ParseLevel(logLevel)
if err != nil {
log.G(context.TODO()).WithField("logLevel", logLevel).Fatal("log level is not supported")
}
logger := log.L.WithFields(logrus.Fields{
"provider": provider,
"operatingSystem": operatingSystem,
"node": nodeName,
"namespace": kubeNamespace,
})
logger.Level = level
log.L = logger
}

90
log/log.go Normal file
View File

@@ -0,0 +1,90 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package log
import (
"context"
"sync/atomic"
"github.com/Sirupsen/logrus"
)
var (
// G is an alias for GetLogger.
//
// We may want to define this locally to a package to get package tagged log
// messages.
G = GetLogger
// L is an alias for the the standard logger.
L = logrus.NewEntry(logrus.StandardLogger())
)
type (
loggerKey struct{}
)
// TraceLevel is the log level for tracing. Trace level is lower than debug level,
// and is usually used to trace detailed behavior of the program.
const TraceLevel = logrus.Level(uint32(logrus.DebugLevel + 1))
// RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to
// ensure the formatted time is always the same number of characters.
const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
// ParseLevel takes a string level and returns the Logrus log level constant.
// It supports trace level.
func ParseLevel(lvl string) (logrus.Level, error) {
if lvl == "trace" {
return TraceLevel, nil
}
return logrus.ParseLevel(lvl)
}
// WithLogger returns a new context with the provided logger. Use in
// combination with logger.WithField(s) for great effect.
func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context {
return context.WithValue(ctx, loggerKey{}, logger)
}
// GetLogger retrieves the current logger from the context. If no logger is
// available, the default logger is returned.
func GetLogger(ctx context.Context) *logrus.Entry {
logger := ctx.Value(loggerKey{})
if logger == nil {
return L
}
return logger.(*logrus.Entry)
}
// Trace logs a message at level Trace with the log entry passed-in.
func Trace(e *logrus.Entry, args ...interface{}) {
level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level)))
if level >= TraceLevel {
e.Debug(args...)
}
}
// Tracef logs a message at level Trace with the log entry passed-in.
func Tracef(e *logrus.Entry, format string, args ...interface{}) {
level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level)))
if level >= TraceLevel {
e.Debugf(format, args...)
}
}

View File

@@ -1,10 +1,10 @@
package manager
import (
"log"
"sync"
"time"
"github.com/pkg/errors"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
@@ -26,7 +26,7 @@ type ResourceManager struct {
}
// NewResourceManager returns a ResourceManager with the internal maps initialized.
func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
func NewResourceManager(k8sClient kubernetes.Interface) (*ResourceManager, error) {
rm := ResourceManager{
pods: make(map[string]*v1.Pod, 0),
deletingPods: make(map[string]*v1.Pod, 0),
@@ -37,8 +37,18 @@ func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
k8sClient: k8sClient,
}
go rm.watchConfigMaps()
go rm.watchSecrets()
configW, err := rm.k8sClient.CoreV1().ConfigMaps(v1.NamespaceAll).Watch(metav1.ListOptions{})
if err != nil {
return nil, errors.Wrap(err, "error getting config watch")
}
secretsW, err := rm.k8sClient.CoreV1().Secrets(v1.NamespaceAll).Watch(metav1.ListOptions{})
if err != nil {
return nil, errors.Wrap(err, "error getting secrets watch")
}
go rm.watchConfigMaps(configW)
go rm.watchSecrets(secretsW)
tick := time.Tick(5 * time.Minute)
go func() {
@@ -68,7 +78,7 @@ func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager {
}
}()
return &rm
return &rm, nil
}
// SetPods clears the internal cache and populates it with the supplied pods.
@@ -213,12 +223,7 @@ func (rm *ResourceManager) GetSecret(name, namespace string) (*v1.Secret, error)
// watchConfigMaps monitors the kubernetes API for modifications and deletions of configmaps
// it evicts them from the internal cache
func (rm *ResourceManager) watchConfigMaps() {
var opts metav1.ListOptions
w, err := rm.k8sClient.CoreV1().ConfigMaps(v1.NamespaceAll).Watch(opts)
if err != nil {
log.Fatal(err)
}
func (rm *ResourceManager) watchConfigMaps(w watch.Interface) {
for {
select {
@@ -242,12 +247,7 @@ func (rm *ResourceManager) watchConfigMaps() {
// watchSecretes monitors the kubernetes API for modifications and deletions of secrets
// it evicts them from the internal cache
func (rm *ResourceManager) watchSecrets() {
var opts metav1.ListOptions
w, err := rm.k8sClient.CoreV1().Secrets(v1.NamespaceAll).Watch(opts)
if err != nil {
log.Fatal(err)
}
func (rm *ResourceManager) watchSecrets(w watch.Interface) {
for {
select {

View File

@@ -19,7 +19,10 @@ func init() {
}
func TestResourceManager(t *testing.T) {
pm := NewResourceManager(fakeClient)
pm, err := NewResourceManager(fakeClient)
if err != nil {
t.Fatal(err)
}
pod1Name := "Pod1"
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)
@@ -36,7 +39,10 @@ func TestResourceManager(t *testing.T) {
}
func TestResourceManagerDeletePod(t *testing.T) {
pm := NewResourceManager(fakeClient)
pm, err := NewResourceManager(fakeClient)
if err != nil {
t.Fatal(err)
}
pod1Name := "Pod1"
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)
@@ -61,7 +67,10 @@ func makePod(namespace, name string) *v1.Pod {
}
func TestResourceManagerUpdatePod(t *testing.T) {
pm := NewResourceManager(fakeClient)
pm, err := NewResourceManager(fakeClient)
if err != nil {
t.Fatal(err)
}
pod1Name := "Pod1"
pod1Namespace := "Pod1Namespace"
pod1 := makePod(pod1Namespace, pod1Name)

View File

@@ -2,19 +2,21 @@ package azure
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"reflect"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/gorilla/websocket"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
client "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci"
@@ -345,7 +347,7 @@ func (p *ACIProvider) GetContainerLogs(namespace, podName, containerName string,
for i := 0; i < retry; i++ {
cLogs, err := p.aciClient.GetContainerLogs(p.resourceGroup, cg.Name, containerName, tail)
if err != nil {
log.Println(err)
log.G(context.TODO()).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying")
time.Sleep(5000 * time.Millisecond)
} else {
logContent = cLogs.Content
@@ -469,7 +471,11 @@ func (p *ACIProvider) GetPods() ([]*v1.Pod, error) {
p, err := containerGroupToPod(&c)
if err != nil {
log.Println(err)
log.G(context.TODO()).WithFields(logrus.Fields{
"name": c.Name,
"id": c.ID,
}).WithError(err).Error("error converting container group to pod")
continue
}
pods = append(pods, p)
@@ -1105,7 +1111,8 @@ func filterServiceAccountSecretVolume(osType string, containerGroup *aci.Contain
return
}
log.Printf("Ignoring service account secret volumes '%v' for Windows", reflect.ValueOf(serviceAccountSecretVolumeName).MapKeys())
l := log.G(context.TODO()).WithField("containerGroup", containerGroup.Name)
l.Infof("Ignoring service account secret volumes '%v' for Windows", reflect.ValueOf(serviceAccountSecretVolumeName).MapKeys())
volumes := make([]aci.Volume, 0, len(containerGroup.ContainerGroupProperties.Volumes))
for _, volume := range containerGroup.ContainerGroupProperties.Volumes {

View File

@@ -448,7 +448,10 @@ func prepareMocks() (*AADMock, *ACIMock, *ACIProvider, error) {
os.Setenv("ACI_RESOURCE_GROUP", fakeResourceGroup)
clientset := fake.NewSimpleClientset()
rm := manager.NewResourceManager(clientset)
rm, err := manager.NewResourceManager(clientset)
if err != nil {
return nil, nil, nil, err
}
provider, err := NewACIProvider("example.toml", rm, fakeNodeName, "Linux", "0.0.0.0", 10250)
if err != nil {

View File

@@ -1,10 +1,12 @@
package azure
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"github.com/virtual-kubelet/virtual-kubelet/log"
)
// AcsCredential represents the credential file for ACS
@@ -19,12 +21,13 @@ type AcsCredential struct {
}
// NewAcsCredential returns an AcsCredential struct from file path
func NewAcsCredential(filepath string) (*AcsCredential, error) {
log.Printf("Reading ACS credential file %q", filepath)
func NewAcsCredential(p string) (*AcsCredential, error) {
logger := log.G(context.TODO()).WithField("method", "NewAcsCredential").WithField("file", p)
log.Trace(logger, "Reading ACS credential file")
b, err := ioutil.ReadFile(filepath)
b, err := ioutil.ReadFile(p)
if err != nil {
return nil, fmt.Errorf("Reading ACS credential file %q failed: %v", filepath, err)
return nil, fmt.Errorf("Reading ACS credential file %q failed: %v", p, err)
}
// Unmarshal the authentication file.
@@ -33,6 +36,6 @@ func NewAcsCredential(filepath string) (*AcsCredential, error) {
return nil, err
}
log.Printf("Load ACS credential file %q successfully", filepath)
log.Trace(logger, "Load ACS credential file successfully")
return &cred, nil
}

View File

@@ -1,24 +1,36 @@
package vkubelet
import (
"context"
"fmt"
"io"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
"github.com/virtual-kubelet/virtual-kubelet/log"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
)
var p Provider
var r mux.Router
func loggingContext(r *http.Request) context.Context {
ctx := r.Context()
logger := log.G(ctx).WithFields(logrus.Fields{
"uri": r.RequestURI,
"vars": mux.Vars(r),
})
return log.WithLogger(ctx, logger)
}
func NotFound(w http.ResponseWriter, r *http.Request) {
log.Printf("404 request not found. \n %v", mux.Vars(r))
logger := log.G(loggingContext(r))
log.Trace(logger, "404 request not found")
http.Error(w, "404 request not found", http.StatusNotFound)
}
@@ -35,37 +47,45 @@ func ApiserverStart(provider Provider) {
r.NotFoundHandler = http.HandlerFunc(NotFound)
if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil {
log.Println(err)
log.G(context.TODO()).WithError(err).Error("error setting up http server")
}
}
func ApiServerHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
if len(vars) == 3 {
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
queryTail := q.Get("tailLines")
if queryTail != "" {
t, err := strconv.Atoi(queryTail)
if err != nil {
log.Println(err)
io.WriteString(w, err.Error())
} else {
tail = t
}
}
podsLogs, err := p.GetContainerLogs(namespace, pod, container, tail)
if err != nil {
log.Println(err)
io.WriteString(w, err.Error())
} else {
io.WriteString(w, podsLogs)
}
} else {
if len(vars) != 3 {
NotFound(w, req)
return
}
ctx := loggingContext(req)
namespace := vars["namespace"]
pod := vars["pod"]
container := vars["container"]
tail := 10
q := req.URL.Query()
if queryTail := q.Get("tailLines"); queryTail != "" {
t, err := strconv.Atoi(queryTail)
if err != nil {
logger := log.G(context.TODO()).WithError(err)
log.Trace(logger, "could not parse tailLines")
http.Error(w, fmt.Sprintf("could not parse \"tailLines\": %v", err), http.StatusBadRequest)
return
}
tail = t
}
podsLogs, err := p.GetContainerLogs(namespace, pod, container, tail)
if err != nil {
log.G(ctx).WithError(err).Error("error getting container logs")
http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError)
return
}
if _, err := io.WriteString(w, podsLogs); err != nil {
log.G(ctx).WithError(err).Warn("error writing response to client")
}
}

View File

@@ -3,8 +3,7 @@
package vkubelet
import (
"fmt"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers/aws"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure"
@@ -56,8 +55,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager
case "vic":
return vic.NewVicProvider(providerConfig, rm, nodeName, operatingSystem)
default:
fmt.Printf("Provider '%s' is not supported\n", provider)
return nil, errors.New("provider not supported")
}
var p Provider
return p, nil
}

View File

@@ -1,8 +1,7 @@
package vkubelet
import (
"fmt"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers/aws"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure"
@@ -43,8 +42,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager
case "sfmesh":
return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
default:
fmt.Printf("Provider '%s' is not supported\n", provider)
return nil, errors.New("provider is not supported")
}
var p Provider
return p, nil
}

View File

@@ -1,8 +1,7 @@
package vkubelet
import (
"fmt"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers/aws"
"github.com/virtual-kubelet/virtual-kubelet/providers/azure"
@@ -43,8 +42,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager
case "sfmesh":
return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort)
default:
fmt.Printf("Provider '%s' is not supported\n", provider)
return nil, errors.New("provider not supported")
}
var p Provider
return p, nil
}

View File

@@ -1,8 +1,8 @@
package vkubelet
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strconv"
@@ -10,6 +10,8 @@ import (
"syscall"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
@@ -69,7 +71,10 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
return nil, err
}
rm := manager.NewResourceManager(clientset)
rm, err := manager.NewResourceManager(clientset)
if err != nil {
return nil, pkgerrors.Wrap(err, "error creating resource manager")
}
daemonEndpointPortEnv := os.Getenv("KUBELET_PORT")
if daemonEndpointPortEnv == "" {
@@ -101,7 +106,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
case "PreferNoSchedule":
vkTaintEffect = corev1.TaintEffectPreferNoSchedule
default:
fmt.Printf("Taint effect '%s' is not supported\n", vkTaintEffectEnv)
return nil, pkgerrors.Errorf("taint effect %q is not supported", vkTaintEffectEnv)
}
taint := corev1.Taint{
@@ -125,17 +130,21 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
provider: p,
}
if err = s.registerNode(); err != nil {
ctx := context.TODO()
ctx = log.WithLogger(ctx, log.G(ctx))
if err = s.registerNode(ctx); err != nil {
return s, err
}
go ApiserverStart(p)
tick := time.Tick(5 * time.Second)
go func() {
for range tick {
s.updateNode()
s.updatePodStatuses()
s.updateNode(ctx)
s.updatePodStatuses(ctx)
}
}()
@@ -143,7 +152,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon
}
// registerNode registers this virtual node with the Kubernetes API.
func (s *Server) registerNode() error {
func (s *Server) registerNode(ctx context.Context) error {
taints := make([]corev1.Taint, 0)
if !s.disableTaint {
@@ -182,14 +191,14 @@ func (s *Server) registerNode() error {
return err
}
log.Printf("Node '%s' with OS type '%s' registered\n", node.Name, node.Status.NodeInfo.OperatingSystem)
log.G(ctx).Info("Registered node")
return nil
}
// Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster.
// Run will block until Stop is called or a SIGINT or SIGTERM signal is received.
func (s *Server) Run() error {
func (s *Server) Run(ctx context.Context) error {
shouldStop := false
sig := make(chan os.Signal, 1)
@@ -207,15 +216,15 @@ func (s *Server) Run() error {
pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts)
if err != nil {
log.Fatal("Failed to list pods", err)
return pkgerrors.Wrap(err, "error getting pod list")
}
s.resourceManager.SetPods(pods)
s.reconcile()
s.reconcile(ctx)
opts.ResourceVersion = pods.ResourceVersion
s.podWatcher, err = s.k8sClient.CoreV1().Pods(s.namespace).Watch(opts)
if err != nil {
log.Fatal("Failed to watch pods", err)
return pkgerrors.Wrap(err, "failed to watch pods")
}
loop:
@@ -224,15 +233,15 @@ func (s *Server) Run() error {
case ev, ok := <-s.podWatcher.ResultChan():
if !ok {
if shouldStop {
log.Println("Pod watcher is stopped.")
log.G(ctx).Info("Pod watcher is stopped")
return nil
}
log.Println("Pod watcher connection is closed unexpectedly.")
log.G(ctx).Error("Pod watcher connection is closed unexpectedly")
break loop
}
log.Println("Pod watcher event is received:", ev.Type)
log.G(ctx).WithField("type", ev.Type).Debug("Pod watcher event is received")
reconcile := false
switch ev.Type {
case watch.Added:
@@ -244,7 +253,7 @@ func (s *Server) Run() error {
}
if reconcile {
s.reconcile()
s.reconcile(ctx)
}
}
}
@@ -262,17 +271,17 @@ func (s *Server) Stop() {
}
// updateNode updates the node status within Kubernetes with updated NodeConditions.
func (s *Server) updateNode() {
func (s *Server) updateNode(ctx context.Context) {
opts := metav1.GetOptions{}
n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts)
if err != nil && !errors.IsNotFound(err) {
log.Println("Failed to retrieve node:", err)
log.G(ctx).WithError(err).Error("Failed to retrieve node")
return
}
if errors.IsNotFound(err) {
if err = s.registerNode(); err != nil {
log.Println("Failed to register node:", err)
if err = s.registerNode(ctx); err != nil {
log.G(ctx).WithError(err).Error("Failed to register node")
return
}
}
@@ -288,27 +297,31 @@ func (s *Server) updateNode() {
n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n)
if err != nil {
log.Println("Failed to update node:", err)
log.G(ctx).WithError(err).Error("Failed to update node")
return
}
}
// reconcile is the main reconciliation loop that compares differences between Kubernetes and
// the active provider and reconciles the differences.
func (s *Server) reconcile() {
log.Println("Start reconcile.")
func (s *Server) reconcile(ctx context.Context) {
logger := log.G(ctx)
logger.Debug("Start reconcile")
defer logger.Debug("End reconcile")
providerPods, err := s.provider.GetPods()
if err != nil {
log.Println(err)
logger.WithError(err).Error("Error getting pod list from provider")
return
}
for _, pod := range providerPods {
// Delete pods that don't exist in Kubernetes
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil {
log.Printf("Deleting pod '%s'\n", pod.Name)
if err := s.deletePod(pod); err != nil {
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
logger := logger.WithField("pod", pod.Name)
logger.Debug("Deleting pod '%s'\n", pod.Name)
if err := s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Error deleting pod")
continue
}
}
@@ -317,6 +330,7 @@ func (s *Server) reconcile() {
// Create any pods for k8s pods that don't exist in the provider
pods := s.resourceManager.GetPods()
for _, pod := range pods {
logger := logger.WithField("pod", pod.Name)
var providerPod *corev1.Pod
for _, p := range providerPods {
if p.Namespace == pod.Namespace && p.Name == pod.Name {
@@ -326,30 +340,33 @@ func (s *Server) reconcile() {
}
if pod.DeletionTimestamp == nil && pod.Status.Phase != corev1.PodFailed && providerPod == nil {
log.Printf("Creating pod '%s'\n", pod.Name)
if err := s.createPod(pod); err != nil {
log.Printf("Error creating pod '%s': %s\n", pod.Name, err)
logger.Debug("Creating pod")
if err := s.createPod(ctx, pod); err != nil {
logger.WithError(err).Error("Error creating pod")
continue
}
}
// Delete pod if DeletionTimestamp is set
if pod.DeletionTimestamp != nil {
log.Printf("Pod '%s' is pending deletion.\n", pod.Name)
log.Trace(logger, "Pod pending deletion")
var err error
if err = s.deletePod(pod); err != nil {
log.Printf("Error deleting pod '%s': %s\n", pod.Name, err)
if err = s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Error deleting pod")
continue
}
log.Trace(logger, "Pod deletion complete")
}
}
}
func (s *Server) createPod(pod *corev1.Pod) error {
func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
if err := s.populateSecretsAndConfigMapsInEnv(pod); err != nil {
return err
}
logger := log.G(ctx).WithField("pod", pod.Name)
if origErr := s.provider.CreatePod(pod); origErr != nil {
pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error
pod.Status.Phase = corev1.PodFailed
@@ -358,29 +375,29 @@ func (s *Server) createPod(pod *corev1.Pod) error {
_, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
if err != nil {
log.Println("Failed to update pod status:", err)
return origErr
logger.WithError(err).Warn("Failed to update pod status")
}
return origErr
}
log.Printf("Pod '%s' created.\n", pod.Name)
logger.Info("Pod created")
return nil
}
func (s *Server) deletePod(pod *corev1.Pod) error {
func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error {
var delErr error
if delErr = s.provider.DeletePod(pod); delErr != nil && errors.IsNotFound(delErr) {
return delErr
}
logger := log.G(ctx).WithField("pod", pod.Name)
if !errors.IsNotFound(delErr) {
var grace int64
if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) {
if errors.IsNotFound(err) {
log.Printf("Pod '%s' doesn't exist.\n", pod.Name)
logger.Error("Pod doesn't exist")
return nil
}
@@ -388,15 +405,14 @@ func (s *Server) deletePod(pod *corev1.Pod) error {
}
s.resourceManager.DeletePod(pod)
log.Printf("Pod '%s' deleted.\n", pod.Name)
logger.Info("Pod deleted")
}
return nil
}
// updatePodStatuses syncs the providers pod status with the kubernetes pod status.
func (s *Server) updatePodStatuses() {
func (s *Server) updatePodStatuses(ctx context.Context) {
// Update all the pods with the provider status.
pods := s.resourceManager.GetPods()
for _, pod := range pods {
@@ -406,7 +422,7 @@ func (s *Server) updatePodStatuses() {
status, err := s.provider.GetPodStatus(pod.Namespace, pod.Name)
if err != nil {
log.Printf("Error retrieving pod '%s' in namespace '%s' status from provider: %s\n", pod.Name, pod.Namespace, err)
log.G(ctx).WithField("pod", pod.Name).Error("Error retrieving pod status")
return
}