Skip to content

Commit aa584e3

Browse files
committed
Fix MongoDB job restart failure after abrupt shutdown
Fixes bug where job restart fails after abrupt shutdown because getLastStepExecution() only checks the empty stepExecutions array in BATCH_JOB_EXECUTION instead of the BATCH_STEP_EXECUTION collection. Changes: - getLastStepExecution(): Query BATCH_STEP_EXECUTION collection directly - countStepExecutions(): Query collection instead of embedded array - Align with JDBC implementation behavior Signed-off-by: baezzys <wlsdn3578@gmail.com>
1 parent ae5767a commit aa584e3

File tree

3 files changed

+126
-43
lines changed

3 files changed

+126
-43
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,16 @@
1515
*/
1616
package org.springframework.batch.core.repository.dao.mongodb;
1717

18-
import java.util.ArrayList;
1918
import java.util.Collection;
20-
import java.util.Comparator;
2119
import java.util.List;
22-
import java.util.Optional;
2320

2421
import org.springframework.batch.core.job.JobExecution;
2522
import org.springframework.batch.core.job.JobInstance;
2623
import org.springframework.batch.core.step.StepExecution;
2724
import org.springframework.batch.core.repository.dao.StepExecutionDao;
2825
import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter;
2926
import org.springframework.batch.core.repository.persistence.converter.StepExecutionConverter;
27+
import org.springframework.data.domain.Sort;
3028
import org.springframework.data.mongodb.core.MongoOperations;
3129
import org.springframework.data.mongodb.core.query.Query;
3230
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
@@ -36,6 +34,7 @@
3634

3735
/**
3836
* @author Mahmoud Ben Hassine
37+
* @author Jinwoo Bae
3938
* @since 5.2.0
4039
*/
4140
public class MongoStepExecutionDao implements StepExecutionDao {
@@ -100,34 +99,42 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
10099
@Override
101100
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
102101
// TODO optimize the query
103-
// get all step executions
104-
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = new ArrayList<>();
105-
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
102+
Query jobExecutionQuery = query(where("jobInstanceId").is(jobInstance.getId()));
106103
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
107-
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
104+
.find(jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class,
108105
JOB_EXECUTIONS_COLLECTION_NAME);
109-
for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) {
110-
stepExecutions.addAll(jobExecution.getStepExecutions());
111-
}
112-
// sort step executions by creation date then id (see contract) and return the
113-
// first one
114-
Optional<org.springframework.batch.core.repository.persistence.StepExecution> lastStepExecution = stepExecutions
115-
.stream()
116-
.filter(stepExecution -> stepExecution.getName().equals(stepName))
117-
.min(Comparator
118-
.comparing(org.springframework.batch.core.repository.persistence.StepExecution::getCreateTime)
119-
.thenComparing(org.springframework.batch.core.repository.persistence.StepExecution::getId));
120-
if (lastStepExecution.isPresent()) {
121-
org.springframework.batch.core.repository.persistence.StepExecution stepExecution = lastStepExecution.get();
122-
JobExecution jobExecution = this.jobExecutionConverter.toJobExecution(jobExecutions.stream()
123-
.filter(execution -> execution.getJobExecutionId().equals(stepExecution.getJobExecutionId()))
124-
.findFirst()
125-
.get(), jobInstance);
126-
return this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution);
106+
107+
if (jobExecutions.isEmpty()) {
108+
return null;
127109
}
128-
else {
110+
111+
List<Long> jobExecutionIds = jobExecutions.stream()
112+
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
113+
.toList();
114+
115+
Query stepExecutionQuery = query(where("name").is(stepName).and("jobExecutionId").in(jobExecutionIds))
116+
.with(Sort.by(Sort.Direction.DESC, "createTime", "stepExecutionId"))
117+
.limit(1);
118+
119+
org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations
120+
.findOne(stepExecutionQuery, org.springframework.batch.core.repository.persistence.StepExecution.class,
121+
STEP_EXECUTIONS_COLLECTION_NAME);
122+
123+
if (stepExecution == null) {
129124
return null;
130125
}
126+
127+
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = jobExecutions.stream()
128+
.filter(execution -> execution.getJobExecutionId().equals(stepExecution.getJobExecutionId()))
129+
.findFirst()
130+
.orElse(null);
131+
132+
if (jobExecution != null) {
133+
JobExecution jobExecutionDomain = this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance);
134+
return this.stepExecutionConverter.toStepExecution(stepExecution, jobExecutionDomain);
135+
}
136+
137+
return null;
131138
}
132139

133140
@Override
@@ -144,22 +151,23 @@ public void addStepExecutions(JobExecution jobExecution) {
144151

145152
@Override
146153
public long countStepExecutions(JobInstance jobInstance, String stepName) {
147-
long count = 0;
148-
// TODO optimize the count query
149-
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
150-
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
151-
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
152-
JOB_EXECUTIONS_COLLECTION_NAME);
153-
for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) {
154-
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = jobExecution
155-
.getStepExecutions();
156-
for (org.springframework.batch.core.repository.persistence.StepExecution stepExecution : stepExecutions) {
157-
if (stepExecution.getName().equals(stepName)) {
158-
count++;
159-
}
160-
}
154+
Query jobExecutionQuery = query(where("jobInstanceId").is(jobInstance.getId()));
155+
List<Long> jobExecutionIds = this.mongoOperations
156+
.find(jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class,
157+
JOB_EXECUTIONS_COLLECTION_NAME)
158+
.stream()
159+
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
160+
.toList();
161+
162+
if (jobExecutionIds.isEmpty()) {
163+
return 0;
161164
}
162-
return count;
165+
166+
// Count step executions directly from BATCH_STEP_EXECUTION collection
167+
Query stepQuery = query(where("name").is(stepName).and("jobExecutionId").in(jobExecutionIds));
168+
return this.mongoOperations.count(stepQuery,
169+
org.springframework.batch.core.repository.persistence.StepExecution.class,
170+
STEP_EXECUTIONS_COLLECTION_NAME);
163171
}
164172

165173
}

spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBIntegrationTestConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
2020
import org.springframework.batch.core.job.builder.JobBuilder;
2121
import org.springframework.batch.core.repository.JobRepository;
22+
import org.springframework.batch.core.repository.dao.StepExecutionDao;
2223
import org.springframework.batch.core.step.builder.StepBuilder;
2324
import org.springframework.batch.repeat.RepeatStatus;
2425
import org.springframework.context.annotation.Bean;
@@ -56,6 +57,11 @@ public JobRepository jobRepository(MongoTemplate mongoTemplate, MongoTransaction
5657
return jobRepositoryFactoryBean.getObject();
5758
}
5859

60+
@Bean
61+
public StepExecutionDao stepExecutionDao(MongoTemplate mongoTemplate) {
62+
return new org.springframework.batch.core.repository.dao.mongodb.MongoStepExecutionDao(mongoTemplate);
63+
}
64+
5965
@Bean
6066
public MongoDatabaseFactory mongoDatabaseFactory(MongoDBContainer mongoDBContainer) {
6167
return new SimpleMongoClientDatabaseFactory(mongoDBContainer.getConnectionString() + "/test");

spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoDBJobRepositoryIntegrationTests.java

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.batch.core.repository.support;
1717

1818
import java.time.LocalDateTime;
19+
import java.util.Collections;
1920
import java.util.Map;
2021

2122
import com.mongodb.client.MongoCollection;
@@ -29,17 +30,26 @@
2930
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
3031
import org.testcontainers.junit.jupiter.Testcontainers;
3132

33+
import org.springframework.batch.core.BatchStatus;
3234
import org.springframework.batch.core.ExitStatus;
3335
import org.springframework.batch.core.job.Job;
3436
import org.springframework.batch.core.job.JobExecution;
37+
import org.springframework.batch.core.job.JobInstance;
3538
import org.springframework.batch.core.job.parameters.JobParameters;
3639
import org.springframework.batch.core.job.parameters.JobParametersBuilder;
3740
import org.springframework.batch.core.launch.JobOperator;
41+
import org.springframework.batch.core.repository.JobRepository;
42+
import org.springframework.batch.core.repository.dao.StepExecutionDao;
43+
import org.springframework.batch.core.step.StepExecution;
3844
import org.springframework.beans.factory.annotation.Autowired;
3945
import org.springframework.data.mongodb.core.MongoTemplate;
46+
import org.springframework.data.mongodb.core.query.Criteria;
47+
import org.springframework.data.mongodb.core.query.Query;
48+
import org.springframework.data.mongodb.core.query.Update;
4049

4150
/**
4251
* @author Mahmoud Ben Hassine
52+
* @author Jinwoo Bae
4353
* @author Yanming Zhou
4454
*/
4555
@DirtiesContext
@@ -53,18 +63,25 @@ public class MongoDBJobRepositoryIntegrationTests {
5363
@SuppressWarnings("removal")
5464
@BeforeEach
5565
public void setUp() {
56-
// collections
66+
// Clear existing collections to ensure clean state
67+
mongoTemplate.getCollection("BATCH_JOB_INSTANCE").drop();
68+
mongoTemplate.getCollection("BATCH_JOB_EXECUTION").drop();
69+
mongoTemplate.getCollection("BATCH_STEP_EXECUTION").drop();
70+
mongoTemplate.getCollection("BATCH_SEQUENCES").drop();
71+
72+
// sequences
5773
mongoTemplate.createCollection("BATCH_JOB_INSTANCE");
5874
mongoTemplate.createCollection("BATCH_JOB_EXECUTION");
5975
mongoTemplate.createCollection("BATCH_STEP_EXECUTION");
60-
// sequences
6176
mongoTemplate.createCollection("BATCH_SEQUENCES");
77+
6278
mongoTemplate.getCollection("BATCH_SEQUENCES")
6379
.insertOne(new Document(Map.of("_id", "BATCH_JOB_INSTANCE_SEQ", "count", 0L)));
6480
mongoTemplate.getCollection("BATCH_SEQUENCES")
6581
.insertOne(new Document(Map.of("_id", "BATCH_JOB_EXECUTION_SEQ", "count", 0L)));
6682
mongoTemplate.getCollection("BATCH_SEQUENCES")
6783
.insertOne(new Document(Map.of("_id", "BATCH_STEP_EXECUTION_SEQ", "count", 0L)));
84+
6885
// indices
6986
mongoTemplate.indexOps("BATCH_JOB_INSTANCE")
7087
.ensureIndex(new Index().on("jobName", Sort.Direction.ASC).named("job_name_idx"));
@@ -112,6 +129,58 @@ void testJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) th
112129
dump(stepExecutionsCollection, "step execution = ");
113130
}
114131

132+
/**
133+
* Test for GitHub issue #4943: getLastStepExecution should work when JobExecution's
134+
* embedded stepExecutions array is empty.
135+
*
136+
* <p>
137+
* This can happen after abrupt shutdown when the embedded stepExecutions array is not
138+
* synchronized, but BATCH_STEP_EXECUTION collection still contains the data.
139+
*
140+
*/
141+
@Test
142+
void testGetLastStepExecutionWithEmptyEmbeddedArray(@Autowired JobOperator jobOperator, @Autowired Job job,
143+
@Autowired StepExecutionDao stepExecutionDao) throws Exception {
144+
// Step 1: Run job normally
145+
JobParameters jobParameters = new JobParametersBuilder().addString("name", "emptyArrayTest")
146+
.addLocalDateTime("runtime", LocalDateTime.now())
147+
.toJobParameters();
148+
149+
JobExecution jobExecution = jobOperator.start(job, jobParameters);
150+
JobInstance jobInstance = jobExecution.getJobInstance();
151+
152+
// Verify job completed successfully
153+
Assertions.assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
154+
155+
// Step 2: Simulate the core issue - clear embedded stepExecutions array
156+
// while keeping BATCH_STEP_EXECUTION collection intact
157+
Query jobQuery = new Query(Criteria.where("jobExecutionId").is(jobExecution.getId()));
158+
Update jobUpdate = new Update().set("stepExecutions", Collections.emptyList());
159+
mongoTemplate.updateFirst(jobQuery, jobUpdate, "BATCH_JOB_EXECUTION");
160+
161+
// Step 3: Verify embedded array is empty but collection still has data
162+
MongoCollection<Document> jobExecutionsCollection = mongoTemplate.getCollection("BATCH_JOB_EXECUTION");
163+
MongoCollection<Document> stepExecutionsCollection = mongoTemplate.getCollection("BATCH_STEP_EXECUTION");
164+
165+
Document jobDoc = jobExecutionsCollection.find(new Document("jobExecutionId", jobExecution.getId())).first();
166+
Assertions.assertTrue(jobDoc.getList("stepExecutions", Document.class).isEmpty(),
167+
"Embedded stepExecutions array should be empty");
168+
Assertions.assertEquals(2, stepExecutionsCollection.countDocuments(),
169+
"BATCH_STEP_EXECUTION collection should still contain data");
170+
171+
// Step 4: Test the fix - getLastStepExecution should work despite empty embedded
172+
// array
173+
StepExecution lastStepExecution = stepExecutionDao.getLastStepExecution(jobInstance, "step1");
174+
Assertions.assertNotNull(lastStepExecution,
175+
"getLastStepExecution should find step execution even with empty embedded array");
176+
Assertions.assertEquals("step1", lastStepExecution.getStepName());
177+
Assertions.assertEquals(BatchStatus.COMPLETED, lastStepExecution.getStatus());
178+
179+
// Step 5: Test countStepExecutions also works
180+
long stepCount = stepExecutionDao.countStepExecutions(jobInstance, "step1");
181+
Assertions.assertEquals(1L, stepCount, "countStepExecutions should work despite empty embedded array");
182+
}
183+
115184
private static void dump(MongoCollection<Document> collection, String prefix) {
116185
for (Document document : collection.find()) {
117186
System.out.println(prefix + document.toJson());

0 commit comments

Comments
 (0)