Skip to content

Commit 7130e2f

Browse files
committed
Overhaul MongoJobExecutionDao
1. Implement method `deleteJobExecution()` 2. Fix find job executions ordering 3. Convert Date to LocalDate/LocalTime/LocalDateTime 4. Introduce MongoJobExecutionDaoIntegrationTests Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
1 parent 270867b commit 7130e2f

File tree

4 files changed

+445
-8
lines changed

4 files changed

+445
-8
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
*/
1616
package org.springframework.batch.core.repository.dao.mongodb;
1717

18+
import java.time.LocalDate;
19+
import java.time.LocalDateTime;
20+
import java.time.LocalTime;
21+
import java.time.ZoneId;
22+
import java.util.Date;
1823
import java.util.HashSet;
1924
import java.util.List;
2025
import java.util.Set;
@@ -23,17 +28,20 @@
2328
import org.springframework.batch.core.job.JobInstance;
2429
import org.springframework.batch.core.job.parameters.JobParameters;
2530
import org.springframework.batch.core.repository.dao.JobExecutionDao;
31+
import org.springframework.batch.core.repository.persistence.JobParameter;
2632
import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter;
2733
import org.springframework.data.domain.Sort;
2834
import org.springframework.data.mongodb.core.MongoOperations;
2935
import org.springframework.data.mongodb.core.query.Query;
3036
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
37+
import org.springframework.util.CollectionUtils;
3138

3239
import static org.springframework.data.mongodb.core.query.Criteria.where;
3340
import static org.springframework.data.mongodb.core.query.Query.query;
3441

3542
/**
3643
* @author Mahmoud Ben Hassine
44+
* @author Yanming Zhou
3745
* @since 5.2.0
3846
*/
3947
public class MongoJobExecutionDao implements JobExecutionDao {
@@ -84,13 +92,12 @@ public void updateJobExecution(JobExecution jobExecution) {
8492

8593
@Override
8694
public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
87-
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
95+
Query query = query(where("jobInstanceId").is(jobInstance.getId()))
96+
.with(Sort.by(Sort.Direction.DESC, "jobExecutionId"));
8897
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
8998
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
9099
JOB_EXECUTIONS_COLLECTION_NAME);
91-
return jobExecutions.stream()
92-
.map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance))
93-
.toList();
100+
return jobExecutions.stream().map(jobExecution -> convert(jobExecution, jobInstance)).toList();
94101
}
95102

96103
@Override
@@ -101,7 +108,7 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) {
101108
query.with(Sort.by(sortOrder)),
102109
org.springframework.batch.core.repository.persistence.JobExecution.class,
103110
JOB_EXECUTIONS_COLLECTION_NAME);
104-
return jobExecution != null ? this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance) : null;
111+
return jobExecution != null ? convert(jobExecution, jobInstance) : null;
105112
}
106113

107114
@Override
@@ -115,7 +122,7 @@ public Set<JobExecution> findRunningJobExecutions(String jobName) {
115122
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
116123
JOB_EXECUTIONS_COLLECTION_NAME)
117124
.stream()
118-
.map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance))
125+
.map(jobExecution -> convert(jobExecution, jobInstance))
119126
.forEach(runningJobExecutions::add);
120127
}
121128
return runningJobExecutions;
@@ -132,7 +139,7 @@ public JobExecution getJobExecution(long executionId) {
132139
}
133140
org.springframework.batch.core.job.JobInstance jobInstance = this.jobInstanceDao
134141
.getJobInstance(jobExecution.getJobInstanceId());
135-
return this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance);
142+
return convert(jobExecution, jobInstance);
136143
}
137144

138145
@Override
@@ -146,4 +153,43 @@ public void synchronizeStatus(JobExecution jobExecution) {
146153
// synchronizeStatus
147154
}
148155

156+
@Override
157+
public void deleteJobExecution(JobExecution jobExecution) {
158+
this.mongoOperations.remove(query(where("jobExecutionId").is(jobExecution.getId())),
159+
JOB_EXECUTIONS_COLLECTION_NAME);
160+
161+
}
162+
163+
private JobExecution convert(org.springframework.batch.core.repository.persistence.JobExecution jobExecution,
164+
org.springframework.batch.core.job.JobInstance jobInstance) {
165+
Set<JobParameter<?>> parameters = jobExecution.getJobParameters();
166+
if (!CollectionUtils.isEmpty(parameters)) {
167+
// MongoDB restore temporal value as Date
168+
Set<JobParameter<?>> converted = new HashSet<>();
169+
for (JobParameter<?> parameter : parameters) {
170+
if (LocalDate.class.getName().equals(parameter.type()) && parameter.value() instanceof Date date) {
171+
converted.add(new JobParameter<>(parameter.name(),
172+
date.toInstant().atZone(ZoneId.systemDefault()).toLocalDate(), parameter.type(),
173+
parameter.identifying()));
174+
}
175+
else if (LocalTime.class.getName().equals(parameter.type()) && parameter.value() instanceof Date date) {
176+
converted.add(new JobParameter<>(parameter.name(),
177+
date.toInstant().atZone(ZoneId.systemDefault()).toLocalTime(), parameter.type(),
178+
parameter.identifying()));
179+
}
180+
else if (LocalDateTime.class.getName().equals(parameter.type())
181+
&& parameter.value() instanceof Date date) {
182+
converted.add(new JobParameter<>(parameter.name(),
183+
date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(), parameter.type(),
184+
parameter.identifying()));
185+
}
186+
else {
187+
converted.add(parameter);
188+
}
189+
}
190+
jobExecution.setJobParameters(converted);
191+
}
192+
return this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance);
193+
}
194+
149195
}

spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/AbstractMongoDBDaoIntegrationTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@
3333
import java.io.IOException;
3434
import java.nio.file.Files;
3535
import java.util.stream.Stream;
36+
import java.time.LocalDateTime;
37+
import java.time.LocalTime;
38+
import java.time.temporal.ChronoUnit;
39+
40+
import static org.junit.jupiter.api.Assertions.*;
3641

3742
/**
3843
* @author Yanming Zhou
@@ -59,6 +64,16 @@ void setUp(@Autowired MongoTemplate mongoTemplate) throws IOException {
5964
}
6065
}
6166

67+
protected void assertTemporalEquals(LocalDateTime lhs, LocalDateTime rhs) {
68+
assertEquals(lhs != null ? lhs.truncatedTo(ChronoUnit.MILLIS) : lhs,
69+
rhs != null ? rhs.truncatedTo(ChronoUnit.MILLIS) : null);
70+
}
71+
72+
protected void assertTemporalEquals(LocalTime lhs, LocalTime rhs) {
73+
assertEquals(lhs != null ? lhs.truncatedTo(ChronoUnit.MILLIS) : lhs,
74+
rhs != null ? rhs.truncatedTo(ChronoUnit.MILLIS) : null);
75+
}
76+
6277
@Configuration
6378
static class MongoDBDaoConfiguration {
6479

0 commit comments

Comments
 (0)