diff --git a/charts/virtual-kubelet/templates/deployment.yaml b/charts/virtual-kubelet/templates/deployment.yaml index 533883576..fb0fc1861 100644 --- a/charts/virtual-kubelet/templates/deployment.yaml +++ b/charts/virtual-kubelet/templates/deployment.yaml @@ -85,6 +85,9 @@ spec: "--provider", "{{ required "provider is required" .Values.provider }}", "--namespace", "{{ .Values.monitoredNamespace }}", "--nodename", "{{ required "nodeName is required" .Values.nodeName }}", + {{- if .Values.logLevel }} + "--log-level", "{{.Values.logLevel}}", + {{- end }} "--os", "{{ .Values.nodeOsType }}" ] volumes: diff --git a/charts/virtual-kubelet/values.yaml b/charts/virtual-kubelet/values.yaml index 6009bd4fe..8039061d6 100644 --- a/charts/virtual-kubelet/values.yaml +++ b/charts/virtual-kubelet/values.yaml @@ -10,6 +10,7 @@ nodeOsType: "Linux" monitoredNamespace: "" apiserverCert: apiserverKey: +logLevel: taint: enabled: true diff --git a/cmd/root.go b/cmd/root.go index 4013337be..827948545 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -15,15 +15,16 @@ package cmd import ( - "fmt" - "log" + "context" "os" "path/filepath" "strings" + "github.com/Sirupsen/logrus" homedir "github.com/mitchellh/go-homedir" "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/providers" vkubelet "github.com/virtual-kubelet/virtual-kubelet/vkubelet" corev1 "k8s.io/api/core/v1" @@ -38,6 +39,7 @@ var provider string var providerConfig string var taintKey string var disableTaint bool +var logLevel string // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ @@ -47,12 +49,13 @@ var RootCmd = &cobra.Command{ backend implementation allowing users to create kubernetes nodes without running the kubelet. This allows users to schedule kubernetes workloads on nodes that aren't running Kubernetes.`, Run: func(cmd *cobra.Command, args []string) { - fmt.Println(kubeConfig) f, err := vkubelet.New(nodeName, operatingSystem, kubeNamespace, kubeConfig, provider, providerConfig, taintKey, disableTaint) if err != nil { - log.Fatal(err) + log.L.WithError(err).Fatal("Error initializing vritual kubelet") + } + if err := f.Run(context.Background()); err != nil { + log.L.Fatal(err) } - f.Run() }, } @@ -60,8 +63,7 @@ This allows users to schedule kubernetes workloads on nodes that aren't running // This is called by main.main(). It only needs to happen once to the rootCmd. func Execute() { if err := RootCmd.Execute(); err != nil { - fmt.Println(err) - os.Exit(1) + log.GetLogger(context.TODO()).WithError(err).Fatal("Error executing root command") } } @@ -87,6 +89,7 @@ func init() { RootCmd.PersistentFlags().StringVar(&providerConfig, "provider-config", "", "cloud provider configuration file") RootCmd.PersistentFlags().StringVar(&taintKey, "taint", "", "Set node taint key") RootCmd.PersistentFlags().MarkDeprecated("taint", "Taint key should now be configured using the VK_TAINT_KEY environment variable") + RootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", `set the log level, e.g. "trace", debug", "info", "warn", "error"`) // Cobra also supports local flags, which will only run // when this action is called directly. @@ -96,15 +99,13 @@ func init() { // initConfig reads in config file and ENV variables if set. func initConfig() { if provider == "" { - fmt.Println("You must supply a cloud provider option: use --provider") - os.Exit(1) + log.G(context.TODO()).Fatal("You must supply a cloud provider option: use --provider") } // Find home directory. home, err := homedir.Dir() if err != nil { - fmt.Println(err) - os.Exit(1) + log.G(context.TODO()).WithError(err).Fatal("Error reading homedir") } if kubeletConfig != "" { @@ -120,7 +121,7 @@ func initConfig() { // If a config file is found, read it in. if err := viper.ReadInConfig(); err == nil { - fmt.Println("Using config file:", viper.ConfigFileUsed()) + log.G(context.TODO()).Debugf("Using config file %s", viper.ConfigFileUsed()) } if kubeConfig == "" { @@ -135,7 +136,20 @@ func initConfig() { // Validate operating system. ok, _ := providers.ValidOperatingSystems[operatingSystem] if !ok { - fmt.Printf("Operating system '%s' not supported. Valid options are %s\n", operatingSystem, strings.Join(providers.ValidOperatingSystems.Names(), " | ")) - os.Exit(1) + log.G(context.TODO()).WithField("OperatingSystem", operatingSystem).Fatalf("Operating system not supported. Valid options are: %s", strings.Join(providers.ValidOperatingSystems.Names(), " | ")) } + + level, err := log.ParseLevel(logLevel) + if err != nil { + log.G(context.TODO()).WithField("logLevel", logLevel).Fatal("log level is not supported") + } + + logger := log.L.WithFields(logrus.Fields{ + "provider": provider, + "operatingSystem": operatingSystem, + "node": nodeName, + "namespace": kubeNamespace, + }) + logger.Level = level + log.L = logger } diff --git a/log/log.go b/log/log.go new file mode 100644 index 000000000..2bb026bd0 --- /dev/null +++ b/log/log.go @@ -0,0 +1,90 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package log + +import ( + "context" + "sync/atomic" + + "github.com/Sirupsen/logrus" +) + +var ( + // G is an alias for GetLogger. + // + // We may want to define this locally to a package to get package tagged log + // messages. + G = GetLogger + + // L is an alias for the the standard logger. + L = logrus.NewEntry(logrus.StandardLogger()) +) + +type ( + loggerKey struct{} +) + +// TraceLevel is the log level for tracing. Trace level is lower than debug level, +// and is usually used to trace detailed behavior of the program. +const TraceLevel = logrus.Level(uint32(logrus.DebugLevel + 1)) + +// RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to +// ensure the formatted time is always the same number of characters. +const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" + +// ParseLevel takes a string level and returns the Logrus log level constant. +// It supports trace level. +func ParseLevel(lvl string) (logrus.Level, error) { + if lvl == "trace" { + return TraceLevel, nil + } + return logrus.ParseLevel(lvl) +} + +// WithLogger returns a new context with the provided logger. Use in +// combination with logger.WithField(s) for great effect. +func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context { + return context.WithValue(ctx, loggerKey{}, logger) +} + +// GetLogger retrieves the current logger from the context. If no logger is +// available, the default logger is returned. +func GetLogger(ctx context.Context) *logrus.Entry { + logger := ctx.Value(loggerKey{}) + + if logger == nil { + return L + } + + return logger.(*logrus.Entry) +} + +// Trace logs a message at level Trace with the log entry passed-in. +func Trace(e *logrus.Entry, args ...interface{}) { + level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level))) + if level >= TraceLevel { + e.Debug(args...) + } +} + +// Tracef logs a message at level Trace with the log entry passed-in. +func Tracef(e *logrus.Entry, format string, args ...interface{}) { + level := logrus.Level(atomic.LoadUint32((*uint32)(&e.Logger.Level))) + if level >= TraceLevel { + e.Debugf(format, args...) + } +} diff --git a/manager/resource.go b/manager/resource.go index 41fb1d9fa..0725d1938 100644 --- a/manager/resource.go +++ b/manager/resource.go @@ -1,10 +1,10 @@ package manager import ( - "log" "sync" "time" + "github.com/pkg/errors" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" @@ -26,7 +26,7 @@ type ResourceManager struct { } // NewResourceManager returns a ResourceManager with the internal maps initialized. -func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager { +func NewResourceManager(k8sClient kubernetes.Interface) (*ResourceManager, error) { rm := ResourceManager{ pods: make(map[string]*v1.Pod, 0), deletingPods: make(map[string]*v1.Pod, 0), @@ -37,8 +37,18 @@ func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager { k8sClient: k8sClient, } - go rm.watchConfigMaps() - go rm.watchSecrets() + configW, err := rm.k8sClient.CoreV1().ConfigMaps(v1.NamespaceAll).Watch(metav1.ListOptions{}) + if err != nil { + return nil, errors.Wrap(err, "error getting config watch") + } + + secretsW, err := rm.k8sClient.CoreV1().Secrets(v1.NamespaceAll).Watch(metav1.ListOptions{}) + if err != nil { + return nil, errors.Wrap(err, "error getting secrets watch") + } + + go rm.watchConfigMaps(configW) + go rm.watchSecrets(secretsW) tick := time.Tick(5 * time.Minute) go func() { @@ -68,7 +78,7 @@ func NewResourceManager(k8sClient kubernetes.Interface) *ResourceManager { } }() - return &rm + return &rm, nil } // SetPods clears the internal cache and populates it with the supplied pods. @@ -213,12 +223,7 @@ func (rm *ResourceManager) GetSecret(name, namespace string) (*v1.Secret, error) // watchConfigMaps monitors the kubernetes API for modifications and deletions of configmaps // it evicts them from the internal cache -func (rm *ResourceManager) watchConfigMaps() { - var opts metav1.ListOptions - w, err := rm.k8sClient.CoreV1().ConfigMaps(v1.NamespaceAll).Watch(opts) - if err != nil { - log.Fatal(err) - } +func (rm *ResourceManager) watchConfigMaps(w watch.Interface) { for { select { @@ -242,12 +247,7 @@ func (rm *ResourceManager) watchConfigMaps() { // watchSecretes monitors the kubernetes API for modifications and deletions of secrets // it evicts them from the internal cache -func (rm *ResourceManager) watchSecrets() { - var opts metav1.ListOptions - w, err := rm.k8sClient.CoreV1().Secrets(v1.NamespaceAll).Watch(opts) - if err != nil { - log.Fatal(err) - } +func (rm *ResourceManager) watchSecrets(w watch.Interface) { for { select { diff --git a/manager/resource_test.go b/manager/resource_test.go index 111d6fc90..0fd19162d 100644 --- a/manager/resource_test.go +++ b/manager/resource_test.go @@ -19,7 +19,10 @@ func init() { } func TestResourceManager(t *testing.T) { - pm := NewResourceManager(fakeClient) + pm, err := NewResourceManager(fakeClient) + if err != nil { + t.Fatal(err) + } pod1Name := "Pod1" pod1Namespace := "Pod1Namespace" pod1 := makePod(pod1Namespace, pod1Name) @@ -36,7 +39,10 @@ func TestResourceManager(t *testing.T) { } func TestResourceManagerDeletePod(t *testing.T) { - pm := NewResourceManager(fakeClient) + pm, err := NewResourceManager(fakeClient) + if err != nil { + t.Fatal(err) + } pod1Name := "Pod1" pod1Namespace := "Pod1Namespace" pod1 := makePod(pod1Namespace, pod1Name) @@ -61,7 +67,10 @@ func makePod(namespace, name string) *v1.Pod { } func TestResourceManagerUpdatePod(t *testing.T) { - pm := NewResourceManager(fakeClient) + pm, err := NewResourceManager(fakeClient) + if err != nil { + t.Fatal(err) + } pod1Name := "Pod1" pod1Namespace := "Pod1Namespace" pod1 := makePod(pod1Namespace, pod1Name) diff --git a/providers/azure/aci.go b/providers/azure/aci.go index ebaed3e9e..5b812557e 100644 --- a/providers/azure/aci.go +++ b/providers/azure/aci.go @@ -2,19 +2,21 @@ package azure import ( "bytes" + "context" "encoding/base64" "encoding/json" "errors" "fmt" "io" - "log" "net/http" "os" "reflect" "strings" "time" + "github.com/Sirupsen/logrus" "github.com/gorilla/websocket" + "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" client "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client" "github.com/virtual-kubelet/virtual-kubelet/providers/azure/client/aci" @@ -345,7 +347,7 @@ func (p *ACIProvider) GetContainerLogs(namespace, podName, containerName string, for i := 0; i < retry; i++ { cLogs, err := p.aciClient.GetContainerLogs(p.resourceGroup, cg.Name, containerName, tail) if err != nil { - log.Println(err) + log.G(context.TODO()).WithField("method", "GetContainerLogs").WithError(err).Debug("Error getting container logs, retrying") time.Sleep(5000 * time.Millisecond) } else { logContent = cLogs.Content @@ -469,7 +471,11 @@ func (p *ACIProvider) GetPods() ([]*v1.Pod, error) { p, err := containerGroupToPod(&c) if err != nil { - log.Println(err) + log.G(context.TODO()).WithFields(logrus.Fields{ + "name": c.Name, + "id": c.ID, + }).WithError(err).Error("error converting container group to pod") + continue } pods = append(pods, p) @@ -1105,7 +1111,8 @@ func filterServiceAccountSecretVolume(osType string, containerGroup *aci.Contain return } - log.Printf("Ignoring service account secret volumes '%v' for Windows", reflect.ValueOf(serviceAccountSecretVolumeName).MapKeys()) + l := log.G(context.TODO()).WithField("containerGroup", containerGroup.Name) + l.Infof("Ignoring service account secret volumes '%v' for Windows", reflect.ValueOf(serviceAccountSecretVolumeName).MapKeys()) volumes := make([]aci.Volume, 0, len(containerGroup.ContainerGroupProperties.Volumes)) for _, volume := range containerGroup.ContainerGroupProperties.Volumes { diff --git a/providers/azure/aci_test.go b/providers/azure/aci_test.go index 6a030c219..ae67bc112 100644 --- a/providers/azure/aci_test.go +++ b/providers/azure/aci_test.go @@ -448,7 +448,10 @@ func prepareMocks() (*AADMock, *ACIMock, *ACIProvider, error) { os.Setenv("ACI_RESOURCE_GROUP", fakeResourceGroup) clientset := fake.NewSimpleClientset() - rm := manager.NewResourceManager(clientset) + rm, err := manager.NewResourceManager(clientset) + if err != nil { + return nil, nil, nil, err + } provider, err := NewACIProvider("example.toml", rm, fakeNodeName, "Linux", "0.0.0.0", 10250) if err != nil { diff --git a/providers/azure/acsCredential.go b/providers/azure/acsCredential.go index 482aa3aeb..2671ac443 100644 --- a/providers/azure/acsCredential.go +++ b/providers/azure/acsCredential.go @@ -1,10 +1,12 @@ package azure import ( + "context" "encoding/json" "fmt" "io/ioutil" - "log" + + "github.com/virtual-kubelet/virtual-kubelet/log" ) // AcsCredential represents the credential file for ACS @@ -19,12 +21,13 @@ type AcsCredential struct { } // NewAcsCredential returns an AcsCredential struct from file path -func NewAcsCredential(filepath string) (*AcsCredential, error) { - log.Printf("Reading ACS credential file %q", filepath) +func NewAcsCredential(p string) (*AcsCredential, error) { + logger := log.G(context.TODO()).WithField("method", "NewAcsCredential").WithField("file", p) + log.Trace(logger, "Reading ACS credential file") - b, err := ioutil.ReadFile(filepath) + b, err := ioutil.ReadFile(p) if err != nil { - return nil, fmt.Errorf("Reading ACS credential file %q failed: %v", filepath, err) + return nil, fmt.Errorf("Reading ACS credential file %q failed: %v", p, err) } // Unmarshal the authentication file. @@ -33,6 +36,6 @@ func NewAcsCredential(filepath string) (*AcsCredential, error) { return nil, err } - log.Printf("Load ACS credential file %q successfully", filepath) + log.Trace(logger, "Load ACS credential file successfully") return &cred, nil } diff --git a/vkubelet/apiserver.go b/vkubelet/apiserver.go index 81cca6b0c..9612207e4 100644 --- a/vkubelet/apiserver.go +++ b/vkubelet/apiserver.go @@ -1,24 +1,36 @@ package vkubelet import ( + "context" "fmt" "io" - "log" "net/http" "os" "strconv" "strings" "time" + "github.com/Sirupsen/logrus" "github.com/gorilla/mux" + "github.com/virtual-kubelet/virtual-kubelet/log" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" ) var p Provider var r mux.Router +func loggingContext(r *http.Request) context.Context { + ctx := r.Context() + logger := log.G(ctx).WithFields(logrus.Fields{ + "uri": r.RequestURI, + "vars": mux.Vars(r), + }) + return log.WithLogger(ctx, logger) +} + func NotFound(w http.ResponseWriter, r *http.Request) { - log.Printf("404 request not found. \n %v", mux.Vars(r)) + logger := log.G(loggingContext(r)) + log.Trace(logger, "404 request not found") http.Error(w, "404 request not found", http.StatusNotFound) } @@ -35,37 +47,45 @@ func ApiserverStart(provider Provider) { r.NotFoundHandler = http.HandlerFunc(NotFound) if err := http.ListenAndServeTLS(addr, certFilePath, keyFilePath, r); err != nil { - log.Println(err) + log.G(context.TODO()).WithError(err).Error("error setting up http server") } } func ApiServerHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) - if len(vars) == 3 { - namespace := vars["namespace"] - pod := vars["pod"] - container := vars["container"] - tail := 10 - q := req.URL.Query() - queryTail := q.Get("tailLines") - if queryTail != "" { - t, err := strconv.Atoi(queryTail) - if err != nil { - log.Println(err) - io.WriteString(w, err.Error()) - } else { - tail = t - } - } - podsLogs, err := p.GetContainerLogs(namespace, pod, container, tail) - if err != nil { - log.Println(err) - io.WriteString(w, err.Error()) - } else { - io.WriteString(w, podsLogs) - } - } else { + if len(vars) != 3 { NotFound(w, req) + return + } + + ctx := loggingContext(req) + + namespace := vars["namespace"] + pod := vars["pod"] + container := vars["container"] + tail := 10 + q := req.URL.Query() + + if queryTail := q.Get("tailLines"); queryTail != "" { + t, err := strconv.Atoi(queryTail) + if err != nil { + logger := log.G(context.TODO()).WithError(err) + log.Trace(logger, "could not parse tailLines") + http.Error(w, fmt.Sprintf("could not parse \"tailLines\": %v", err), http.StatusBadRequest) + return + } + tail = t + } + + podsLogs, err := p.GetContainerLogs(namespace, pod, container, tail) + if err != nil { + log.G(ctx).WithError(err).Error("error getting container logs") + http.Error(w, fmt.Sprintf("error while getting container logs: %v", err), http.StatusInternalServerError) + return + } + + if _, err := io.WriteString(w, podsLogs); err != nil { + log.G(ctx).WithError(err).Warn("error writing response to client") } } diff --git a/vkubelet/lookup.go b/vkubelet/lookup.go index 8550edd75..89ec9b682 100644 --- a/vkubelet/lookup.go +++ b/vkubelet/lookup.go @@ -3,8 +3,7 @@ package vkubelet import ( - "fmt" - + "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers/aws" "github.com/virtual-kubelet/virtual-kubelet/providers/azure" @@ -56,8 +55,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager case "vic": return vic.NewVicProvider(providerConfig, rm, nodeName, operatingSystem) default: - fmt.Printf("Provider '%s' is not supported\n", provider) + return nil, errors.New("provider not supported") } - var p Provider - return p, nil } diff --git a/vkubelet/lookup_darwin.go b/vkubelet/lookup_darwin.go index 55d34e18a..a45f3d929 100644 --- a/vkubelet/lookup_darwin.go +++ b/vkubelet/lookup_darwin.go @@ -1,8 +1,7 @@ package vkubelet import ( - "fmt" - + "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers/aws" "github.com/virtual-kubelet/virtual-kubelet/providers/azure" @@ -43,8 +42,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager case "sfmesh": return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) default: - fmt.Printf("Provider '%s' is not supported\n", provider) + return nil, errors.New("provider is not supported") } - var p Provider - return p, nil } diff --git a/vkubelet/lookup_windows.go b/vkubelet/lookup_windows.go index c37918e89..1a09aec54 100644 --- a/vkubelet/lookup_windows.go +++ b/vkubelet/lookup_windows.go @@ -1,8 +1,7 @@ package vkubelet import ( - "fmt" - + "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/manager" "github.com/virtual-kubelet/virtual-kubelet/providers/aws" "github.com/virtual-kubelet/virtual-kubelet/providers/azure" @@ -43,8 +42,6 @@ func lookupProvider(provider, providerConfig string, rm *manager.ResourceManager case "sfmesh": return sfmesh.NewSFMeshProvider(rm, nodeName, operatingSystem, internalIP, daemonEndpointPort) default: - fmt.Printf("Provider '%s' is not supported\n", provider) + return nil, errors.New("provider not supported") } - var p Provider - return p, nil } diff --git a/vkubelet/vkubelet.go b/vkubelet/vkubelet.go index fb1f2731b..e4b3cd159 100644 --- a/vkubelet/vkubelet.go +++ b/vkubelet/vkubelet.go @@ -1,8 +1,8 @@ package vkubelet import ( + "context" "fmt" - "log" "os" "os/signal" "strconv" @@ -10,6 +10,8 @@ import ( "syscall" "time" + pkgerrors "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/manager" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -69,7 +71,10 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon return nil, err } - rm := manager.NewResourceManager(clientset) + rm, err := manager.NewResourceManager(clientset) + if err != nil { + return nil, pkgerrors.Wrap(err, "error creating resource manager") + } daemonEndpointPortEnv := os.Getenv("KUBELET_PORT") if daemonEndpointPortEnv == "" { @@ -101,7 +106,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon case "PreferNoSchedule": vkTaintEffect = corev1.TaintEffectPreferNoSchedule default: - fmt.Printf("Taint effect '%s' is not supported\n", vkTaintEffectEnv) + return nil, pkgerrors.Errorf("taint effect %q is not supported", vkTaintEffectEnv) } taint := corev1.Taint{ @@ -125,17 +130,21 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon provider: p, } - if err = s.registerNode(); err != nil { + ctx := context.TODO() + ctx = log.WithLogger(ctx, log.G(ctx)) + + if err = s.registerNode(ctx); err != nil { return s, err } go ApiserverStart(p) tick := time.Tick(5 * time.Second) + go func() { for range tick { - s.updateNode() - s.updatePodStatuses() + s.updateNode(ctx) + s.updatePodStatuses(ctx) } }() @@ -143,7 +152,7 @@ func New(nodeName, operatingSystem, namespace, kubeConfig, provider, providerCon } // registerNode registers this virtual node with the Kubernetes API. -func (s *Server) registerNode() error { +func (s *Server) registerNode(ctx context.Context) error { taints := make([]corev1.Taint, 0) if !s.disableTaint { @@ -182,14 +191,14 @@ func (s *Server) registerNode() error { return err } - log.Printf("Node '%s' with OS type '%s' registered\n", node.Name, node.Status.NodeInfo.OperatingSystem) + log.G(ctx).Info("Registered node") return nil } // Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster. // Run will block until Stop is called or a SIGINT or SIGTERM signal is received. -func (s *Server) Run() error { +func (s *Server) Run(ctx context.Context) error { shouldStop := false sig := make(chan os.Signal, 1) @@ -207,15 +216,15 @@ func (s *Server) Run() error { pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts) if err != nil { - log.Fatal("Failed to list pods", err) + return pkgerrors.Wrap(err, "error getting pod list") } s.resourceManager.SetPods(pods) - s.reconcile() + s.reconcile(ctx) opts.ResourceVersion = pods.ResourceVersion s.podWatcher, err = s.k8sClient.CoreV1().Pods(s.namespace).Watch(opts) if err != nil { - log.Fatal("Failed to watch pods", err) + return pkgerrors.Wrap(err, "failed to watch pods") } loop: @@ -224,15 +233,15 @@ func (s *Server) Run() error { case ev, ok := <-s.podWatcher.ResultChan(): if !ok { if shouldStop { - log.Println("Pod watcher is stopped.") + log.G(ctx).Info("Pod watcher is stopped") return nil } - log.Println("Pod watcher connection is closed unexpectedly.") + log.G(ctx).Error("Pod watcher connection is closed unexpectedly") break loop } - log.Println("Pod watcher event is received:", ev.Type) + log.G(ctx).WithField("type", ev.Type).Debug("Pod watcher event is received") reconcile := false switch ev.Type { case watch.Added: @@ -244,7 +253,7 @@ func (s *Server) Run() error { } if reconcile { - s.reconcile() + s.reconcile(ctx) } } } @@ -262,17 +271,17 @@ func (s *Server) Stop() { } // updateNode updates the node status within Kubernetes with updated NodeConditions. -func (s *Server) updateNode() { +func (s *Server) updateNode(ctx context.Context) { opts := metav1.GetOptions{} n, err := s.k8sClient.CoreV1().Nodes().Get(s.nodeName, opts) if err != nil && !errors.IsNotFound(err) { - log.Println("Failed to retrieve node:", err) + log.G(ctx).WithError(err).Error("Failed to retrieve node") return } if errors.IsNotFound(err) { - if err = s.registerNode(); err != nil { - log.Println("Failed to register node:", err) + if err = s.registerNode(ctx); err != nil { + log.G(ctx).WithError(err).Error("Failed to register node") return } } @@ -288,27 +297,31 @@ func (s *Server) updateNode() { n, err = s.k8sClient.CoreV1().Nodes().UpdateStatus(n) if err != nil { - log.Println("Failed to update node:", err) + log.G(ctx).WithError(err).Error("Failed to update node") return } } // reconcile is the main reconciliation loop that compares differences between Kubernetes and // the active provider and reconciles the differences. -func (s *Server) reconcile() { - log.Println("Start reconcile.") +func (s *Server) reconcile(ctx context.Context) { + logger := log.G(ctx) + logger.Debug("Start reconcile") + defer logger.Debug("End reconcile") + providerPods, err := s.provider.GetPods() if err != nil { - log.Println(err) + logger.WithError(err).Error("Error getting pod list from provider") return } for _, pod := range providerPods { // Delete pods that don't exist in Kubernetes if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil { - log.Printf("Deleting pod '%s'\n", pod.Name) - if err := s.deletePod(pod); err != nil { - log.Printf("Error deleting pod '%s': %s\n", pod.Name, err) + logger := logger.WithField("pod", pod.Name) + logger.Debug("Deleting pod '%s'\n", pod.Name) + if err := s.deletePod(ctx, pod); err != nil { + logger.WithError(err).Error("Error deleting pod") continue } } @@ -317,6 +330,7 @@ func (s *Server) reconcile() { // Create any pods for k8s pods that don't exist in the provider pods := s.resourceManager.GetPods() for _, pod := range pods { + logger := logger.WithField("pod", pod.Name) var providerPod *corev1.Pod for _, p := range providerPods { if p.Namespace == pod.Namespace && p.Name == pod.Name { @@ -326,30 +340,33 @@ func (s *Server) reconcile() { } if pod.DeletionTimestamp == nil && pod.Status.Phase != corev1.PodFailed && providerPod == nil { - log.Printf("Creating pod '%s'\n", pod.Name) - if err := s.createPod(pod); err != nil { - log.Printf("Error creating pod '%s': %s\n", pod.Name, err) + logger.Debug("Creating pod") + if err := s.createPod(ctx, pod); err != nil { + logger.WithError(err).Error("Error creating pod") continue } } // Delete pod if DeletionTimestamp is set if pod.DeletionTimestamp != nil { - log.Printf("Pod '%s' is pending deletion.\n", pod.Name) + log.Trace(logger, "Pod pending deletion") var err error - if err = s.deletePod(pod); err != nil { - log.Printf("Error deleting pod '%s': %s\n", pod.Name, err) + if err = s.deletePod(ctx, pod); err != nil { + logger.WithError(err).Error("Error deleting pod") continue } + log.Trace(logger, "Pod deletion complete") } } } -func (s *Server) createPod(pod *corev1.Pod) error { +func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error { if err := s.populateSecretsAndConfigMapsInEnv(pod); err != nil { return err } + logger := log.G(ctx).WithField("pod", pod.Name) + if origErr := s.provider.CreatePod(pod); origErr != nil { pod.ResourceVersion = "" // Blank out resource version to prevent object has been modified error pod.Status.Phase = corev1.PodFailed @@ -358,29 +375,29 @@ func (s *Server) createPod(pod *corev1.Pod) error { _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) if err != nil { - log.Println("Failed to update pod status:", err) - return origErr + logger.WithError(err).Warn("Failed to update pod status") } return origErr } - log.Printf("Pod '%s' created.\n", pod.Name) + logger.Info("Pod created") return nil } -func (s *Server) deletePod(pod *corev1.Pod) error { +func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error { var delErr error if delErr = s.provider.DeletePod(pod); delErr != nil && errors.IsNotFound(delErr) { return delErr } + logger := log.G(ctx).WithField("pod", pod.Name) if !errors.IsNotFound(delErr) { var grace int64 if err := s.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil && errors.IsNotFound(err) { if errors.IsNotFound(err) { - log.Printf("Pod '%s' doesn't exist.\n", pod.Name) + logger.Error("Pod doesn't exist") return nil } @@ -388,15 +405,14 @@ func (s *Server) deletePod(pod *corev1.Pod) error { } s.resourceManager.DeletePod(pod) - - log.Printf("Pod '%s' deleted.\n", pod.Name) + logger.Info("Pod deleted") } return nil } // updatePodStatuses syncs the providers pod status with the kubernetes pod status. -func (s *Server) updatePodStatuses() { +func (s *Server) updatePodStatuses(ctx context.Context) { // Update all the pods with the provider status. pods := s.resourceManager.GetPods() for _, pod := range pods { @@ -406,7 +422,7 @@ func (s *Server) updatePodStatuses() { status, err := s.provider.GetPodStatus(pod.Namespace, pod.Name) if err != nil { - log.Printf("Error retrieving pod '%s' in namespace '%s' status from provider: %s\n", pod.Name, pod.Namespace, err) + log.G(ctx).WithField("pod", pod.Name).Error("Error retrieving pod status") return }