Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/src/main/scala/cromwell/backend/io/JobPaths.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ trait JobPaths {
lazy val dockerCid = callExecutionRoot.resolve(dockerCidFilename)
lazy val returnCode = callExecutionRoot.resolve(returnCodeFilename)
lazy val memoryRetryRC = callExecutionRoot.resolve(memoryRetryRCFilename)
// Path to to an existing file that contains the error text of the job if it failed due to memory constraints.
lazy val memoryRetryError = Option(standardPaths.error)

// This is a `def` because `standardPaths` is a `var` that may be reassigned during the calculation of
// standard output and error file names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,7 @@ trait StandardAsyncExecutionActor
): Future[ExecutionHandle] = {

// Returns true if the task has written an RC file that indicates OOM, false otherwise
def memoryRetryRC: Future[Boolean] = {
def memoryRetryRC: Future[(Boolean, Option[Path])] = {

def readFile(path: Path, maxBytes: Option[Int]): Future[String] =
asyncIo.contentAsStringAsync(path, maxBytes, failOnOverflow = false)
Expand All @@ -1438,23 +1438,37 @@ trait StandardAsyncExecutionActor
}
}

def checkMemoryRetryStderr(errorKeys: List[String], maxBytes: Int): Future[Boolean] =
readFile(jobPaths.standardPaths.error, Option(maxBytes)) map { errorContent =>
def checkMemoryRetryStderr(memoryRetryError: Path, errorKeys: List[String], maxBytes: Int): Future[Boolean] =
readFile(memoryRetryError, Option(maxBytes)) map { errorContent =>
errorKeys.exists(errorContent.contains)
}

asyncIo.existsAsync(jobPaths.memoryRetryRC) flatMap {
case true => checkMemoryRetryRC()
case false =>
(memoryRetryErrorKeys, memoryRetryStderrLimit) match {
case (Some(keys), Some(limit)) =>
asyncIo.existsAsync(jobPaths.standardPaths.error) flatMap {
case true => checkMemoryRetryStderr(keys, limit)
case false => Future.successful(false)
}
case _ => Future.successful(false)
}
}
def checkMemoryRetryError(): Future[Boolean] =
(memoryRetryErrorKeys, memoryRetryStderrLimit, jobPaths.memoryRetryError) match {
case (Some(keys), Some(limit), Some(memoryRetryError)) =>
for {
memoryRetryErrorExists <- asyncIo.existsAsync(memoryRetryError)
memoryRetryErrorFound <-
if (memoryRetryErrorExists)
checkMemoryRetryStderr(memoryRetryError, keys, limit)
else
Future.successful(false)
} yield memoryRetryErrorFound
case _ => Future.successful(false)
}

// For backwards behavioral compatibility, check for the old memory retry RC file first. That file used to catch
// the errors from the standard error file, but now sometimes the error is written to a separate log file.
// If it exists, check its contents. If it doesn't find an OOM code, check the new memory retry error file.
for {
memoryRetryRCExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
memoryRetryRCErrorFound <- if (memoryRetryRCExists) checkMemoryRetryRC() else Future.successful(false)
memoryRetryErrorFound <- if (memoryRetryRCErrorFound) Future.successful(true) else checkMemoryRetryError()
memoryErrorPathOption =
if (memoryRetryRCErrorFound) Option(jobPaths.standardPaths.error)
else if (memoryRetryErrorFound) jobPaths.memoryRetryError
else None
} yield (memoryRetryErrorFound, memoryErrorPathOption)
}

val stderr = jobPaths.standardPaths.error
Expand All @@ -1465,74 +1479,76 @@ trait StandardAsyncExecutionActor
// Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
// may fail due to race conditions on quickly-executing jobs.
stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future.successful(0L)
outOfMemoryDetected <- memoryRetryRC
} yield (stderrSize, returnCodeAsString, outOfMemoryDetected)

stderrSizeAndReturnCodeAndMemoryRetry flatMap { case (stderrSize, returnCodeAsString, outOfMemoryDetected) =>
val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)

if (isDone(status)) {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption),
Option(returnCodeAsInt),
None
(outOfMemoryDetected, outOfMemoryPathOption) <- memoryRetryRC
} yield (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption)

stderrSizeAndReturnCodeAndMemoryRetry flatMap {
case (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption) =>
val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)

if (isDone(status)) {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption),
Option(returnCodeAsInt),
None
)
)
)
retryElseFail(executionHandle)
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
// It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
// if it was caused by OOM killer, want to handle as OOM and not job abort.
case Success(returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(
RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log),
Option(returnCodeAsInt),
None
retryElseFail(executionHandle)
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
// It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
// if it was caused by OOM killer, want to handle as OOM and not job abort.
case Success(returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(
RetryWithMoreMemory(jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
Option(returnCodeAsInt),
None
)
)
)
retryElseFail(executionHandle,
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
)
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
Future.successful(AbortedExecutionHandle)
case Success(returnCodeAsInt) =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
Option(returnCodeAsInt),
None
retryElseFail(executionHandle,
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
)
)
retryElseFail(executionHandle)
case Failure(_) =>
Future.successful(
FailedNonRetryableExecutionHandle(
ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
kvPairsToSave = None
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
Future.successful(AbortedExecutionHandle)
case Success(returnCodeAsInt) =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(
WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
Option(returnCodeAsInt),
None
)
)
)
}
} else {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt)
if outOfMemoryDetected && memoryRetryRequested && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(
RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log),
Option(returnCodeAsInt),
None
retryElseFail(executionHandle)
case Failure(_) =>
Future.successful(
FailedNonRetryableExecutionHandle(
ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
kvPairsToSave = None
)
)
)
retryElseFail(executionHandle,
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
)
case _ =>
val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
retryElseFail(failureStatus)
}
} else {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt)
if outOfMemoryDetected && memoryRetryRequested && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(
RetryWithMoreMemory(jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
Option(returnCodeAsInt),
None
)
)
retryElseFail(executionHandle,
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
)
case _ =>
val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
retryElseFail(failureStatus)
}
}
}
} recoverWith { case exception =>
if (isDone(status)) Future.successful(FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None))
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
version 1.0

task imitate_oom_error {
meta {
volatile: true
}
command {
echo "$MEM_SIZE $MEM_UNIT"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ import cromwell.backend.google.batch.models.RunStatus.TerminalRunStatus
import cromwell.backend.google.batch.models._
import cromwell.backend.google.batch.monitoring.{BatchInstrumentation, CheckpointingConfiguration, MonitoringImage}
import cromwell.backend.google.batch.runnable.WorkflowOptionKeys
import cromwell.backend.google.batch.util.{GcpBatchReferenceFilesMappingOperations, RuntimeOutputMapping}
import cromwell.backend.google.batch.util.{
GcpBatchReferenceFilesMappingOperations,
MemoryRetryRunnable,
MemoryRetryStandard,
RuntimeOutputMapping
}
import cromwell.backend.standard._
import cromwell.core._
import cromwell.core.io.IoCommandBuilder
Expand Down Expand Up @@ -633,7 +638,14 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar

// if the `memory_retry_multiplier` is not present in the workflow options there is no need to check whether or
// not the `stderr` file contained memory retry error keys
val retryWithMoreMemoryKeys: Option[List[String]] = memoryRetryFactor.flatMap(_ => memoryRetryErrorKeys)
// If the retry detection is processed using the stderr then do not try retry with more memory in the backend.
// This keeps the backend jobs from logging the memory retry error keys in the very verbose Google Cloud Batch
// logs, tripping up the standard memory retry detection.
val retryWithMoreMemoryKeys: Option[List[String]] =
batchConfiguration.batchAttributes.memoryRetryCheckMode match {
case MemoryRetryRunnable => memoryRetryFactor.flatMap(_ => memoryRetryErrorKeys)
case MemoryRetryStandard => None
}

val targetLogFile = batchAttributes.logsPolicy match {
case GcpBatchLogsPolicy.CloudLogging => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.{
GcsTransferConfiguration,
VirtualPrivateCloudConfiguration
}
import cromwell.backend.google.batch.util.GcpBatchReferenceFilesMappingOperations
import cromwell.backend.google.batch.util.{GcpBatchReferenceFilesMappingOperations, MemoryRetryCheckMode}
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.cloudsupport.gcp.auth.GoogleAuthMode
import cromwell.docker.DockerMirroring
Expand Down Expand Up @@ -58,7 +58,8 @@ case class GcpBatchConfigurationAttributes(
referenceFileToDiskImageMappingOpt: Option[Map[String, GcpBatchReferenceFilesDisk]],
checkpointingInterval: FiniteDuration,
logsPolicy: GcpBatchLogsPolicy,
maxTransientErrorRetries: Int
maxTransientErrorRetries: Int,
memoryRetryCheckMode: MemoryRetryCheckMode
)

object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOperations with StrictLogging {
Expand Down Expand Up @@ -89,6 +90,7 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
val DefaultGcsTransferAttempts: Refined[Int, Positive] = refineMV[Positive](3)

val checkpointingIntervalKey = "checkpointing-interval"
val memoryRetryCheckModeKey = "memory-retry-check-mode"

private val batchKeys = CommonBackendConfigurationAttributes.commonValidConfigurationAttributeKeys ++ Set(
"project",
Expand Down Expand Up @@ -128,7 +130,8 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
"virtual-private-cloud.subnetwork-label-key",
"virtual-private-cloud.auth",
"reference-disk-localization-manifests",
checkpointingIntervalKey
checkpointingIntervalKey,
memoryRetryCheckModeKey
)

private val deprecatedBatchKeys: Map[String, String] = Map(
Expand Down Expand Up @@ -311,6 +314,13 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
val maxTransientErrorRetries: Int =
backendConfig.as[Option[Int]]("max-transient-error-retries").getOrElse(10)

val memoryRetryCheckMode: ErrorOr[MemoryRetryCheckMode] =
MemoryRetryCheckMode
.tryParse(
backendConfig.getOrElse(memoryRetryCheckModeKey, MemoryRetryCheckMode.DefaultMode.name)
)
.toErrorOr

def authGoogleConfigForBatchConfigurationAttributes(
project: String,
bucket: String,
Expand All @@ -327,7 +337,8 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
virtualPrivateCloudConfiguration: VirtualPrivateCloudConfiguration,
batchRequestTimeoutConfiguration: BatchRequestTimeoutConfiguration,
referenceDiskLocalizationManifestFilesOpt: Option[List[ManifestFile]],
logsPolicy: GcpBatchLogsPolicy
logsPolicy: GcpBatchLogsPolicy,
memoryRetryCheckMode: MemoryRetryCheckMode
): ErrorOr[GcpBatchConfigurationAttributes] =
(googleConfig.auth(batchName), googleConfig.auth(gcsName)) mapN { (batchAuth, gcsAuth) =>
val generatedReferenceFilesMappingOpt = referenceDiskLocalizationManifestFilesOpt map {
Expand All @@ -354,7 +365,8 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
referenceFileToDiskImageMappingOpt = generatedReferenceFilesMappingOpt,
checkpointingInterval = checkpointingInterval,
logsPolicy = logsPolicy,
maxTransientErrorRetries = maxTransientErrorRetries
maxTransientErrorRetries = maxTransientErrorRetries,
memoryRetryCheckMode = memoryRetryCheckMode
)
}

Expand All @@ -373,7 +385,8 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
virtualPrivateCloudConfiguration,
batchRequestTimeoutConfigurationValidation,
referenceDiskLocalizationManifestFiles,
logsPolicy
logsPolicy,
memoryRetryCheckMode
) flatMapN authGoogleConfigForBatchConfigurationAttributes match {
case Valid(r) => r
case Invalid(f) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cromwell.backend.google.batch.models

import cromwell.backend.BackendJobDescriptorKey
import cromwell.backend.google.batch.runnable.GcpBatchMetadataKeys
import cromwell.backend.google.batch.util.{MemoryRetryRunnable, MemoryRetryStandard}
import cromwell.backend.io.JobPaths
import cromwell.core.path.Path
import cromwell.services.metadata.CallMetadataKeys
Expand Down Expand Up @@ -48,6 +49,12 @@ case class GcpBatchJobPaths(override val workflowPaths: GcpBatchWorkflowPaths,
case _ => None
}

override lazy val memoryRetryError: Option[Path] =
workflowPaths.gcpBatchConfiguration.batchAttributes.memoryRetryCheckMode match {
case MemoryRetryRunnable => None
case MemoryRetryStandard => maybeBatchLogPath
}

override lazy val customMetadataPaths = {
val backendLogsMetadata = maybeBatchLogPath map { p: Path =>
Map(CallMetadataKeys.BackendLogsPrefix + ":log" -> p)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cromwell.backend.google.batch.util

import scala.util.{Failure, Success, Try}

sealed trait MemoryRetryCheckMode {
val name: String

override def toString: String = name
}

case object MemoryRetryRunnable extends MemoryRetryCheckMode {
override val name: String = "Runnable"
}

case object MemoryRetryStandard extends MemoryRetryCheckMode {
override val name: String = "Standard"
}

object MemoryRetryCheckMode {
val DefaultMode = MemoryRetryRunnable
private val AllModes: Seq[MemoryRetryCheckMode] = Seq(MemoryRetryRunnable, MemoryRetryStandard)

def tryParse(mode: String): Try[MemoryRetryCheckMode] =
AllModes
.find(_.name.equalsIgnoreCase(mode))
.map(Success(_))
.getOrElse(
Failure(
new Exception(
s"Invalid memory retry check mode: '$mode', supported modes are: ${AllModes.map(_.name).mkString("'", "', '", "'")}"
)
)
)
}
Loading
Loading