From b46f4093dfc3df0d6c0e698abb93fd57e9f8a23d Mon Sep 17 00:00:00 2001 From: Khalid Shakir Date: Sat, 28 Jun 2025 13:30:15 -0400 Subject: [PATCH] Allow users to request Cromwell to retry 50002 --- ...cpBatchAsyncBackendJobExecutionActor.scala | 11 ++- .../batch/runnable/WorkflowOptionKeys.scala | 1 + ...tchAsyncBackendJobExecutionActorSpec.scala | 73 ++++++++++++++++--- 3 files changed, 74 insertions(+), 11 deletions(-) diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala index fe46d15e0ad..963db9e9f92 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala @@ -203,6 +203,12 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar case _ => false } + lazy val retryReportingTimeouts: Boolean = + jobDescriptor.workflowDescriptor.workflowOptions + .getBoolean(WorkflowOptionKeys.RetryReportingTimeouts) + .toOption + .getOrElse(false) + override def tryAbort(job: StandardAsyncJob): Unit = abortJob(workflowId = workflowId, jobName = JobName.parse(job.jobId), @@ -1204,11 +1210,12 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar // Check whether this failure should be automatically resubmitted without counting against maxRetries. // Guidance: Resubmit if the task has a known-transient failure type and has not yet cost the user money. private def isTransientFailure(failed: RunStatus.Failed): Boolean = { - lazy val errorTypeIsTransient = List( + lazy val transientErrors = List( GcpBatchExitCode.VMPreemption, GcpBatchExitCode.VMRecreatedDuringExecution, GcpBatchExitCode.VMRebootedDuringExecution - ).contains(failed.errorCode) + ) ++ retryReportingTimeouts.option(GcpBatchExitCode.VMReportingTimeout) + lazy val errorTypeIsTransient = transientErrors.contains(failed.errorCode) lazy val taskStartedRunning = failed.eventList.exists(e => executionEventRunningMatcher.matches(e.name)) transientErrorRetryable && errorTypeIsTransient && !taskStartedRunning } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/WorkflowOptionKeys.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/WorkflowOptionKeys.scala index 5496246d05b..627f735181e 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/WorkflowOptionKeys.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/runnable/WorkflowOptionKeys.scala @@ -8,4 +8,5 @@ object WorkflowOptionKeys { val GoogleProject = "google_project" val GoogleComputeServiceAccount = "google_compute_service_account" val EnableFuse = "enable_fuse" + val RetryReportingTimeouts = "retry_reporting_timeouts" } diff --git a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala index 0295931b2ce..2f2dbaaab41 100644 --- a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala +++ b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala @@ -21,6 +21,7 @@ import cromwell.backend.google.batch.api.GcpBatchRequestFactory import cromwell.backend.google.batch.io.{DiskType, GcpBatchWorkingDisk} import cromwell.backend.google.batch.models._ import cromwell.backend.google.batch.runnable.RunnableUtils.MountPoint +import cromwell.backend.google.batch.runnable.WorkflowOptionKeys import cromwell.backend.google.batch.util.BatchExpressionFunctions import cromwell.backend.io.JobPathsSpecHelper._ import cromwell.backend.standard.{ @@ -115,6 +116,14 @@ class GcpBatchAsyncBackendJobExecutionActorSpec val Inputs: Map[FullyQualifiedName, WomValue] = Map("wf_sup.sup.addressee" -> WomString("dog")) private val NoOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue])) + private val RetryReportingTimeoutsOptions = + WorkflowOptions( + JsObject( + Map[String, JsValue]( + WorkflowOptionKeys.RetryReportingTimeouts -> JsBoolean(true) + ) + ) + ) private lazy val TestableCallContext = CallContext(mockPathBuilder.build("gs://root").get, DummyStandardPaths, isDocker = false) @@ -230,7 +239,8 @@ class GcpBatchAsyncBackendJobExecutionActorSpec previousPreemptions: Int, previousUnexpectedRetries: Int, previousTransientRetries: Int, - failedRetriesCountOpt: Option[Int] = None + failedRetriesCountOpt: Option[Int], + workflowOptions: WorkflowOptions ): BackendJobDescriptor = { val attempt = previousPreemptions + previousUnexpectedRetries + 1 val wdlNamespace = WdlNamespaceWithWorkflow @@ -252,7 +262,7 @@ class GcpBatchAsyncBackendJobExecutionActorSpec WorkflowId.randomId(), womDefinition, inputs, - NoOptions, + workflowOptions, Labels.empty, HogGroup("foo"), List.empty, @@ -335,14 +345,17 @@ class GcpBatchAsyncBackendJobExecutionActorSpec def buildPreemptibleTestActorRef(attempt: Int, preemptible: Int, previousTransientRetriesCount: Int = 0, - failedRetriesCountOpt: Option[Int] = None + failedRetriesCountOpt: Option[Int] = None, + workflowOptions: WorkflowOptions = NoOptions ): TestActorRef[TestableGcpBatchJobExecutionActor] = { // For this test we say that all previous attempts were preempted: - val jobDescriptor = buildPreemptibleJobDescriptor(preemptible, - attempt - 1, - previousUnexpectedRetries = 0, - previousTransientRetries = previousTransientRetriesCount, - failedRetriesCountOpt = failedRetriesCountOpt + val jobDescriptor = buildPreemptibleJobDescriptor( + preemptible, + attempt - 1, + previousUnexpectedRetries = 0, + previousTransientRetries = previousTransientRetriesCount, + failedRetriesCountOpt = failedRetriesCountOpt, + workflowOptions = workflowOptions ) val props = Props( new TestableGcpBatchJobExecutionActor(jobDescriptor, @@ -467,7 +480,7 @@ class GcpBatchAsyncBackendJobExecutionActorSpec ) ) - val jobDescriptor = buildPreemptibleJobDescriptor(0, 0, 0, 0) + val jobDescriptor = buildPreemptibleJobDescriptor(0, 0, 0, 0, None, NoOptions) val serviceRegistryProbe = TestProbe() val backend1 = executionActor( @@ -511,6 +524,48 @@ class GcpBatchAsyncBackendJobExecutionActorSpec failedHandle.returnCode shouldBe None } + it should "not retry transient reporting timeouts by default" in { + val actorRef = buildPreemptibleTestActorRef( + attempt = 1, + preemptible = 1, + workflowOptions = NoOptions + ) + val batchBackend = actorRef.underlyingActor + val runId = generateStandardAsyncJob + val handle = new GcpBatchPendingExecutionHandle(null, runId, None, None) + + val failedStatus = RunStatus.Failed( + GcpBatchExitCode.VMReportingTimeout, + Seq.empty + ) + val executionResult = batchBackend.handleExecutionResult(failedStatus, handle) + val result = Await.result(executionResult, timeout) + result.isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true + val failedHandle = result.asInstanceOf[FailedNonRetryableExecutionHandle] + failedHandle.returnCode shouldBe None + } + + it should "retry transient reporting timeouts when requested" in { + val actorRef = buildPreemptibleTestActorRef( + attempt = 1, + preemptible = 1, + workflowOptions = RetryReportingTimeoutsOptions + ) + val batchBackend = actorRef.underlyingActor + val runId = generateStandardAsyncJob + val handle = new GcpBatchPendingExecutionHandle(null, runId, None, None) + + val failedStatus = RunStatus.Failed( + GcpBatchExitCode.VMReportingTimeout, + Seq.empty + ) + val executionResult = batchBackend.handleExecutionResult(failedStatus, handle) + val result = Await.result(executionResult, timeout) + result.isInstanceOf[FailedRetryableExecutionHandle] shouldBe true + val failedHandle = result.asInstanceOf[FailedRetryableExecutionHandle] + failedHandle.returnCode shouldBe None + } + it should "retry transient failures when appropriate" in { val actorRef = buildPreemptibleTestActorRef(attempt = 2, preemptible = 0, previousTransientRetriesCount = 1) val batchBackend = actorRef.underlyingActor