From ff446c10cfec202ef02a4d832841ad64354b0993 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Sun, 10 Dec 2017 14:44:13 +0000 Subject: [PATCH 1/2] Added Support to upload to AWS S3 --- cmd/logshare-cli/main.go | 108 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 3 deletions(-) diff --git a/cmd/logshare-cli/main.go b/cmd/logshare-cli/main.go index ec91d1d..cd54bf8 100644 --- a/cmd/logshare-cli/main.go +++ b/cmd/logshare-cli/main.go @@ -14,6 +14,14 @@ import ( "github.com/pkg/errors" "github.com/urfave/cli" "golang.org/x/net/context" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + + "compress/gzip" + + "sync" ) // Rev is set on build time and should contain the git commit logshare-cli @@ -31,11 +39,12 @@ func main() { app.Flags = flags app.Version = Rev - conf := &config{} - app.Action = run(conf) + runCtx := RunContext{} + app.Action = run(&runCtx) if err := app.Run(os.Args); err != nil { log.Println(err) } + runCtx.wg.Wait() } func setupGoogleStr(projectId string, bucketName string, filename string) (*gcs.Writer, error) { @@ -59,7 +68,48 @@ func setupGoogleStr(projectId string, bucketName string, filename string) (*gcs. return obj.NewWriter(gCtx), error } -func run(conf *config) func(c *cli.Context) error { +func setupAWSStr(runCtx *RunContext, filename string) (*io.PipeWriter, error) { + conf := &runCtx.conf + + reader, writer := io.Pipe() + + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(conf.awsS3Region)}, + ) + + if conf.awsS3Compress { + filename += ".gz" + } + + if err != nil { + log.Fatalln("Failed to create session", err) + } + + key := filename + if conf.awsS3BucketPath != "" { + key = conf.awsS3BucketPath + "/" + filename + } + + runCtx.wg.Add(1) + go func() { + uploader := s3manager.NewUploader(sess) + _, err := uploader.Upload(&s3manager.UploadInput{ + Body: reader, + Bucket: aws.String(conf.awsS3Bucket), + Key: aws.String(key), + }) + + if err != nil { + log.Fatalln("Failed to upload", err) + } + runCtx.wg.Done() + }() + + return writer, err +} + +func run(runCtx *RunContext) func(c *cli.Context) error { + conf := &runCtx.conf return func(c *cli.Context) error { if err := parseFlags(conf, c); err != nil { cli.ShowAppHelp(c) @@ -83,13 +133,34 @@ func run(conf *config) func(c *cli.Context) error { fileName := "cloudflare_els_" + conf.zoneID + "_" + strconv.Itoa(int(time.Now().Unix())) + ".json" gcsWriter, err := setupGoogleStr(conf.googleProjectId, conf.googleStorageBucket, fileName) + if err != nil { return err } + defer gcsWriter.Close() outputWriter = gcsWriter } + if conf.awsS3Bucket != "" { + fileName := "cloudflare_els_" + conf.zoneID + "_" + strconv.Itoa(int(time.Now().Unix())) + ".json" + + writer, err := setupAWSStr(runCtx, fileName) + + if err != nil { + return err + } + + defer writer.Close() + + if conf.awsS3Compress { + outputWriter = gzip.NewWriter(writer) + } else { + outputWriter = writer + } + + } + client, err := logshare.New( conf.apiKey, conf.apiEmail, @@ -149,10 +220,19 @@ func parseFlags(conf *config, c *cli.Context) error { conf.listFields = c.Bool("list-fields") conf.googleStorageBucket = c.String("google-storage-bucket") conf.googleProjectId = c.String("google-project-id") + conf.awsS3Bucket = c.String("aws-s3-bucket") + conf.awsS3BucketPath = c.String("aws-s3-path") + conf.awsS3Region = c.String("aws-s3-region") + conf.awsS3Compress = c.Bool("aws-s3-compress") return conf.Validate() } +type RunContext struct { + conf config + wg sync.WaitGroup +} + type config struct { apiKey string apiEmail string @@ -168,6 +248,10 @@ type config struct { listFields bool googleStorageBucket string googleProjectId string + awsS3Bucket string + awsS3BucketPath string + awsS3Region string + awsS3Compress bool } func (conf *config) Validate() error { @@ -255,4 +339,22 @@ var flags = []cli.Flag{ Name: "google-project-id", Usage: "Project ID of the Google Cloud Storage Bucket to upload logs to", }, + cli.StringFlag{ + Name: "aws-s3-bucket", + Usage: "Name of the AWS S3 Bucket to upload logs to", + }, + cli.StringFlag{ + Name: "aws-s3-path", + Value: "", + Usage: "AWS S3 Path inside bucket to upload logs to", + }, + cli.StringFlag{ + Name: "aws-s3-region", + Value: "us-west-2", + Usage: "AWS Region of the AWS S3 Bucket to upload logs to (default is us-west-2)", + }, + cli.BoolFlag{ + Name: "aws-s3-compress", + Usage: "Upload compressed logs to AWS S3. Default false ", + }, } From 373d2ab53b6dccbb10ab85185738d0ddfdca4b01 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Mon, 18 Dec 2017 09:24:27 +0000 Subject: [PATCH 2/2] Fix - ensure we close the writers in the correct order When using aws-s3-compress we need to ensure we first close the GzWriter before the PipeWriter to avoid having "corrupted" JSON in the logs, this was observed when trying to use AWS Athena to query the Logs --- cmd/logshare-cli/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/logshare-cli/main.go b/cmd/logshare-cli/main.go index cd54bf8..9a4beb1 100644 --- a/cmd/logshare-cli/main.go +++ b/cmd/logshare-cli/main.go @@ -152,13 +152,13 @@ func run(runCtx *RunContext) func(c *cli.Context) error { } defer writer.Close() - if conf.awsS3Compress { - outputWriter = gzip.NewWriter(writer) + gzWriter := gzip.NewWriter(writer) + defer gzWriter.Close() + outputWriter = gzWriter } else { outputWriter = writer } - } client, err := logshare.New(