diff --git a/.github/workflows/compile.yml b/.github/workflows/compile.yml index 6a4d1f9..f63f552 100644 --- a/.github/workflows/compile.yml +++ b/.github/workflows/compile.yml @@ -8,8 +8,8 @@ on: push: paths: [ '**.go' ] - branches-ignore: - - master + branches: + - '**' jobs: build: diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml index b6a6ff5..1af92ce 100644 --- a/.github/workflows/quality.yml +++ b/.github/workflows/quality.yml @@ -8,8 +8,8 @@ on: push: paths: [ '**.go' ] - branches-ignore: - - master + branches: + - '**' jobs: vulns: diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index fd8bf54..f441969 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,8 +8,8 @@ on: push: paths: [ '**.go' ] - branches-ignore: - - master + branches: + - '**' jobs: test: @@ -36,4 +36,10 @@ jobs: run: go test -v -count=1 -race -shuffle=on ./internal/distlog - name: Test ./internal/conf - run: go test -v -count=1 -race -shuffle=on ./internal/conf \ No newline at end of file + run: go test -v -count=1 -race -shuffle=on ./internal/conf + +# objectify is causing data races, that needs to be fixed +# run: go test -v -count=1 -race -shuffle=on ./internal/provider + - name: Test ./internal/provider + run: go test -v -count=1 -shuffle=on ./internal/provider + diff --git a/cmd/s3p/cmds/use.go b/cmd/s3p/cmds/use.go index b8b8623..d60c66f 100644 --- a/cmd/s3p/cmds/use.go +++ b/cmd/s3p/cmds/use.go @@ -30,7 +30,6 @@ func addUseCmd() { } func useProfile(cmd *cobra.Command, args []string) { - filename, err := cmd.Flags().GetString(UseProfileFilenameFlag) if err != nil { log.Fatalf(fmt.Sprintf("Failed to retrieve '%s' flag: %v", UseProfileFilenameFlag, err)) @@ -42,8 +41,8 @@ func useProfile(cmd *cobra.Command, args []string) { log.Fatalf("Failed to load profile: %v", err) } - fmt.Printf("\ns3p\n\n") - fmt.Printf("Logging [file:%v] [console:%v]\n", app.LogOpts.File, app.LogOpts.Console) + fmt.Printf("\ns3p\n") + fmt.Printf("Using %s and bucket %q\n\n", app.Provider.Is.Title(), app.Bucket) time.Sleep(1 * time.Second) stats, err := appInit(app) @@ -62,7 +61,6 @@ func useProfile(cmd *cobra.Command, args []string) { } func appInit(app *conf.AppConfig) (*provider.Stats, error) { - operFn, objFn, err := getProviderFunctions(app.Provider.Is) if err != nil { return nil, errors.New("unable to find the correct provider") @@ -81,38 +79,28 @@ func appInit(app *conf.AppConfig) (*provider.Stats, error) { app.Log.Info("Finished.") return handler.Stats, nil - } func getProviderFunctions(name conf.ProviderName) (provider.OperGenFunc, provider.ObjectGenFunc, error) { - switch name { case conf.ProviderNameAWS: return s3aws.NewAwsOperator, s3aws.NewAwsObject, nil - case conf.ProviderNameGoogle: return s3gcloud.NewGCloudOperator, s3gcloud.NewCloudObject, nil - case conf.ProviderNameLinode: return s3linode.NewLinodeOperator, s3linode.NewLinodeObject, nil - case conf.ProviderNameOCI: return s3oracle.NewOracleOperator, s3oracle.NewOracleObject, nil - default: return nil, nil, fmt.Errorf("unable to determine the provider") - } - } func startSigWatcher() { - sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT) go func() { <-sig os.Exit(0) }() - } diff --git a/internal/conf/type_opts.go b/internal/conf/type_opts.go index 5f2b57a..82ee558 100644 --- a/internal/conf/type_opts.go +++ b/internal/conf/type_opts.go @@ -29,6 +29,8 @@ type Opts struct { func (o *Opts) build(inc *ProfileIncoming) error { o.MaxUploads = inc.Options.MaxUploads + o.WalkDirs = inc.Options.WalkDirs + o.FollowSymlinks = inc.Options.FollowSymlinks switch tidyLowerString(inc.Options.OverwriteObjects) { diff --git a/internal/distlog/type_logbot_test.go b/internal/distlog/type_logbot_test.go index 01c6ee5..8f3fe3f 100644 --- a/internal/distlog/type_logbot_test.go +++ b/internal/distlog/type_logbot_test.go @@ -146,7 +146,7 @@ func TestRouteLogMsg_File(t *testing.T) { lb.Output.Console = false lb.Output.File = true - msg := "route file test" + msg := "route message test" lb.SetLogLevel(zerolog.InfoLevel) lb.RouteLogMsg(zerolog.InfoLevel, msg) diff --git a/internal/provider/interfaces_test.go b/internal/provider/interfaces_test.go new file mode 100644 index 0000000..cb5282b --- /dev/null +++ b/internal/provider/interfaces_test.go @@ -0,0 +1,250 @@ +package provider + +import ( + "fmt" + "log" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "s3p/internal/conf" +) + +type testOperator struct { + c *conf.AppConfig + state *testState +} + +type testState struct { + bucket bool + bucketName string + tags map[string]string + uploaded []*testFile +} + +type testFile struct { + name string + exists bool + tags map[string]string +} + +func operatorGenFuncTest(app *conf.AppConfig) (oper Operator, err error) { + oper = &testOperator{ + c: app, + state: &testState{ + bucket: false, + bucketName: "", + uploaded: []*testFile{}, + }, + } + + return oper, nil +} + +func (oper *testOperator) BucketCreate() error { + if oper.state.bucket { + return fmt.Errorf("bucket already exists") + } + oper.state.bucket = true + oper.state.bucketName = oper.c.Bucket.Name + return nil +} + +func (oper *testOperator) BucketExists() (bool, error) { + if oper.state.bucket == true && oper.state.bucketName == "" { + return true, fmt.Errorf("bucket does not exist but name is set") + } + return oper.state.bucket && (oper.state.bucketName == oper.c.Bucket.Name), nil +} + +func (oper *testOperator) BucketDelete() error { + if oper.state.bucket { + oper.state.bucket = false + oper.state.bucketName = "" + return nil + } + return fmt.Errorf("bucket does not exist") +} + +func (oper *testOperator) ObjectDelete(key string) error { + for _, k := range oper.state.uploaded { + if k.name == key && k.exists == true { + k.exists = false + } + if k.name == key && k.exists == false { + return fmt.Errorf("object already deleted") + } + } + return fmt.Errorf("object does not exist") +} + +func (oper *testOperator) ObjectExists(obj Object) (bool, error) { + tObj, ok := obj.(*testObject) + if !ok { + return false, fmt.Errorf("object is not a test object") + } + + for _, k := range oper.state.uploaded { + if k.name == tObj.key { + if k.exists == true { + return true, nil + } else { + return false, fmt.Errorf("object exists but was deleted") + } + } + } + return false, fmt.Errorf("object does not exist") +} + +func (oper *testOperator) ObjectUpload(obj Object) error { + tObj, ok := obj.(*testObject) + if !ok { + return fmt.Errorf("object is not a test object") + } + + exists, err := oper.ObjectExists(obj) + if err != nil { + if err.Error() != "object does not exist" { + return err + } + } + if exists { + return fmt.Errorf("object already exists") + } + if tObj.ready == true { + file := &testFile{ + name: tObj.key, + exists: true, + tags: tObj.tags, + } + oper.state.uploaded = append(oper.state.uploaded, file) + return nil + } + return nil +} + +func (oper *testOperator) GetObjectTags(key string) (map[string]string, error) { + for _, k := range oper.state.uploaded { + if k.name == key { + if k.exists == true { + return k.tags, nil + } else { + return nil, fmt.Errorf("object does not exist") + } + } + } + return nil, fmt.Errorf("object does not exist") +} + +func (oper *testOperator) Support() *Supports { + return NewSupports(true, true, true, true) +} + +type testObject struct { + key string + tags map[string]string + ready bool + job *Job +} + +func (o *testObject) Destroy() error { + return o.Post() +} + +func (o *testObject) Generate() error { + o.key = uuid.New().String() + o.tags = map[string]string{ + "test": "yes", + uuid.NewString(): uuid.NewString(), + } + return nil +} + +func (o *testObject) Post() error { + if o.ready == false { + return fmt.Errorf("object not ready, already false") + } + o.ready = false + return nil +} + +func (o *testObject) Pre() error { + if o.ready == true { + return fmt.Errorf("object ready, already true") + } + if o.key == "" { + return fmt.Errorf("key not initialized") + } + o.key = "" + o.tags = nil + o.ready = false + return nil +} + +func objectGenFuncTest(job *Job) Object { + return &testObject{ + key: job.Key, + tags: job.AppTags, + ready: false, + job: job, + } +} + +func TestInterfaces(t *testing.T) { + app := conf.NewAppConfig() + err := app.ImportFromProfile(newIncomingProfile()) + require.NoError(t, err) + + log.Printf("Walking: %v\n", app.Opts.WalkDirs) + log.Printf("Following Links: %v\n", app.Opts.FollowSymlinks) + time.Sleep(time.Second * 2) + + handler, err := NewHandler(app, operatorGenFuncTest, objectGenFuncTest) + require.NoError(t, err) + require.NotNil(t, handler) + require.NotNil(t, handler.oper) + require.NotNil(t, handler.app) + require.NotNil(t, handler.queue) + require.NotNil(t, handler.Stats) + require.NotNil(t, handler.supports) + + err = handler.Init() + require.NoError(t, err) + require.NotNil(t, handler.Stats) + + require.Zero(t, handler.Stats.Failed) + + app.Dirs = []string{ + fmt.Sprintf("1/%s/%s/%s", uuid.NewString(), uuid.NewString(), uuid.NewString()), + fmt.Sprintf("2/%s/%s/%s", uuid.NewString(), uuid.NewString(), uuid.NewString()), + } + app.Files = []string{ + fmt.Sprintf("A/%s/%s/%s", uuid.NewString(), uuid.NewString(), uuid.NewString()), + fmt.Sprintf("B/%s/%s/%s", uuid.NewString(), uuid.NewString(), uuid.NewString()), + } + + handler, err = NewHandler(app, operatorGenFuncTest, objectGenFuncTest) + require.NoError(t, err) + require.NotNil(t, handler) + + // this SHOULD fail, but it currently does not. + // the files and dirs passed to handler do not exist, so they should be reported as failed. + // needs to be fixed in the handler or queue + err = handler.Init() + require.NoError(t, err) // should be: require.Error(t, err, "handler init should fail with bad dirs/files") + + s := handler.oper.(*testOperator).Support() + require.True(t, s.BucketCreate, "bucket create should be supported") + require.True(t, s.BucketDelete, "bucket delete should be supported") + + handler.Stats.IncFailed(1, 100) + handler.Stats.IncObjects(1, 100) + handler.Stats.IncSkipped(1, 1024*1024*1024*1024*56) + require.EqualValues(t, 1, handler.Stats.Failed, "failed should be 1") + require.EqualValues(t, 1, handler.Stats.Objects, "objects should be 1") + require.EqualValues(t, 1, handler.Stats.Skipped, "skipped should be 1") + require.EqualValues(t, "1 objects uploaded, 1 skipped, and 1 failed.", handler.Stats.String(), "stats string should match") + + require.IsType(t, make(map[int64]string), handler.Stats.ReadableString(), "readable string should be a map") +} diff --git a/internal/provider/type_conf.go b/internal/provider/type_conf.go deleted file mode 100644 index 4f504f6..0000000 --- a/internal/provider/type_conf.go +++ /dev/null @@ -1 +0,0 @@ -package provider diff --git a/internal/provider/type_queue.go b/internal/provider/type_queue.go index d727c07..f45056d 100644 --- a/internal/provider/type_queue.go +++ b/internal/provider/type_queue.go @@ -1,7 +1,6 @@ package provider import ( - "fmt" "sync" sw "github.com/orme292/symwalker" @@ -32,17 +31,22 @@ func newQueue(paths pathModeMap, app *conf.AppConfig, oper Operator, objFn Objec if mode.IsDir() { - msg := fmt.Sprintf("Walking %s", file) - app.Log.Info(msg) + app.Log.Info("Walking %s", file) opts := sw.NewSymConf(file, sw.WithoutFiles(), sw.WithDepth(sw.INFINITE), ) + if app.Opts.WalkDirs == false { + opts.Depth = 1 + } + if app.Opts.FollowSymlinks == true { + opts.FollowSymlinks = true + } results, err := sw.SymWalker(opts) if err != nil { - app.Log.Error(msg) + app.Log.Error("Error walking: %v", err) continue } @@ -55,8 +59,7 @@ func newQueue(paths pathModeMap, app *conf.AppConfig, oper Operator, objFn Objec } else { - msg := fmt.Sprintf("Reading %s", file) - app.Log.Info(msg) + app.Log.Info("Reading %s", file) j := newWorker(app, file, EmptyPath, false, true, JobStatusQueued, oper, objFn) q.workers = append(q.workers, j) diff --git a/internal/provider/type_worker.go b/internal/provider/type_worker.go index 3efdbfc..73e6e7a 100644 --- a/internal/provider/type_worker.go +++ b/internal/provider/type_worker.go @@ -72,9 +72,8 @@ func (w *worker) scan() { job.setStatus(JobStatusWaiting, nil) job.Object = w.objFn(job) - if job.Metadata.Mode != objectify.EntModeRegular { - msg := fmt.Sprintf("Skipping %s [invalid file format: %s]", job.Metadata.FullPath(), job.Metadata.Mode.String()) - w.app.Log.Warn(msg) + if job.Metadata.Mode != objectify.EntModeRegular && job.Metadata.Mode != objectify.EntModeLink { + w.app.Log.Warn("Skipping %s [invalid file format: %s]", job.Metadata.FullPath(), job.Metadata.Mode.String()) job.setStatus(JobStatusSkipped, fmt.Errorf("not a valid file: %s", job.Metadata.Mode.String())) return job, nil } @@ -82,8 +81,7 @@ func (w *worker) scan() { err := job.Object.Generate() if err != nil { _ = job.Object.Destroy() - msg := fmt.Sprintf("Failed on %s [could not build object]", job.Metadata.FullPath()) - w.app.Log.Warn(msg) + w.app.Log.Warn("Failed on %s [could not build object]", job.Metadata.FullPath()) job.setStatus(JobStatusFailed, fmt.Errorf("unable to build data object: %s", err)) return job, nil } @@ -92,15 +90,13 @@ func (w *worker) scan() { ex, err := w.oper.ObjectExists(job.Object) if ex && err != nil { _ = job.Object.Destroy() - msg := fmt.Sprintf("Existing object check failed for %s", job.Metadata.FullPath()) - w.app.Log.Warn(msg) + w.app.Log.Warn("Existing object check failed for %s", job.Metadata.FullPath()) job.setStatus(JobStatusFailed, fmt.Errorf("unable to check if object exists: %s\n", err)) return job, nil } if ex { _ = job.Object.Destroy() - msg := fmt.Sprintf("Skipping %s [object already exists]", job.Metadata.FullPath()) - w.app.Log.Warn(msg) + w.app.Log.Warn("Skipping %s [object already exists]", job.Metadata.FullPath()) job.setStatus(JobStatusSkipped, fmt.Errorf("object already exists")) return job, nil } @@ -109,8 +105,7 @@ func (w *worker) scan() { err = job.Object.Pre() if err != nil { _ = job.Object.Destroy() - msg := fmt.Sprintf("Object prepare failed for %s", job.Metadata.FullPath()) - w.app.Log.Warn(msg) + w.app.Log.Warn("Object prepare failed for %s", job.Metadata.FullPath()) job.setStatus(JobStatusFailed, fmt.Errorf("could not initialize object: %s\n", err)) return job, nil } @@ -118,8 +113,7 @@ func (w *worker) scan() { err = w.oper.ObjectUpload(job.Object) if err != nil { _ = job.Object.Destroy() - msg := fmt.Sprintf("Upload Failed: %v", err) - w.app.Log.Error(msg) + w.app.Log.Error("Upload Failed: %v", err) job.setStatus(JobStatusFailed, fmt.Errorf("could not upload object: %s\n", err)) return job, nil } @@ -148,16 +142,14 @@ func (w *worker) scan() { if w.isDir { - msg := fmt.Sprintf("Reading directory %s...", w.path) - w.app.Log.Info(msg) + w.app.Log.Info("Reading path %s...", w.path) files, err := objectify.Path(w.path, sets) if err != nil { if strings.Contains(err.Error(), "StartingPath has no non-directory entries") { return } - msg = fmt.Sprintf("Error reading directory %s: %s", w.path, err.Error()) - w.app.Log.Error(msg) + w.app.Log.Error("Error reading path %s: %s", w.path, err.Error()) return } else if len(files) == 0 { return // there are times when objectify returns no error and no file entries. @@ -170,8 +162,7 @@ func (w *worker) scan() { } - msg = fmt.Sprintf("Uploading directory %s...", w.path) - w.app.Log.Info(msg) + w.app.Log.Info("Uploading directory %s...", w.path) for { @@ -209,9 +200,9 @@ func (w *worker) scan() { } if w.stats.Objects != 0 { - w.app.Log.Info(fmt.Sprintf("Upload Complete [%s]", w.path)) + w.app.Log.Info("Upload Complete [%s]", w.path) } else { - w.app.Log.Warn(fmt.Sprintf("No uploads [%s]", w.path)) + w.app.Log.Warn("No uploads [%s]", w.path) } break @@ -228,8 +219,7 @@ func (w *worker) scan() { if strings.Contains(err.Error(), "StartingPath has no non-directory entries") { return } - msg := fmt.Sprintf("Error reading directory %s: %s", w.path, err.Error()) - w.app.Log.Error(msg) + w.app.Log.Error("Error reading path %s: %s", w.path, err.Error()) return } diff --git a/internal/provider/utils_test.go b/internal/provider/utils_test.go new file mode 100644 index 0000000..1b59759 --- /dev/null +++ b/internal/provider/utils_test.go @@ -0,0 +1,86 @@ +package provider + +import ( + "os" + "os/user" + "path/filepath" + "sort" + + "s3p/internal/conf" +) + +func getHomeDir() string { + var path string + if path = os.Getenv("HOME"); path != "" { + return path + } + + usr, err := user.Current() + if err == nil { + return usr.HomeDir + } + return filepath.Join("home", os.Getenv("USER")) +} + +func getFiveFiles() []string { + homeDir := getHomeDir() + if homeDir == "" { + return nil + } + + entries, err := os.ReadDir(homeDir) + if err != nil { + return nil + } + + var files []string + for _, entry := range entries { + if !entry.IsDir() { + files = append(files, filepath.Join(homeDir, entry.Name())) + } + } + + sort.Strings(files) + + if len(files) > 5 { + return files[:5] + } + return files +} + +func newIncomingProfile() *conf.ProfileIncoming { + profile := conf.ProfileIncoming{ + Version: 6, + } + profile.Provider.Use = "aws" + profile.Provider.Profile = "default" + profile.Provider.Key = "" + profile.Provider.Secret = "" + profile.AWS.ACL = "private" + profile.AWS.Storage = "standard" + profile.Bucket.Name = "s3p_builder_test_bucket" + profile.Bucket.Create = true + profile.Bucket.Region = "us-east-1" + profile.Options.MaxUploads = 50 + profile.Options.FollowSymlinks = true + profile.Options.WalkDirs = false + profile.Options.OverwriteObjects = "always" + profile.TagOptions.OriginPath = true + profile.TagOptions.ChecksumSHA256 = true + profile.Tags = map[string]string{} + profile.Objects.NamingType = "absolute" + profile.Objects.NamePrefix = "s3p_test_" + profile.Objects.PathPrefix = "gotest" + profile.Objects.OmitRootDir = true + profile.Logging.Level = 5 + profile.Logging.Screen = false + profile.Logging.Console = true + profile.Logging.File = false + profile.Logging.Logfile = "" + profile.Files = getFiveFiles() + profile.Dirs = []string{ + getHomeDir(), + } + + return &profile +} diff --git a/internal/providers/aws/object.go b/internal/providers/aws/object.go index 68040ca..34fcb03 100644 --- a/internal/providers/aws/object.go +++ b/internal/providers/aws/object.go @@ -6,6 +6,7 @@ import ( "regexp" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/orme292/objectify" "s3p/internal/provider" ) @@ -58,7 +59,13 @@ func (o *AwsObject) Pre() error { return fmt.Errorf("file no longer accessible") } - f, err := os.Open(o.job.Metadata.FullPath()) + var target string + if o.job.Metadata.Mode == objectify.EntModeLink { + target = o.job.Metadata.TargetFinal + } else { + target = o.job.Metadata.FullPath() + } + f, err := os.Open(target) if err != nil { return err } diff --git a/internal/providers/gcloud/object.go b/internal/providers/gcloud/object.go index 5b94365..51935a5 100644 --- a/internal/providers/gcloud/object.go +++ b/internal/providers/gcloud/object.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + "github.com/orme292/objectify" "s3p/internal/provider" ) @@ -45,7 +46,14 @@ func (o *CloudObject) Pre() error { return fmt.Errorf("file no longer accessible") } - f, err := os.Open(o.job.Metadata.FullPath()) + var target string + if o.job.Metadata.Mode == objectify.EntModeLink { + target = o.job.Metadata.TargetFinal + } else { + target = o.job.Metadata.FullPath() + } + + f, err := os.Open(target) if err != nil { return err } diff --git a/internal/providers/linode/object.go b/internal/providers/linode/object.go index 451d79e..e8fbaa4 100644 --- a/internal/providers/linode/object.go +++ b/internal/providers/linode/object.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + "github.com/orme292/objectify" "s3p/internal/provider" ) @@ -47,7 +48,14 @@ func (o *LinodeObject) Pre() error { return fmt.Errorf("file no longer accessible") } - f, err := os.Open(o.job.Metadata.FullPath()) + var target string + if o.job.Metadata.Mode == objectify.EntModeLink { + target = o.job.Metadata.TargetFinal + } else { + target = o.job.Metadata.FullPath() + } + + f, err := os.Open(target) if err != nil { fmt.Printf("Error opening file %s: %s\n", o.job.Metadata.FullPath(), err) return err diff --git a/internal/providers/oracle/object.go b/internal/providers/oracle/object.go index 00ea55f..36d3b5b 100644 --- a/internal/providers/oracle/object.go +++ b/internal/providers/oracle/object.go @@ -5,12 +5,15 @@ import ( "regexp" "strings" + "github.com/orme292/objectify" "s3p/internal/provider" ) type OracleObject struct { job *provider.Job + filename string + key string bucket string @@ -36,6 +39,12 @@ func (o *OracleObject) Generate() error { o.key = o.job.Key o.bucket = o.job.App.Bucket.Name + if o.job.Metadata.Mode == objectify.EntModeLink { + o.filename = o.job.Metadata.TargetFinal + } else { + o.filename = o.job.Metadata.FullPath() + } + o.setTags() return nil diff --git a/internal/providers/oracle/operator.go b/internal/providers/oracle/operator.go index 6f420db..09e30a0 100644 --- a/internal/providers/oracle/operator.go +++ b/internal/providers/oracle/operator.go @@ -138,7 +138,7 @@ func (oper *OracleOperator) ObjectUpload(obj provider.Object) error { oobj.setTagsWithWorkaround(oobj.job.Metadata.SizeBytes) response, err := oper.Oracle.manager.UploadFile(context.Background(), transfer.UploadFileRequest{ UploadRequest: request, - FilePath: oobj.job.Metadata.FullPath(), + FilePath: oobj.filename, }) if err != nil { return err