Skip to content

Commit dcc0c23

Browse files
committed
Overhaul MongoDB Dao to fix various bugs
Now it's covered by tests copied from JDBC implementation. See GH-4896 Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
1 parent fc9cc6f commit dcc0c23

File tree

9 files changed

+1271
-43
lines changed

9 files changed

+1271
-43
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/job/parameters/JobParameters.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.time.LocalDate;
2121
import java.time.LocalDateTime;
2222
import java.time.LocalTime;
23+
import java.time.ZoneId;
2324
import java.util.ArrayList;
2425
import java.util.Collections;
2526
import java.util.Date;
@@ -45,6 +46,7 @@
4546
* @author Michael Minella
4647
* @author Mahmoud Ben Hassine
4748
* @author Taeik Lim
49+
* @author Yanming Zhou
4850
* @since 1.0
4951
*/
5052
public class JobParameters implements Serializable {
@@ -222,6 +224,10 @@ public LocalDate getLocalDate(String key) {
222224
if (!jobParameter.getType().equals(LocalDate.class)) {
223225
throw new IllegalArgumentException("Key " + key + " is not of type java.time.LocalDate");
224226
}
227+
if (jobParameter.getValue() instanceof Date date) {
228+
// MongoDB restore value as Date
229+
return date.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
230+
}
225231
return (LocalDate) jobParameter.getValue();
226232
}
227233

@@ -257,6 +263,10 @@ public LocalTime getLocalTime(String key) {
257263
if (!jobParameter.getType().equals(LocalTime.class)) {
258264
throw new IllegalArgumentException("Key " + key + " is not of type java.time.LocalTime");
259265
}
266+
if (jobParameter.getValue() instanceof Date date) {
267+
// MongoDB restore value as Date
268+
return date.toInstant().atZone(ZoneId.systemDefault()).toLocalTime();
269+
}
260270
return (LocalTime) jobParameter.getValue();
261271
}
262272

@@ -292,6 +302,10 @@ public LocalDateTime getLocalDateTime(String key) {
292302
if (!jobParameter.getType().equals(LocalDateTime.class)) {
293303
throw new IllegalArgumentException("Key " + key + " is not of type java.time.LocalDateTime");
294304
}
305+
if (jobParameter.getValue() instanceof Date date) {
306+
// MongoDB restore value as Date
307+
return date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
308+
}
295309
return (LocalDateTime) jobParameter.getValue();
296310
}
297311

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
/**
3636
* @author Mahmoud Ben Hassine
37+
* @author Yanming Zhou
3738
* @since 5.2.0
3839
*/
3940
public class MongoJobExecutionDao implements JobExecutionDao {
@@ -44,6 +45,8 @@ public class MongoJobExecutionDao implements JobExecutionDao {
4445

4546
private static final String JOB_INSTANCES_COLLECTION_NAME = "BATCH_JOB_INSTANCE";
4647

48+
private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION";
49+
4750
private final MongoOperations mongoOperations;
4851

4952
private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter();
@@ -81,7 +84,8 @@ public void updateJobExecution(JobExecution jobExecution) {
8184

8285
@Override
8386
public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
84-
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
87+
Query query = query(where("jobInstanceId").is(jobInstance.getId()))
88+
.with(Sort.by(Sort.Direction.DESC, "jobExecutionId"));
8589
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
8690
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
8791
JOB_EXECUTIONS_COLLECTION_NAME);
@@ -118,7 +122,15 @@ public Set<JobExecution> findRunningJobExecutions(String jobName) {
118122
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
119123
JOB_EXECUTIONS_COLLECTION_NAME)
120124
.stream()
121-
.map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance))
125+
.map(jobExecution -> {
126+
Query stepExecutionQuery = query(where("jobExecutionId").is(jobExecution.getJobExecutionId()));
127+
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = this.mongoOperations
128+
.find(stepExecutionQuery,
129+
org.springframework.batch.core.repository.persistence.StepExecution.class,
130+
STEP_EXECUTIONS_COLLECTION_NAME);
131+
jobExecution.setStepExecutions(stepExecutions);
132+
return this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance);
133+
})
122134
.forEach(runningJobExecutions::add);
123135
}
124136
return runningJobExecutions;
@@ -133,6 +145,13 @@ public JobExecution getJobExecution(Long executionId) {
133145
if (jobExecution == null) {
134146
return null;
135147
}
148+
149+
Query stepExecutionQuery = query(where("jobExecutionId").is(executionId));
150+
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = this.mongoOperations
151+
.find(stepExecutionQuery, org.springframework.batch.core.repository.persistence.StepExecution.class,
152+
STEP_EXECUTIONS_COLLECTION_NAME);
153+
jobExecution.setStepExecutions(stepExecutions);
154+
136155
Query jobInstanceQuery = query(where("jobInstanceId").is(jobExecution.getJobInstanceId()));
137156
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne(
138157
jobInstanceQuery, org.springframework.batch.core.repository.persistence.JobInstance.class,
@@ -152,4 +171,11 @@ public void synchronizeStatus(JobExecution jobExecution) {
152171
// synchronizeStatus
153172
}
154173

174+
@Override
175+
public void deleteJobExecution(JobExecution jobExecution) {
176+
this.mongoOperations.remove(query(where("jobExecutionId").is(jobExecution.getId())),
177+
JOB_EXECUTIONS_COLLECTION_NAME);
178+
179+
}
180+
155181
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.batch.core.repository.dao.mongodb;
1717

18+
import java.util.Collections;
1819
import java.util.List;
1920

2021
import org.springframework.batch.core.job.DefaultJobKeyGenerator;
@@ -36,6 +37,7 @@
3637

3738
/**
3839
* @author Mahmoud Ben Hassine
40+
* @author Yanming Zhou
3941
* @since 5.2.0
4042
*/
4143
public class MongoJobInstanceDao implements JobInstanceDao {
@@ -117,7 +119,10 @@ public List<JobInstance> getJobInstances(String jobName, int start, int count) {
117119
org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
118120
.stream()
119121
.toList();
120-
return jobInstances.subList(start, jobInstances.size())
122+
if (jobInstances.size() <= start) {
123+
return Collections.emptyList();
124+
}
125+
return jobInstances.subList(start, Math.min(jobInstances.size(), start + jobInstances.size()))
121126
.stream()
122127
.map(this.jobInstanceConverter::toJobInstance)
123128
.limit(count)
@@ -163,4 +168,9 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException {
163168
return this.mongoOperations.count(query, COLLECTION_NAME);
164169
}
165170

171+
@Override
172+
public void deleteJobInstance(JobInstance jobInstance) {
173+
this.mongoOperations.remove(query(where("jobInstanceId").is(jobInstance.getId())), COLLECTION_NAME);
174+
}
175+
166176
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.batch.core.repository.dao.mongodb;
1717

18-
import java.util.ArrayList;
1918
import java.util.Collection;
2019
import java.util.Comparator;
2120
import java.util.List;
@@ -30,12 +29,14 @@
3029
import org.springframework.data.mongodb.core.MongoOperations;
3130
import org.springframework.data.mongodb.core.query.Query;
3231
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
32+
import org.springframework.util.Assert;
3333

3434
import static org.springframework.data.mongodb.core.query.Criteria.where;
3535
import static org.springframework.data.mongodb.core.query.Query.query;
3636

3737
/**
3838
* @author Mahmoud Ben Hassine
39+
* @author Yanming Zhou
3940
* @since 5.2.0
4041
*/
4142
public class MongoStepExecutionDao implements StepExecutionDao {
@@ -65,6 +66,10 @@ public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecuti
6566

6667
@Override
6768
public void saveStepExecution(StepExecution stepExecution) {
69+
Assert.isNull(stepExecution.getId(),
70+
"to-be-saved (not updated) StepExecution can't already have an id assigned");
71+
Assert.isNull(stepExecution.getVersion(),
72+
"to-be-saved (not updated) StepExecution can't already have a version assigned");
6873
org.springframework.batch.core.repository.persistence.StepExecution stepExecutionToSave = this.stepExecutionConverter
6974
.fromStepExecution(stepExecution);
7075
long stepExecutionId = this.stepExecutionIncrementer.nextLongValue();
@@ -75,6 +80,7 @@ public void saveStepExecution(StepExecution stepExecution) {
7580

7681
@Override
7782
public void saveStepExecutions(Collection<StepExecution> stepExecutions) {
83+
Assert.notNull(stepExecutions, "Attempt to save a null collection of step executions");
7884
for (StepExecution stepExecution : stepExecutions) {
7985
saveStepExecution(stepExecution);
8086
}
@@ -101,20 +107,21 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
101107
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
102108
// TODO optimize the query
103109
// get all step executions
104-
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = new ArrayList<>();
105110
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
106111
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
107112
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
108113
JOB_EXECUTIONS_COLLECTION_NAME);
109-
for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) {
110-
stepExecutions.addAll(jobExecution.getStepExecutions());
111-
}
114+
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = this.mongoOperations
115+
.find(query(where("jobExecutionId").in(jobExecutions.stream()
116+
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
117+
.toList())), org.springframework.batch.core.repository.persistence.StepExecution.class,
118+
STEP_EXECUTIONS_COLLECTION_NAME);
112119
// sort step executions by creation date then id (see contract) and return the
113-
// first one
120+
// last one
114121
Optional<org.springframework.batch.core.repository.persistence.StepExecution> lastStepExecution = stepExecutions
115122
.stream()
116123
.filter(stepExecution -> stepExecution.getName().equals(stepName))
117-
.min(Comparator
124+
.max(Comparator
118125
.comparing(org.springframework.batch.core.repository.persistence.StepExecution::getCreateTime)
119126
.thenComparing(org.springframework.batch.core.repository.persistence.StepExecution::getId));
120127
if (lastStepExecution.isPresent()) {
@@ -150,16 +157,18 @@ public long countStepExecutions(JobInstance jobInstance, String stepName) {
150157
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
151158
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
152159
JOB_EXECUTIONS_COLLECTION_NAME);
153-
for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) {
154-
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = jobExecution
155-
.getStepExecutions();
156-
for (org.springframework.batch.core.repository.persistence.StepExecution stepExecution : stepExecutions) {
157-
if (stepExecution.getName().equals(stepName)) {
158-
count++;
159-
}
160-
}
161-
}
162-
return count;
160+
return this.mongoOperations.count(
161+
query(where("jobExecutionId").in(jobExecutions.stream()
162+
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
163+
.toList())),
164+
org.springframework.batch.core.repository.persistence.StepExecution.class,
165+
STEP_EXECUTIONS_COLLECTION_NAME);
166+
}
167+
168+
@Override
169+
public void deleteStepExecution(StepExecution stepExecution) {
170+
this.mongoOperations.remove(query(where("stepExecutionId").is(stepExecution.getId())),
171+
STEP_EXECUTIONS_COLLECTION_NAME);
163172
}
164173

165174
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2008-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.core.repository.support;
17+
18+
import org.bson.Document;
19+
import org.junit.jupiter.api.BeforeEach;
20+
import org.springframework.batch.core.repository.JobRepository;
21+
import org.springframework.beans.factory.annotation.Autowired;
22+
import org.springframework.data.mongodb.core.MongoTemplate;
23+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
24+
import org.testcontainers.junit.jupiter.Testcontainers;
25+
26+
import java.time.LocalDateTime;
27+
import java.time.LocalTime;
28+
import java.time.temporal.ChronoUnit;
29+
import java.util.Map;
30+
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
33+
/**
34+
* @author Yanming Zhou
35+
*/
36+
@Testcontainers(disabledWithoutDocker = true)
37+
@SpringJUnitConfig(MongoDBIntegrationTestConfiguration.class)
38+
abstract class AbstractMongoDBDaoIntegrationTests {
39+
40+
@Autowired
41+
protected JobRepository repository;
42+
43+
@Autowired
44+
private MongoTemplate mongoTemplate;
45+
46+
@BeforeEach
47+
public void initializeSchema() throws Exception {
48+
// collections
49+
mongoTemplate.dropCollection("BATCH_JOB_INSTANCE");
50+
mongoTemplate.dropCollection("BATCH_JOB_EXECUTION");
51+
mongoTemplate.dropCollection("BATCH_STEP_EXECUTION");
52+
mongoTemplate.createCollection("BATCH_JOB_INSTANCE");
53+
mongoTemplate.createCollection("BATCH_JOB_EXECUTION");
54+
mongoTemplate.createCollection("BATCH_STEP_EXECUTION");
55+
// sequences
56+
mongoTemplate.dropCollection("BATCH_SEQUENCES");
57+
mongoTemplate.createCollection("BATCH_SEQUENCES");
58+
mongoTemplate.getCollection("BATCH_SEQUENCES")
59+
.insertOne(new Document(Map.of("_id", "BATCH_JOB_INSTANCE_SEQ", "count", 0L)));
60+
mongoTemplate.getCollection("BATCH_SEQUENCES")
61+
.insertOne(new Document(Map.of("_id", "BATCH_JOB_EXECUTION_SEQ", "count", 0L)));
62+
mongoTemplate.getCollection("BATCH_SEQUENCES")
63+
.insertOne(new Document(Map.of("_id", "BATCH_STEP_EXECUTION_SEQ", "count", 0L)));
64+
}
65+
66+
protected void assertTemporalEquals(LocalDateTime lhs, LocalDateTime rhs) {
67+
assertEquals(lhs != null ? lhs.truncatedTo(ChronoUnit.MILLIS) : lhs,
68+
rhs != null ? rhs.truncatedTo(ChronoUnit.MILLIS) : null);
69+
}
70+
71+
protected void assertTemporalEquals(LocalTime lhs, LocalTime rhs) {
72+
assertEquals(lhs != null ? lhs.truncatedTo(ChronoUnit.MILLIS) : lhs,
73+
rhs != null ? rhs.truncatedTo(ChronoUnit.MILLIS) : null);
74+
}
75+
76+
}

spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoExecutionContextDaoIntegrationTests.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
package org.springframework.batch.core.repository.support;
1717

1818
import java.time.LocalDateTime;
19-
import java.util.Map;
2019

21-
import org.bson.Document;
22-
import org.junit.jupiter.api.BeforeAll;
2320
import org.junit.jupiter.api.Test;
2421
import org.springframework.batch.core.job.Job;
2522
import org.springframework.batch.core.job.JobExecution;
@@ -35,10 +32,8 @@
3532
import org.springframework.context.annotation.Bean;
3633
import org.springframework.context.annotation.Configuration;
3734
import org.springframework.data.mongodb.core.MongoOperations;
38-
import org.springframework.data.mongodb.core.MongoTemplate;
3935
import org.springframework.test.annotation.DirtiesContext;
40-
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
41-
import org.testcontainers.junit.jupiter.Testcontainers;
36+
import org.springframework.test.context.ContextConfiguration;
4237

4338
import static org.junit.jupiter.api.Assertions.assertEquals;
4439
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -49,23 +44,8 @@
4944
* @author Yanming Zhou
5045
*/
5146
@DirtiesContext
52-
@Testcontainers(disabledWithoutDocker = true)
53-
@SpringJUnitConfig({ MongoDBIntegrationTestConfiguration.class, ExecutionContextDaoConfiguration.class })
54-
public class MongoExecutionContextDaoIntegrationTests {
55-
56-
@BeforeAll
57-
static void setUp(@Autowired MongoTemplate mongoTemplate) {
58-
mongoTemplate.createCollection("BATCH_JOB_INSTANCE");
59-
mongoTemplate.createCollection("BATCH_JOB_EXECUTION");
60-
mongoTemplate.createCollection("BATCH_STEP_EXECUTION");
61-
mongoTemplate.createCollection("BATCH_SEQUENCES");
62-
mongoTemplate.getCollection("BATCH_SEQUENCES")
63-
.insertOne(new Document(Map.of("_id", "BATCH_JOB_INSTANCE_SEQ", "count", 0L)));
64-
mongoTemplate.getCollection("BATCH_SEQUENCES")
65-
.insertOne(new Document(Map.of("_id", "BATCH_JOB_EXECUTION_SEQ", "count", 0L)));
66-
mongoTemplate.getCollection("BATCH_SEQUENCES")
67-
.insertOne(new Document(Map.of("_id", "BATCH_STEP_EXECUTION_SEQ", "count", 0L)));
68-
}
47+
@ContextConfiguration(classes = ExecutionContextDaoConfiguration.class)
48+
public class MongoExecutionContextDaoIntegrationTests extends AbstractMongoDBDaoIntegrationTests {
6949

7050
@Test
7151
void testGetJobExecutionWithEmptyResult(@Autowired ExecutionContextDao executionContextDao) {

0 commit comments

Comments
 (0)