Skip to content

Commit 213ba54

Browse files
thelightway24fmbenhassine
authored andcommitted
Add recover method to JobOperator
Resolves #4876 Signed-off-by: Yejeong, Ham <dev@thelightway.kr>
1 parent b27ee64 commit 213ba54

File tree

6 files changed

+151
-1
lines changed

6 files changed

+151
-1
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/launch/JobOperator.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
*
4040
* @author Dave Syer
4141
* @author Mahmoud Ben Hassine
42+
* @author Yejeong Ham
4243
* @since 2.0
4344
*/
4445
@SuppressWarnings("removal")
@@ -248,6 +249,18 @@ JobExecution startNextInstance(Job job) throws JobRestartException, JobExecution
248249
*/
249250
JobExecution abandon(JobExecution jobExecution) throws JobExecutionAlreadyRunningException;
250251

252+
/**
253+
* Marks the given {@link JobExecution} as {@code FAILED} when it is stuck in a
254+
* {@code STARTED} state due to an abrupt shutdown or failure, in order to make it
255+
* restartable. This operation makes a previously non-restartable execution eligible
256+
* for restart by updating its execution context with the flag {@code recovered=true}.
257+
* @param jobExecution the {@link JobExecution} to recover
258+
* @return the {@link JobExecution} after it has been marked as recovered
259+
* @throws UnexpectedJobExecutionException if the job execution is already complete or
260+
* abandoned
261+
*/
262+
JobExecution recover(JobExecution jobExecution);
263+
251264
/**
252265
* List the {@link JobExecution JobExecutions} associated with a particular
253266
* {@link JobInstance}, in reverse order of creation (and therefore usually of

spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/CommandLineJobOperator.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* {@link #main(String[])} method explains the various operations and exit codes.
5050
*
5151
* @author Mahmoud Ben Hassine
52+
* @author Yejeong Ham
5253
* @since 6.0
5354
*/
5455
public class CommandLineJobOperator {
@@ -204,6 +205,29 @@ public int abandon(long jobExecutionId) {
204205
}
205206
}
206207

208+
/**
209+
* Recover the job execution with the given ID that is stuck in a {@code STARTED}
210+
* state due to an abrupt shutdown or failure, making it eligible for restart.
211+
* @param jobExecutionId the ID of the job execution to recover
212+
* @return the exit code of the recovered job execution, or JVM_EXITCODE_GENERIC_ERROR
213+
* if an error occurs
214+
*/
215+
public int recover(long jobExecutionId) {
216+
logger.info(() -> "Recovering job execution with ID: " + jobExecutionId);
217+
try {
218+
JobExecution jobExecution = this.jobRepository.getJobExecution(jobExecutionId);
219+
if (jobExecution == null) {
220+
logger.error(() -> "No job execution found with ID: " + jobExecutionId);
221+
return JVM_EXITCODE_GENERIC_ERROR;
222+
}
223+
JobExecution recoveredExecution = this.jobOperator.recover(jobExecution);
224+
return this.exitCodeMapper.intValue(recoveredExecution.getExitStatus().getExitCode());
225+
}
226+
catch (Exception e) {
227+
return JVM_EXITCODE_GENERIC_ERROR;
228+
}
229+
}
230+
207231
/*
208232
* Main method to operate jobs from the command line.
209233
*
@@ -287,6 +311,10 @@ public static void main(String[] args) {
287311
jobExecutionId = Long.parseLong(args[2]);
288312
exitCode = operator.abandon(jobExecutionId);
289313
break;
314+
case "recover":
315+
jobExecutionId = Long.parseLong(args[2]);
316+
exitCode = operator.recover(jobExecutionId);
317+
break;
290318
default:
291319
System.err.println("Unknown operation: " + operation);
292320
exitCode = JVM_EXITCODE_GENERIC_ERROR;

spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
* @author Will Schipp
8181
* @author Mahmoud Ben Hassine
8282
* @author Andrey Litvitski
83+
* @author Yejeong Ham
8384
* @since 2.0
8485
* @deprecated since 6.0 in favor of {@link TaskExecutorJobOperator}. Scheduled for
8586
* removal in 6.2 or later.
@@ -391,6 +392,45 @@ public JobExecution abandon(JobExecution jobExecution) throws JobExecutionAlread
391392
return jobExecution;
392393
}
393394

395+
@Override
396+
public JobExecution recover(JobExecution jobExecution) {
397+
Assert.notNull(jobExecution, "JobExecution must not be null");
398+
399+
if (jobExecution.getExecutionContext().containsKey("recovered")) {
400+
if (logger.isInfoEnabled()) {
401+
logger.info("already recovered job execution: " + jobExecution);
402+
}
403+
throw new UnexpectedJobExecutionException("JobExecution is already recovered");
404+
}
405+
406+
BatchStatus jobStatus = jobExecution.getStatus();
407+
if (jobStatus == BatchStatus.COMPLETED || jobStatus == BatchStatus.ABANDONED) {
408+
throw new UnexpectedJobExecutionException(
409+
"JobExecution is already complete or abandoned and therefore cannot be recovered: " + jobExecution);
410+
}
411+
412+
if (logger.isInfoEnabled()) {
413+
logger.info("Recovering job execution: " + jobExecution);
414+
}
415+
416+
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
417+
BatchStatus stepStatus = stepExecution.getStatus();
418+
if (stepStatus.isRunning() || stepStatus == BatchStatus.STOPPING) {
419+
stepExecution.setStatus(BatchStatus.FAILED);
420+
stepExecution.setEndTime(LocalDateTime.now());
421+
stepExecution.getExecutionContext().put("recovered", true);
422+
jobRepository.update(stepExecution);
423+
}
424+
}
425+
426+
jobExecution.setStatus(BatchStatus.FAILED);
427+
jobExecution.setEndTime(LocalDateTime.now());
428+
jobExecution.getExecutionContext().put("recovered", true);
429+
jobRepository.update(jobExecution);
430+
431+
return jobExecution;
432+
}
433+
394434
@Override
395435
@Deprecated(since = "6.0", forRemoval = true)
396436
public Set<String> getJobNames() {

spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/TaskExecutorJobOperator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
* @author Lucas Ward
5252
* @author Will Schipp
5353
* @author Mahmoud Ben Hassine
54+
* @author Yejeong Ham
5455
* @since 6.0
5556
*/
5657
@SuppressWarnings("removal")
@@ -119,4 +120,10 @@ public JobExecution abandon(JobExecution jobExecution) throws JobExecutionAlread
119120
return super.abandon(jobExecution);
120121
}
121122

123+
@Override
124+
public JobExecution recover(JobExecution jobExecution) {
125+
Assert.notNull(jobExecution, "JobExecution must not be null");
126+
return super.recover(jobExecution);
127+
}
128+
122129
}

spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/CommandLineJobOperatorTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* Tests for {@link CommandLineJobOperator}.
3636
*
3737
* @author Mahmoud Ben Hassine
38+
* @author Yejeong Ham
3839
*/
3940
class CommandLineJobOperatorTests {
4041

@@ -133,4 +134,18 @@ void abandon() throws Exception {
133134
Mockito.verify(jobOperator).abandon(jobExecution);
134135
}
135136

137+
@Test
138+
void recover() {
139+
// given
140+
long jobExecutionId = 1;
141+
JobExecution jobExecution = mock();
142+
143+
// when
144+
Mockito.when(jobRepository.getJobExecution(jobExecutionId)).thenReturn(jobExecution);
145+
this.commandLineJobOperator.recover(jobExecutionId);
146+
147+
// then
148+
Mockito.verify(jobOperator).recover(jobExecution);
149+
}
150+
136151
}

spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/TaskExecutorJobOperatorTests.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.batch.core.job.JobExecution;
3333
import org.springframework.batch.core.job.JobExecutionException;
3434
import org.springframework.batch.core.job.JobInstance;
35+
import org.springframework.batch.core.job.UnexpectedJobExecutionException;
3536
import org.springframework.batch.core.job.parameters.JobParameters;
3637
import org.springframework.batch.core.job.parameters.JobParametersIncrementer;
3738
import org.springframework.batch.core.step.Step;
@@ -69,7 +70,7 @@
6970
* @author Will Schipp
7071
* @author Mahmoud Ben Hassine
7172
* @author Jinwoo Bae
72-
*
73+
* @author Yejeong Ham
7374
*/
7475
@SuppressWarnings("removal")
7576
class TaskExecutorJobOperatorTests {
@@ -427,6 +428,52 @@ void testAbortNonStopping() {
427428
assertThrows(JobExecutionAlreadyRunningException.class, () -> jobOperator.abandon(123L));
428429
}
429430

431+
@Test
432+
void testRecover() {
433+
JobInstance jobInstance = new JobInstance(123L, job.getName());
434+
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters);
435+
jobExecution.setStatus(BatchStatus.STARTED);
436+
jobExecution.createStepExecution("step1").setStatus(BatchStatus.STARTED);
437+
when(jobRepository.getJobExecution(111L)).thenReturn(jobExecution);
438+
when(jobRepository.getLastJobExecution(jobInstance)).thenReturn(jobExecution);
439+
JobExecution recover = jobOperator.recover(jobExecution);
440+
assertEquals(BatchStatus.FAILED, recover.getStatus());
441+
assertTrue(recover.getExecutionContext().containsKey("recovered"));
442+
}
443+
444+
@Test
445+
void testRecoverStepStopping() {
446+
JobInstance jobInstance = new JobInstance(123L, job.getName());
447+
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters);
448+
jobExecution.setStatus(BatchStatus.STARTED);
449+
jobExecution.createStepExecution("step1").setStatus(BatchStatus.STOPPING);
450+
when(jobRepository.getJobExecution(111L)).thenReturn(jobExecution);
451+
when(jobRepository.getLastJobExecution(jobInstance)).thenReturn(jobExecution);
452+
JobExecution recover = jobOperator.recover(jobExecution);
453+
assertEquals(BatchStatus.FAILED, recover.getStatus());
454+
assertTrue(recover.getExecutionContext().containsKey("recovered"));
455+
}
456+
457+
@Test
458+
void testRecoverJobAbandon() {
459+
JobInstance jobInstance = new JobInstance(123L, job.getName());
460+
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters);
461+
jobExecution.setStatus(BatchStatus.ABANDONED);
462+
when(jobRepository.getJobExecution(111L)).thenReturn(jobExecution);
463+
when(jobRepository.getLastJobExecution(jobInstance)).thenReturn(jobExecution);
464+
assertThrows(UnexpectedJobExecutionException.class, () -> jobOperator.recover(jobExecution));
465+
}
466+
467+
@Test
468+
void testRecoverJobCompleted() {
469+
JobInstance jobInstance = new JobInstance(123L, job.getName());
470+
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters);
471+
jobExecution.setStatus(BatchStatus.COMPLETED);
472+
when(jobRepository.getJobExecution(111L)).thenReturn(jobExecution);
473+
when(jobRepository.getLastJobExecution(jobInstance)).thenReturn(jobExecution);
474+
assertThrows(UnexpectedJobExecutionException.class, () -> jobOperator.recover(jobExecution));
475+
}
476+
430477
static class MockJob extends AbstractJob {
431478

432479
private TaskletStep taskletStep;

0 commit comments

Comments
 (0)