Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 92 Release Notes

### AWS Batch
* Fixed an issue where job failures before all outputs were written would cause delocalization to fail, preventing the upload of return code, stdout, and stderr files needed for debugging.
* Split the option to tag resources between AWS Batch jobs vs. EC2 and EBS volumes hardware
* Moved the option to tag job resources from runtime attributes to backend config.
* Appended the custom labels to the list of resource tags to propagate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,114 +465,121 @@ final case class AwsBatchJob(
val stdErr = dockerStderr.replace("/cromwell_root", workDir)

// generate a series of s3 commands to delocalize artifacts from the container to storage at the end of the task
val outputCopyCommand = outputs
.map {
// local is relative path, no mountpoint disk in front.
case output: AwsBatchFileOutput if output.local.pathAsString.contains("*") => "" // filter out globs
case output: AwsBatchFileOutput if output.s3key.endsWith(".list") && output.s3key.contains("glob-") =>
Log.debug("Globbing : check for EFS settings.")
val s3GlobOutDirectory = output.s3key.replace(".list", "")
// glob paths are not generated with 127 char limit, using generateGlobPaths(). name can be used safely
val globDirectory = output.name.replace(".list", "")
/*
* Need to process this list and de-localize each file if the list file actually exists
* if it doesn't exist then 'touch' it so that it can be copied otherwise later steps will get upset
* about the missing file
*/
if (efsMntPoint.isDefined && output.mount.mountPoint.pathAsString == efsMntPoint.get) {
Log.debug(
"EFS glob output file detected: " + output.s3key + s" / ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}"
)
val test_cmd = if (efsDelocalize.isDefined && efsDelocalize.getOrElse(false)) {
Log.debug("delocalization on EFS is enabled")
Log.debug(s"Delocalizing $globDirectory to $s3GlobOutDirectory\n")
val outputCopyCommand = {
val coreDelocalize =
s"""
|if [ -f "$workDir/${jobPaths.returnCodeFilename}" ]; then _s3_delocalize_with_retry "$workDir/${jobPaths.returnCodeFilename}" "${jobPaths.callRoot.pathAsString}/${jobPaths.returnCodeFilename}" ; fi
|if [ -f "$stdErr" ]; then _s3_delocalize_with_retry "$stdErr" "${jobPaths.standardPaths.error.pathAsString}"; fi
|if [ -f "$stdOut" ]; then _s3_delocalize_with_retry "$stdOut" "${jobPaths.standardPaths.output.pathAsString}"; fi
|""".stripMargin

val otherOutputs = outputs
.map {
// local is relative path, no mountpoint disk in front.
case output: AwsBatchFileOutput if output.local.pathAsString.contains("*") => "" // filter out globs
case output: AwsBatchFileOutput if output.s3key.endsWith(".list") && output.s3key.contains("glob-") =>
Log.debug("Globbing : check for EFS settings.")
val s3GlobOutDirectory = output.s3key.replace(".list", "")
// glob paths are not generated with 127 char limit, using generateGlobPaths(). name can be used safely
val globDirectory = output.name.replace(".list", "")
/*
* Need to process this list and de-localize each file if the list file actually exists
* if it doesn't exist then 'touch' it so that it can be copied otherwise later steps will get upset
* about the missing file
*/
if (efsMntPoint.isDefined && output.mount.mountPoint.pathAsString == efsMntPoint.get) {
Log.debug(
"EFS glob output file detected: " + output.s3key + s" / ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}"
)
val test_cmd = if (efsDelocalize.isDefined && efsDelocalize.getOrElse(false)) {
Log.debug("delocalization on EFS is enabled")
Log.debug(s"Delocalizing $globDirectory to $s3GlobOutDirectory\n")
s"""
|touch "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}"
|_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}"
|if [ -e $globDirectory ]; then _s3_delocalize_with_retry "$globDirectory" "$s3GlobOutDirectory" ; fi
|""".stripMargin
} else {

// check file for existence
s"""
|# test the glob list
|test -e "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" || (echo 'output file: ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString} does not exist' && DELOCALIZATION_FAILED=1)
|# test individual files.
|SAVEIFS="$$IFS"
|IFS=$$'\n'
|for F in $$(cat "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}"); do
| test -e "${globDirectory}/$$F" || (echo 'globbed file: "${globDirectory}/$$F" does not exist' && DELOCALIZATION_FAILED=1 && break)
|done
|IFS="$$SAVEIFS"
|"""
}
// need to make md5sum?
val md5_cmd = if (efsMakeMD5.isDefined && efsMakeMD5.getOrElse(false)) {
Log.debug("Add cmd to create MD5 sibling.")
// generate MD5 if missing or if local file is newer than sibling md5
s"""
|if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' -nt '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]]; then
| # the glob list
| md5sum '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' > '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || (echo 'Could not generate ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' && DELOCALIZATION_FAILED=1 );
| # globbed files, using specified number of cpus for parallel processing.
| SAVEIFS="$$IFS"
| IFS=$$'\n'
| cat "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" | xargs -I% -P${runtimeAttributes.cpu.##.toString} bash -c "md5sum ${globDirectory}/% > ${globDirectory}/%.md5"
| IFS="$$SAVEIFS"
|fi
|""".stripMargin
}
// return combined result
s"""
|touch "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}"
|_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}"
|if [ -e $globDirectory ]; then _s3_delocalize_with_retry "$globDirectory" "$s3GlobOutDirectory" ; fi
|""".stripMargin
|${test_cmd}
|${md5_cmd}
| """.stripMargin
} else {

// check file for existence
// default delocalization command.
Log.debug(s"Delocalize from ${output.name} to ${output.s3key}\n")
s"""
|# test the glob list
|test -e "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" || (echo 'output file: ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString} does not exist' && DELOCALIZATION_FAILED=1)
|# test individual files.
|SAVEIFS="$$IFS"
|IFS=$$'\n'
|for F in $$(cat "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}"); do
| test -e "${globDirectory}/$$F" || (echo 'globbed file: "${globDirectory}/$$F" does not exist' && DELOCALIZATION_FAILED=1 && break)
|done
|IFS="$$SAVEIFS"
|"""
|touch "${output.name}"
|_s3_delocalize_with_retry "${output.name}" "${output.s3key}"
|if [ -e "$globDirectory" ]; then _s3_delocalize_with_retry "$globDirectory" "$s3GlobOutDirectory" ; fi""".stripMargin
}
// need to make md5sum?
val md5_cmd = if (efsMakeMD5.isDefined && efsMakeMD5.getOrElse(false)) {
Log.debug("Add cmd to create MD5 sibling.")
// generate MD5 if missing or if local file is newer than sibling md5
s"""
|if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' -nt '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]]; then
| # the glob list
| md5sum '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' > '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || (echo 'Could not generate ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' && DELOCALIZATION_FAILED=1 );
| # globbed files, using specified number of cpus for parallel processing.
| SAVEIFS="$$IFS"
| IFS=$$'\n'
| cat "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" | xargs -I% -P${runtimeAttributes.cpu.##.toString} bash -c "md5sum ${globDirectory}/% > ${globDirectory}/%.md5"
| IFS="$$SAVEIFS"
|fi
|""".stripMargin

// files on /cromwell/ working dir must be delocalized
case output: AwsBatchFileOutput
if output.s3key.startsWith(
"s3://"
) && output.mount.mountPoint.pathAsString == AwsBatchWorkingDisk.MountPoint.pathAsString =>
// output is on working disk mount
s"""_s3_delocalize_with_retry "$workDir/${output.local.pathAsString}" "${output.s3key}" "${output.optional}" """.stripMargin

// files on EFS mounts are optionally delocalized.
case output: AwsBatchFileOutput
if efsMntPoint.isDefined && output.mount.mountPoint.pathAsString == efsMntPoint.get =>
Log.debug(
"EFS output file detected: " + output.s3key + s" / ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}"
)
// EFS located file : test existence or delocalize.
if (efsDelocalize.isDefined && efsDelocalize.getOrElse(false)) {
Log.debug("efs-delocalization enabled")
s"""_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}" "${output.optional}" """.stripMargin
} else {
Log.debug("efs-delocalization disabled")
s"""_check_efs_outfile "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${efsMakeMD5
.getOrElse(false)}" "${output.optional}" """.stripMargin
}
// return combined result
s"""
|${test_cmd}
|${md5_cmd}
| """.stripMargin
} else {
// default delocalization command.
Log.debug(s"Delocalize from ${output.name} to ${output.s3key}\n")
s"""
|touch "${output.name}"
|_s3_delocalize_with_retry "${output.name}" "${output.s3key}"
|if [ -e "$globDirectory" ]; then _s3_delocalize_with_retry "$globDirectory" "$s3GlobOutDirectory" ; fi""".stripMargin
}

// files on /cromwell/ working dir must be delocalized
case output: AwsBatchFileOutput
if output.s3key.startsWith(
"s3://"
) && output.mount.mountPoint.pathAsString == AwsBatchWorkingDisk.MountPoint.pathAsString =>
// output is on working disk mount
s"""_s3_delocalize_with_retry "$workDir/${output.local.pathAsString}" "${output.s3key}" "${output.optional}" """.stripMargin

// files on EFS mounts are optionally delocalized.
case output: AwsBatchFileOutput
if efsMntPoint.isDefined && output.mount.mountPoint.pathAsString == efsMntPoint.get =>
Log.debug(
"EFS output file detected: " + output.s3key + s" / ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}"
)
// EFS located file : test existence or delocalize.
if (efsDelocalize.isDefined && efsDelocalize.getOrElse(false)) {
Log.debug("efs-delocalization enabled")
s"""_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}" "${output.optional}" """.stripMargin
} else {
Log.debug("efs-delocalization disabled")
s"""_check_efs_outfile "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${efsMakeMD5
.getOrElse(false)}" "${output.optional}" """.stripMargin
}
case output: AwsBatchFileOutput =>
// output on a different mount
Log.debug("output data on other mount")
Log.debug(output.toString)
s"""_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}" """.stripMargin
case _ => ""
}
.mkString("\n")

case output: AwsBatchFileOutput =>
// output on a different mount
Log.debug("output data on other mount")
Log.debug(output.toString)
s"""_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}" """.stripMargin
case _ => ""
}
.mkString("\n") + "\n" +
s"""
|if [ -f "$workDir/${jobPaths.returnCodeFilename}" ]; then _s3_delocalize_with_retry "$workDir/${jobPaths.returnCodeFilename}" "${jobPaths.callRoot.pathAsString}/${jobPaths.returnCodeFilename}" ; fi
|if [ -f "$stdErr" ]; then _s3_delocalize_with_retry "$stdErr" "${jobPaths.standardPaths.error.pathAsString}"; fi
|if [ -f "$stdOut" ]; then _s3_delocalize_with_retry "$stdOut" "${jobPaths.standardPaths.output.pathAsString}"; fi
|""".stripMargin
// Prepend core delocalize lines so rc, stderr, stdout are always delocalized first.
coreDelocalize + "\n" + otherOutputs + "\n"
}

// insert the preamble at the insertion point and the postscript copy command at the end
replaced.patch(insertionPoint, preamble, 0) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi
val job = generateJobWithS3InOut
val postscript =
s"""
|{
|set -e
|# (re-)add tags to include added volumes:
|if [[ "false" == "true" ]]; then
Expand All @@ -484,12 +485,13 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi
|
|echo '*** DELOCALIZING OUTPUTS ***'
|DELOCALIZATION_FAILED=0
|_s3_delocalize_with_retry "/tmp/scratch/baa" "s3://bucket/somewhere/baa" "false"
|
|if [ -f "/tmp/scratch/hello-rc.txt" ]; then _s3_delocalize_with_retry "/tmp/scratch/hello-rc.txt" "${job.jobPaths.returnCode}" ; fi
|if [ -f "/tmp/scratch/hello-stderr.log" ]; then _s3_delocalize_with_retry "/tmp/scratch/hello-stderr.log" "${job.jobPaths.standardPaths.error}"; fi
|if [ -f "/tmp/scratch/hello-stdout.log" ]; then _s3_delocalize_with_retry "/tmp/scratch/hello-stdout.log" "${job.jobPaths.standardPaths.output}"; fi
|
|_s3_delocalize_with_retry "/tmp/scratch/baa" "s3://bucket/somewhere/baa" "false"
|
|echo "DELOCALIZATION RESULT: $$DELOCALIZATION_FAILED"
|if [[ $$DELOCALIZATION_FAILED -eq 1 ]]; then
| echo '*** DELOCALIZATION FAILED ***'
Expand All @@ -504,7 +506,9 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi
|exit $$rc
|}
|""".stripMargin
job.reconfiguredScript should include(postscript)

val normalize: String => String = s => s.replaceAll("\\s+", " ").trim
normalize(job.reconfiguredScript) should include(normalize(postscript))
}

it should "generate preamble with input copy command in reconfigured script" in {
Expand Down
Loading