Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 92 additions & 6 deletions executor/linux/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,76 @@ func (o *outputSvc) pollFiles(ctx context.Context, ctn *pipeline.Container, _ste
// https://pkg.go.dev/github.com/sirupsen/logrus#Entry.WithField
logger := o.client.Logger.WithField("artifact-outputs", ctn.Name)

// load the step log so artifact messages are streamed to the UI.
// Wait briefly to allow the StreamStep goroutine to finish its final
// log upload so that we do not race with it. Then re-fetch the log
// from the server to get the authoritative copy that includes all
// container output — this prevents artifact messages from overwriting
// the step's own logs.
time.Sleep(2 * time.Second)

_log, _, err := o.client.Vela.Log.GetStep(ctx,
b.GetRepo().GetOrg(), b.GetRepo().GetName(),
b.GetNumber(), _step.Number)
if err != nil {
logger.Warnf("unable to fetch step log for artifact streaming: %v", err)
}

// store back into the map so future references are consistent
if _log != nil {
o.client.stepLogs.Store(_step.ID, _log)
}

// streamLog appends a message to the step log and pushes it to the server
// so that artifact progress is visible in the UI on the attached step.
streamLog := func(msg string) {
logger.Info(msg)

if _log == nil {
return
}

_log.AppendData([]byte(fmt.Sprintf("[artifact] %s\n", msg)))

_, err := o.client.Vela.Log.UpdateStep(ctx,
b.GetRepo().GetOrg(), b.GetRepo().GetName(),
b.GetNumber(), _step.Number, _log)
if err != nil {
logger.Errorf("unable to update step log: %v", err)
}
}

streamLog(fmt.Sprintf("starting artifact upload for step %s (build: %d, repo: %s/%s)",
_step.Name, b.GetNumber(), b.GetRepo().GetOrg(), b.GetRepo().GetName()))
streamLog(fmt.Sprintf("configured artifact paths: %v", _step.Artifacts.Paths))

// grab file paths from the container
filesPath, err := o.client.Runtime.PollFileNames(ctx, ctn, _step)
if err != nil {
streamLog(fmt.Sprintf("failed to discover artifact files: %v", err))
return fmt.Errorf("unable to poll file names: %w", err)
}

streamLog(fmt.Sprintf("discovered %d artifact file(s) matching configured paths", len(filesPath)))

if len(filesPath) == 0 {
streamLog(fmt.Sprintf("no files found matching artifact paths: %v — ensure your step produces files at the expected locations", _step.Artifacts.Paths))
return fmt.Errorf("no files found for file list: %v", _step.Artifacts.Paths)
}

logger.Debugf("matched files: %v", filesPath)

// create http client for uploading files to storage
putClient := http.DefaultClient
putClient.Timeout = time.Second * 30

// track upload statistics
var (
uploaded int
skipped int
failed int
)

// process each file found
for _, filePath := range filesPath {
fileName := filepath.Base(filePath)
Expand All @@ -247,31 +303,52 @@ func (o *outputSvc) pollFiles(ctx context.Context, ctn *pipeline.Container, _ste
// skip hidden files and files within hidden directories
if isHidden(filePath) {
logger.Debugf("skipping hidden file or directory: %s", filePath)

skipped++

continue
}

url, _, err := o.client.Vela.Build.GetPresignedPutURL(ctx, fileName, b.GetRepo().GetOrg(), b.GetRepo().GetName(),
b.GetNumber())
if err != nil {
logger.Errorf("unable to get presigned put url: %v", err)
streamLog(fmt.Sprintf("artifact %q could not be uploaded — the server did not provide an upload URL. "+
"This may indicate that artifact storage is not configured or the server encountered an error. "+
"Please contact your Vela administrator if this persists. (error: %v)", fileName, err))

failed++

continue
}

// get file content from container
reader, size, err := o.client.Runtime.PollFileContent(ctx, ctn, filePath)
if err != nil {
logger.Errorf("unable to poll file content for %s: %v", filePath, err)
streamLog(fmt.Sprintf("unable to read artifact file %q from container: %v", filePath, err))

failed++

continue
}

logger.Infof("artifact file %q size: %d bytes", fileName, size)

// TODO: surface this skip to the user
if o.client.fileSizeLimit > 0 && size > o.client.fileSizeLimit {
logger.Infof("skipping file %s due to file size limit", filePath)
streamLog(fmt.Sprintf("skipping artifact %q — file size (%d bytes) exceeds the per-file limit (%d bytes)",
fileName, size, o.client.fileSizeLimit))

skipped++

continue
}

if o.client.buildFileSizeLimit > 0 && size+o.client.Uploaded > o.client.buildFileSizeLimit {
logger.Infof("skipping file %s due to build file size limit", filePath)
streamLog(fmt.Sprintf("skipping artifact %q — uploading this file would exceed the per-build size limit (%d bytes). "+
"Total uploaded so far: %d bytes", fileName, o.client.buildFileSizeLimit, o.client.Uploaded))

skipped++

continue
}

Expand All @@ -282,17 +359,26 @@ func (o *outputSvc) pollFiles(ctx context.Context, ctn *pipeline.Container, _ste
strconv.FormatInt(b.GetNumber(), 10),
fileName)

logger.Debugf("uploading file %s to storage with object name %s", filePath, objectName)
streamLog(fmt.Sprintf("uploading artifact %q to storage (object: %s, size: %d bytes)", fileName, objectName, size))

err = uploadObject(ctx, putClient, reader, size, fileName, url.URL)
if err != nil {
logger.Errorf("unable to upload object %s: %v", fileName, err)
streamLog(fmt.Sprintf("failed to upload artifact %q: %v", fileName, err))

failed++

continue
}

o.client.Uploaded += size
uploaded++

streamLog(fmt.Sprintf("successfully uploaded artifact %q (%d bytes)", fileName, size))
}

streamLog(fmt.Sprintf("artifact upload complete — uploaded: %d, skipped: %d, failed: %d (total files: %d)",
uploaded, skipped, failed, len(filesPath)))

return nil
}

Expand Down
Loading