Skip to content

Commit 629497d

Browse files
parikshitduttafmbenhassine
authored andcommitted
Implemented BulkOperations API in MongoItemWriter
1 parent 43c40b1 commit 629497d

File tree

2 files changed

+83
-27
lines changed

2 files changed

+83
-27
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2017 the original author or authors.
2+
* Copyright 2012-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,12 +19,22 @@
1919
import java.util.ArrayList;
2020
import java.util.List;
2121

22+
import org.bson.Document;
23+
import org.bson.types.ObjectId;
24+
2225
import org.springframework.batch.item.ItemWriter;
2326
import org.springframework.beans.factory.InitializingBean;
27+
import org.springframework.data.mongodb.core.BulkOperations;
28+
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
29+
import org.springframework.data.mongodb.core.FindAndReplaceOptions;
2430
import org.springframework.data.mongodb.core.MongoOperations;
31+
import org.springframework.data.mongodb.core.convert.MongoConverter;
32+
import org.springframework.data.mongodb.core.query.Criteria;
33+
import org.springframework.data.mongodb.core.query.Query;
2534
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
2635
import org.springframework.transaction.support.TransactionSynchronizationManager;
2736
import org.springframework.util.Assert;
37+
import org.springframework.util.ClassUtils;
2838
import org.springframework.util.CollectionUtils;
2939
import org.springframework.util.StringUtils;
3040

@@ -42,6 +52,7 @@
4252
* </p>
4353
*
4454
* @author Michael Minella
55+
* @author Parikshit Dutta
4556
*
4657
*/
4758
public class MongoItemWriter<T> implements ItemWriter<T>, InitializingBean {
@@ -133,16 +144,28 @@ protected void doWrite(List<? extends T> items) {
133144
}
134145
}
135146
else {
147+
BulkOperations bulkOperations = null;
148+
136149
if(StringUtils.hasText(collection)) {
137-
for (Object object : items) {
138-
template.save(object, collection);
139-
}
150+
bulkOperations = template.bulkOps(BulkMode.ORDERED, collection);
140151
}
141152
else {
142-
for (Object object : items) {
143-
template.save(object);
144-
}
153+
bulkOperations = template.bulkOps(BulkMode.ORDERED, ClassUtils.getUserClass(items.get(0)));
154+
}
155+
156+
for (Object object : items) {
157+
Document document = new Document();
158+
159+
MongoConverter mongoConverter = template.getConverter();
160+
mongoConverter.write(object, document);
161+
162+
Query query = new Query();
163+
query.addCriteria(Criteria.where("_id").is((document.get("_id") != null)
164+
? document.get("_id") : new ObjectId()));
165+
166+
bulkOperations.replaceOne(query, document, new FindAndReplaceOptions().upsert());
145167
}
168+
bulkOperations.execute();
146169
}
147170
}
148171
}

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoItemWriterTests.java

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2017 the original author or authors.
2+
* Copyright 2013-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,36 +19,55 @@
1919
import java.util.Collections;
2020
import java.util.List;
2121

22+
import org.bson.Document;
2223
import org.junit.Before;
2324
import org.junit.Test;
2425
import org.mockito.Mock;
2526
import org.mockito.MockitoAnnotations;
2627

2728
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
29+
import org.springframework.data.mongodb.core.BulkOperations;
2830
import org.springframework.data.mongodb.core.MongoOperations;
31+
import org.springframework.data.mongodb.core.convert.MongoConverter;
32+
import org.springframework.data.mongodb.core.query.Query;
2933
import org.springframework.transaction.PlatformTransactionManager;
3034
import org.springframework.transaction.support.TransactionCallback;
3135
import org.springframework.transaction.support.TransactionTemplate;
3236

3337
import static org.junit.Assert.assertEquals;
3438
import static org.junit.Assert.fail;
3539
import static org.mockito.ArgumentMatchers.any;
40+
import static org.mockito.ArgumentMatchers.anyString;
41+
import static org.mockito.Mockito.when;
3642
import static org.mockito.Mockito.doAnswer;
3743
import static org.mockito.Mockito.mock;
3844
import static org.mockito.Mockito.verify;
45+
import static org.mockito.Mockito.times;
3946
import static org.mockito.Mockito.verifyZeroInteractions;
4047

48+
/**
49+
* @author Michael Minella
50+
* @author Parikshit Dutta
51+
*/
4152
@SuppressWarnings("serial")
4253
public class MongoItemWriterTests {
4354

4455
private MongoItemWriter<Object> writer;
4556
@Mock
4657
private MongoOperations template;
58+
@Mock
59+
private BulkOperations bulkOperations;
60+
@Mock
61+
private MongoConverter mongoConverter;
4762
private PlatformTransactionManager transactionManager = new ResourcelessTransactionManager();
4863

4964
@Before
5065
public void setUp() throws Exception {
5166
MockitoAnnotations.initMocks(this);
67+
when(template.bulkOps(any(), anyString())).thenReturn(bulkOperations);
68+
when(template.bulkOps(any(), any(Class.class))).thenReturn(bulkOperations);
69+
when(template.getConverter()).thenReturn(mongoConverter);
70+
5271
writer = new MongoItemWriter<>();
5372
writer.setTemplate(template);
5473
writer.afterPropertiesSet();
@@ -77,8 +96,8 @@ public void testWriteNoTransactionNoCollection() throws Exception {
7796

7897
writer.write(items);
7998

80-
verify(template).save(items.get(0));
81-
verify(template).save(items.get(1));
99+
verify(template).bulkOps(any(), any(Class.class));
100+
verify(bulkOperations, times(2)).replaceOne(any(Query.class), any(Object.class), any());
82101
}
83102

84103
@Test
@@ -92,15 +111,16 @@ public void testWriteNoTransactionWithCollection() throws Exception {
92111

93112
writer.write(items);
94113

95-
verify(template).save(items.get(0), "collection");
96-
verify(template).save(items.get(1), "collection");
114+
verify(template).bulkOps(any(), anyString());
115+
verify(bulkOperations, times(2)).replaceOne(any(Query.class), any(Object.class), any());
97116
}
98117

99118
@Test
100119
public void testWriteNoTransactionNoItems() throws Exception {
101120
writer.write(null);
102121

103122
verifyZeroInteractions(template);
123+
verifyZeroInteractions(bulkOperations);
104124
}
105125

106126
@Test
@@ -120,8 +140,8 @@ public void testWriteTransactionNoCollection() throws Exception {
120140
return null;
121141
});
122142

123-
verify(template).save(items.get(0));
124-
verify(template).save(items.get(1));
143+
verify(template).bulkOps(any(), any(Class.class));
144+
verify(bulkOperations, times(2)).replaceOne(any(Query.class), any(Object.class), any());
125145
}
126146

127147
@Test
@@ -143,8 +163,8 @@ public void testWriteTransactionWithCollection() throws Exception {
143163
return null;
144164
});
145165

146-
verify(template).save(items.get(0), "collection");
147-
verify(template).save(items.get(1), "collection");
166+
verify(template).bulkOps(any(), anyString());
167+
verify(bulkOperations, times(2)).replaceOne(any(Query.class), any(Object.class), any());
148168
}
149169

150170
@Test
@@ -172,6 +192,7 @@ public void testWriteTransactionFails() throws Exception {
172192
}
173193

174194
verifyZeroInteractions(template);
195+
verifyZeroInteractions(bulkOperations);
175196
}
176197

177198
/**
@@ -203,6 +224,7 @@ public void testWriteTransactionReadOnly() throws Exception {
203224
}
204225

205226
verifyZeroInteractions(template);
227+
verifyZeroInteractions(bulkOperations);
206228
}
207229

208230
@Test
@@ -234,31 +256,43 @@ public void testRemoveNoTransactionWithCollection() throws Exception {
234256
verify(template).remove(items.get(0), "collection");
235257
verify(template).remove(items.get(1), "collection");
236258
}
237-
238-
// BATCH-2018
259+
260+
// BATCH-2018, test code updated to pass BATCH-3713
239261
@Test
240262
public void testResourceKeyCollision() throws Exception {
241263
final int limit = 5000;
242264
@SuppressWarnings("unchecked")
243265
List<MongoItemWriter<String>> writers = new ArrayList<>(limit);
266+
final String[] documents = new String[limit];
244267
final String[] results = new String[limit];
245268
for(int i = 0; i< limit; i++) {
246269
final int index = i;
247270
MongoOperations mongoOperations = mock(MongoOperations.class);
248-
271+
BulkOperations bulkOperations = mock(BulkOperations.class);
272+
MongoConverter mongoConverter = mock(MongoConverter.class);
273+
274+
when(mongoOperations.bulkOps(any(), any(Class.class))).thenReturn(bulkOperations);
275+
when(mongoOperations.getConverter()).thenReturn(mongoConverter);
276+
277+
// mocking the object to document conversion which is used in forming bulk operation
278+
doAnswer(invocation -> {
279+
documents[index] = (String) invocation.getArguments()[0];
280+
return null;
281+
}).when(mongoConverter).write(any(String.class), any(Document.class));
282+
249283
doAnswer(invocation -> {
250-
String val = (String) invocation.getArguments()[0];
251284
if(results[index] == null) {
252-
results[index] = val;
285+
results[index] = documents[index];
253286
} else {
254-
results[index] += val;
287+
results[index] += documents[index];
255288
}
256289
return null;
257-
}).when(mongoOperations).save(any(String.class));
290+
}).when(bulkOperations).replaceOne(any(Query.class), any(Document.class), any());
291+
258292
writers.add(i, new MongoItemWriter<>());
259293
writers.get(i).setTemplate(mongoOperations);
260294
}
261-
295+
262296
new TransactionTemplate(transactionManager).execute((TransactionCallback<Void>) status -> {
263297
try {
264298
for(int i=0; i< limit; i++) {
@@ -270,10 +304,9 @@ public void testResourceKeyCollision() throws Exception {
270304
}
271305
return null;
272306
});
273-
307+
274308
for(int i=0; i< limit; i++) {
275309
assertEquals(String.valueOf(i), results[i]);
276-
}
310+
}
277311
}
278-
279312
}

0 commit comments

Comments
 (0)