Initial commit
This commit is contained in:
376
vendor/github.com/hyperhq/hypercli/daemon/logger/awslogs/cloudwatchlogs.go
generated
vendored
Normal file
376
vendor/github.com/hyperhq/hypercli/daemon/logger/awslogs/cloudwatchlogs.go
generated
vendored
Normal file
@@ -0,0 +1,376 @@
|
||||
// Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
|
||||
package awslogs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/defaults"
|
||||
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
||||
"github.com/hyperhq/hypercli/daemon/logger"
|
||||
"github.com/hyperhq/hypercli/dockerversion"
|
||||
)
|
||||
|
||||
const (
|
||||
name = "awslogs"
|
||||
regionKey = "awslogs-region"
|
||||
regionEnvKey = "AWS_REGION"
|
||||
logGroupKey = "awslogs-group"
|
||||
logStreamKey = "awslogs-stream"
|
||||
batchPublishFrequency = 5 * time.Second
|
||||
|
||||
// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
|
||||
perEventBytes = 26
|
||||
maximumBytesPerPut = 1048576
|
||||
maximumLogEventsPerPut = 10000
|
||||
|
||||
// See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
|
||||
maximumBytesPerEvent = 262144 - perEventBytes
|
||||
|
||||
resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
|
||||
dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
|
||||
invalidSequenceTokenCode = "InvalidSequenceTokenException"
|
||||
|
||||
userAgentHeader = "User-Agent"
|
||||
)
|
||||
|
||||
type logStream struct {
|
||||
logStreamName string
|
||||
logGroupName string
|
||||
client api
|
||||
messages chan *logger.Message
|
||||
lock sync.RWMutex
|
||||
closed bool
|
||||
sequenceToken *string
|
||||
}
|
||||
|
||||
type api interface {
|
||||
CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
|
||||
PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
|
||||
}
|
||||
|
||||
type regionFinder interface {
|
||||
Region() (string, error)
|
||||
}
|
||||
|
||||
type byTimestamp []*cloudwatchlogs.InputLogEvent
|
||||
|
||||
// init registers the awslogs driver and sets the default region, if provided
|
||||
func init() {
|
||||
if os.Getenv(regionEnvKey) != "" {
|
||||
defaults.DefaultConfig.Region = aws.String(os.Getenv(regionEnvKey))
|
||||
}
|
||||
if err := logger.RegisterLogDriver(name, New); err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// New creates an awslogs logger using the configuration passed in on the
|
||||
// context. Supported context configuration variables are awslogs-region,
|
||||
// awslogs-group, and awslogs-stream. When available, configuration is
|
||||
// also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
|
||||
// AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
|
||||
// the EC2 Instance Metadata Service.
|
||||
func New(ctx logger.Context) (logger.Logger, error) {
|
||||
logGroupName := ctx.Config[logGroupKey]
|
||||
logStreamName := ctx.ContainerID
|
||||
if ctx.Config[logStreamKey] != "" {
|
||||
logStreamName = ctx.Config[logStreamKey]
|
||||
}
|
||||
client, err := newAWSLogsClient(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
containerStream := &logStream{
|
||||
logStreamName: logStreamName,
|
||||
logGroupName: logGroupName,
|
||||
client: client,
|
||||
messages: make(chan *logger.Message, 4096),
|
||||
}
|
||||
err = containerStream.create()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go containerStream.collectBatch()
|
||||
|
||||
return containerStream, nil
|
||||
}
|
||||
|
||||
// newRegionFinder is a variable such that the implementation
|
||||
// can be swapped out for unit tests.
|
||||
var newRegionFinder = func() regionFinder {
|
||||
return ec2metadata.New(nil)
|
||||
}
|
||||
|
||||
// newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
|
||||
// Customizations to the default client from the SDK include a Docker-specific
|
||||
// User-Agent string and automatic region detection using the EC2 Instance
|
||||
// Metadata Service when region is otherwise unspecified.
|
||||
func newAWSLogsClient(ctx logger.Context) (api, error) {
|
||||
config := defaults.DefaultConfig
|
||||
if ctx.Config[regionKey] != "" {
|
||||
config = defaults.DefaultConfig.Merge(&aws.Config{
|
||||
Region: aws.String(ctx.Config[regionKey]),
|
||||
})
|
||||
}
|
||||
if config.Region == nil || *config.Region == "" {
|
||||
logrus.Info("Trying to get region from EC2 Metadata")
|
||||
ec2MetadataClient := newRegionFinder()
|
||||
region, err := ec2MetadataClient.Region()
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"error": err,
|
||||
}).Error("Could not get region from EC2 metadata, environment, or log option")
|
||||
return nil, errors.New("Cannot determine region for awslogs driver")
|
||||
}
|
||||
config.Region = ®ion
|
||||
}
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"region": *config.Region,
|
||||
}).Debug("Created awslogs client")
|
||||
client := cloudwatchlogs.New(config)
|
||||
|
||||
client.Handlers.Build.PushBackNamed(request.NamedHandler{
|
||||
Name: "DockerUserAgentHandler",
|
||||
Fn: func(r *request.Request) {
|
||||
currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
|
||||
r.HTTPRequest.Header.Set(userAgentHeader,
|
||||
fmt.Sprintf("Docker %s (%s) %s",
|
||||
dockerversion.Version, runtime.GOOS, currentAgent))
|
||||
},
|
||||
})
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// Name returns the name of the awslogs logging driver
|
||||
func (l *logStream) Name() string {
|
||||
return name
|
||||
}
|
||||
|
||||
// Log submits messages for logging by an instance of the awslogs logging driver
|
||||
func (l *logStream) Log(msg *logger.Message) error {
|
||||
l.lock.RLock()
|
||||
defer l.lock.RUnlock()
|
||||
if !l.closed {
|
||||
l.messages <- msg
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the instance of the awslogs logging driver
|
||||
func (l *logStream) Close() error {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
if !l.closed {
|
||||
close(l.messages)
|
||||
}
|
||||
l.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// create creates a log stream for the instance of the awslogs logging driver
|
||||
func (l *logStream) create() error {
|
||||
input := &cloudwatchlogs.CreateLogStreamInput{
|
||||
LogGroupName: aws.String(l.logGroupName),
|
||||
LogStreamName: aws.String(l.logStreamName),
|
||||
}
|
||||
|
||||
_, err := l.client.CreateLogStream(input)
|
||||
|
||||
if err != nil {
|
||||
if awsErr, ok := err.(awserr.Error); ok {
|
||||
fields := logrus.Fields{
|
||||
"errorCode": awsErr.Code(),
|
||||
"message": awsErr.Message(),
|
||||
"origError": awsErr.OrigErr(),
|
||||
"logGroupName": l.logGroupName,
|
||||
"logStreamName": l.logStreamName,
|
||||
}
|
||||
if awsErr.Code() == resourceAlreadyExistsCode {
|
||||
// Allow creation to succeed
|
||||
logrus.WithFields(fields).Info("Log stream already exists")
|
||||
return nil
|
||||
}
|
||||
logrus.WithFields(fields).Error("Failed to create log stream")
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// newTicker is used for time-based batching. newTicker is a variable such
|
||||
// that the implementation can be swapped out for unit tests.
|
||||
var newTicker = func(freq time.Duration) *time.Ticker {
|
||||
return time.NewTicker(freq)
|
||||
}
|
||||
|
||||
// collectBatch executes as a goroutine to perform batching of log events for
|
||||
// submission to the log stream. Batching is performed on time- and size-
|
||||
// bases. Time-based batching occurs at a 5 second interval (defined in the
|
||||
// batchPublishFrequency const). Size-based batching is performed on the
|
||||
// maximum number of events per batch (defined in maximumLogEventsPerPut) and
|
||||
// the maximum number of total bytes in a batch (defined in
|
||||
// maximumBytesPerPut). Log messages are split by the maximum bytes per event
|
||||
// (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead
|
||||
// (defined in perEventBytes) which is accounted for in split- and batch-
|
||||
// calculations.
|
||||
func (l *logStream) collectBatch() {
|
||||
timer := newTicker(batchPublishFrequency)
|
||||
var events []*cloudwatchlogs.InputLogEvent
|
||||
bytes := 0
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
l.publishBatch(events)
|
||||
events = events[:0]
|
||||
bytes = 0
|
||||
case msg, more := <-l.messages:
|
||||
if !more {
|
||||
l.publishBatch(events)
|
||||
return
|
||||
}
|
||||
unprocessedLine := msg.Line
|
||||
for len(unprocessedLine) > 0 {
|
||||
// Split line length so it does not exceed the maximum
|
||||
lineBytes := len(unprocessedLine)
|
||||
if lineBytes > maximumBytesPerEvent {
|
||||
lineBytes = maximumBytesPerEvent
|
||||
}
|
||||
line := unprocessedLine[:lineBytes]
|
||||
unprocessedLine = unprocessedLine[lineBytes:]
|
||||
if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
|
||||
// Publish an existing batch if it's already over the maximum number of events or if adding this
|
||||
// event would push it over the maximum number of total bytes.
|
||||
l.publishBatch(events)
|
||||
events = events[:0]
|
||||
bytes = 0
|
||||
}
|
||||
events = append(events, &cloudwatchlogs.InputLogEvent{
|
||||
Message: aws.String(string(line)),
|
||||
Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
|
||||
})
|
||||
bytes += (lineBytes + perEventBytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// publishBatch calls PutLogEvents for a given set of InputLogEvents,
|
||||
// accounting for sequencing requirements (each request must reference the
|
||||
// sequence token returned by the previous request).
|
||||
func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
|
||||
if len(events) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
sort.Sort(byTimestamp(events))
|
||||
|
||||
nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken)
|
||||
|
||||
if err != nil {
|
||||
if awsErr, ok := err.(awserr.Error); ok {
|
||||
if awsErr.Code() == dataAlreadyAcceptedCode {
|
||||
// already submitted, just grab the correct sequence token
|
||||
parts := strings.Split(awsErr.Message(), " ")
|
||||
nextSequenceToken = &parts[len(parts)-1]
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"errorCode": awsErr.Code(),
|
||||
"message": awsErr.Message(),
|
||||
"logGroupName": l.logGroupName,
|
||||
"logStreamName": l.logStreamName,
|
||||
}).Info("Data already accepted, ignoring error")
|
||||
err = nil
|
||||
} else if awsErr.Code() == invalidSequenceTokenCode {
|
||||
// sequence code is bad, grab the correct one and retry
|
||||
parts := strings.Split(awsErr.Message(), " ")
|
||||
token := parts[len(parts)-1]
|
||||
nextSequenceToken, err = l.putLogEvents(events, &token)
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
} else {
|
||||
l.sequenceToken = nextSequenceToken
|
||||
}
|
||||
}
|
||||
|
||||
// putLogEvents wraps the PutLogEvents API
|
||||
func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
|
||||
input := &cloudwatchlogs.PutLogEventsInput{
|
||||
LogEvents: events,
|
||||
SequenceToken: sequenceToken,
|
||||
LogGroupName: aws.String(l.logGroupName),
|
||||
LogStreamName: aws.String(l.logStreamName),
|
||||
}
|
||||
resp, err := l.client.PutLogEvents(input)
|
||||
if err != nil {
|
||||
if awsErr, ok := err.(awserr.Error); ok {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"errorCode": awsErr.Code(),
|
||||
"message": awsErr.Message(),
|
||||
"origError": awsErr.OrigErr(),
|
||||
"logGroupName": l.logGroupName,
|
||||
"logStreamName": l.logStreamName,
|
||||
}).Error("Failed to put log events")
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return resp.NextSequenceToken, nil
|
||||
}
|
||||
|
||||
// ValidateLogOpt looks for awslogs-specific log options awslogs-region,
|
||||
// awslogs-group, and awslogs-stream
|
||||
func ValidateLogOpt(cfg map[string]string) error {
|
||||
for key := range cfg {
|
||||
switch key {
|
||||
case logGroupKey:
|
||||
case logStreamKey:
|
||||
case regionKey:
|
||||
default:
|
||||
return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
|
||||
}
|
||||
}
|
||||
if cfg[logGroupKey] == "" {
|
||||
return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Len returns the length of a byTimestamp slice. Len is required by the
|
||||
// sort.Interface interface.
|
||||
func (slice byTimestamp) Len() int {
|
||||
return len(slice)
|
||||
}
|
||||
|
||||
// Less compares two values in a byTimestamp slice by Timestamp. Less is
|
||||
// required by the sort.Interface interface.
|
||||
func (slice byTimestamp) Less(i, j int) bool {
|
||||
iTimestamp, jTimestamp := int64(0), int64(0)
|
||||
if slice != nil && slice[i].Timestamp != nil {
|
||||
iTimestamp = *slice[i].Timestamp
|
||||
}
|
||||
if slice != nil && slice[j].Timestamp != nil {
|
||||
jTimestamp = *slice[j].Timestamp
|
||||
}
|
||||
return iTimestamp < jTimestamp
|
||||
}
|
||||
|
||||
// Swap swaps two values in a byTimestamp slice with each other. Swap is
|
||||
// required by the sort.Interface interface.
|
||||
func (slice byTimestamp) Swap(i, j int) {
|
||||
slice[i], slice[j] = slice[j], slice[i]
|
||||
}
|
||||
627
vendor/github.com/hyperhq/hypercli/daemon/logger/awslogs/cloudwatchlogs_test.go
generated
vendored
Normal file
627
vendor/github.com/hyperhq/hypercli/daemon/logger/awslogs/cloudwatchlogs_test.go
generated
vendored
Normal file
@@ -0,0 +1,627 @@
|
||||
package awslogs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
||||
"github.com/hyperhq/hypercli/daemon/logger"
|
||||
"github.com/hyperhq/hypercli/dockerversion"
|
||||
)
|
||||
|
||||
const (
|
||||
groupName = "groupName"
|
||||
streamName = "streamName"
|
||||
sequenceToken = "sequenceToken"
|
||||
nextSequenceToken = "nextSequenceToken"
|
||||
logline = "this is a log line"
|
||||
)
|
||||
|
||||
func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
|
||||
ctx := logger.Context{
|
||||
Config: map[string]string{
|
||||
regionKey: "us-east-1",
|
||||
},
|
||||
}
|
||||
|
||||
client, err := newAWSLogsClient(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs)
|
||||
if !ok {
|
||||
t.Fatal("Could not cast client to cloudwatchlogs.CloudWatchLogs")
|
||||
}
|
||||
buildHandlerList := realClient.Handlers.Build
|
||||
request := &request.Request{
|
||||
HTTPRequest: &http.Request{
|
||||
Header: http.Header{},
|
||||
},
|
||||
}
|
||||
buildHandlerList.Run(request)
|
||||
expectedUserAgentString := fmt.Sprintf("Docker %s (%s) %s/%s",
|
||||
dockerversion.Version, runtime.GOOS, aws.SDKName, aws.SDKVersion)
|
||||
userAgent := request.HTTPRequest.Header.Get("User-Agent")
|
||||
if userAgent != expectedUserAgentString {
|
||||
t.Errorf("Wrong User-Agent string, expected \"%s\" but was \"%s\"",
|
||||
expectedUserAgentString, userAgent)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewAWSLogsClientRegionDetect(t *testing.T) {
|
||||
ctx := logger.Context{
|
||||
Config: map[string]string{},
|
||||
}
|
||||
|
||||
mockMetadata := newMockMetadataClient()
|
||||
newRegionFinder = func() regionFinder {
|
||||
return mockMetadata
|
||||
}
|
||||
mockMetadata.regionResult <- ®ionResult{
|
||||
successResult: "us-east-1",
|
||||
}
|
||||
|
||||
_, err := newAWSLogsClient(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateSuccess(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
}
|
||||
mockClient.createLogStreamResult <- &createLogStreamResult{}
|
||||
|
||||
err := stream.create()
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Received unexpected err: %v\n", err)
|
||||
}
|
||||
argument := <-mockClient.createLogStreamArgument
|
||||
if argument.LogGroupName == nil {
|
||||
t.Fatal("Expected non-nil LogGroupName")
|
||||
}
|
||||
if *argument.LogGroupName != groupName {
|
||||
t.Errorf("Expected LogGroupName to be %s", groupName)
|
||||
}
|
||||
if argument.LogStreamName == nil {
|
||||
t.Fatal("Expected non-nil LogGroupName")
|
||||
}
|
||||
if *argument.LogStreamName != streamName {
|
||||
t.Errorf("Expected LogStreamName to be %s", streamName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateError(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
}
|
||||
mockClient.createLogStreamResult <- &createLogStreamResult{
|
||||
errorResult: errors.New("Error!"),
|
||||
}
|
||||
|
||||
err := stream.create()
|
||||
|
||||
if err == nil {
|
||||
t.Fatal("Expected non-nil err")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateAlreadyExists(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
}
|
||||
mockClient.createLogStreamResult <- &createLogStreamResult{
|
||||
errorResult: awserr.New(resourceAlreadyExistsCode, "", nil),
|
||||
}
|
||||
|
||||
err := stream.create()
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Expected nil err")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublishBatchSuccess(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
|
||||
events := []*cloudwatchlogs.InputLogEvent{
|
||||
{
|
||||
Message: aws.String(logline),
|
||||
},
|
||||
}
|
||||
|
||||
stream.publishBatch(events)
|
||||
if stream.sequenceToken == nil {
|
||||
t.Fatal("Expected non-nil sequenceToken")
|
||||
}
|
||||
if *stream.sequenceToken != nextSequenceToken {
|
||||
t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
|
||||
}
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if argument.SequenceToken == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
|
||||
}
|
||||
if *argument.SequenceToken != sequenceToken {
|
||||
t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
|
||||
}
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if argument.LogEvents[0] != events[0] {
|
||||
t.Error("Expected event to equal input")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublishBatchError(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
errorResult: errors.New("Error!"),
|
||||
}
|
||||
|
||||
events := []*cloudwatchlogs.InputLogEvent{
|
||||
{
|
||||
Message: aws.String(logline),
|
||||
},
|
||||
}
|
||||
|
||||
stream.publishBatch(events)
|
||||
if stream.sequenceToken == nil {
|
||||
t.Fatal("Expected non-nil sequenceToken")
|
||||
}
|
||||
if *stream.sequenceToken != sequenceToken {
|
||||
t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
|
||||
mockClient := newMockClientBuffered(2)
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
|
||||
events := []*cloudwatchlogs.InputLogEvent{
|
||||
{
|
||||
Message: aws.String(logline),
|
||||
},
|
||||
}
|
||||
|
||||
stream.publishBatch(events)
|
||||
if stream.sequenceToken == nil {
|
||||
t.Fatal("Expected non-nil sequenceToken")
|
||||
}
|
||||
if *stream.sequenceToken != nextSequenceToken {
|
||||
t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
|
||||
}
|
||||
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if argument.SequenceToken == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
|
||||
}
|
||||
if *argument.SequenceToken != sequenceToken {
|
||||
t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
|
||||
}
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if argument.LogEvents[0] != events[0] {
|
||||
t.Error("Expected event to equal input")
|
||||
}
|
||||
|
||||
argument = <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if argument.SequenceToken == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
|
||||
}
|
||||
if *argument.SequenceToken != "token" {
|
||||
t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken)
|
||||
}
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if argument.LogEvents[0] != events[0] {
|
||||
t.Error("Expected event to equal input")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublishBatchAlreadyAccepted(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
|
||||
}
|
||||
|
||||
events := []*cloudwatchlogs.InputLogEvent{
|
||||
{
|
||||
Message: aws.String(logline),
|
||||
},
|
||||
}
|
||||
|
||||
stream.publishBatch(events)
|
||||
if stream.sequenceToken == nil {
|
||||
t.Fatal("Expected non-nil sequenceToken")
|
||||
}
|
||||
if *stream.sequenceToken != "token" {
|
||||
t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)
|
||||
}
|
||||
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if argument.SequenceToken == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
|
||||
}
|
||||
if *argument.SequenceToken != sequenceToken {
|
||||
t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
|
||||
}
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if argument.LogEvents[0] != events[0] {
|
||||
t.Error("Expected event to equal input")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectBatchSimple(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
messages: make(chan *logger.Message),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
ticks := make(chan time.Time)
|
||||
newTicker = func(_ time.Duration) *time.Ticker {
|
||||
return &time.Ticker{
|
||||
C: ticks,
|
||||
}
|
||||
}
|
||||
|
||||
go stream.collectBatch()
|
||||
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(logline),
|
||||
Timestamp: time.Time{},
|
||||
})
|
||||
|
||||
ticks <- time.Time{}
|
||||
stream.Close()
|
||||
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if *argument.LogEvents[0].Message != logline {
|
||||
t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectBatchTicker(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
messages: make(chan *logger.Message),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
ticks := make(chan time.Time)
|
||||
newTicker = func(_ time.Duration) *time.Ticker {
|
||||
return &time.Ticker{
|
||||
C: ticks,
|
||||
}
|
||||
}
|
||||
|
||||
go stream.collectBatch()
|
||||
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(logline + " 1"),
|
||||
Timestamp: time.Time{},
|
||||
})
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(logline + " 2"),
|
||||
Timestamp: time.Time{},
|
||||
})
|
||||
|
||||
ticks <- time.Time{}
|
||||
|
||||
// Verify first batch
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if len(argument.LogEvents) != 2 {
|
||||
t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if *argument.LogEvents[0].Message != logline+" 1" {
|
||||
t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message)
|
||||
}
|
||||
if *argument.LogEvents[1].Message != logline+" 2" {
|
||||
t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message)
|
||||
}
|
||||
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(logline + " 3"),
|
||||
Timestamp: time.Time{},
|
||||
})
|
||||
|
||||
ticks <- time.Time{}
|
||||
argument = <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if *argument.LogEvents[0].Message != logline+" 3" {
|
||||
t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message)
|
||||
}
|
||||
|
||||
stream.Close()
|
||||
|
||||
}
|
||||
|
||||
func TestCollectBatchClose(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
messages: make(chan *logger.Message),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
var ticks = make(chan time.Time)
|
||||
newTicker = func(_ time.Duration) *time.Ticker {
|
||||
return &time.Ticker{
|
||||
C: ticks,
|
||||
}
|
||||
}
|
||||
|
||||
go stream.collectBatch()
|
||||
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(logline),
|
||||
Timestamp: time.Time{},
|
||||
})
|
||||
|
||||
// no ticks
|
||||
stream.Close()
|
||||
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if *argument.LogEvents[0].Message != logline {
|
||||
t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectBatchLineSplit(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
messages: make(chan *logger.Message),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
var ticks = make(chan time.Time)
|
||||
newTicker = func(_ time.Duration) *time.Ticker {
|
||||
return &time.Ticker{
|
||||
C: ticks,
|
||||
}
|
||||
}
|
||||
|
||||
go stream.collectBatch()
|
||||
|
||||
longline := strings.Repeat("A", maximumBytesPerEvent)
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(longline + "B"),
|
||||
Timestamp: time.Time{},
|
||||
})
|
||||
|
||||
// no ticks
|
||||
stream.Close()
|
||||
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if len(argument.LogEvents) != 2 {
|
||||
t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if *argument.LogEvents[0].Message != longline {
|
||||
t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
|
||||
}
|
||||
if *argument.LogEvents[1].Message != "B" {
|
||||
t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectBatchMaxEvents(t *testing.T) {
|
||||
mockClient := newMockClientBuffered(1)
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
messages: make(chan *logger.Message),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
var ticks = make(chan time.Time)
|
||||
newTicker = func(_ time.Duration) *time.Ticker {
|
||||
return &time.Ticker{
|
||||
C: ticks,
|
||||
}
|
||||
}
|
||||
|
||||
go stream.collectBatch()
|
||||
|
||||
line := "A"
|
||||
for i := 0; i <= maximumLogEventsPerPut; i++ {
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(line),
|
||||
Timestamp: time.Time{},
|
||||
})
|
||||
}
|
||||
|
||||
// no ticks
|
||||
stream.Close()
|
||||
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if len(argument.LogEvents) != maximumLogEventsPerPut {
|
||||
t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))
|
||||
}
|
||||
|
||||
argument = <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectBatchMaxTotalBytes(t *testing.T) {
|
||||
mockClient := newMockClientBuffered(1)
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
messages: make(chan *logger.Message),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
var ticks = make(chan time.Time)
|
||||
newTicker = func(_ time.Duration) *time.Ticker {
|
||||
return &time.Ticker{
|
||||
C: ticks,
|
||||
}
|
||||
}
|
||||
|
||||
go stream.collectBatch()
|
||||
|
||||
longline := strings.Repeat("A", maximumBytesPerPut)
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(longline + "B"),
|
||||
Timestamp: time.Time{},
|
||||
})
|
||||
|
||||
// no ticks
|
||||
stream.Close()
|
||||
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
bytes := 0
|
||||
for _, event := range argument.LogEvents {
|
||||
bytes += len(*event.Message)
|
||||
}
|
||||
if bytes > maximumBytesPerPut {
|
||||
t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
|
||||
}
|
||||
|
||||
argument = <-mockClient.putLogEventsArgument
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
message := *argument.LogEvents[0].Message
|
||||
if message[len(message)-1:] != "B" {
|
||||
t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
|
||||
}
|
||||
}
|
||||
76
vendor/github.com/hyperhq/hypercli/daemon/logger/awslogs/cwlogsiface_mock_test.go
generated
vendored
Normal file
76
vendor/github.com/hyperhq/hypercli/daemon/logger/awslogs/cwlogsiface_mock_test.go
generated
vendored
Normal file
@@ -0,0 +1,76 @@
|
||||
package awslogs
|
||||
|
||||
import "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
||||
|
||||
type mockcwlogsclient struct {
|
||||
createLogStreamArgument chan *cloudwatchlogs.CreateLogStreamInput
|
||||
createLogStreamResult chan *createLogStreamResult
|
||||
putLogEventsArgument chan *cloudwatchlogs.PutLogEventsInput
|
||||
putLogEventsResult chan *putLogEventsResult
|
||||
}
|
||||
|
||||
type createLogStreamResult struct {
|
||||
successResult *cloudwatchlogs.CreateLogStreamOutput
|
||||
errorResult error
|
||||
}
|
||||
|
||||
type putLogEventsResult struct {
|
||||
successResult *cloudwatchlogs.PutLogEventsOutput
|
||||
errorResult error
|
||||
}
|
||||
|
||||
func newMockClient() *mockcwlogsclient {
|
||||
return &mockcwlogsclient{
|
||||
createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, 1),
|
||||
createLogStreamResult: make(chan *createLogStreamResult, 1),
|
||||
putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, 1),
|
||||
putLogEventsResult: make(chan *putLogEventsResult, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func newMockClientBuffered(buflen int) *mockcwlogsclient {
|
||||
return &mockcwlogsclient{
|
||||
createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, buflen),
|
||||
createLogStreamResult: make(chan *createLogStreamResult, buflen),
|
||||
putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, buflen),
|
||||
putLogEventsResult: make(chan *putLogEventsResult, buflen),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockcwlogsclient) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
|
||||
m.createLogStreamArgument <- input
|
||||
output := <-m.createLogStreamResult
|
||||
return output.successResult, output.errorResult
|
||||
}
|
||||
|
||||
func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
|
||||
m.putLogEventsArgument <- input
|
||||
output := <-m.putLogEventsResult
|
||||
return output.successResult, output.errorResult
|
||||
}
|
||||
|
||||
type mockmetadataclient struct {
|
||||
regionResult chan *regionResult
|
||||
}
|
||||
|
||||
type regionResult struct {
|
||||
successResult string
|
||||
errorResult error
|
||||
}
|
||||
|
||||
func newMockMetadataClient() *mockmetadataclient {
|
||||
return &mockmetadataclient{
|
||||
regionResult: make(chan *regionResult, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockmetadataclient) Region() (string, error) {
|
||||
output := <-m.regionResult
|
||||
return output.successResult, output.errorResult
|
||||
}
|
||||
|
||||
func test() {
|
||||
_ = &logStream{
|
||||
client: newMockClient(),
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user