Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
case _ => false
}

lazy val retryReportingTimeouts: Boolean =
jobDescriptor.workflowDescriptor.workflowOptions
.getBoolean(WorkflowOptionKeys.RetryReportingTimeouts)
.toOption
.getOrElse(false)

override def tryAbort(job: StandardAsyncJob): Unit =
abortJob(workflowId = workflowId,
jobName = JobName.parse(job.jobId),
Expand Down Expand Up @@ -1204,11 +1210,12 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
// Check whether this failure should be automatically resubmitted without counting against maxRetries.
// Guidance: Resubmit if the task has a known-transient failure type and has not yet cost the user money.
private def isTransientFailure(failed: RunStatus.Failed): Boolean = {
lazy val errorTypeIsTransient = List(
lazy val transientErrors = List(
GcpBatchExitCode.VMPreemption,
GcpBatchExitCode.VMRecreatedDuringExecution,
GcpBatchExitCode.VMRebootedDuringExecution
).contains(failed.errorCode)
) ++ retryReportingTimeouts.option(GcpBatchExitCode.VMReportingTimeout)
lazy val errorTypeIsTransient = transientErrors.contains(failed.errorCode)
lazy val taskStartedRunning = failed.eventList.exists(e => executionEventRunningMatcher.matches(e.name))
transientErrorRetryable && errorTypeIsTransient && !taskStartedRunning
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ object WorkflowOptionKeys {
val GoogleProject = "google_project"
val GoogleComputeServiceAccount = "google_compute_service_account"
val EnableFuse = "enable_fuse"
val RetryReportingTimeouts = "retry_reporting_timeouts"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import cromwell.backend.google.batch.api.GcpBatchRequestFactory
import cromwell.backend.google.batch.io.{DiskType, GcpBatchWorkingDisk}
import cromwell.backend.google.batch.models._
import cromwell.backend.google.batch.runnable.RunnableUtils.MountPoint
import cromwell.backend.google.batch.runnable.WorkflowOptionKeys
import cromwell.backend.google.batch.util.BatchExpressionFunctions
import cromwell.backend.io.JobPathsSpecHelper._
import cromwell.backend.standard.{
Expand Down Expand Up @@ -115,6 +116,14 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
val Inputs: Map[FullyQualifiedName, WomValue] = Map("wf_sup.sup.addressee" -> WomString("dog"))

private val NoOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue]))
private val RetryReportingTimeoutsOptions =
WorkflowOptions(
JsObject(
Map[String, JsValue](
WorkflowOptionKeys.RetryReportingTimeouts -> JsBoolean(true)
)
)
)

private lazy val TestableCallContext =
CallContext(mockPathBuilder.build("gs://root").get, DummyStandardPaths, isDocker = false)
Expand Down Expand Up @@ -230,7 +239,8 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
previousPreemptions: Int,
previousUnexpectedRetries: Int,
previousTransientRetries: Int,
failedRetriesCountOpt: Option[Int] = None
failedRetriesCountOpt: Option[Int],
workflowOptions: WorkflowOptions
): BackendJobDescriptor = {
val attempt = previousPreemptions + previousUnexpectedRetries + 1
val wdlNamespace = WdlNamespaceWithWorkflow
Expand All @@ -252,7 +262,7 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
WorkflowId.randomId(),
womDefinition,
inputs,
NoOptions,
workflowOptions,
Labels.empty,
HogGroup("foo"),
List.empty,
Expand Down Expand Up @@ -335,14 +345,17 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
def buildPreemptibleTestActorRef(attempt: Int,
preemptible: Int,
previousTransientRetriesCount: Int = 0,
failedRetriesCountOpt: Option[Int] = None
failedRetriesCountOpt: Option[Int] = None,
workflowOptions: WorkflowOptions = NoOptions
): TestActorRef[TestableGcpBatchJobExecutionActor] = {
// For this test we say that all previous attempts were preempted:
val jobDescriptor = buildPreemptibleJobDescriptor(preemptible,
attempt - 1,
previousUnexpectedRetries = 0,
previousTransientRetries = previousTransientRetriesCount,
failedRetriesCountOpt = failedRetriesCountOpt
val jobDescriptor = buildPreemptibleJobDescriptor(
preemptible,
attempt - 1,
previousUnexpectedRetries = 0,
previousTransientRetries = previousTransientRetriesCount,
failedRetriesCountOpt = failedRetriesCountOpt,
workflowOptions = workflowOptions
)
val props = Props(
new TestableGcpBatchJobExecutionActor(jobDescriptor,
Expand Down Expand Up @@ -467,7 +480,7 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
)
)

val jobDescriptor = buildPreemptibleJobDescriptor(0, 0, 0, 0)
val jobDescriptor = buildPreemptibleJobDescriptor(0, 0, 0, 0, None, NoOptions)
val serviceRegistryProbe = TestProbe()

val backend1 = executionActor(
Expand Down Expand Up @@ -511,6 +524,48 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
failedHandle.returnCode shouldBe None
}

it should "not retry transient reporting timeouts by default" in {
val actorRef = buildPreemptibleTestActorRef(
attempt = 1,
preemptible = 1,
workflowOptions = NoOptions
)
val batchBackend = actorRef.underlyingActor
val runId = generateStandardAsyncJob
val handle = new GcpBatchPendingExecutionHandle(null, runId, None, None)

val failedStatus = RunStatus.Failed(
GcpBatchExitCode.VMReportingTimeout,
Seq.empty
)
val executionResult = batchBackend.handleExecutionResult(failedStatus, handle)
val result = Await.result(executionResult, timeout)
result.isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true
val failedHandle = result.asInstanceOf[FailedNonRetryableExecutionHandle]
failedHandle.returnCode shouldBe None
}

it should "retry transient reporting timeouts when requested" in {
val actorRef = buildPreemptibleTestActorRef(
attempt = 1,
preemptible = 1,
workflowOptions = RetryReportingTimeoutsOptions
)
val batchBackend = actorRef.underlyingActor
val runId = generateStandardAsyncJob
val handle = new GcpBatchPendingExecutionHandle(null, runId, None, None)

val failedStatus = RunStatus.Failed(
GcpBatchExitCode.VMReportingTimeout,
Seq.empty
)
val executionResult = batchBackend.handleExecutionResult(failedStatus, handle)
val result = Await.result(executionResult, timeout)
result.isInstanceOf[FailedRetryableExecutionHandle] shouldBe true
val failedHandle = result.asInstanceOf[FailedRetryableExecutionHandle]
failedHandle.returnCode shouldBe None
}

it should "retry transient failures when appropriate" in {
val actorRef = buildPreemptibleTestActorRef(attempt = 2, preemptible = 0, previousTransientRetriesCount = 1)
val batchBackend = actorRef.underlyingActor
Expand Down
Loading