Skip to content

Commit 31469a3

Browse files
committed
Overhaul MongoStepExecutionDao
1. Implement method `deleteStepExecution()` 2. Fix NPE if stepExecutionId not exists when get step execution 3. Fix count step executions by step name 4. Introduce MongoStepExecutionDaoIntegrationTests Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
1 parent 7130e2f commit 31469a3

File tree

2 files changed

+222
-15
lines changed

2 files changed

+222
-15
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
/**
3838
* @author Mahmoud Ben Hassine
39+
* @author Yanming Zhou
3940
* @since 5.2.0
4041
*/
4142
public class MongoStepExecutionDao implements StepExecutionDao {
@@ -95,8 +96,8 @@ public StepExecution getStepExecution(long stepExecutionId) {
9596
org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations
9697
.findOne(query, org.springframework.batch.core.repository.persistence.StepExecution.class,
9798
STEP_EXECUTIONS_COLLECTION_NAME);
98-
JobExecution jobExecution = jobExecutionDao.getJobExecution(stepExecution.getJobExecutionId());
99-
return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution) : null;
99+
return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution,
100+
jobExecutionDao.getJobExecution(stepExecution.getJobExecutionId())) : null;
100101
}
101102

102103
@Deprecated(since = "6.0", forRemoval = true)
@@ -163,24 +164,22 @@ public List<StepExecution> getStepExecutions(JobExecution jobExecution) {
163164

164165
@Override
165166
public long countStepExecutions(JobInstance jobInstance, String stepName) {
166-
long count = 0;
167-
// TODO optimize the count query
168167
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
169168
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
170169
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
171170
JOB_EXECUTIONS_COLLECTION_NAME);
172-
for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) {
173-
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = jobExecution
174-
.getStepExecutions();
175-
for (org.springframework.batch.core.repository.persistence.StepExecution stepExecution : stepExecutions) {
176-
if (stepExecution.getName().equals(stepName)) {
177-
count++;
178-
}
179-
}
180-
}
181-
return count;
171+
return this.mongoOperations.count(
172+
query(where("jobExecutionId").in(jobExecutions.stream()
173+
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
174+
.toList())),
175+
org.springframework.batch.core.repository.persistence.StepExecution.class,
176+
STEP_EXECUTIONS_COLLECTION_NAME);
182177
}
183178

184-
// TODO implement deleteStepExecution(StepExecution stepExecution)
179+
@Override
180+
public void deleteStepExecution(StepExecution stepExecution) {
181+
this.mongoOperations.remove(query(where("stepExecutionId").is(stepExecution.getId())),
182+
STEP_EXECUTIONS_COLLECTION_NAME);
183+
}
185184

186185
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Copyright 2008-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.core.repository.support;
17+
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.Disabled;
20+
import org.junit.jupiter.api.Test;
21+
import org.springframework.batch.core.BatchStatus;
22+
import org.springframework.batch.core.job.JobExecution;
23+
import org.springframework.batch.core.job.JobInstance;
24+
import org.springframework.batch.core.job.parameters.JobParameters;
25+
import org.springframework.batch.core.repository.dao.JobExecutionDao;
26+
import org.springframework.batch.core.repository.dao.JobInstanceDao;
27+
import org.springframework.batch.core.repository.dao.StepExecutionDao;
28+
import org.springframework.batch.core.step.StepExecution;
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.dao.OptimisticLockingFailureException;
31+
32+
import java.time.LocalDateTime;
33+
import java.time.temporal.ChronoUnit;
34+
35+
import static org.junit.jupiter.api.Assertions.*;
36+
37+
/**
38+
* @author Yanming Zhou
39+
*/
40+
class MongoStepExecutionDaoIntegrationTests extends AbstractMongoDBDaoIntegrationTests {
41+
42+
@Autowired
43+
private StepExecutionDao dao;
44+
45+
private JobInstance jobInstance;
46+
47+
private JobExecution jobExecution;
48+
49+
@BeforeEach
50+
public void setUp(@Autowired JobInstanceDao jobInstanceDao, @Autowired JobExecutionDao jobExecutionDao)
51+
throws Exception {
52+
JobParameters jobParameters = new JobParameters();
53+
jobInstance = jobInstanceDao.createJobInstance("execTestJob", jobParameters);
54+
jobExecution = jobExecutionDao.createJobExecution(jobInstance, new JobParameters());
55+
}
56+
57+
@Test
58+
void testSaveAndGetExecution() {
59+
60+
StepExecution stepExecution = dao.createStepExecution("step", jobExecution);
61+
62+
stepExecution.setStatus(BatchStatus.STARTED);
63+
stepExecution.setReadSkipCount(7);
64+
stepExecution.setProcessSkipCount(2);
65+
stepExecution.setWriteSkipCount(5);
66+
stepExecution.setProcessSkipCount(11);
67+
stepExecution.setRollbackCount(3);
68+
stepExecution.setLastUpdated(LocalDateTime.now());
69+
stepExecution.setReadCount(17);
70+
stepExecution.setFilterCount(15);
71+
stepExecution.setWriteCount(13);
72+
dao.updateStepExecution(stepExecution);
73+
74+
StepExecution retrieved = dao.getStepExecution(stepExecution.getId());
75+
assertNotNull(retrieved);
76+
77+
assertStepExecutionsAreEqual(stepExecution, retrieved);
78+
assertNotNull(retrieved.getJobExecution());
79+
assertNotNull(retrieved.getJobExecution().getId());
80+
assertNotNull(retrieved.getJobExecution().getJobInstance());
81+
82+
}
83+
84+
@Test
85+
void testSaveAndGetLastExecution() {
86+
LocalDateTime now = LocalDateTime.now();
87+
StepExecution stepExecution1 = dao.createStepExecution("step1", jobExecution);
88+
stepExecution1.setStartTime(now);
89+
dao.updateStepExecution(stepExecution1);
90+
91+
StepExecution stepExecution2 = dao.createStepExecution("step1", jobExecution);
92+
stepExecution2.setStartTime(now.plus(500, ChronoUnit.MILLIS));
93+
dao.updateStepExecution(stepExecution2);
94+
95+
StepExecution lastStepExecution = dao.getLastStepExecution(jobInstance, "step1");
96+
assertNotNull(lastStepExecution);
97+
assertStepExecutionsAreEqual(stepExecution2, lastStepExecution);
98+
}
99+
100+
@Test
101+
void testSaveAndGetLastExecutionWhenSameStartTime() {
102+
LocalDateTime now = LocalDateTime.now();
103+
StepExecution stepExecution1 = dao.createStepExecution("step1", jobExecution);
104+
stepExecution1.setStartTime(now);
105+
dao.updateStepExecution(stepExecution1);
106+
107+
StepExecution stepExecution2 = dao.createStepExecution("step1", jobExecution);
108+
stepExecution2.setStartTime(now);
109+
dao.updateStepExecution(stepExecution2);
110+
111+
StepExecution lastStepExecution = stepExecution1.getId() > stepExecution2.getId() ? stepExecution1
112+
: stepExecution2;
113+
StepExecution retrieved = dao.getLastStepExecution(jobInstance, "step1");
114+
assertNotNull(retrieved);
115+
assertEquals(lastStepExecution.getId(), retrieved.getId());
116+
}
117+
118+
@Test
119+
void testGetForNotExistingJobExecution() {
120+
assertNull(dao.getStepExecution(45677L));
121+
}
122+
123+
/**
124+
* Update and retrieve updated StepExecution - make sure the update is reflected as
125+
* expected and version number has been incremented
126+
*/
127+
@Test
128+
void testUpdateExecution() {
129+
StepExecution stepExecution = dao.createStepExecution("step1", jobExecution);
130+
131+
stepExecution.setStatus(BatchStatus.ABANDONED);
132+
stepExecution.setLastUpdated(LocalDateTime.now());
133+
dao.updateStepExecution(stepExecution);
134+
135+
StepExecution retrieved = dao.getStepExecution(stepExecution.getId());
136+
assertNotNull(retrieved);
137+
assertEquals(stepExecution, retrieved);
138+
assertTemporalEquals(stepExecution.getLastUpdated(), retrieved.getLastUpdated());
139+
assertEquals(BatchStatus.ABANDONED, retrieved.getStatus());
140+
}
141+
142+
/**
143+
* Exception should be raised when the version of update argument doesn't match the
144+
* version of persisted entity.
145+
*/
146+
@Disabled("Not supported yet")
147+
@Test
148+
void testConcurrentModificationException() {
149+
150+
StepExecution exec1 = dao.createStepExecution("step", jobExecution);
151+
152+
StepExecution exec2 = dao.getStepExecution(exec1.getId());
153+
assertNotNull(exec2);
154+
155+
assertEquals(Integer.valueOf(0), exec1.getVersion());
156+
assertEquals(exec1.getVersion(), exec2.getVersion());
157+
158+
dao.updateStepExecution(exec1);
159+
assertEquals(Integer.valueOf(1), exec1.getVersion());
160+
161+
assertThrows(OptimisticLockingFailureException.class, () -> dao.updateStepExecution(exec2));
162+
}
163+
164+
private void assertStepExecutionsAreEqual(StepExecution expected, StepExecution actual) {
165+
assertEquals(expected.getId(), actual.getId());
166+
assertTemporalEquals(expected.getStartTime(), actual.getStartTime());
167+
assertTemporalEquals(expected.getEndTime(), actual.getEndTime());
168+
assertEquals(expected.getSkipCount(), actual.getSkipCount());
169+
assertEquals(expected.getCommitCount(), actual.getCommitCount());
170+
assertEquals(expected.getReadCount(), actual.getReadCount());
171+
assertEquals(expected.getWriteCount(), actual.getWriteCount());
172+
assertEquals(expected.getFilterCount(), actual.getFilterCount());
173+
assertEquals(expected.getWriteSkipCount(), actual.getWriteSkipCount());
174+
assertEquals(expected.getReadSkipCount(), actual.getReadSkipCount());
175+
assertEquals(expected.getProcessSkipCount(), actual.getProcessSkipCount());
176+
assertEquals(expected.getRollbackCount(), actual.getRollbackCount());
177+
assertEquals(expected.getExitStatus(), actual.getExitStatus());
178+
assertTemporalEquals(expected.getLastUpdated(), actual.getLastUpdated());
179+
assertEquals(expected.getExitStatus(), actual.getExitStatus());
180+
assertEquals(expected.getJobExecutionId(), actual.getJobExecutionId());
181+
assertTemporalEquals(expected.getCreateTime(), actual.getCreateTime());
182+
}
183+
184+
@Test
185+
void testCountStepExecutions() {
186+
// Given
187+
StepExecution stepExecution = dao.createStepExecution("step", jobExecution);
188+
189+
// When
190+
long result = dao.countStepExecutions(jobInstance, stepExecution.getStepName());
191+
192+
// Then
193+
assertEquals(1, result);
194+
}
195+
196+
@Test
197+
void testDeleteStepExecution() {
198+
// Given
199+
StepExecution stepExecution = dao.createStepExecution("step", jobExecution);
200+
201+
// When
202+
dao.deleteStepExecution(stepExecution);
203+
204+
// Then
205+
assertNull(dao.getStepExecution(stepExecution.getId()));
206+
}
207+
208+
}

0 commit comments

Comments
 (0)