From 232f777393144eada0c85539c219c77227c86ba9 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Thu, 15 Sep 2022 03:42:58 +0200 Subject: [PATCH 01/25] vod: added vodloadtester draft --- cmd/vodloadtester/vodloadtester.go | 259 +++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- 3 files changed, 262 insertions(+), 3 deletions(-) create mode 100644 cmd/vodloadtester/vodloadtester.go diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go new file mode 100644 index 00000000..3074a2bf --- /dev/null +++ b/cmd/vodloadtester/vodloadtester.go @@ -0,0 +1,259 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "math/rand" + "os" + "runtime" + "strings" + "sync" + "time" + + "github.com/golang/glog" + api "github.com/livepeer/go-api-client" + "github.com/livepeer/stream-tester/internal/utils" + "github.com/peterbourgon/ff/v2" +) + +type cliArguments struct { + Verbosity int + Simultaneous uint + Version bool + TaskCheck bool + APIServer string + APIToken string + Filename string + VideoAmount uint + OutputPath string + + TestDuration time.Duration + StartDelayDuration time.Duration +} + +type vodLoadTester struct { + ctx context.Context + cancel context.CancelFunc + lapi *api.Client +} + +type uploadTest struct { + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + RunnerInfo string `json:"runnerInfo,omitempty"` + Kind string `json:"kind,omitempty"` + AssetID string `json:"assetId,omitempty"` + TaskID string `json:"taskId,omitempty"` + RequestUploadSuccess uint `json:"requestUploadSuccess,omitempty"` + UploadSuccess uint `json:"uploadSuccess,omitempty"` + TaskCheckSuccess uint `json:"taskCheckSuccess,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` +} + +func main() { + var cliFlags = &cliArguments{} + + flag.Set("logtostderr", "true") + vFlag := flag.Lookup("v") + + fs := flag.NewFlagSet("loadtester", flag.ExitOnError) + + fs.IntVar(&cliFlags.Verbosity, "v", 3, "Log verbosity. {4|5|6}") + fs.BoolVar(&cliFlags.Version, "version", false, "Print out the version") + fs.BoolVar(&cliFlags.TaskCheck, "task-check", false, "Check task processing") + + fs.UintVar(&cliFlags.VideoAmount, "video-amt", 1, "How many video to upload") + fs.DurationVar(&cliFlags.StartDelayDuration, "delay-between-groups", 10*time.Second, "Delay between starting group of uploads") + + fs.UintVar(&cliFlags.Simultaneous, "sim", 1, "Number of simulteneous videos to upload") + fs.StringVar(&cliFlags.Filename, "file", "bbb_sunflower_1080p_30fps_normal_2min.mp4", "File to upload") + fs.StringVar(&cliFlags.APIToken, "api-token", "", "Token of the Livepeer API to be used") + fs.StringVar(&cliFlags.APIServer, "api-server", "origin.livepeer.com", "Server of the Livepeer API to be used") + fs.StringVar(&cliFlags.OutputPath, "output-path", "/tmp/results.ndjson", "Path to output result .ndjson file") + + _ = fs.String("config", "", "config file (optional)") + + ff.Parse(fs, os.Args[1:], + ff.WithConfigFileFlag("config"), + ff.WithConfigFileParser(ff.PlainParser), + ff.WithEnvVarPrefix("VODLOADTESTER"), + ) + flag.CommandLine.Parse(nil) + vFlag.Value.Set(fmt.Sprintf("%d", cliFlags.Verbosity)) + + hostName, _ := os.Hostname() + runnerInfo := fmt.Sprintf("Hostname %s OS %s IPs %v\n", hostName, runtime.GOOS, utils.GetIPs()) + fmt.Printf("Compiler version: %s %s\n", runtime.Compiler, runtime.Version()) + fmt.Print(runnerInfo) + + if cliFlags.Version { + return + } + + if cliFlags.Filename == "" { + glog.Fatal("missing --file parameter") + } + var err error + var fileName string + + if fileName, err = utils.GetFile(cliFlags.Filename, strings.ReplaceAll(hostName, ".", "_")); err != nil { + if err == utils.ErrNotFound { + glog.Fatalf("file %s not found\n", cliFlags.Filename) + } else { + glog.Fatalf("error getting file %s: %v\n", cliFlags.Filename, err) + } + } + glog.Infof("uploading video file %q", fileName) + + ctx, cancel := context.WithCancel(context.Background()) + + if cliFlags.APIToken == "" { + glog.Fatalf("No API token provided") + } + + apiToken := cliFlags.APIToken + apiServer := cliFlags.APIServer + outputNdjson := cliFlags.OutputPath + + lApiOpts := api.ClientOptions{ + Server: apiServer, + AccessToken: apiToken, + Timeout: 240 * time.Second, + } + lapi, _ := api.NewAPIClientGeolocated(lApiOpts) + vt := &vodLoadTester{ + lapi: lapi, + ctx: ctx, + cancel: cancel, + } + + wg := &sync.WaitGroup{} + + for i := 0; i < int(cliFlags.VideoAmount); i += int(cliFlags.Simultaneous) { + for j := 0; j < int(cliFlags.Simultaneous); j++ { + fmt.Printf("Uploading video %d/%d\n", i+j+1, cliFlags.VideoAmount) + wg.Add(1) + go func() { + uploadTest := uploadTest{ + StartTime: time.Now(), + RunnerInfo: runnerInfo, + Kind: "directUpload", + } + assetId, taskId, err := vt.uploadAsset(fileName) + if err != nil { + glog.Errorf("Error uploading asset: %v", err) + if assetId == "" { + uploadTest.RequestUploadSuccess = 0 + uploadTest.ErrorMessage = err.Error() + } else { + uploadTest.AssetID = assetId + uploadTest.RequestUploadSuccess = 1 + uploadTest.UploadSuccess = 0 + } + } else { + uploadTest.AssetID = assetId + uploadTest.TaskID = taskId + uploadTest.RequestUploadSuccess = 1 + uploadTest.UploadSuccess = 1 + if cliFlags.TaskCheck { + err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) + if err != nil { + uploadTest.TaskCheckSuccess = 0 + uploadTest.ErrorMessage = err.Error() + } else { + uploadTest.TaskCheckSuccess = 1 + } + } + + uploadTest.EndTime = time.Now() + jsonString, err := json.Marshal(uploadTest) + if err != nil { + glog.Errorf("Error converting runTests to json: %v", err) + } + f, err := os.OpenFile(outputNdjson, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + glog.Errorf("Error opening %s: %v", outputNdjson, err) + } + defer f.Close() + if _, err := f.WriteString(string(jsonString) + "\n"); err != nil { + glog.Errorf("Error writing to %s: %v", outputNdjson, err) + } + + } + wg.Done() + }() + } + time.Sleep(cliFlags.StartDelayDuration) + } + + wg.Wait() + +} + +func (vt *vodLoadTester) uploadAsset(fileName string) (string, string, error) { + rndAssetName := fmt.Sprintf("load_test_%s", randName()) + + res, err := vt.lapi.RequestUpload(rndAssetName) + if err != nil { + fmt.Printf("error requesting upload: %v\n", err) + return "", "", err + } + uploadUrl := res.Url + assetId := res.Asset.ID + taskId := res.Task.ID + + file, err := os.Open(fileName) + + if err != nil { + fmt.Printf("error opening file %s: %v\n", fileName, err) + return assetId, taskId, err + } + + err = vt.lapi.UploadAsset(vt.ctx, uploadUrl, file) + if err != nil { + fmt.Printf("error uploading asset: %v\n", err) + return assetId, taskId, err + } + + fmt.Printf("Video uploaded for asset id %s\n", assetId) + + return assetId, taskId, err +} + +func (vt *vodLoadTester) checkTaskProcessing(taskPollDuration time.Duration, processingTask api.TaskOnlyId) error { + startTime := time.Now() + timeout := 3 * time.Minute + for { + glog.Infof("Waiting %s for task id=%s to be processed, elapsed=%s", taskPollDuration, processingTask.ID, time.Since(startTime)) + time.Sleep(taskPollDuration) + + task, err := vt.lapi.GetTask(processingTask.ID) + if err != nil { + glog.Errorf("Error retrieving task id=%s err=%v", processingTask.ID, err) + return fmt.Errorf("error retrieving task id=%s: %w", processingTask.ID, err) + } + if task.Status.Phase == "completed" { + glog.Infof("Task success, taskId=%s", task.ID) + return nil + } + if task.Status.Phase != "pending" && task.Status.Phase != "running" && task.Status.Phase != "waiting" { + glog.Errorf("Error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage) + return fmt.Errorf("error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage) + } + + if time.Since(startTime) > timeout { + glog.Errorf("Timeout processing task, taskId=%s", task.ID) + return fmt.Errorf("timeout processing task, taskId=%s", task.ID) + } + } +} + +func randName() string { + x := make([]byte, 10) + for i := 0; i < len(x); i++ { + x[i] = byte(rand.Uint32()) + } + return fmt.Sprintf("%x", x) +} diff --git a/go.mod b/go.mod index 2030f690..46960463 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/golang/glog v1.0.0 github.com/gosuri/uilive v0.0.3 // indirect github.com/gosuri/uiprogress v0.0.1 - github.com/livepeer/go-api-client v0.2.9-0.20220908143323-8ff49d9fddce + github.com/livepeer/go-api-client v0.2.9-0.20220913191130-1f2f838f95d6 github.com/livepeer/go-livepeer v0.5.31 github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07 github.com/livepeer/leaderboard-serverless v1.0.0 diff --git a/go.sum b/go.sum index 759089bc..6837f2eb 100644 --- a/go.sum +++ b/go.sum @@ -710,8 +710,8 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/livepeer/go-api-client v0.2.9-0.20220908143323-8ff49d9fddce h1:YbZcqp276/5Dp/ys680emXveyt2UTHAirkhg1LUF5MQ= -github.com/livepeer/go-api-client v0.2.9-0.20220908143323-8ff49d9fddce/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.2.9-0.20220913191130-1f2f838f95d6 h1:vv0P8DHGKXBi8uwXQvDe4w5vIgNqE0WkKbuEkpIRHOA= +github.com/livepeer/go-api-client v0.2.9-0.20220913191130-1f2f838f95d6/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= github.com/livepeer/go-livepeer v0.5.31 h1:LcN+qDnqWRws7fdVYc4ucZPVcLQRs2tehUYCQVnlnRw= github.com/livepeer/go-livepeer v0.5.31/go.mod h1:cpBikcGWApkx0cyR0Ht+uAym7j3uAwXGpPbvaOA8XUU= github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw= From 69eb6f5bea56b29e7964bc5a24063c7c4fb5ac38 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Thu, 15 Sep 2022 03:46:44 +0200 Subject: [PATCH 02/25] vod: sneaked in fix for vodtester --- internal/app/vodtester/vodtester_app.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/app/vodtester/vodtester_app.go b/internal/app/vodtester/vodtester_app.go index 521768b8..c2e797ae 100644 --- a/internal/app/vodtester/vodtester_app.go +++ b/internal/app/vodtester/vodtester_app.go @@ -164,7 +164,7 @@ func (vt *vodTester) directUploadTester(fileName string, taskPollDuration time.D return fmt.Errorf("error opening file=%s: %w", fileName, err) } - err = vt.lapi.UploadAsset(uploadEndpoint, file) + err = vt.lapi.UploadAsset(vt.ctx, uploadEndpoint, file) if err != nil { glog.Errorf("Error uploading file filePath=%s err=%v", fileName, err) return fmt.Errorf("error uploading for assetId=%s taskId=%s: %w", uploadAsset.ID, uploadTask.ID, err) From 9d4d7c0ea1717431479449773ca70531c9c6ae98 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Fri, 16 Sep 2022 18:57:55 +0200 Subject: [PATCH 03/25] vodloadtester: added tus resumable uploads load test --- cmd/vodloadtester/vodloadtester.go | 255 +++++++++++++++++++++-------- 1 file changed, 191 insertions(+), 64 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 3074a2bf..5428b963 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/eventials/go-tus" "github.com/golang/glog" api "github.com/livepeer/go-api-client" "github.com/livepeer/stream-tester/internal/utils" @@ -22,21 +23,27 @@ type cliArguments struct { Verbosity int Simultaneous uint Version bool - TaskCheck bool - APIServer string - APIToken string - Filename string - VideoAmount uint - OutputPath string + + DirectUpload bool + ResumableUpload bool + Import bool + + TaskCheck bool + APIServer string + APIToken string + Filename string + VideoAmount uint + OutputPath string TestDuration time.Duration StartDelayDuration time.Duration } type vodLoadTester struct { - ctx context.Context - cancel context.CancelFunc - lapi *api.Client + ctx context.Context + cancel context.CancelFunc + lapi *api.Client + cliFlags cliArguments } type uploadTest struct { @@ -64,11 +71,15 @@ func main() { fs.BoolVar(&cliFlags.Version, "version", false, "Print out the version") fs.BoolVar(&cliFlags.TaskCheck, "task-check", false, "Check task processing") + fs.BoolVar(&cliFlags.DirectUpload, "direct", false, "Launch direct upload test") + fs.BoolVar(&cliFlags.ResumableUpload, "resumable", false, "Launch tus upload test") + fs.BoolVar(&cliFlags.Import, "import", false, "Launch import from url test") + fs.UintVar(&cliFlags.VideoAmount, "video-amt", 1, "How many video to upload") fs.DurationVar(&cliFlags.StartDelayDuration, "delay-between-groups", 10*time.Second, "Delay between starting group of uploads") fs.UintVar(&cliFlags.Simultaneous, "sim", 1, "Number of simulteneous videos to upload") - fs.StringVar(&cliFlags.Filename, "file", "bbb_sunflower_1080p_30fps_normal_2min.mp4", "File to upload") + fs.StringVar(&cliFlags.Filename, "file", "bbb_sunflower_1080p_30fps_normal_2min.mp4", "File to upload or url to import") fs.StringVar(&cliFlags.APIToken, "api-token", "", "Token of the Livepeer API to be used") fs.StringVar(&cliFlags.APIServer, "api-server", "origin.livepeer.com", "Server of the Livepeer API to be used") fs.StringVar(&cliFlags.OutputPath, "output-path", "/tmp/results.ndjson", "Path to output result .ndjson file") @@ -95,18 +106,10 @@ func main() { if cliFlags.Filename == "" { glog.Fatal("missing --file parameter") } + var err error var fileName string - if fileName, err = utils.GetFile(cliFlags.Filename, strings.ReplaceAll(hostName, ".", "_")); err != nil { - if err == utils.ErrNotFound { - glog.Fatalf("file %s not found\n", cliFlags.Filename) - } else { - glog.Fatalf("error getting file %s: %v\n", cliFlags.Filename, err) - } - } - glog.Infof("uploading video file %q", fileName) - ctx, cancel := context.WithCancel(context.Background()) if cliFlags.APIToken == "" { @@ -115,7 +118,6 @@ func main() { apiToken := cliFlags.APIToken apiServer := cliFlags.APIServer - outputNdjson := cliFlags.OutputPath lApiOpts := api.ClientOptions{ Server: apiServer, @@ -124,16 +126,47 @@ func main() { } lapi, _ := api.NewAPIClientGeolocated(lApiOpts) vt := &vodLoadTester{ - lapi: lapi, - ctx: ctx, - cancel: cancel, + lapi: lapi, + ctx: ctx, + cancel: cancel, + cliFlags: *cliFlags, + } + + if cliFlags.DirectUpload || cliFlags.ResumableUpload { + if fileName, err = utils.GetFile(cliFlags.Filename, strings.ReplaceAll(hostName, ".", "_")); err != nil { + if err == utils.ErrNotFound { + glog.Fatalf("file %s not found\n", cliFlags.Filename) + } else { + glog.Fatalf("error getting file %s: %v\n", cliFlags.Filename, err) + } + } + + if cliFlags.DirectUpload { + glog.Infof("Launching direct upload load test for %q", fileName) + vt.directUploadLoadTest(fileName, runnerInfo) + } + if cliFlags.ResumableUpload { + glog.Infof("Launching resumable upload load test for %q", fileName) + vt.resumableUploadLoadTest(fileName, runnerInfo) + } + } + +} + +func (vt *vodLoadTester) requestUploadUrls(assetName string) (*api.UploadUrls, error) { + uploadUrls, err := vt.lapi.RequestUpload(assetName) + if err != nil { + return nil, err } + return uploadUrls, nil +} +func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string) { wg := &sync.WaitGroup{} - for i := 0; i < int(cliFlags.VideoAmount); i += int(cliFlags.Simultaneous) { - for j := 0; j < int(cliFlags.Simultaneous); j++ { - fmt.Printf("Uploading video %d/%d\n", i+j+1, cliFlags.VideoAmount) + for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { + for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { + fmt.Printf("Uploading video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { uploadTest := uploadTest{ @@ -141,23 +174,28 @@ func main() { RunnerInfo: runnerInfo, Kind: "directUpload", } - assetId, taskId, err := vt.uploadAsset(fileName) + rndAssetName := fmt.Sprintf("load_test_%s", randName()) + requestedUpload, err := vt.requestUploadUrls(rndAssetName) + if err != nil { - glog.Errorf("Error uploading asset: %v", err) - if assetId == "" { - uploadTest.RequestUploadSuccess = 0 - uploadTest.ErrorMessage = err.Error() - } else { - uploadTest.AssetID = assetId - uploadTest.RequestUploadSuccess = 1 - uploadTest.UploadSuccess = 0 - } + glog.Errorf("Error requesting upload urls: %v", err) + uploadTest.RequestUploadSuccess = 0 + uploadTest.ErrorMessage = err.Error() } else { - uploadTest.AssetID = assetId - uploadTest.TaskID = taskId uploadTest.RequestUploadSuccess = 1 + uploadTest.AssetID = requestedUpload.Asset.ID + uploadTest.TaskID = requestedUpload.Task.ID + } + + err = vt.uploadAsset(fileName, requestedUpload.Url) + + if err != nil { + glog.Errorf("Error uploading asset: %v", err) + uploadTest.UploadSuccess = 0 + uploadTest.EndTime = time.Now() + } else { uploadTest.UploadSuccess = 1 - if cliFlags.TaskCheck { + if vt.cliFlags.TaskCheck { err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) if err != nil { uploadTest.TaskCheckSuccess = 0 @@ -168,58 +206,147 @@ func main() { } uploadTest.EndTime = time.Now() - jsonString, err := json.Marshal(uploadTest) - if err != nil { - glog.Errorf("Error converting runTests to json: %v", err) - } - f, err := os.OpenFile(outputNdjson, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - glog.Errorf("Error opening %s: %v", outputNdjson, err) - } - defer f.Close() - if _, err := f.WriteString(string(jsonString) + "\n"); err != nil { - glog.Errorf("Error writing to %s: %v", outputNdjson, err) + } + vt.writeResultNdjson(uploadTest) + wg.Done() + }() + } + time.Sleep(vt.cliFlags.StartDelayDuration) + } + + wg.Wait() +} + +func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { + wg := &sync.WaitGroup{} + + for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { + for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { + fmt.Printf("rUpload video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) + wg.Add(1) + go func() { + uploadTest := uploadTest{ + StartTime: time.Now(), + RunnerInfo: runnerInfo, + Kind: "resumable", + } + rndAssetName := fmt.Sprintf("load_test_%s", randName()) + requestedUpload, err := vt.requestUploadUrls(rndAssetName) + + if err != nil { + glog.Errorf("Error requesting upload urls: %v", err) + uploadTest.RequestUploadSuccess = 0 + uploadTest.ErrorMessage = err.Error() + } else { + uploadTest.RequestUploadSuccess = 1 + uploadTest.AssetID = requestedUpload.Asset.ID + uploadTest.TaskID = requestedUpload.Task.ID + } + + uploadUrl := requestedUpload.TusEndpoint + + err = vt.uploadAssetResumable(uploadUrl, fileName) + + if err != nil { + glog.Errorf("Error on resumable upload: %v", err) + uploadTest.UploadSuccess = 0 + uploadTest.EndTime = time.Now() + } else { + uploadTest.UploadSuccess = 1 + if vt.cliFlags.TaskCheck { + err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) + if err != nil { + uploadTest.TaskCheckSuccess = 0 + uploadTest.ErrorMessage = err.Error() + } else { + uploadTest.TaskCheckSuccess = 1 + } } + uploadTest.EndTime = time.Now() } + vt.writeResultNdjson(uploadTest) wg.Done() }() } - time.Sleep(cliFlags.StartDelayDuration) + time.Sleep(vt.cliFlags.StartDelayDuration) } wg.Wait() } -func (vt *vodLoadTester) uploadAsset(fileName string) (string, string, error) { - rndAssetName := fmt.Sprintf("load_test_%s", randName()) - - res, err := vt.lapi.RequestUpload(rndAssetName) +func (vt *vodLoadTester) writeResultNdjson(uploadTest uploadTest) { + jsonString, err := json.Marshal(uploadTest) if err != nil { - fmt.Printf("error requesting upload: %v\n", err) - return "", "", err + glog.Errorf("Error converting runTests to json: %v", err) + } + f, err := os.OpenFile(vt.cliFlags.OutputPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + glog.Errorf("Error opening %s: %v", vt.cliFlags.OutputPath, err) + } + defer f.Close() + if _, err := f.WriteString(string(jsonString) + "\n"); err != nil { + glog.Errorf("Error writing to %s: %v", vt.cliFlags.OutputPath, err) } - uploadUrl := res.Url - assetId := res.Asset.ID - taskId := res.Task.ID +} + +func (vt *vodLoadTester) uploadAsset(fileName string, uploadUrl string) error { file, err := os.Open(fileName) if err != nil { fmt.Printf("error opening file %s: %v\n", fileName, err) - return assetId, taskId, err + return err } err = vt.lapi.UploadAsset(vt.ctx, uploadUrl, file) if err != nil { fmt.Printf("error uploading asset: %v\n", err) - return assetId, taskId, err + return err + } + + return err +} + +// Temporary function while waiting for go-api-client to get fixed +func (vt *vodLoadTester) uploadAssetResumable(url string, fileName string) error { + + file, err := os.Open(fileName) + + if err != nil { + fmt.Printf("error opening file %s: %v\n", fileName, err) + return err } - fmt.Printf("Video uploaded for asset id %s\n", assetId) + defer file.Close() + + config := tus.DefaultConfig() + config.ChunkSize = 5 * 1024 * 1024 // S3 complains if less than 5MB + + client, err := tus.NewClient(url, config) + if err != nil { + fmt.Printf("error creating tus client: %v\n", err) + return err + } + upload, err := tus.NewUploadFromFile(file) + if err != nil { + fmt.Printf("error creating new upload from file: %v\n", err) + return err + } + uploader, err := client.CreateUpload(upload) + if err != nil { + fmt.Printf("error creating upload: %v\n", err) + return err + } + + err = uploader.Upload() + + if err != nil { + fmt.Printf("error resumable uploading asset: %v\n", err) + } - return assetId, taskId, err + return err } func (vt *vodLoadTester) checkTaskProcessing(taskPollDuration time.Duration, processingTask api.TaskOnlyId) error { From c23e867ba8b92f3c0839b9d4e001e0577fa01a81 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Fri, 16 Sep 2022 19:18:59 +0200 Subject: [PATCH 04/25] vodloadtester: removed tus temp func & go mod to fixed api client --- cmd/vodloadtester/vodloadtester.go | 27 ++------------------------- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 5 insertions(+), 28 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 5428b963..8d4efd0c 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -12,7 +12,6 @@ import ( "sync" "time" - "github.com/eventials/go-tus" "github.com/golang/glog" api "github.com/livepeer/go-api-client" "github.com/livepeer/stream-tester/internal/utils" @@ -309,7 +308,6 @@ func (vt *vodLoadTester) uploadAsset(fileName string, uploadUrl string) error { return err } -// Temporary function while waiting for go-api-client to get fixed func (vt *vodLoadTester) uploadAssetResumable(url string, fileName string) error { file, err := os.Open(fileName) @@ -319,31 +317,10 @@ func (vt *vodLoadTester) uploadAssetResumable(url string, fileName string) error return err } - defer file.Close() - - config := tus.DefaultConfig() - config.ChunkSize = 5 * 1024 * 1024 // S3 complains if less than 5MB - - client, err := tus.NewClient(url, config) - if err != nil { - fmt.Printf("error creating tus client: %v\n", err) - return err - } - upload, err := tus.NewUploadFromFile(file) - if err != nil { - fmt.Printf("error creating new upload from file: %v\n", err) - return err - } - uploader, err := client.CreateUpload(upload) - if err != nil { - fmt.Printf("error creating upload: %v\n", err) - return err - } - - err = uploader.Upload() + err = vt.lapi.ResumableUpload(url, file) if err != nil { - fmt.Printf("error resumable uploading asset: %v\n", err) + fmt.Printf("error on resumable upload asset: %v", err) } return err diff --git a/go.mod b/go.mod index 46960463..bd3f519f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/golang/glog v1.0.0 github.com/gosuri/uilive v0.0.3 // indirect github.com/gosuri/uiprogress v0.0.1 - github.com/livepeer/go-api-client v0.2.9-0.20220913191130-1f2f838f95d6 + github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515 github.com/livepeer/go-livepeer v0.5.31 github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07 github.com/livepeer/leaderboard-serverless v1.0.0 diff --git a/go.sum b/go.sum index 6837f2eb..f2db92d2 100644 --- a/go.sum +++ b/go.sum @@ -710,8 +710,8 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/livepeer/go-api-client v0.2.9-0.20220913191130-1f2f838f95d6 h1:vv0P8DHGKXBi8uwXQvDe4w5vIgNqE0WkKbuEkpIRHOA= -github.com/livepeer/go-api-client v0.2.9-0.20220913191130-1f2f838f95d6/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515 h1:3UvLoSvntPi0Z/yW6zskPmZZwA+lnm0pQVIvG/uBnrE= +github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= github.com/livepeer/go-livepeer v0.5.31 h1:LcN+qDnqWRws7fdVYc4ucZPVcLQRs2tehUYCQVnlnRw= github.com/livepeer/go-livepeer v0.5.31/go.mod h1:cpBikcGWApkx0cyR0Ht+uAym7j3uAwXGpPbvaOA8XUU= github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw= From 85f66011a4e19764e669ec35287181c3097a1ad9 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Fri, 16 Sep 2022 19:33:34 +0200 Subject: [PATCH 05/25] vodloadtester: added import --- cmd/vodloadtester/vodloadtester.go | 68 +++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 8d4efd0c..6d72ea44 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -54,6 +54,7 @@ type uploadTest struct { TaskID string `json:"taskId,omitempty"` RequestUploadSuccess uint `json:"requestUploadSuccess,omitempty"` UploadSuccess uint `json:"uploadSuccess,omitempty"` + ImportSuccess uint `json:"importSuccess,omitempty"` TaskCheckSuccess uint `json:"taskCheckSuccess,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` } @@ -131,6 +132,14 @@ func main() { cliFlags: *cliFlags, } + if cliFlags.Import { + if cliFlags.DirectUpload || cliFlags.ResumableUpload { + glog.Fatal("Cannot use -import with either -direct or -resumable") + } + fileName = cliFlags.Filename + vt.importFromUrlTest(fileName) + } + if cliFlags.DirectUpload || cliFlags.ResumableUpload { if fileName, err = utils.GetFile(cliFlags.Filename, strings.ReplaceAll(hostName, ".", "_")); err != nil { if err == utils.ErrNotFound { @@ -173,13 +182,15 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string RunnerInfo: runnerInfo, Kind: "directUpload", } - rndAssetName := fmt.Sprintf("load_test_%s", randName()) + rndAssetName := fmt.Sprintf("load_test_direct_%s", randName()) requestedUpload, err := vt.requestUploadUrls(rndAssetName) if err != nil { glog.Errorf("Error requesting upload urls: %v", err) uploadTest.RequestUploadSuccess = 0 uploadTest.ErrorMessage = err.Error() + vt.writeResultNdjson(uploadTest) + return } else { uploadTest.RequestUploadSuccess = 1 uploadTest.AssetID = requestedUpload.Asset.ID @@ -191,7 +202,6 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string if err != nil { glog.Errorf("Error uploading asset: %v", err) uploadTest.UploadSuccess = 0 - uploadTest.EndTime = time.Now() } else { uploadTest.UploadSuccess = 1 if vt.cliFlags.TaskCheck { @@ -204,7 +214,6 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string } } - uploadTest.EndTime = time.Now() } vt.writeResultNdjson(uploadTest) wg.Done() @@ -229,13 +238,15 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { RunnerInfo: runnerInfo, Kind: "resumable", } - rndAssetName := fmt.Sprintf("load_test_%s", randName()) + rndAssetName := fmt.Sprintf("load_test_resumable_%s", randName()) requestedUpload, err := vt.requestUploadUrls(rndAssetName) if err != nil { glog.Errorf("Error requesting upload urls: %v", err) uploadTest.RequestUploadSuccess = 0 uploadTest.ErrorMessage = err.Error() + vt.writeResultNdjson(uploadTest) + return } else { uploadTest.RequestUploadSuccess = 1 uploadTest.AssetID = requestedUpload.Asset.ID @@ -249,7 +260,6 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { if err != nil { glog.Errorf("Error on resumable upload: %v", err) uploadTest.UploadSuccess = 0 - uploadTest.EndTime = time.Now() } else { uploadTest.UploadSuccess = 1 if vt.cliFlags.TaskCheck { @@ -262,7 +272,6 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } } - uploadTest.EndTime = time.Now() } vt.writeResultNdjson(uploadTest) wg.Done() @@ -275,7 +284,54 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } +func (vt *vodLoadTester) importFromUrlTest(url string) { + wg := &sync.WaitGroup{} + + for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { + for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { + fmt.Printf("Importing video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) + wg.Add(1) + go func() { + uploadTest := uploadTest{ + StartTime: time.Now(), + Kind: "import", + } + + rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) + asset, task, err := vt.lapi.ImportAsset(url, rndAssetName) + + if err != nil { + glog.Errorf("Error importing asset: %v", err) + uploadTest.ImportSuccess = 0 + uploadTest.ErrorMessage = err.Error() + uploadTest.EndTime = time.Now() + } else { + uploadTest.AssetID = asset.ID + uploadTest.TaskID = task.ID + uploadTest.ImportSuccess = 1 + if vt.cliFlags.TaskCheck { + err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) + if err != nil { + uploadTest.TaskCheckSuccess = 0 + uploadTest.ErrorMessage = err.Error() + } else { + uploadTest.TaskCheckSuccess = 1 + } + } + + uploadTest.EndTime = time.Now() + } + vt.writeResultNdjson(uploadTest) + wg.Done() + }() + } + time.Sleep(vt.cliFlags.StartDelayDuration) + } + wg.Wait() +} + func (vt *vodLoadTester) writeResultNdjson(uploadTest uploadTest) { + uploadTest.EndTime = time.Now() jsonString, err := json.Marshal(uploadTest) if err != nil { glog.Errorf("Error converting runTests to json: %v", err) From 98bc4b16d65aadd93e1df87bbfdbfd5a16c815c6 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Fri, 16 Sep 2022 19:35:09 +0200 Subject: [PATCH 06/25] vodloadtester: changed flag name --- cmd/vodloadtester/vodloadtester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 6d72ea44..726ecf3f 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -65,7 +65,7 @@ func main() { flag.Set("logtostderr", "true") vFlag := flag.Lookup("v") - fs := flag.NewFlagSet("loadtester", flag.ExitOnError) + fs := flag.NewFlagSet("vodloadtester", flag.ExitOnError) fs.IntVar(&cliFlags.Verbosity, "v", 3, "Log verbosity. {4|5|6}") fs.BoolVar(&cliFlags.Version, "version", false, "Print out the version") From 541fe29d27caeea7ca88e6d76b67f9430a6a1580 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Fri, 16 Sep 2022 20:26:22 +0200 Subject: [PATCH 07/25] vodloadtester: added runner to import & defer deleteAsset on each task --- cmd/vodloadtester/vodloadtester.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 726ecf3f..8e432bf3 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -137,7 +137,7 @@ func main() { glog.Fatal("Cannot use -import with either -direct or -resumable") } fileName = cliFlags.Filename - vt.importFromUrlTest(fileName) + vt.importFromUrlTest(fileName, runnerInfo) } if cliFlags.DirectUpload || cliFlags.ResumableUpload { @@ -184,6 +184,7 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string } rndAssetName := fmt.Sprintf("load_test_direct_%s", randName()) requestedUpload, err := vt.requestUploadUrls(rndAssetName) + defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) if err != nil { glog.Errorf("Error requesting upload urls: %v", err) @@ -240,6 +241,7 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } rndAssetName := fmt.Sprintf("load_test_resumable_%s", randName()) requestedUpload, err := vt.requestUploadUrls(rndAssetName) + defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) if err != nil { glog.Errorf("Error requesting upload urls: %v", err) @@ -284,7 +286,7 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } -func (vt *vodLoadTester) importFromUrlTest(url string) { +func (vt *vodLoadTester) importFromUrlTest(url string, runnerInfo string) { wg := &sync.WaitGroup{} for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { @@ -293,12 +295,14 @@ func (vt *vodLoadTester) importFromUrlTest(url string) { wg.Add(1) go func() { uploadTest := uploadTest{ - StartTime: time.Now(), - Kind: "import", + StartTime: time.Now(), + RunnerInfo: runnerInfo, + Kind: "import", } rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) asset, task, err := vt.lapi.ImportAsset(url, rndAssetName) + defer vt.lapi.DeleteAsset(asset.ID) if err != nil { glog.Errorf("Error importing asset: %v", err) From 6f6690b268c8fb333e515e9b310105d2fdd4faf6 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Fri, 16 Sep 2022 20:32:10 +0200 Subject: [PATCH 08/25] vodloadtester: remove ip from runnerInfo --- cmd/vodloadtester/vodloadtester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 8e432bf3..968299f1 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -95,7 +95,7 @@ func main() { vFlag.Value.Set(fmt.Sprintf("%d", cliFlags.Verbosity)) hostName, _ := os.Hostname() - runnerInfo := fmt.Sprintf("Hostname %s OS %s IPs %v\n", hostName, runtime.GOOS, utils.GetIPs()) + runnerInfo := fmt.Sprintf("Hostname %s OS %s", hostName, runtime.GOOS) fmt.Printf("Compiler version: %s %s\n", runtime.Compiler, runtime.Version()) fmt.Print(runnerInfo) From 71d6c9c8674c04f0a11ed61823c3cda485d1adf8 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Mon, 14 Nov 2022 13:41:54 +0100 Subject: [PATCH 09/25] vodloadtester: address comments --- cmd/vodloadtester/vodloadtester.go | 38 ++++++++++-------------------- 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 968299f1..2c559934 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -134,10 +134,11 @@ func main() { if cliFlags.Import { if cliFlags.DirectUpload || cliFlags.ResumableUpload { - glog.Fatal("Cannot use -import with either -direct or -resumable") + glog.Infof("Cannot use -import with either -direct or -resumable, doing only the import task") } fileName = cliFlags.Filename vt.importFromUrlTest(fileName, runnerInfo) + return } if cliFlags.DirectUpload || cliFlags.ResumableUpload { @@ -185,6 +186,7 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string rndAssetName := fmt.Sprintf("load_test_direct_%s", randName()) requestedUpload, err := vt.requestUploadUrls(rndAssetName) defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) + defer wg.Done() if err != nil { glog.Errorf("Error requesting upload urls: %v", err) @@ -198,7 +200,7 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string uploadTest.TaskID = requestedUpload.Task.ID } - err = vt.uploadAsset(fileName, requestedUpload.Url) + err = vt.uploadAsset(fileName, requestedUpload.Url, false) if err != nil { glog.Errorf("Error uploading asset: %v", err) @@ -217,7 +219,6 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string } vt.writeResultNdjson(uploadTest) - wg.Done() }() } time.Sleep(vt.cliFlags.StartDelayDuration) @@ -242,6 +243,7 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { rndAssetName := fmt.Sprintf("load_test_resumable_%s", randName()) requestedUpload, err := vt.requestUploadUrls(rndAssetName) defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) + defer wg.Done() if err != nil { glog.Errorf("Error requesting upload urls: %v", err) @@ -257,7 +259,7 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { uploadUrl := requestedUpload.TusEndpoint - err = vt.uploadAssetResumable(uploadUrl, fileName) + err = vt.uploadAsset(fileName, uploadUrl, true) if err != nil { glog.Errorf("Error on resumable upload: %v", err) @@ -276,7 +278,6 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } vt.writeResultNdjson(uploadTest) - wg.Done() }() } time.Sleep(vt.cliFlags.StartDelayDuration) @@ -303,6 +304,7 @@ func (vt *vodLoadTester) importFromUrlTest(url string, runnerInfo string) { rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) asset, task, err := vt.lapi.ImportAsset(url, rndAssetName) defer vt.lapi.DeleteAsset(asset.ID) + defer wg.Done() if err != nil { glog.Errorf("Error importing asset: %v", err) @@ -326,7 +328,6 @@ func (vt *vodLoadTester) importFromUrlTest(url string, runnerInfo string) { uploadTest.EndTime = time.Now() } vt.writeResultNdjson(uploadTest) - wg.Done() }() } time.Sleep(vt.cliFlags.StartDelayDuration) @@ -350,7 +351,7 @@ func (vt *vodLoadTester) writeResultNdjson(uploadTest uploadTest) { } } -func (vt *vodLoadTester) uploadAsset(fileName string, uploadUrl string) error { +func (vt *vodLoadTester) uploadAsset(fileName string, uploadUrl string, resumable bool) error { file, err := os.Open(fileName) @@ -359,30 +360,17 @@ func (vt *vodLoadTester) uploadAsset(fileName string, uploadUrl string) error { return err } - err = vt.lapi.UploadAsset(vt.ctx, uploadUrl, file) - if err != nil { - fmt.Printf("error uploading asset: %v\n", err) - return err + if resumable { + err = vt.lapi.ResumableUpload(uploadUrl, file) + } else { + err = vt.lapi.UploadAsset(vt.ctx, uploadUrl, file) } - return err -} - -func (vt *vodLoadTester) uploadAssetResumable(url string, fileName string) error { - - file, err := os.Open(fileName) - if err != nil { - fmt.Printf("error opening file %s: %v\n", fileName, err) + fmt.Printf("error uploading asset: %v\n", err) return err } - err = vt.lapi.ResumableUpload(url, file) - - if err != nil { - fmt.Printf("error on resumable upload asset: %v", err) - } - return err } From 476ca5b06e0f5883731dfeb01a1a113b1352f729 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Mon, 14 Nov 2022 14:03:51 +0100 Subject: [PATCH 10/25] vodloadtester: added internal timeout on task check --- cmd/vodloadtester/vodloadtester.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 2c559934..89d8f61e 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -27,12 +27,13 @@ type cliArguments struct { ResumableUpload bool Import bool - TaskCheck bool - APIServer string - APIToken string - Filename string - VideoAmount uint - OutputPath string + TaskCheck bool + TaskCheckTimeout uint + APIServer string + APIToken string + Filename string + VideoAmount uint + OutputPath string TestDuration time.Duration StartDelayDuration time.Duration @@ -71,6 +72,8 @@ func main() { fs.BoolVar(&cliFlags.Version, "version", false, "Print out the version") fs.BoolVar(&cliFlags.TaskCheck, "task-check", false, "Check task processing") + fs.UintVar(&cliFlags.TaskCheckTimeout, "task-timeout", 500, "Task check timeout in seconds") + fs.BoolVar(&cliFlags.DirectUpload, "direct", false, "Launch direct upload test") fs.BoolVar(&cliFlags.ResumableUpload, "resumable", false, "Launch tus upload test") fs.BoolVar(&cliFlags.Import, "import", false, "Launch import from url test") @@ -376,7 +379,7 @@ func (vt *vodLoadTester) uploadAsset(fileName string, uploadUrl string, resumabl func (vt *vodLoadTester) checkTaskProcessing(taskPollDuration time.Duration, processingTask api.TaskOnlyId) error { startTime := time.Now() - timeout := 3 * time.Minute + timeout := time.Duration(vt.cliFlags.TaskCheckTimeout) * time.Second for { glog.Infof("Waiting %s for task id=%s to be processed, elapsed=%s", taskPollDuration, processingTask.ID, time.Since(startTime)) time.Sleep(taskPollDuration) From a0ac6a6a36aaef80f5cfb69d87de760a2382ccd1 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Mon, 14 Nov 2022 14:04:12 +0100 Subject: [PATCH 11/25] vodloadtester: added internal timeout on task check --- cmd/vodloadtester/vodloadtester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 89d8f61e..6e000dde 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -399,7 +399,7 @@ func (vt *vodLoadTester) checkTaskProcessing(taskPollDuration time.Duration, pro } if time.Since(startTime) > timeout { - glog.Errorf("Timeout processing task, taskId=%s", task.ID) + glog.Errorf("Internal timeout processing task, taskId=%s", task.ID) return fmt.Errorf("timeout processing task, taskId=%s", task.ID) } } From b78333137b664dd24c967f05d944bd1bc3307e9b Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 02:28:39 +0100 Subject: [PATCH 12/25] vodloadtester: added ability load JSON files with a list of files to import --- cmd/vodloadtester/vodloadtester.go | 119 +++++++++++++++++++++++++++-- 1 file changed, 114 insertions(+), 5 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 6e000dde..ab209b65 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -5,6 +5,7 @@ import ( "encoding/json" "flag" "fmt" + "io/ioutil" "math/rand" "os" "runtime" @@ -34,6 +35,7 @@ type cliArguments struct { Filename string VideoAmount uint OutputPath string + KeepAssets bool TestDuration time.Duration StartDelayDuration time.Duration @@ -58,6 +60,11 @@ type uploadTest struct { ImportSuccess uint `json:"importSuccess,omitempty"` TaskCheckSuccess uint `json:"taskCheckSuccess,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` + UrlSource string `json:"urlSource,omitempty"` +} + +type jsonImport struct { + Url string `json:"url"` } func main() { @@ -77,12 +84,13 @@ func main() { fs.BoolVar(&cliFlags.DirectUpload, "direct", false, "Launch direct upload test") fs.BoolVar(&cliFlags.ResumableUpload, "resumable", false, "Launch tus upload test") fs.BoolVar(&cliFlags.Import, "import", false, "Launch import from url test") + fs.BoolVar(&cliFlags.KeepAssets, "keep-assets", false, "Keep assets after test") fs.UintVar(&cliFlags.VideoAmount, "video-amt", 1, "How many video to upload") fs.DurationVar(&cliFlags.StartDelayDuration, "delay-between-groups", 10*time.Second, "Delay between starting group of uploads") fs.UintVar(&cliFlags.Simultaneous, "sim", 1, "Number of simulteneous videos to upload") - fs.StringVar(&cliFlags.Filename, "file", "bbb_sunflower_1080p_30fps_normal_2min.mp4", "File to upload or url to import") + fs.StringVar(&cliFlags.Filename, "file", "bbb_sunflower_1080p_30fps_normal_2min.mp4", "File to upload or url to import. Can be either a video or a .json array of objects with a url key") fs.StringVar(&cliFlags.APIToken, "api-token", "", "Token of the Livepeer API to be used") fs.StringVar(&cliFlags.APIServer, "api-server", "origin.livepeer.com", "Server of the Livepeer API to be used") fs.StringVar(&cliFlags.OutputPath, "output-path", "/tmp/results.ndjson", "Path to output result .ndjson file") @@ -140,7 +148,17 @@ func main() { glog.Infof("Cannot use -import with either -direct or -resumable, doing only the import task") } fileName = cliFlags.Filename - vt.importFromUrlTest(fileName, runnerInfo) + + if fileName == "" { + glog.Fatalf("No file name provided") + } + + if strings.HasSuffix(fileName, ".json") { + glog.Infof("Importing from json file %s. Ignoring any -video-amount parameter provided.", fileName) + vt.importFromJSON(fileName, runnerInfo) + } else { + vt.importFromUrlTest(fileName, runnerInfo) + } return } @@ -188,7 +206,10 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string } rndAssetName := fmt.Sprintf("load_test_direct_%s", randName()) requestedUpload, err := vt.requestUploadUrls(rndAssetName) - defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) + if !vt.cliFlags.KeepAssets { + defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) + } + defer wg.Done() if err != nil { @@ -245,7 +266,10 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } rndAssetName := fmt.Sprintf("load_test_resumable_%s", randName()) requestedUpload, err := vt.requestUploadUrls(rndAssetName) - defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) + if !vt.cliFlags.KeepAssets { + defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) + } + defer wg.Done() if err != nil { @@ -290,6 +314,83 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } +func (vt *vodLoadTester) importFromJSON(jsonFile string, runnerInfo string) { + data, err := ioutil.ReadFile(jsonFile) + + if err != nil { + glog.Errorf("Error reading json file: %v", err) + return + } + + var jsonData []jsonImport + err = json.Unmarshal(data, &jsonData) + + if err != nil { + glog.Errorf("Error parsing json file: %v", err) + return + } + + wg := &sync.WaitGroup{} + + for i := 0; i < len(jsonData); i += int(vt.cliFlags.Simultaneous) { + for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { + if i+j >= len(jsonData) { + break + } + fmt.Printf("Import video %d/%d\n", i+j+1, len(jsonData)) + wg.Add(1) + index := i + j + go func(i int) { + + uploadTest := uploadTest{ + StartTime: time.Now(), + RunnerInfo: runnerInfo, + Kind: "import", + UrlSource: jsonData[index].Url, + } + + rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) + fmt.Printf("Importing %s from %s", rndAssetName, jsonData[index].Url) + asset, task, err := vt.lapi.ImportAsset(jsonData[index].Url, rndAssetName) + if !vt.cliFlags.KeepAssets { + defer vt.lapi.DeleteAsset(asset.ID) + } + + defer wg.Done() + + if err != nil { + glog.Errorf("Error importing asset: %v", err) + uploadTest.ImportSuccess = 0 + uploadTest.ErrorMessage = err.Error() + vt.writeResultNdjson(uploadTest) + return + } else { + uploadTest.ImportSuccess = 1 + uploadTest.AssetID = asset.ID + uploadTest.TaskID = task.ID + } + + if vt.cliFlags.TaskCheck { + err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) + if err != nil { + uploadTest.TaskCheckSuccess = 0 + uploadTest.ErrorMessage = err.Error() + } else { + uploadTest.TaskCheckSuccess = 1 + } + } + + uploadTest.EndTime = time.Now() + + vt.writeResultNdjson(uploadTest) + + }(i + j) + } + time.Sleep(vt.cliFlags.StartDelayDuration) + } + wg.Wait() +} + func (vt *vodLoadTester) importFromUrlTest(url string, runnerInfo string) { wg := &sync.WaitGroup{} @@ -302,11 +403,15 @@ func (vt *vodLoadTester) importFromUrlTest(url string, runnerInfo string) { StartTime: time.Now(), RunnerInfo: runnerInfo, Kind: "import", + UrlSource: url, } rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) asset, task, err := vt.lapi.ImportAsset(url, rndAssetName) - defer vt.lapi.DeleteAsset(asset.ID) + if !vt.cliFlags.KeepAssets { + defer vt.lapi.DeleteAsset(asset.ID) + } + defer wg.Done() if err != nil { @@ -387,6 +492,10 @@ func (vt *vodLoadTester) checkTaskProcessing(taskPollDuration time.Duration, pro task, err := vt.lapi.GetTask(processingTask.ID) if err != nil { glog.Errorf("Error retrieving task id=%s err=%v", processingTask.ID, err) + if strings.Contains(err.Error(), "connection reset by peer") { + // Retry + continue + } return fmt.Errorf("error retrieving task id=%s: %w", processingTask.ID, err) } if task.Status.Phase == "completed" { From a58bd91e6acf866e7e99cdddeaeb88be3b09de21 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 03:15:45 +0100 Subject: [PATCH 13/25] vodloadtester: consistency in casing of keys in results & retry on 520 error --- cmd/vodloadtester/vodloadtester.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index ab209b65..ec046815 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -49,8 +49,8 @@ type vodLoadTester struct { } type uploadTest struct { - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime"` RunnerInfo string `json:"runnerInfo,omitempty"` Kind string `json:"kind,omitempty"` AssetID string `json:"assetId,omitempty"` @@ -92,7 +92,7 @@ func main() { fs.UintVar(&cliFlags.Simultaneous, "sim", 1, "Number of simulteneous videos to upload") fs.StringVar(&cliFlags.Filename, "file", "bbb_sunflower_1080p_30fps_normal_2min.mp4", "File to upload or url to import. Can be either a video or a .json array of objects with a url key") fs.StringVar(&cliFlags.APIToken, "api-token", "", "Token of the Livepeer API to be used") - fs.StringVar(&cliFlags.APIServer, "api-server", "origin.livepeer.com", "Server of the Livepeer API to be used") + fs.StringVar(&cliFlags.APIServer, "api-server", "origin.livepeer.monster", "Server of the Livepeer API to be used") fs.StringVar(&cliFlags.OutputPath, "output-path", "/tmp/results.ndjson", "Path to output result .ndjson file") _ = fs.String("config", "", "config file (optional)") @@ -492,7 +492,7 @@ func (vt *vodLoadTester) checkTaskProcessing(taskPollDuration time.Duration, pro task, err := vt.lapi.GetTask(processingTask.ID) if err != nil { glog.Errorf("Error retrieving task id=%s err=%v", processingTask.ID, err) - if strings.Contains(err.Error(), "connection reset by peer") { + if strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "520") { // Retry continue } From 25951ef792e60aeb03932abc4610e0c34e546730 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 05:23:19 +0100 Subject: [PATCH 14/25] vodloadtester: 2 import functions into one! --- cmd/vodloadtester/vodloadtester.go | 134 +++++++++++------------------ 1 file changed, 51 insertions(+), 83 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index ec046815..0b00647b 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -90,7 +90,7 @@ func main() { fs.DurationVar(&cliFlags.StartDelayDuration, "delay-between-groups", 10*time.Second, "Delay between starting group of uploads") fs.UintVar(&cliFlags.Simultaneous, "sim", 1, "Number of simulteneous videos to upload") - fs.StringVar(&cliFlags.Filename, "file", "bbb_sunflower_1080p_30fps_normal_2min.mp4", "File to upload or url to import. Can be either a video or a .json array of objects with a url key") + fs.StringVar(&cliFlags.Filename, "file", "", "File to upload or url to import. Can be either a video or a .json array of objects with a url key") fs.StringVar(&cliFlags.APIToken, "api-token", "", "Token of the Livepeer API to be used") fs.StringVar(&cliFlags.APIServer, "api-server", "origin.livepeer.monster", "Server of the Livepeer API to be used") fs.StringVar(&cliFlags.OutputPath, "output-path", "/tmp/results.ndjson", "Path to output result .ndjson file") @@ -145,17 +145,19 @@ func main() { if cliFlags.Import { if cliFlags.DirectUpload || cliFlags.ResumableUpload { - glog.Infof("Cannot use -import with either -direct or -resumable, doing only the import task") + glog.Fatalf("Cannot use -import with either -direct or -resumable") + return } fileName = cliFlags.Filename if fileName == "" { glog.Fatalf("No file name provided") + return } if strings.HasSuffix(fileName, ".json") { glog.Infof("Importing from json file %s. Ignoring any -video-amount parameter provided.", fileName) - vt.importFromJSON(fileName, runnerInfo) + vt.importFromJSONTest(fileName, runnerInfo) } else { vt.importFromUrlTest(fileName, runnerInfo) } @@ -314,7 +316,7 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } -func (vt *vodLoadTester) importFromJSON(jsonFile string, runnerInfo string) { +func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) { data, err := ioutil.ReadFile(jsonFile) if err != nil { @@ -341,49 +343,7 @@ func (vt *vodLoadTester) importFromJSON(jsonFile string, runnerInfo string) { wg.Add(1) index := i + j go func(i int) { - - uploadTest := uploadTest{ - StartTime: time.Now(), - RunnerInfo: runnerInfo, - Kind: "import", - UrlSource: jsonData[index].Url, - } - - rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) - fmt.Printf("Importing %s from %s", rndAssetName, jsonData[index].Url) - asset, task, err := vt.lapi.ImportAsset(jsonData[index].Url, rndAssetName) - if !vt.cliFlags.KeepAssets { - defer vt.lapi.DeleteAsset(asset.ID) - } - - defer wg.Done() - - if err != nil { - glog.Errorf("Error importing asset: %v", err) - uploadTest.ImportSuccess = 0 - uploadTest.ErrorMessage = err.Error() - vt.writeResultNdjson(uploadTest) - return - } else { - uploadTest.ImportSuccess = 1 - uploadTest.AssetID = asset.ID - uploadTest.TaskID = task.ID - } - - if vt.cliFlags.TaskCheck { - err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) - if err != nil { - uploadTest.TaskCheckSuccess = 0 - uploadTest.ErrorMessage = err.Error() - } else { - uploadTest.TaskCheckSuccess = 1 - } - } - - uploadTest.EndTime = time.Now() - - vt.writeResultNdjson(uploadTest) - + vt.importFromUrl(jsonData[index].Url, runnerInfo, wg) }(i + j) } time.Sleep(vt.cliFlags.StartDelayDuration) @@ -399,48 +359,56 @@ func (vt *vodLoadTester) importFromUrlTest(url string, runnerInfo string) { fmt.Printf("Importing video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { - uploadTest := uploadTest{ - StartTime: time.Now(), - RunnerInfo: runnerInfo, - Kind: "import", - UrlSource: url, - } + vt.importFromUrl(url, runnerInfo, wg) + }() + } + time.Sleep(vt.cliFlags.StartDelayDuration) + } + wg.Wait() +} - rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) - asset, task, err := vt.lapi.ImportAsset(url, rndAssetName) - if !vt.cliFlags.KeepAssets { - defer vt.lapi.DeleteAsset(asset.ID) - } +func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.WaitGroup) { + uploadTest := uploadTest{ + StartTime: time.Now(), + RunnerInfo: runnerInfo, + Kind: "import", + UrlSource: url, + } - defer wg.Done() + rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) + fmt.Printf("Importing %s from %s", rndAssetName, url) + asset, task, err := vt.lapi.ImportAsset(url, rndAssetName) + if !vt.cliFlags.KeepAssets { + defer vt.lapi.DeleteAsset(asset.ID) + } - if err != nil { - glog.Errorf("Error importing asset: %v", err) - uploadTest.ImportSuccess = 0 - uploadTest.ErrorMessage = err.Error() - uploadTest.EndTime = time.Now() - } else { - uploadTest.AssetID = asset.ID - uploadTest.TaskID = task.ID - uploadTest.ImportSuccess = 1 - if vt.cliFlags.TaskCheck { - err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) - if err != nil { - uploadTest.TaskCheckSuccess = 0 - uploadTest.ErrorMessage = err.Error() - } else { - uploadTest.TaskCheckSuccess = 1 - } - } + defer wg.Done() - uploadTest.EndTime = time.Now() - } - vt.writeResultNdjson(uploadTest) - }() + if err != nil { + glog.Errorf("Error importing asset: %v", err) + uploadTest.ImportSuccess = 0 + uploadTest.ErrorMessage = err.Error() + vt.writeResultNdjson(uploadTest) + return + } else { + uploadTest.ImportSuccess = 1 + uploadTest.AssetID = asset.ID + uploadTest.TaskID = task.ID + } + + if vt.cliFlags.TaskCheck { + err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) + if err != nil { + uploadTest.TaskCheckSuccess = 0 + uploadTest.ErrorMessage = err.Error() + } else { + uploadTest.TaskCheckSuccess = 1 } - time.Sleep(vt.cliFlags.StartDelayDuration) } - wg.Wait() + + uploadTest.EndTime = time.Now() + + vt.writeResultNdjson(uploadTest) } func (vt *vodLoadTester) writeResultNdjson(uploadTest uploadTest) { From 102de00e5276c9829615ce67b617449b20b6b9bc Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 05:34:06 +0100 Subject: [PATCH 15/25] vodloadtester: 2 upload functions into one! --- cmd/vodloadtester/vodloadtester.go | 154 ++++++++++++----------------- 1 file changed, 63 insertions(+), 91 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 0b00647b..dead7d13 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -201,50 +201,7 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string fmt.Printf("Uploading video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { - uploadTest := uploadTest{ - StartTime: time.Now(), - RunnerInfo: runnerInfo, - Kind: "directUpload", - } - rndAssetName := fmt.Sprintf("load_test_direct_%s", randName()) - requestedUpload, err := vt.requestUploadUrls(rndAssetName) - if !vt.cliFlags.KeepAssets { - defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) - } - - defer wg.Done() - - if err != nil { - glog.Errorf("Error requesting upload urls: %v", err) - uploadTest.RequestUploadSuccess = 0 - uploadTest.ErrorMessage = err.Error() - vt.writeResultNdjson(uploadTest) - return - } else { - uploadTest.RequestUploadSuccess = 1 - uploadTest.AssetID = requestedUpload.Asset.ID - uploadTest.TaskID = requestedUpload.Task.ID - } - - err = vt.uploadAsset(fileName, requestedUpload.Url, false) - - if err != nil { - glog.Errorf("Error uploading asset: %v", err) - uploadTest.UploadSuccess = 0 - } else { - uploadTest.UploadSuccess = 1 - if vt.cliFlags.TaskCheck { - err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) - if err != nil { - uploadTest.TaskCheckSuccess = 0 - uploadTest.ErrorMessage = err.Error() - } else { - uploadTest.TaskCheckSuccess = 1 - } - } - - } - vt.writeResultNdjson(uploadTest) + vt.doUpload(fileName, runnerInfo, wg, false) }() } time.Sleep(vt.cliFlags.StartDelayDuration) @@ -258,55 +215,10 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { - fmt.Printf("rUpload video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) + fmt.Printf("Uploading resumable video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { - uploadTest := uploadTest{ - StartTime: time.Now(), - RunnerInfo: runnerInfo, - Kind: "resumable", - } - rndAssetName := fmt.Sprintf("load_test_resumable_%s", randName()) - requestedUpload, err := vt.requestUploadUrls(rndAssetName) - if !vt.cliFlags.KeepAssets { - defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) - } - - defer wg.Done() - - if err != nil { - glog.Errorf("Error requesting upload urls: %v", err) - uploadTest.RequestUploadSuccess = 0 - uploadTest.ErrorMessage = err.Error() - vt.writeResultNdjson(uploadTest) - return - } else { - uploadTest.RequestUploadSuccess = 1 - uploadTest.AssetID = requestedUpload.Asset.ID - uploadTest.TaskID = requestedUpload.Task.ID - } - - uploadUrl := requestedUpload.TusEndpoint - - err = vt.uploadAsset(fileName, uploadUrl, true) - - if err != nil { - glog.Errorf("Error on resumable upload: %v", err) - uploadTest.UploadSuccess = 0 - } else { - uploadTest.UploadSuccess = 1 - if vt.cliFlags.TaskCheck { - err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) - if err != nil { - uploadTest.TaskCheckSuccess = 0 - uploadTest.ErrorMessage = err.Error() - } else { - uploadTest.TaskCheckSuccess = 1 - } - } - - } - vt.writeResultNdjson(uploadTest) + vt.doUpload(fileName, runnerInfo, wg, true) }() } time.Sleep(vt.cliFlags.StartDelayDuration) @@ -316,6 +228,66 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } +func (vt *vodLoadTester) doUpload(fileName, runnerInfo string, wg *sync.WaitGroup, resumable bool) { + + uploadKind := "directUpload" + + if resumable { + uploadKind = "resumableUpload" + } + + uploadTest := uploadTest{ + StartTime: time.Now(), + RunnerInfo: runnerInfo, + Kind: uploadKind, + } + rndAssetName := fmt.Sprintf("load_test_%s_%s", uploadKind, randName()) + requestedUpload, err := vt.requestUploadUrls(rndAssetName) + if !vt.cliFlags.KeepAssets { + defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) + } + + defer wg.Done() + + if err != nil { + glog.Errorf("Error requesting upload urls: %v", err) + uploadTest.RequestUploadSuccess = 0 + uploadTest.ErrorMessage = err.Error() + vt.writeResultNdjson(uploadTest) + return + } else { + uploadTest.RequestUploadSuccess = 1 + uploadTest.AssetID = requestedUpload.Asset.ID + uploadTest.TaskID = requestedUpload.Task.ID + } + + uploadUrl := requestedUpload.Url + + if resumable { + uploadUrl = requestedUpload.TusEndpoint + } + + err = vt.uploadAsset(fileName, uploadUrl, resumable) + + if err != nil { + glog.Errorf("Error on %s: %v", uploadKind, err) + uploadTest.UploadSuccess = 0 + } else { + uploadTest.UploadSuccess = 1 + if vt.cliFlags.TaskCheck { + err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) + if err != nil { + uploadTest.TaskCheckSuccess = 0 + uploadTest.ErrorMessage = err.Error() + } else { + uploadTest.TaskCheckSuccess = 1 + } + } + + } + vt.writeResultNdjson(uploadTest) +} + func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) { data, err := ioutil.ReadFile(jsonFile) From 687096ecddf16f70690fa7cf5163c9dc869ca7b3 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 05:37:41 +0100 Subject: [PATCH 16/25] vodloadtester: added task poll time --- cmd/vodloadtester/vodloadtester.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index dead7d13..17075aba 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -28,14 +28,15 @@ type cliArguments struct { ResumableUpload bool Import bool - TaskCheck bool - TaskCheckTimeout uint - APIServer string - APIToken string - Filename string - VideoAmount uint - OutputPath string - KeepAssets bool + TaskCheck bool + TaskCheckTimeout uint + TaskCheckPollTime uint + APIServer string + APIToken string + Filename string + VideoAmount uint + OutputPath string + KeepAssets bool TestDuration time.Duration StartDelayDuration time.Duration @@ -80,6 +81,7 @@ func main() { fs.BoolVar(&cliFlags.TaskCheck, "task-check", false, "Check task processing") fs.UintVar(&cliFlags.TaskCheckTimeout, "task-timeout", 500, "Task check timeout in seconds") + fs.UintVar(&cliFlags.TaskCheckPollTime, "task-poll-time", 5, "Task check poll time in seconds") fs.BoolVar(&cliFlags.DirectUpload, "direct", false, "Launch direct upload test") fs.BoolVar(&cliFlags.ResumableUpload, "resumable", false, "Launch tus upload test") @@ -275,7 +277,7 @@ func (vt *vodLoadTester) doUpload(fileName, runnerInfo string, wg *sync.WaitGrou } else { uploadTest.UploadSuccess = 1 if vt.cliFlags.TaskCheck { - err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) + err := vt.checkTaskProcessing(api.TaskOnlyId{ID: uploadTest.TaskID}) if err != nil { uploadTest.TaskCheckSuccess = 0 uploadTest.ErrorMessage = err.Error() @@ -369,7 +371,7 @@ func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.W } if vt.cliFlags.TaskCheck { - err := vt.checkTaskProcessing(5*time.Second, api.TaskOnlyId{ID: uploadTest.TaskID}) + err := vt.checkTaskProcessing(api.TaskOnlyId{ID: uploadTest.TaskID}) if err != nil { uploadTest.TaskCheckSuccess = 0 uploadTest.ErrorMessage = err.Error() @@ -422,7 +424,8 @@ func (vt *vodLoadTester) uploadAsset(fileName string, uploadUrl string, resumabl return err } -func (vt *vodLoadTester) checkTaskProcessing(taskPollDuration time.Duration, processingTask api.TaskOnlyId) error { +func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId) error { + taskPollDuration := time.Duration(vt.cliFlags.TaskCheckPollTime) * time.Second startTime := time.Now() timeout := time.Duration(vt.cliFlags.TaskCheckTimeout) * time.Second for { From ad1181544640c18539a2d7f974cd853e88f5d5bd Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 05:45:19 +0100 Subject: [PATCH 17/25] vodloadtester: cleanup & move around --- cmd/vodloadtester/vodloadtester.go | 42 ++++++++++++++++-------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 17075aba..ff67ec6e 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -78,25 +78,32 @@ func main() { fs.IntVar(&cliFlags.Verbosity, "v", 3, "Log verbosity. {4|5|6}") fs.BoolVar(&cliFlags.Version, "version", false, "Print out the version") - fs.BoolVar(&cliFlags.TaskCheck, "task-check", false, "Check task processing") - fs.UintVar(&cliFlags.TaskCheckTimeout, "task-timeout", 500, "Task check timeout in seconds") - fs.UintVar(&cliFlags.TaskCheckPollTime, "task-poll-time", 5, "Task check poll time in seconds") + // Credentials + fs.StringVar(&cliFlags.APIToken, "api-token", "", "Token of the Livepeer API to be used") + fs.StringVar(&cliFlags.APIServer, "api-server", "origin.livepeer.monster", "Server of the Livepeer API to be used") + // Tests fs.BoolVar(&cliFlags.DirectUpload, "direct", false, "Launch direct upload test") fs.BoolVar(&cliFlags.ResumableUpload, "resumable", false, "Launch tus upload test") fs.BoolVar(&cliFlags.Import, "import", false, "Launch import from url test") - fs.BoolVar(&cliFlags.KeepAssets, "keep-assets", false, "Keep assets after test") - fs.UintVar(&cliFlags.VideoAmount, "video-amt", 1, "How many video to upload") - fs.DurationVar(&cliFlags.StartDelayDuration, "delay-between-groups", 10*time.Second, "Delay between starting group of uploads") - - fs.UintVar(&cliFlags.Simultaneous, "sim", 1, "Number of simulteneous videos to upload") + // Input files and results fs.StringVar(&cliFlags.Filename, "file", "", "File to upload or url to import. Can be either a video or a .json array of objects with a url key") - fs.StringVar(&cliFlags.APIToken, "api-token", "", "Token of the Livepeer API to be used") - fs.StringVar(&cliFlags.APIServer, "api-server", "origin.livepeer.monster", "Server of the Livepeer API to be used") fs.StringVar(&cliFlags.OutputPath, "output-path", "/tmp/results.ndjson", "Path to output result .ndjson file") + // Test parameters + fs.UintVar(&cliFlags.VideoAmount, "video-amt", 1, "How many video to upload or import") + fs.UintVar(&cliFlags.Simultaneous, "sim", 1, "Number of simulteneous videos to upload or import (batch size)") + fs.DurationVar(&cliFlags.StartDelayDuration, "delay-between-groups", 10*time.Second, "Delay between starting group of uploads") + + // Task check parameters + fs.BoolVar(&cliFlags.TaskCheck, "task-check", false, "Check task processing") + fs.UintVar(&cliFlags.TaskCheckTimeout, "task-timeout", 500, "Task check timeout in seconds") + fs.UintVar(&cliFlags.TaskCheckPollTime, "task-poll-time", 5, "Task check poll time in seconds") + + fs.BoolVar(&cliFlags.KeepAssets, "keep-assets", false, "Keep assets after test") + _ = fs.String("config", "", "config file (optional)") ff.Parse(fs, os.Args[1:], @@ -152,11 +159,6 @@ func main() { } fileName = cliFlags.Filename - if fileName == "" { - glog.Fatalf("No file name provided") - return - } - if strings.HasSuffix(fileName, ".json") { glog.Infof("Importing from json file %s. Ignoring any -video-amount parameter provided.", fileName) vt.importFromJSONTest(fileName, runnerInfo) @@ -203,7 +205,7 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string fmt.Printf("Uploading video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { - vt.doUpload(fileName, runnerInfo, wg, false) + vt.uploadFile(fileName, runnerInfo, wg, false) }() } time.Sleep(vt.cliFlags.StartDelayDuration) @@ -220,7 +222,7 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { fmt.Printf("Uploading resumable video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { - vt.doUpload(fileName, runnerInfo, wg, true) + vt.uploadFile(fileName, runnerInfo, wg, true) }() } time.Sleep(vt.cliFlags.StartDelayDuration) @@ -230,7 +232,7 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } -func (vt *vodLoadTester) doUpload(fileName, runnerInfo string, wg *sync.WaitGroup, resumable bool) { +func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGroup, resumable bool) { uploadKind := "directUpload" @@ -269,7 +271,7 @@ func (vt *vodLoadTester) doUpload(fileName, runnerInfo string, wg *sync.WaitGrou uploadUrl = requestedUpload.TusEndpoint } - err = vt.uploadAsset(fileName, uploadUrl, resumable) + err = vt.doUpload(fileName, uploadUrl, resumable) if err != nil { glog.Errorf("Error on %s: %v", uploadKind, err) @@ -401,7 +403,7 @@ func (vt *vodLoadTester) writeResultNdjson(uploadTest uploadTest) { } } -func (vt *vodLoadTester) uploadAsset(fileName string, uploadUrl string, resumable bool) error { +func (vt *vodLoadTester) doUpload(fileName string, uploadUrl string, resumable bool) error { file, err := os.Open(fileName) From 8ed1891fa1aaf029f23c296f04214195a97f8589 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 05:54:50 +0100 Subject: [PATCH 18/25] vodloadtester: added percentage tracking --- cmd/vodloadtester/vodloadtester.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index ff67ec6e..0ad780c8 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -9,6 +9,7 @@ import ( "math/rand" "os" "runtime" + "strconv" "strings" "sync" "time" @@ -431,10 +432,19 @@ func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId) erro startTime := time.Now() timeout := time.Duration(vt.cliFlags.TaskCheckTimeout) * time.Second for { - glog.Infof("Waiting %s for task id=%s to be processed, elapsed=%s", taskPollDuration, processingTask.ID, time.Since(startTime)) + time.Sleep(taskPollDuration) task, err := vt.lapi.GetTask(processingTask.ID) + progress := "" + if task.Status.Phase == "running" { + percentage := task.Status.Progress * 100 + stringPercentage := strconv.FormatFloat(percentage, 'f', 2, 64) + progress = fmt.Sprintf("progress=%s%%", stringPercentage) + } + + glog.Infof("Waiting %s for task id=%s to be processed, elapsed=%s %s", taskPollDuration, processingTask.ID, time.Since(startTime), progress) + if err != nil { glog.Errorf("Error retrieving task id=%s err=%v", processingTask.ID, err) if strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "520") { @@ -456,6 +466,7 @@ func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId) erro glog.Errorf("Internal timeout processing task, taskId=%s", task.ID) return fmt.Errorf("timeout processing task, taskId=%s", task.ID) } + } } From 3e3068144e05f02deb996cfae0bc2332684ad603 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 06:05:54 +0100 Subject: [PATCH 19/25] vodloadtester: added glog verbosity --- cmd/vodloadtester/vodloadtester.go | 53 +++++++++++++++--------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 0ad780c8..e8e57612 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -16,6 +16,7 @@ import ( "github.com/golang/glog" api "github.com/livepeer/go-api-client" + "github.com/livepeer/go-api-client/logs" "github.com/livepeer/stream-tester/internal/utils" "github.com/peterbourgon/ff/v2" ) @@ -77,7 +78,7 @@ func main() { fs := flag.NewFlagSet("vodloadtester", flag.ExitOnError) - fs.IntVar(&cliFlags.Verbosity, "v", 3, "Log verbosity. {4|5|6}") + fs.IntVar(&cliFlags.Verbosity, "v", 5, "Log verbosity. {4|5|6}") fs.BoolVar(&cliFlags.Version, "version", false, "Print out the version") // Credentials @@ -117,8 +118,8 @@ func main() { hostName, _ := os.Hostname() runnerInfo := fmt.Sprintf("Hostname %s OS %s", hostName, runtime.GOOS) - fmt.Printf("Compiler version: %s %s\n", runtime.Compiler, runtime.Version()) - fmt.Print(runnerInfo) + glog.V(logs.DEBUG).Infof("Compiler version: %s %s\n", runtime.Compiler, runtime.Version()) + glog.V(logs.DEBUG).Infof(runnerInfo) if cliFlags.Version { return @@ -134,7 +135,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) if cliFlags.APIToken == "" { - glog.Fatalf("No API token provided") + glog.V(logs.DEBUG).Infof("No API token provided") } apiToken := cliFlags.APIToken @@ -155,13 +156,13 @@ func main() { if cliFlags.Import { if cliFlags.DirectUpload || cliFlags.ResumableUpload { - glog.Fatalf("Cannot use -import with either -direct or -resumable") + glog.V(logs.DEBUG).Infof("Cannot use -import with either -direct or -resumable") return } fileName = cliFlags.Filename if strings.HasSuffix(fileName, ".json") { - glog.Infof("Importing from json file %s. Ignoring any -video-amount parameter provided.", fileName) + glog.V(logs.DEBUG).Infof("Importing from json file %s. Ignoring any -video-amount parameter provided.", fileName) vt.importFromJSONTest(fileName, runnerInfo) } else { vt.importFromUrlTest(fileName, runnerInfo) @@ -172,18 +173,18 @@ func main() { if cliFlags.DirectUpload || cliFlags.ResumableUpload { if fileName, err = utils.GetFile(cliFlags.Filename, strings.ReplaceAll(hostName, ".", "_")); err != nil { if err == utils.ErrNotFound { - glog.Fatalf("file %s not found\n", cliFlags.Filename) + glog.V(logs.DEBUG).Infof("file %s not found\n", cliFlags.Filename) } else { - glog.Fatalf("error getting file %s: %v\n", cliFlags.Filename, err) + glog.V(logs.DEBUG).Infof("error getting file %s: %v\n", cliFlags.Filename, err) } } if cliFlags.DirectUpload { - glog.Infof("Launching direct upload load test for %q", fileName) + glog.V(logs.DEBUG).Infof("Launching direct upload load test for %q", fileName) vt.directUploadLoadTest(fileName, runnerInfo) } if cliFlags.ResumableUpload { - glog.Infof("Launching resumable upload load test for %q", fileName) + glog.V(logs.DEBUG).Infof("Launching resumable upload load test for %q", fileName) vt.resumableUploadLoadTest(fileName, runnerInfo) } } @@ -203,7 +204,7 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { - fmt.Printf("Uploading video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) + glog.V(logs.DEBUG).Infof("Uploading video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { vt.uploadFile(fileName, runnerInfo, wg, false) @@ -220,7 +221,7 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { - fmt.Printf("Uploading resumable video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) + glog.V(logs.DEBUG).Infof("Uploading resumable video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { vt.uploadFile(fileName, runnerInfo, wg, true) @@ -255,7 +256,7 @@ func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGr defer wg.Done() if err != nil { - glog.Errorf("Error requesting upload urls: %v", err) + glog.V(logs.DEBUG).Infof("Error requesting upload urls: %v", err) uploadTest.RequestUploadSuccess = 0 uploadTest.ErrorMessage = err.Error() vt.writeResultNdjson(uploadTest) @@ -275,7 +276,7 @@ func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGr err = vt.doUpload(fileName, uploadUrl, resumable) if err != nil { - glog.Errorf("Error on %s: %v", uploadKind, err) + glog.V(logs.DEBUG).Infof("Error on %s: %v", uploadKind, err) uploadTest.UploadSuccess = 0 } else { uploadTest.UploadSuccess = 1 @@ -297,7 +298,7 @@ func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) data, err := ioutil.ReadFile(jsonFile) if err != nil { - glog.Errorf("Error reading json file: %v", err) + glog.V(logs.DEBUG).Infof("Error reading json file: %v", err) return } @@ -305,7 +306,7 @@ func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) err = json.Unmarshal(data, &jsonData) if err != nil { - glog.Errorf("Error parsing json file: %v", err) + glog.V(logs.DEBUG).Infof("Error parsing json file: %v", err) return } @@ -316,7 +317,7 @@ func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) if i+j >= len(jsonData) { break } - fmt.Printf("Import video %d/%d\n", i+j+1, len(jsonData)) + glog.V(logs.DEBUG).Infof("Import video %d/%d\n", i+j+1, len(jsonData)) wg.Add(1) index := i + j go func(i int) { @@ -333,7 +334,7 @@ func (vt *vodLoadTester) importFromUrlTest(url string, runnerInfo string) { for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { - fmt.Printf("Importing video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) + glog.V(logs.DEBUG).Infof("Importing video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { vt.importFromUrl(url, runnerInfo, wg) @@ -353,7 +354,7 @@ func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.W } rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) - fmt.Printf("Importing %s from %s", rndAssetName, url) + glog.V(logs.DEBUG).Infof("Importing %s from %s", rndAssetName, url) asset, task, err := vt.lapi.ImportAsset(url, rndAssetName) if !vt.cliFlags.KeepAssets { defer vt.lapi.DeleteAsset(asset.ID) @@ -362,7 +363,7 @@ func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.W defer wg.Done() if err != nil { - glog.Errorf("Error importing asset: %v", err) + glog.V(logs.DEBUG).Infof("Error importing asset: %v", err) uploadTest.ImportSuccess = 0 uploadTest.ErrorMessage = err.Error() vt.writeResultNdjson(uploadTest) @@ -392,15 +393,15 @@ func (vt *vodLoadTester) writeResultNdjson(uploadTest uploadTest) { uploadTest.EndTime = time.Now() jsonString, err := json.Marshal(uploadTest) if err != nil { - glog.Errorf("Error converting runTests to json: %v", err) + glog.V(logs.DEBUG).Infof("Error converting runTests to json: %v", err) } f, err := os.OpenFile(vt.cliFlags.OutputPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - glog.Errorf("Error opening %s: %v", vt.cliFlags.OutputPath, err) + glog.V(logs.DEBUG).Infof("Error opening %s: %v", vt.cliFlags.OutputPath, err) } defer f.Close() if _, err := f.WriteString(string(jsonString) + "\n"); err != nil { - glog.Errorf("Error writing to %s: %v", vt.cliFlags.OutputPath, err) + glog.V(logs.DEBUG).Infof("Error writing to %s: %v", vt.cliFlags.OutputPath, err) } } @@ -409,7 +410,7 @@ func (vt *vodLoadTester) doUpload(fileName string, uploadUrl string, resumable b file, err := os.Open(fileName) if err != nil { - fmt.Printf("error opening file %s: %v\n", fileName, err) + glog.V(logs.DEBUG).Infof("error opening file %s: %v\n", fileName, err) return err } @@ -420,7 +421,7 @@ func (vt *vodLoadTester) doUpload(fileName string, uploadUrl string, resumable b } if err != nil { - fmt.Printf("error uploading asset: %v\n", err) + glog.V(logs.DEBUG).Infof("error uploading asset: %v\n", err) return err } @@ -443,7 +444,7 @@ func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId) erro progress = fmt.Sprintf("progress=%s%%", stringPercentage) } - glog.Infof("Waiting %s for task id=%s to be processed, elapsed=%s %s", taskPollDuration, processingTask.ID, time.Since(startTime), progress) + glog.V(logs.DEBUG).Infof("Waiting %s for task id=%s to be processed, elapsed=%s %s", taskPollDuration, processingTask.ID, time.Since(startTime), progress) if err != nil { glog.Errorf("Error retrieving task id=%s err=%v", processingTask.ID, err) From f8304a246b387ade4a8471f278ad53003c107592 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 06:36:05 +0100 Subject: [PATCH 20/25] vodloadtester: verbosity --- cmd/vodloadtester/vodloadtester.go | 44 ++++++++++++++++-------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index e8e57612..2d3083ef 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -118,7 +118,7 @@ func main() { hostName, _ := os.Hostname() runnerInfo := fmt.Sprintf("Hostname %s OS %s", hostName, runtime.GOOS) - glog.V(logs.DEBUG).Infof("Compiler version: %s %s\n", runtime.Compiler, runtime.Version()) + glog.V(logs.SHORT).Infof("Compiler version: %s %s\n", runtime.Compiler, runtime.Version()) glog.V(logs.DEBUG).Infof(runnerInfo) if cliFlags.Version { @@ -126,7 +126,8 @@ func main() { } if cliFlags.Filename == "" { - glog.Fatal("missing --file parameter") + glog.V(logs.SHORT).Infof("missing --file parameter") + return } var err error @@ -135,7 +136,8 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) if cliFlags.APIToken == "" { - glog.V(logs.DEBUG).Infof("No API token provided") + glog.V(logs.SHORT).Infof("No API token provided") + return } apiToken := cliFlags.APIToken @@ -156,13 +158,13 @@ func main() { if cliFlags.Import { if cliFlags.DirectUpload || cliFlags.ResumableUpload { - glog.V(logs.DEBUG).Infof("Cannot use -import with either -direct or -resumable") + glog.V(logs.SHORT).Infof("Cannot use -import with either -direct or -resumable") return } fileName = cliFlags.Filename if strings.HasSuffix(fileName, ".json") { - glog.V(logs.DEBUG).Infof("Importing from json file %s. Ignoring any -video-amount parameter provided.", fileName) + glog.V(logs.SHORT).Infof("Importing from json file %s. Ignoring any -video-amount parameter provided.", fileName) vt.importFromJSONTest(fileName, runnerInfo) } else { vt.importFromUrlTest(fileName, runnerInfo) @@ -173,9 +175,9 @@ func main() { if cliFlags.DirectUpload || cliFlags.ResumableUpload { if fileName, err = utils.GetFile(cliFlags.Filename, strings.ReplaceAll(hostName, ".", "_")); err != nil { if err == utils.ErrNotFound { - glog.V(logs.DEBUG).Infof("file %s not found\n", cliFlags.Filename) + glog.V(logs.SHORT).Infof("file %s not found\n", cliFlags.Filename) } else { - glog.V(logs.DEBUG).Infof("error getting file %s: %v\n", cliFlags.Filename, err) + glog.V(logs.SHORT).Infof("error getting file %s: %v\n", cliFlags.Filename, err) } } @@ -204,7 +206,7 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { - glog.V(logs.DEBUG).Infof("Uploading video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) + glog.V(logs.SHORT).Infof("Uploading video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { vt.uploadFile(fileName, runnerInfo, wg, false) @@ -221,7 +223,7 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { - glog.V(logs.DEBUG).Infof("Uploading resumable video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) + glog.V(logs.SHORT).Infof("Uploading resumable video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { vt.uploadFile(fileName, runnerInfo, wg, true) @@ -256,7 +258,7 @@ func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGr defer wg.Done() if err != nil { - glog.V(logs.DEBUG).Infof("Error requesting upload urls: %v", err) + glog.V(logs.SHORT).Infof("Error requesting upload urls: %v", err) uploadTest.RequestUploadSuccess = 0 uploadTest.ErrorMessage = err.Error() vt.writeResultNdjson(uploadTest) @@ -276,7 +278,7 @@ func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGr err = vt.doUpload(fileName, uploadUrl, resumable) if err != nil { - glog.V(logs.DEBUG).Infof("Error on %s: %v", uploadKind, err) + glog.V(logs.SHORT).Infof("Error on %s: %v", uploadKind, err) uploadTest.UploadSuccess = 0 } else { uploadTest.UploadSuccess = 1 @@ -298,7 +300,7 @@ func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) data, err := ioutil.ReadFile(jsonFile) if err != nil { - glog.V(logs.DEBUG).Infof("Error reading json file: %v", err) + glog.V(logs.SHORT).Infof("Error reading json file: %v", err) return } @@ -306,7 +308,7 @@ func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) err = json.Unmarshal(data, &jsonData) if err != nil { - glog.V(logs.DEBUG).Infof("Error parsing json file: %v", err) + glog.V(logs.SHORT).Infof("Error parsing json file: %v", err) return } @@ -317,7 +319,7 @@ func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) if i+j >= len(jsonData) { break } - glog.V(logs.DEBUG).Infof("Import video %d/%d\n", i+j+1, len(jsonData)) + glog.V(logs.SHORT).Infof("Importing video from JSON, %d/%d\n", i+j+1, len(jsonData)) wg.Add(1) index := i + j go func(i int) { @@ -334,7 +336,7 @@ func (vt *vodLoadTester) importFromUrlTest(url string, runnerInfo string) { for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { - glog.V(logs.DEBUG).Infof("Importing video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) + glog.V(logs.SHORT).Infof("Importing video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) wg.Add(1) go func() { vt.importFromUrl(url, runnerInfo, wg) @@ -363,7 +365,7 @@ func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.W defer wg.Done() if err != nil { - glog.V(logs.DEBUG).Infof("Error importing asset: %v", err) + glog.V(logs.SHORT).Infof("Error importing asset: %v", err) uploadTest.ImportSuccess = 0 uploadTest.ErrorMessage = err.Error() vt.writeResultNdjson(uploadTest) @@ -393,15 +395,15 @@ func (vt *vodLoadTester) writeResultNdjson(uploadTest uploadTest) { uploadTest.EndTime = time.Now() jsonString, err := json.Marshal(uploadTest) if err != nil { - glog.V(logs.DEBUG).Infof("Error converting runTests to json: %v", err) + glog.V(logs.SHORT).Infof("Error converting runTests to json: %v", err) } f, err := os.OpenFile(vt.cliFlags.OutputPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - glog.V(logs.DEBUG).Infof("Error opening %s: %v", vt.cliFlags.OutputPath, err) + glog.V(logs.SHORT).Infof("Error opening %s: %v", vt.cliFlags.OutputPath, err) } defer f.Close() if _, err := f.WriteString(string(jsonString) + "\n"); err != nil { - glog.V(logs.DEBUG).Infof("Error writing to %s: %v", vt.cliFlags.OutputPath, err) + glog.V(logs.SHORT).Infof("Error writing to %s: %v", vt.cliFlags.OutputPath, err) } } @@ -410,7 +412,7 @@ func (vt *vodLoadTester) doUpload(fileName string, uploadUrl string, resumable b file, err := os.Open(fileName) if err != nil { - glog.V(logs.DEBUG).Infof("error opening file %s: %v\n", fileName, err) + glog.V(logs.SHORT).Infof("error opening file %s: %v\n", fileName, err) return err } @@ -421,7 +423,7 @@ func (vt *vodLoadTester) doUpload(fileName string, uploadUrl string, resumable b } if err != nil { - glog.V(logs.DEBUG).Infof("error uploading asset: %v\n", err) + glog.V(logs.SHORT).Infof("error uploading asset: %v\n", err) return err } From 513186c3ca1f915df43fb0624a92a2b8377f495c Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 06:43:54 +0100 Subject: [PATCH 21/25] vodloadtester: logs --- cmd/vodloadtester/vodloadtester.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 2d3083ef..e0479420 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -405,6 +405,7 @@ func (vt *vodLoadTester) writeResultNdjson(uploadTest uploadTest) { if _, err := f.WriteString(string(jsonString) + "\n"); err != nil { glog.V(logs.SHORT).Infof("Error writing to %s: %v", vt.cliFlags.OutputPath, err) } + glog.V(logs.SHORT).Infof("test results updated with new entry in %s", vt.cliFlags.OutputPath) } func (vt *vodLoadTester) doUpload(fileName string, uploadUrl string, resumable bool) error { @@ -452,6 +453,7 @@ func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId) erro glog.Errorf("Error retrieving task id=%s err=%v", processingTask.ID, err) if strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "520") { // Retry + glog.V(logs.SHORT).Infof("Livepeer Studio API unreachable, retrying getting task information.... ") continue } return fmt.Errorf("error retrieving task id=%s: %w", processingTask.ID, err) From 2d25a995e8fd4b0ec79d9b6144e7cf678fe54a0d Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 07:29:15 +0100 Subject: [PATCH 22/25] vodloadtester: added probe data in results & moved progress gathering at the end of task processing to avoid panics on api fail --- cmd/vodloadtester/vodloadtester.go | 84 ++++++++++++++++++++++-------- 1 file changed, 63 insertions(+), 21 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index e0479420..acd7a6d1 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -39,6 +39,7 @@ type cliArguments struct { VideoAmount uint OutputPath string KeepAssets bool + ProbeData bool TestDuration time.Duration StartDelayDuration time.Duration @@ -52,18 +53,39 @@ type vodLoadTester struct { } type uploadTest struct { - StartTime time.Time `json:"startTime"` - EndTime time.Time `json:"endTime"` - RunnerInfo string `json:"runnerInfo,omitempty"` - Kind string `json:"kind,omitempty"` - AssetID string `json:"assetId,omitempty"` - TaskID string `json:"taskId,omitempty"` - RequestUploadSuccess uint `json:"requestUploadSuccess,omitempty"` - UploadSuccess uint `json:"uploadSuccess,omitempty"` - ImportSuccess uint `json:"importSuccess,omitempty"` - TaskCheckSuccess uint `json:"taskCheckSuccess,omitempty"` - ErrorMessage string `json:"errorMessage,omitempty"` - UrlSource string `json:"urlSource,omitempty"` + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime"` + RunnerInfo string `json:"runnerInfo,omitempty"` + Kind string `json:"kind,omitempty"` + AssetID string `json:"assetId,omitempty"` + TaskID string `json:"taskId,omitempty"` + RequestUploadSuccess uint `json:"requestUploadSuccess,omitempty"` + UploadSuccess uint `json:"uploadSuccess,omitempty"` + ImportSuccess uint `json:"importSuccess,omitempty"` + TaskCheckSuccess uint `json:"taskCheckSuccess,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` + UrlSource string `json:"urlSource,omitempty"` + ProbeData []ProbeData `json:"data,omitempty"` +} + +// This is specifically for JSON imported files with these fields into the json objects +// Probably temporary for the new pipeline VOD test, may be removed later or may be done in a different way (like ffprobe standard output) +type ProbeData struct { + AssetName string `json:"asset_name,omitempty"` + Size string `json:"size,omitempty"` + ContainerFormat string `json:"container_format,omitempty"` + VideoTracks string `json:"video_tracks,omitempty"` + AudioTracks string `json:"audio_tracks,omitempty"` + SubtitleTracks string `json:"subtitle_tracks,omitempty"` + OtherTracks string `json:"other_tracks,omitempty"` + VideoCodec string `json:"video_codec,omitempty"` + AudioCodec string `json:"audio_codec,omitempty"` + PixelFormat string `json:"pixel_format,omitempty"` + Duration string `json:"duration,omitempty"` + Width string `json:"width,omitempty"` + Height string `json:"height,omitempty"` + Fps string `json:"fps,omitempty"` + URL string `json:"url,omitempty"` } type jsonImport struct { @@ -93,6 +115,7 @@ func main() { // Input files and results fs.StringVar(&cliFlags.Filename, "file", "", "File to upload or url to import. Can be either a video or a .json array of objects with a url key") fs.StringVar(&cliFlags.OutputPath, "output-path", "/tmp/results.ndjson", "Path to output result .ndjson file") + fs.BoolVar(&cliFlags.ProbeData, "probe-data", false, "Write object data in results when importing a JSON file") // Test parameters fs.UintVar(&cliFlags.VideoAmount, "video-amt", 1, "How many video to upload or import") @@ -305,6 +328,7 @@ func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) } var jsonData []jsonImport + var probeData []ProbeData err = json.Unmarshal(data, &jsonData) if err != nil { @@ -312,6 +336,13 @@ func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) return } + err = json.Unmarshal(data, &probeData) + + if err != nil { + glog.V(logs.SHORT).Infof("Error parsing json file: %v", err) + return + } + wg := &sync.WaitGroup{} for i := 0; i < len(jsonData); i += int(vt.cliFlags.Simultaneous) { @@ -323,7 +354,13 @@ func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) wg.Add(1) index := i + j go func(i int) { - vt.importFromUrl(jsonData[index].Url, runnerInfo, wg) + var probe *ProbeData + + if index < len(probeData) { + probe = &probeData[index] + } + + vt.importFromUrl(jsonData[index].Url, runnerInfo, wg, *probe) }(i + j) } time.Sleep(vt.cliFlags.StartDelayDuration) @@ -347,7 +384,7 @@ func (vt *vodLoadTester) importFromUrlTest(url string, runnerInfo string) { wg.Wait() } -func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.WaitGroup) { +func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.WaitGroup, probeData ...ProbeData) { uploadTest := uploadTest{ StartTime: time.Now(), RunnerInfo: runnerInfo, @@ -355,6 +392,10 @@ func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.W UrlSource: url, } + if probeData != nil && vt.cliFlags.ProbeData { + uploadTest.ProbeData = probeData + } + rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) glog.V(logs.DEBUG).Infof("Importing %s from %s", rndAssetName, url) asset, task, err := vt.lapi.ImportAsset(url, rndAssetName) @@ -441,13 +482,6 @@ func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId) erro task, err := vt.lapi.GetTask(processingTask.ID) progress := "" - if task.Status.Phase == "running" { - percentage := task.Status.Progress * 100 - stringPercentage := strconv.FormatFloat(percentage, 'f', 2, 64) - progress = fmt.Sprintf("progress=%s%%", stringPercentage) - } - - glog.V(logs.DEBUG).Infof("Waiting %s for task id=%s to be processed, elapsed=%s %s", taskPollDuration, processingTask.ID, time.Since(startTime), progress) if err != nil { glog.Errorf("Error retrieving task id=%s err=%v", processingTask.ID, err) @@ -472,6 +506,14 @@ func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId) erro return fmt.Errorf("timeout processing task, taskId=%s", task.ID) } + if task.Status.Phase == "running" { + percentage := task.Status.Progress * 100 + stringPercentage := strconv.FormatFloat(percentage, 'f', 2, 64) + progress = fmt.Sprintf("progress=%s%%", stringPercentage) + } + + glog.V(logs.DEBUG).Infof("Waiting %s for task id=%s to be processed, elapsed=%s %s", taskPollDuration, processingTask.ID, time.Since(startTime), progress) + } } From dfb9c5743895c25aa89470e5ca1b863692e39aef Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Sun, 4 Dec 2022 23:23:19 +0100 Subject: [PATCH 23/25] vodloadtester: fixed some panic cases in requestUpload & added upload files from folder --- cmd/vodloadtester/vodloadtester.go | 166 ++++++++++++++++++++++++----- 1 file changed, 141 insertions(+), 25 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index acd7a6d1..efb64d04 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "math/rand" "os" + "path/filepath" "runtime" "strconv" "strings" @@ -36,6 +37,7 @@ type cliArguments struct { APIServer string APIToken string Filename string + Folder string VideoAmount uint OutputPath string KeepAssets bool @@ -53,6 +55,7 @@ type vodLoadTester struct { } type uploadTest struct { + ApiServer string `json:"apiServer"` StartTime time.Time `json:"startTime"` EndTime time.Time `json:"endTime"` RunnerInfo string `json:"runnerInfo,omitempty"` @@ -64,7 +67,7 @@ type uploadTest struct { ImportSuccess uint `json:"importSuccess,omitempty"` TaskCheckSuccess uint `json:"taskCheckSuccess,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` - UrlSource string `json:"urlSource,omitempty"` + Source string `json:"source,omitempty"` ProbeData []ProbeData `json:"data,omitempty"` } @@ -92,6 +95,10 @@ type jsonImport struct { Url string `json:"url"` } +var videoFormats = []string{ + "mp4", +} + func main() { var cliFlags = &cliArguments{} @@ -114,6 +121,7 @@ func main() { // Input files and results fs.StringVar(&cliFlags.Filename, "file", "", "File to upload or url to import. Can be either a video or a .json array of objects with a url key") + fs.StringVar(&cliFlags.Folder, "folder", "", "Folder with files to upload. The tester will search in the folder for files with the following extensions: "+strings.Join(videoFormats, ", ")) fs.StringVar(&cliFlags.OutputPath, "output-path", "/tmp/results.ndjson", "Path to output result .ndjson file") fs.BoolVar(&cliFlags.ProbeData, "probe-data", false, "Write object data in results when importing a JSON file") @@ -148,8 +156,8 @@ func main() { return } - if cliFlags.Filename == "" { - glog.V(logs.SHORT).Infof("missing --file parameter") + if cliFlags.Filename == "" && cliFlags.Folder == "" { + glog.V(logs.SHORT).Infof("missing --file or --folder parameter") return } @@ -179,15 +187,22 @@ func main() { cliFlags: *cliFlags, } + // Import from URL or JSON load test if cliFlags.Import { if cliFlags.DirectUpload || cliFlags.ResumableUpload { glog.V(logs.SHORT).Infof("Cannot use -import with either -direct or -resumable") return } + + if cliFlags.Filename == "" { + glog.V(logs.SHORT).Infof("Cannot use -import without specifying a -file $URL or -file $JSON_FILE") + return + } + fileName = cliFlags.Filename if strings.HasSuffix(fileName, ".json") { - glog.V(logs.SHORT).Infof("Importing from json file %s. Ignoring any -video-amount parameter provided.", fileName) + glog.V(logs.SHORT).Infof("Importing from json file %s. Ignoring any -video-amt parameter provided.", fileName) vt.importFromJSONTest(fileName, runnerInfo) } else { vt.importFromUrlTest(fileName, runnerInfo) @@ -195,22 +210,38 @@ func main() { return } + // Upload tests if cliFlags.DirectUpload || cliFlags.ResumableUpload { - if fileName, err = utils.GetFile(cliFlags.Filename, strings.ReplaceAll(hostName, ".", "_")); err != nil { - if err == utils.ErrNotFound { - glog.V(logs.SHORT).Infof("file %s not found\n", cliFlags.Filename) - } else { - glog.V(logs.SHORT).Infof("error getting file %s: %v\n", cliFlags.Filename, err) + + // From folder + if cliFlags.Folder != "" { + glog.V(logs.SHORT).Infof("Uploading from folder %s. Ignoring any -video-amt and -file parameter provided.", cliFlags.Folder) + if cliFlags.DirectUpload { + glog.V(logs.DEBUG).Infof("Launching direct upload test for folder %s", cliFlags.Folder) + vt.directUploadFromFolderLoadTest(cliFlags.Folder, runnerInfo) + } + if cliFlags.ResumableUpload { + glog.V(logs.DEBUG).Infof("Launching resumable upload test for folder %s", cliFlags.Folder) + vt.resumableUploadFromFolderLoadTest(cliFlags.Folder, runnerInfo) + } + } else { + // From file + if fileName, err = utils.GetFile(cliFlags.Filename, strings.ReplaceAll(hostName, ".", "_")); err != nil { + if err == utils.ErrNotFound { + glog.V(logs.SHORT).Infof("file %s not found\n", cliFlags.Filename) + } else { + glog.V(logs.SHORT).Infof("error getting file %s: %v\n", cliFlags.Filename, err) + } } - } - if cliFlags.DirectUpload { - glog.V(logs.DEBUG).Infof("Launching direct upload load test for %q", fileName) - vt.directUploadLoadTest(fileName, runnerInfo) - } - if cliFlags.ResumableUpload { - glog.V(logs.DEBUG).Infof("Launching resumable upload load test for %q", fileName) - vt.resumableUploadLoadTest(fileName, runnerInfo) + if cliFlags.DirectUpload { + glog.V(logs.DEBUG).Infof("Launching direct upload load test for %q", fileName) + vt.directUploadLoadTest(fileName, runnerInfo) + } + if cliFlags.ResumableUpload { + glog.V(logs.DEBUG).Infof("Launching resumable upload load test for %q", fileName) + vt.resumableUploadLoadTest(fileName, runnerInfo) + } } } @@ -241,6 +272,33 @@ func (vt *vodLoadTester) directUploadLoadTest(fileName string, runnerInfo string wg.Wait() } +func (vt *vodLoadTester) directUploadFromFolderLoadTest(folderName string, runnerInfo string) { + wg := &sync.WaitGroup{} + + filePaths := vt.getVideoFilesFromFolder(folderName) + + if len(filePaths) == 0 { + glog.V(logs.SHORT).Infof("No files with extension %v found in folder %s\n", videoFormats, folderName) + return + } + + for i := 0; i < len(filePaths); i += int(vt.cliFlags.Simultaneous) { + for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { + if i+j >= len(filePaths) { + break + } + glog.V(logs.DEBUG).Infof("Uploading video from folder, %d/%d\n", i+j+1, len(filePaths)) + wg.Add(1) + index := i + j + go func() { + vt.uploadFile(filePaths[index], runnerInfo, wg, false) + }() + } + time.Sleep(vt.cliFlags.StartDelayDuration) + } + wg.Wait() +} + func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { wg := &sync.WaitGroup{} @@ -259,6 +317,59 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } +func (vt *vodLoadTester) resumableUploadFromFolderLoadTest(folderName string, runnerInfo string) { + wg := &sync.WaitGroup{} + + filePaths := vt.getVideoFilesFromFolder(folderName) + + if len(filePaths) == 0 { + glog.V(logs.SHORT).Infof("No files with extension %v found in folder %s\n", videoFormats, folderName) + return + } + + for i := 0; i < len(filePaths); i += int(vt.cliFlags.Simultaneous) { + for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { + if i+j >= len(filePaths) { + break + } + glog.V(logs.DEBUG).Infof("Uploading video from folder, %d/%d\n", i+j+1, len(filePaths)) + wg.Add(1) + index := i + j + go func() { + vt.uploadFile(filePaths[index], runnerInfo, wg, true) + }() + } + time.Sleep(vt.cliFlags.StartDelayDuration) + } + wg.Wait() +} + +func (vt *vodLoadTester) getVideoFilesFromFolder(folderName string) []string { + files, err := ioutil.ReadDir(folderName) + var filePaths []string + + if err != nil { + glog.V(logs.SHORT).Infof("Error reading folder %s: %v\n", folderName, err) + return filePaths + } + + glog.V(logs.SHORT).Infof("Uploading %d videos from folder %s\n", len(files), folderName) + + for _, file := range files { + if file.IsDir() { + continue + } + for _, format := range videoFormats { + if strings.HasSuffix(file.Name(), format) { + glog.V(logs.DEBUG).Infof("Uploading video %s\n", file.Name()) + filePaths = append(filePaths, filepath.Join(folderName, file.Name())) + } + } + } + + return filePaths +} + func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGroup, resumable bool) { uploadKind := "directUpload" @@ -268,15 +379,14 @@ func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGr } uploadTest := uploadTest{ + ApiServer: vt.cliFlags.APIServer, StartTime: time.Now(), RunnerInfo: runnerInfo, Kind: uploadKind, + Source: fileName, } rndAssetName := fmt.Sprintf("load_test_%s_%s", uploadKind, randName()) requestedUpload, err := vt.requestUploadUrls(rndAssetName) - if !vt.cliFlags.KeepAssets { - defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) - } defer wg.Done() @@ -292,6 +402,10 @@ func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGr uploadTest.TaskID = requestedUpload.Task.ID } + if !vt.cliFlags.KeepAssets { + defer vt.lapi.DeleteAsset(requestedUpload.Asset.ID) + } + uploadUrl := requestedUpload.Url if resumable { @@ -386,10 +500,11 @@ func (vt *vodLoadTester) importFromUrlTest(url string, runnerInfo string) { func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.WaitGroup, probeData ...ProbeData) { uploadTest := uploadTest{ + ApiServer: vt.cliFlags.APIServer, StartTime: time.Now(), RunnerInfo: runnerInfo, Kind: "import", - UrlSource: url, + Source: url, } if probeData != nil && vt.cliFlags.ProbeData { @@ -399,9 +514,6 @@ func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.W rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) glog.V(logs.DEBUG).Infof("Importing %s from %s", rndAssetName, url) asset, task, err := vt.lapi.ImportAsset(url, rndAssetName) - if !vt.cliFlags.KeepAssets { - defer vt.lapi.DeleteAsset(asset.ID) - } defer wg.Done() @@ -417,6 +529,10 @@ func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.W uploadTest.TaskID = task.ID } + if !vt.cliFlags.KeepAssets { + defer vt.lapi.DeleteAsset(asset.ID) + } + if vt.cliFlags.TaskCheck { err := vt.checkTaskProcessing(api.TaskOnlyId{ID: uploadTest.TaskID}) if err != nil { @@ -485,7 +601,7 @@ func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId) erro if err != nil { glog.Errorf("Error retrieving task id=%s err=%v", processingTask.ID, err) - if strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "520") { + if strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "520") || strings.Contains(err.Error(), "no such host") { // Retry glog.V(logs.SHORT).Infof("Livepeer Studio API unreachable, retrying getting task information.... ") continue From 20eca1b422833512f4bf1590c546e0bcb5457f19 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Wed, 18 Oct 2023 17:50:24 +0200 Subject: [PATCH 24/25] add clip tests --- cmd/vodloadtester/vodloadtester.go | 187 ++++++++++++++++++++++++++--- go.mod | 2 +- go.sum | 4 +- 3 files changed, 171 insertions(+), 22 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index efb64d04..38757e08 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -30,6 +30,7 @@ type cliArguments struct { DirectUpload bool ResumableUpload bool Import bool + Clip bool TaskCheck bool TaskCheckTimeout uint @@ -56,8 +57,8 @@ type vodLoadTester struct { type uploadTest struct { ApiServer string `json:"apiServer"` - StartTime time.Time `json:"startTime"` - EndTime time.Time `json:"endTime"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` RunnerInfo string `json:"runnerInfo,omitempty"` Kind string `json:"kind,omitempty"` AssetID string `json:"assetId,omitempty"` @@ -68,6 +69,8 @@ type uploadTest struct { TaskCheckSuccess uint `json:"taskCheckSuccess,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` Source string `json:"source,omitempty"` + Elapsed uint `json:"elapsed,omitempty"` + TimeToSourceReady float64 `json:"timeToSourceReady,omitempty"` ProbeData []ProbeData `json:"data,omitempty"` } @@ -118,6 +121,7 @@ func main() { fs.BoolVar(&cliFlags.DirectUpload, "direct", false, "Launch direct upload test") fs.BoolVar(&cliFlags.ResumableUpload, "resumable", false, "Launch tus upload test") fs.BoolVar(&cliFlags.Import, "import", false, "Launch import from url test") + fs.BoolVar(&cliFlags.Clip, "clip", false, "Launch clipping load test") // Input files and results fs.StringVar(&cliFlags.Filename, "file", "", "File to upload or url to import. Can be either a video or a .json array of objects with a url key") @@ -156,7 +160,7 @@ func main() { return } - if cliFlags.Filename == "" && cliFlags.Folder == "" { + if cliFlags.Filename == "" && cliFlags.Folder == "" && !cliFlags.Clip { glog.V(logs.SHORT).Infof("missing --file or --folder parameter") return } @@ -245,10 +249,15 @@ func main() { } } + if cliFlags.Clip { + playbackId := "afb20wapuh9cubwx" + vt.clipTest(playbackId, runnerInfo) + } + } func (vt *vodLoadTester) requestUploadUrls(assetName string) (*api.UploadUrls, error) { - uploadUrls, err := vt.lapi.RequestUpload(assetName) + uploadUrls, err := vt.lapi.RequestUpload(assetName, "") if err != nil { return nil, err } @@ -317,6 +326,24 @@ func (vt *vodLoadTester) resumableUploadLoadTest(fileName, runnerInfo string) { } +func (vt *vodLoadTester) clipTest(playbackId string, runnerInfo string) { + + wg := &sync.WaitGroup{} + + for i := 0; i < int(vt.cliFlags.VideoAmount); i += int(vt.cliFlags.Simultaneous) { + for j := 0; j < int(vt.cliFlags.Simultaneous); j++ { + glog.V(logs.SHORT).Infof("Clipping video %d/%d\n", i+j+1, vt.cliFlags.VideoAmount) + wg.Add(1) + go func() { + vt.Clip(playbackId, runnerInfo, wg) + }() + } + time.Sleep(vt.cliFlags.StartDelayDuration) + } + + wg.Wait() +} + func (vt *vodLoadTester) resumableUploadFromFolderLoadTest(folderName string, runnerInfo string) { wg := &sync.WaitGroup{} @@ -377,10 +404,10 @@ func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGr if resumable { uploadKind = "resumableUpload" } - + startTime := time.Now() uploadTest := uploadTest{ ApiServer: vt.cliFlags.APIServer, - StartTime: time.Now(), + StartTime: startTime.Format("2006-01-02 15:04:05"), RunnerInfo: runnerInfo, Kind: uploadKind, Source: fileName, @@ -394,7 +421,7 @@ func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGr glog.V(logs.SHORT).Infof("Error requesting upload urls: %v", err) uploadTest.RequestUploadSuccess = 0 uploadTest.ErrorMessage = err.Error() - vt.writeResultNdjson(uploadTest) + vt.writeResultNdjson(uploadTest, startTime) return } else { uploadTest.RequestUploadSuccess = 1 @@ -420,7 +447,7 @@ func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGr } else { uploadTest.UploadSuccess = 1 if vt.cliFlags.TaskCheck { - err := vt.checkTaskProcessing(api.TaskOnlyId{ID: uploadTest.TaskID}) + err := vt.checkTaskProcessing(api.TaskOnlyId{ID: uploadTest.TaskID}, uploadTest.AssetID) if err != nil { uploadTest.TaskCheckSuccess = 0 uploadTest.ErrorMessage = err.Error() @@ -430,7 +457,88 @@ func (vt *vodLoadTester) uploadFile(fileName, runnerInfo string, wg *sync.WaitGr } } - vt.writeResultNdjson(uploadTest) + vt.writeResultNdjson(uploadTest, startTime) +} + +func (vt *vodLoadTester) Clip(playbackId string, runnerInfo string, wg *sync.WaitGroup) { + + rndAssetName := fmt.Sprintf("clip_test_%s", randName()) + startTime := time.Now() + uploadTest := uploadTest{ + ApiServer: vt.cliFlags.APIServer, + StartTime: startTime.Format("2006-01-02 15:04:05"), + RunnerInfo: runnerInfo, + Kind: "clip", + Source: "", + } + + asset, task, err := vt.lapi.Clip(api.Clip{ + PlaybackID: playbackId, + StartTime: 1697641672000, + EndTime: 1697641852000, + Name: rndAssetName, + }) + + uploadTest.TaskID = task.ID + uploadTest.AssetID = asset.ID + + if err != nil { + glog.V(logs.SHORT).Infof("Error clipping asset: %v", err) + uploadTest.RequestUploadSuccess = 0 + uploadTest.ErrorMessage = err.Error() + vt.writeResultNdjson(uploadTest, startTime) + return + } + + uploadTest.RequestUploadSuccess = 1 + + // Create a new WaitGroup to wait for parallel operations + var innerWg sync.WaitGroup + + // Channel to communicate errors between goroutines + errCh := make(chan error, 2) + + // Check asset source in a goroutine + innerWg.Add(1) + go func() { + defer innerWg.Done() + + err := vt.checkAssetSource(asset.ID) + if err != nil { + uploadTest.TaskCheckSuccess = 0 + uploadTest.ErrorMessage = err.Error() + errCh <- err + } else { + uploadTest.TimeToSourceReady = time.Since(startTime).Seconds() + } + }() + + // Check task processing in another goroutine + innerWg.Add(1) + go func() { + defer innerWg.Done() + + err := vt.checkTaskProcessing(api.TaskOnlyId{ID: task.ID}, uploadTest.AssetID) + if err != nil { + uploadTest.TaskCheckSuccess = 0 + uploadTest.ErrorMessage = err.Error() + errCh <- err + } + }() + + // Wait for the goroutines to finish + innerWg.Wait() + + // Check for errors from goroutines and write result + if len(errCh) > 0 { + // Here, you can handle the errors from the goroutines if you wish. + // For now, we're simply logging the first error. + glog.V(logs.SHORT).Infof("Error from goroutines: %v", <-errCh) + } + + vt.writeResultNdjson(uploadTest, startTime) + + defer wg.Done() } func (vt *vodLoadTester) importFromJSONTest(jsonFile string, runnerInfo string) { @@ -499,9 +607,10 @@ func (vt *vodLoadTester) importFromUrlTest(url string, runnerInfo string) { } func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.WaitGroup, probeData ...ProbeData) { + startTime := time.Now() uploadTest := uploadTest{ ApiServer: vt.cliFlags.APIServer, - StartTime: time.Now(), + StartTime: startTime.Format("2006-01-02 15:04:05"), RunnerInfo: runnerInfo, Kind: "import", Source: url, @@ -513,7 +622,7 @@ func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.W rndAssetName := fmt.Sprintf("load_test_import_%s", randName()) glog.V(logs.DEBUG).Infof("Importing %s from %s", rndAssetName, url) - asset, task, err := vt.lapi.ImportAsset(url, rndAssetName) + asset, task, err := vt.lapi.UploadViaURL(url, rndAssetName, "") defer wg.Done() @@ -521,7 +630,7 @@ func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.W glog.V(logs.SHORT).Infof("Error importing asset: %v", err) uploadTest.ImportSuccess = 0 uploadTest.ErrorMessage = err.Error() - vt.writeResultNdjson(uploadTest) + vt.writeResultNdjson(uploadTest, startTime) return } else { uploadTest.ImportSuccess = 1 @@ -534,7 +643,7 @@ func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.W } if vt.cliFlags.TaskCheck { - err := vt.checkTaskProcessing(api.TaskOnlyId{ID: uploadTest.TaskID}) + err := vt.checkTaskProcessing(api.TaskOnlyId{ID: uploadTest.TaskID}, uploadTest.AssetID) if err != nil { uploadTest.TaskCheckSuccess = 0 uploadTest.ErrorMessage = err.Error() @@ -543,13 +652,18 @@ func (vt *vodLoadTester) importFromUrl(url string, runnerInfo string, wg *sync.W } } - uploadTest.EndTime = time.Now() + uploadTest.EndTime = time.Now().Format("2006-01-02 15:04:05") - vt.writeResultNdjson(uploadTest) + vt.writeResultNdjson(uploadTest, startTime) } -func (vt *vodLoadTester) writeResultNdjson(uploadTest uploadTest) { - uploadTest.EndTime = time.Now() +func (vt *vodLoadTester) writeResultNdjson(uploadTest uploadTest, startTime time.Time) { + endTime := time.Now() + elapsed := endTime.Sub(startTime) + + uploadTest.EndTime = endTime.Format("2006-01-02 15:04:05") + uploadTest.Elapsed = uint(elapsed.Seconds()) + jsonString, err := json.Marshal(uploadTest) if err != nil { glog.V(logs.SHORT).Infof("Error converting runTests to json: %v", err) @@ -588,7 +702,7 @@ func (vt *vodLoadTester) doUpload(fileName string, uploadUrl string, resumable b return err } -func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId) error { +func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId, assetID string) error { taskPollDuration := time.Duration(vt.cliFlags.TaskCheckPollTime) * time.Second startTime := time.Now() timeout := time.Duration(vt.cliFlags.TaskCheckTimeout) * time.Second @@ -596,7 +710,7 @@ func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId) erro time.Sleep(taskPollDuration) - task, err := vt.lapi.GetTask(processingTask.ID) + task, err := vt.lapi.GetTask(processingTask.ID, false) progress := "" if err != nil { @@ -633,6 +747,41 @@ func (vt *vodLoadTester) checkTaskProcessing(processingTask api.TaskOnlyId) erro } } +func (vt *vodLoadTester) checkAssetSource(assetID string) error { + taskPollDuration := time.Duration(vt.cliFlags.TaskCheckPollTime) * time.Second + startTime := time.Now() + timeout := time.Duration(vt.cliFlags.TaskCheckTimeout) * time.Second + for { + + time.Sleep(taskPollDuration) + + asset, err := vt.lapi.GetAsset(assetID, false) + + if err != nil { + glog.Errorf("Error retrieving asset id=%s err=%v", assetID, err) + if strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "520") || strings.Contains(err.Error(), "no such host") { + // Retry + glog.V(logs.SHORT).Infof("Livepeer Studio API unreachable, retrying getting task information.... ") + continue + } + return fmt.Errorf("error retrieving asset id=%s: %w", assetID, err) + } + + if time.Since(startTime) > timeout { + glog.Errorf("Internal timeout processing asset, asset=%s", assetID) + return fmt.Errorf("timeout processing asset, asset=%s", assetID) + } + + if asset.PlaybackURL != "" { + glog.Infof("Source playback ready, assetId=%s", assetID) + return nil + } + + glog.V(logs.DEBUG).Infof("Waiting %s for asset source id=%s to be processed, elapsed=%s", taskPollDuration, assetID, time.Since(startTime)) + + } +} + func randName() string { x := make([]byte, 10) for i := 0; i < len(x); i++ { diff --git a/go.mod b/go.mod index bd3f519f..d5e537ab 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/golang/glog v1.0.0 github.com/gosuri/uilive v0.0.3 // indirect github.com/gosuri/uiprogress v0.0.1 - github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515 + github.com/livepeer/go-api-client v0.4.10-0.20231016190638-afea57499662 github.com/livepeer/go-livepeer v0.5.31 github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07 github.com/livepeer/leaderboard-serverless v1.0.0 diff --git a/go.sum b/go.sum index f2db92d2..088affce 100644 --- a/go.sum +++ b/go.sum @@ -710,8 +710,8 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515 h1:3UvLoSvntPi0Z/yW6zskPmZZwA+lnm0pQVIvG/uBnrE= -github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.4.10-0.20231016190638-afea57499662 h1:KJ/L/Tz/brn1zgKnLSVAL4g3h/hfyvd3FCqdG2gdnmw= +github.com/livepeer/go-api-client v0.4.10-0.20231016190638-afea57499662/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= github.com/livepeer/go-livepeer v0.5.31 h1:LcN+qDnqWRws7fdVYc4ucZPVcLQRs2tehUYCQVnlnRw= github.com/livepeer/go-livepeer v0.5.31/go.mod h1:cpBikcGWApkx0cyR0Ht+uAym7j3uAwXGpPbvaOA8XUU= github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw= From 9e6533aba10bbe23130ab188f2a9758bd3c850f8 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Mon, 23 Oct 2023 17:33:22 +0200 Subject: [PATCH 25/25] params for times and playbackId --- cmd/vodloadtester/vodloadtester.go | 63 +++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 19 deletions(-) diff --git a/cmd/vodloadtester/vodloadtester.go b/cmd/vodloadtester/vodloadtester.go index 38757e08..13953566 100644 --- a/cmd/vodloadtester/vodloadtester.go +++ b/cmd/vodloadtester/vodloadtester.go @@ -39,6 +39,9 @@ type cliArguments struct { APIToken string Filename string Folder string + PlaybackId string + StartTime uint + EndTime uint VideoAmount uint OutputPath string KeepAssets bool @@ -56,22 +59,31 @@ type vodLoadTester struct { } type uploadTest struct { - ApiServer string `json:"apiServer"` - StartTime string `json:"startTime"` - EndTime string `json:"endTime"` - RunnerInfo string `json:"runnerInfo,omitempty"` - Kind string `json:"kind,omitempty"` - AssetID string `json:"assetId,omitempty"` - TaskID string `json:"taskId,omitempty"` - RequestUploadSuccess uint `json:"requestUploadSuccess,omitempty"` - UploadSuccess uint `json:"uploadSuccess,omitempty"` - ImportSuccess uint `json:"importSuccess,omitempty"` - TaskCheckSuccess uint `json:"taskCheckSuccess,omitempty"` - ErrorMessage string `json:"errorMessage,omitempty"` - Source string `json:"source,omitempty"` - Elapsed uint `json:"elapsed,omitempty"` - TimeToSourceReady float64 `json:"timeToSourceReady,omitempty"` - ProbeData []ProbeData `json:"data,omitempty"` + ApiServer string `json:"apiServer"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + RunnerInfo string `json:"runnerInfo,omitempty"` + Kind string `json:"kind,omitempty"` + AssetID string `json:"assetId,omitempty"` + TaskID string `json:"taskId,omitempty"` + RequestUploadSuccess uint `json:"requestUploadSuccess,omitempty"` + UploadSuccess uint `json:"uploadSuccess,omitempty"` + ImportSuccess uint `json:"importSuccess,omitempty"` + TaskCheckSuccess uint `json:"taskCheckSuccess,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` + Source string `json:"source,omitempty"` + Elapsed uint `json:"elapsed,omitempty"` + TimeToSourceReady float64 `json:"timeToSourceReady,omitempty"` + ProbeData []ProbeData `json:"data,omitempty"` + PhasesFirstSeen PhasesFirstSeen `json:"phasesFirstSeen,omitempty"` +} + +type PhasesFirstSeen struct { + Waiting string `json:"waiting,omitempty"` + Running string `json:"running,omitempty"` + Completed string `json:"completed,omitempty"` + Failed string `json:"failed,omitempty"` + Pending string `json:"pending,omitempty"` } // This is specifically for JSON imported files with these fields into the json objects @@ -128,6 +140,9 @@ func main() { fs.StringVar(&cliFlags.Folder, "folder", "", "Folder with files to upload. The tester will search in the folder for files with the following extensions: "+strings.Join(videoFormats, ", ")) fs.StringVar(&cliFlags.OutputPath, "output-path", "/tmp/results.ndjson", "Path to output result .ndjson file") fs.BoolVar(&cliFlags.ProbeData, "probe-data", false, "Write object data in results when importing a JSON file") + fs.StringVar(&cliFlags.PlaybackId, "playback-id", "", "Playback ID to clip") + fs.UintVar(&cliFlags.StartTime, "start-time", 0, "Start time to clip") + fs.UintVar(&cliFlags.EndTime, "end-time", 0, "End time to clip") // Test parameters fs.UintVar(&cliFlags.VideoAmount, "video-amt", 1, "How many video to upload or import") @@ -250,7 +265,17 @@ func main() { } if cliFlags.Clip { - playbackId := "afb20wapuh9cubwx" + if cliFlags.PlaybackId == "" { + glog.V(logs.SHORT).Infof("Cannot use -clip without specifying a -playback-id") + return + } + + if cliFlags.StartTime == 0 || cliFlags.EndTime == 0 { + glog.V(logs.SHORT).Infof("Cannot use -clip without specifying -start-time and -end-time") + return + } + + playbackId := cliFlags.PlaybackId vt.clipTest(playbackId, runnerInfo) } @@ -474,8 +499,8 @@ func (vt *vodLoadTester) Clip(playbackId string, runnerInfo string, wg *sync.Wai asset, task, err := vt.lapi.Clip(api.Clip{ PlaybackID: playbackId, - StartTime: 1697641672000, - EndTime: 1697641852000, + StartTime: int64(vt.cliFlags.StartTime), + EndTime: int64(vt.cliFlags.EndTime), Name: rndAssetName, })