Skip to content

Commit 4b9b921

Browse files
committed
Enhance RedisItemReader/Writer performance with pipelining
Signed-off-by: Hyunwoo Jung <hyunwoojung@kakao.com>
1 parent 85ba76a commit 4b9b921

File tree

8 files changed

+187
-44
lines changed

8 files changed

+187
-44
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/redis/RedisItemReader.java

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,21 +20,31 @@
2020
import org.springframework.batch.infrastructure.item.ExecutionContext;
2121
import org.springframework.batch.infrastructure.item.ItemStreamException;
2222
import org.springframework.batch.infrastructure.item.ItemStreamReader;
23+
import org.springframework.dao.DataAccessException;
2324
import org.springframework.data.redis.core.Cursor;
25+
import org.springframework.data.redis.core.RedisOperations;
2426
import org.springframework.data.redis.core.RedisTemplate;
2527
import org.springframework.data.redis.core.ScanOptions;
28+
import org.springframework.data.redis.core.SessionCallback;
2629
import org.springframework.util.Assert;
2730

31+
import java.util.ArrayDeque;
32+
import java.util.ArrayList;
33+
import java.util.Deque;
34+
import java.util.List;
35+
2836
/**
2937
* Item reader for Redis based on Spring Data Redis. Uses a {@link RedisTemplate} to query
3038
* data. The user should provide a {@link ScanOptions} to specify the set of keys to
31-
* query.
39+
* query. The {@code fetchSize} property controls how many items are fetched from Redis in
40+
* a single pipeline round-trip for efficiency.
3241
*
3342
* <p>
3443
* The implementation is not thread-safe and not restartable.
3544
* </p>
3645
*
3746
* @author Mahmoud Ben Hassine
47+
* @author Hyunwoo Jung
3848
* @since 5.1
3949
* @param <K> type of keys
4050
* @param <V> type of values
@@ -47,28 +57,32 @@ public class RedisItemReader<K, V> implements ItemStreamReader<V> {
4757

4858
private @Nullable Cursor<K> cursor;
4959

50-
public RedisItemReader(RedisTemplate<K, V> redisTemplate, ScanOptions scanOptions) {
60+
private final int fetchSize;
61+
62+
private final Deque<V> buffer;
63+
64+
public RedisItemReader(RedisTemplate<K, V> redisTemplate, ScanOptions scanOptions, int fetchSize) {
5165
Assert.notNull(redisTemplate, "redisTemplate must not be null");
5266
Assert.notNull(scanOptions, "scanOptions must no be null");
67+
Assert.isTrue(fetchSize > 0, "fetchSize must be greater than 0");
5368
this.redisTemplate = redisTemplate;
5469
this.scanOptions = scanOptions;
70+
this.fetchSize = fetchSize;
71+
this.buffer = new ArrayDeque<>();
5572
}
5673

5774
@Override
5875
public void open(ExecutionContext executionContext) throws ItemStreamException {
5976
this.cursor = this.redisTemplate.scan(this.scanOptions);
6077
}
6178

62-
@SuppressWarnings("DataFlowIssue")
6379
@Override
6480
public @Nullable V read() throws Exception {
65-
if (this.cursor.hasNext()) {
66-
K nextKey = this.cursor.next();
67-
return this.redisTemplate.opsForValue().get(nextKey);
68-
}
69-
else {
70-
return null;
81+
if (this.buffer.isEmpty()) {
82+
fetchNext();
7183
}
84+
85+
return this.buffer.pollFirst();
7286
}
7387

7488
@SuppressWarnings("DataFlowIssue")
@@ -77,4 +91,36 @@ public void close() throws ItemStreamException {
7791
this.cursor.close();
7892
}
7993

94+
@SuppressWarnings("DataFlowIssue")
95+
private void fetchNext() {
96+
List<K> keys = new ArrayList<>();
97+
while (this.cursor.hasNext() && keys.size() < this.fetchSize) {
98+
keys.add(this.cursor.next());
99+
}
100+
101+
if (keys.isEmpty()) {
102+
return;
103+
}
104+
105+
@SuppressWarnings("unchecked")
106+
List<V> items = (List<V>) this.redisTemplate.executePipelined(sessionCallback(keys));
107+
108+
this.buffer.addAll(items);
109+
}
110+
111+
private SessionCallback<Object> sessionCallback(List<K> keys) {
112+
return new SessionCallback<>() {
113+
114+
@SuppressWarnings("NullAway")
115+
@Override
116+
public @Nullable Object execute(RedisOperations operations) throws DataAccessException {
117+
for (K key : keys) {
118+
operations.opsForValue().get(key);
119+
}
120+
121+
return null;
122+
}
123+
};
124+
}
125+
80126
}

spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/redis/RedisItemWriter.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,20 @@
1616

1717
package org.springframework.batch.infrastructure.item.redis;
1818

19+
import org.jspecify.annotations.Nullable;
1920
import org.springframework.batch.infrastructure.item.ItemWriter;
2021
import org.springframework.batch.infrastructure.item.KeyValueItemWriter;
2122
import org.springframework.core.convert.converter.Converter;
23+
import org.springframework.dao.DataAccessException;
24+
import org.springframework.data.redis.core.RedisOperations;
2225
import org.springframework.data.redis.core.RedisTemplate;
26+
import org.springframework.data.redis.core.SessionCallback;
27+
import org.springframework.data.util.Pair;
2328
import org.springframework.util.Assert;
2429

30+
import java.util.ArrayList;
31+
import java.util.List;
32+
2533
/**
2634
* <p>
2735
* An {@link ItemWriter} implementation for Redis using a {@link RedisTemplate} .
@@ -30,12 +38,15 @@
3038
* @author Santiago Molano
3139
* @author Mahmoud Ben Hassine
3240
* @author Stefano Cordio
41+
* @author Hyunwoo Jung
3342
* @since 5.1
3443
*/
3544
public class RedisItemWriter<K, T> extends KeyValueItemWriter<K, T> {
3645

3746
private RedisTemplate<K, T> redisTemplate;
3847

48+
private final List<Pair<K, T>> buffer = new ArrayList<>();
49+
3950
/**
4051
* Create a new {@link RedisItemWriter}.
4152
* @param itemKeyMapper the {@link Converter} used to derive a key from an item.
@@ -50,19 +61,25 @@ public RedisItemWriter(Converter<T, K> itemKeyMapper, RedisTemplate<K, T> redisT
5061

5162
@Override
5263
protected void writeKeyValue(K key, T value) {
53-
if (this.delete) {
54-
this.redisTemplate.delete(key);
55-
}
56-
else {
57-
this.redisTemplate.opsForValue().set(key, value);
58-
}
64+
this.buffer.add(Pair.of(key, value));
5965
}
6066

6167
@Override
6268
protected void init() {
6369
Assert.notNull(this.redisTemplate, "RedisTemplate must not be null");
6470
}
6571

72+
@Override
73+
protected void flush() throws Exception {
74+
if (this.buffer.isEmpty()) {
75+
return;
76+
}
77+
78+
this.redisTemplate.executePipelined(sessionCallback());
79+
80+
this.buffer.clear();
81+
}
82+
6683
/**
6784
* Set the {@link RedisTemplate} to use.
6885
* @param redisTemplate the template to use
@@ -71,4 +88,33 @@ public void setRedisTemplate(RedisTemplate<K, T> redisTemplate) {
7188
this.redisTemplate = redisTemplate;
7289
}
7390

91+
private SessionCallback<Object> sessionCallback() {
92+
return new SessionCallback<>() {
93+
94+
@SuppressWarnings({ "unchecked", "NullAway" })
95+
@Override
96+
public @Nullable Object execute(RedisOperations operations) throws DataAccessException {
97+
if (RedisItemWriter.this.delete) {
98+
executeDeleteOperations(operations);
99+
}
100+
else {
101+
executeSetOperations(operations);
102+
}
103+
return null;
104+
}
105+
};
106+
}
107+
108+
private void executeDeleteOperations(RedisOperations<K, T> operations) {
109+
for (Pair<K, T> item : this.buffer) {
110+
operations.delete(item.getFirst());
111+
}
112+
}
113+
114+
private void executeSetOperations(RedisOperations<K, T> operations) {
115+
for (Pair<K, T> item : this.buffer) {
116+
operations.opsForValue().set(item.getFirst(), item.getSecond());
117+
}
118+
}
119+
74120
}

spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/redis/builder/RedisItemReaderBuilder.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
* Builder for {@link RedisItemReader}.
2424
*
2525
* @author Mahmoud Ben Hassine
26+
* @author Hyunwoo Jung
2627
* @since 5.1
2728
* @param <K> type of keys
2829
* @param <V> type of values
@@ -33,6 +34,8 @@ public class RedisItemReaderBuilder<K, V> {
3334

3435
private ScanOptions scanOptions;
3536

37+
private int fetchSize;
38+
3639
/**
3740
* Set the {@link RedisTemplate} to use in the reader.
3841
* @param redisTemplate the template to use
@@ -53,12 +56,22 @@ public RedisItemReaderBuilder<K, V> scanOptions(ScanOptions scanOptions) {
5356
return this;
5457
}
5558

59+
/**
60+
* Set the fetchSize to how many items from Redis in a single round-trip.
61+
* @param fetchSize the number of items to fetch per pipeline execution
62+
* @return the current builder instance for fluent chaining
63+
*/
64+
public RedisItemReaderBuilder<K, V> fetchSize(int fetchSize) {
65+
this.fetchSize = fetchSize;
66+
return this;
67+
}
68+
5669
/**
5770
* Build a new {@link RedisItemReader}.
5871
* @return a new item reader
5972
*/
6073
public RedisItemReader<K, V> build() {
61-
return new RedisItemReader<>(this.redisTemplate, this.scanOptions);
74+
return new RedisItemReader<>(this.redisTemplate, this.scanOptions, this.fetchSize);
6275
}
6376

6477
}

spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/redis/RedisItemReaderIntegrationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
@ExtendWith(SpringExtension.class)
5353
class RedisItemReaderIntegrationTests {
5454

55-
private static final DockerImageName REDIS_IMAGE = DockerImageName.parse("redis:8.0.3");
55+
private static final DockerImageName REDIS_IMAGE = DockerImageName.parse("redis:8.2.2");
5656

5757
@Container
5858
public static RedisContainer redis = new RedisContainer(REDIS_IMAGE);
@@ -82,7 +82,7 @@ void testRead(RedisConnectionFactory connectionFactory) throws Exception {
8282

8383
RedisTemplate<String, Person> redisTemplate = setUpRedisTemplate(connectionFactory);
8484
ScanOptions scanOptions = ScanOptions.scanOptions().match("person:*").count(10).build();
85-
this.reader = new RedisItemReader<>(redisTemplate, scanOptions);
85+
this.reader = new RedisItemReader<>(redisTemplate, scanOptions, 10);
8686

8787
this.reader.open(new ExecutionContext());
8888

spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/redis/RedisItemReaderTests.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,21 +16,26 @@
1616
package org.springframework.batch.infrastructure.item.redis;
1717

1818
import org.junit.jupiter.api.Assertions;
19+
import org.junit.jupiter.api.BeforeEach;
1920
import org.junit.jupiter.api.Test;
2021
import org.junit.jupiter.api.extension.ExtendWith;
2122
import org.mockito.Answers;
2223
import org.mockito.Mock;
23-
import org.mockito.Mockito;
2424
import org.mockito.junit.jupiter.MockitoExtension;
25-
2625
import org.springframework.batch.infrastructure.item.ExecutionContext;
27-
import org.springframework.batch.infrastructure.item.redis.RedisItemReader;
2826
import org.springframework.data.redis.core.Cursor;
2927
import org.springframework.data.redis.core.RedisTemplate;
3028
import org.springframework.data.redis.core.ScanOptions;
29+
import org.springframework.data.redis.core.SessionCallback;
30+
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
34+
import static org.mockito.ArgumentMatchers.any;
35+
import static org.mockito.Mockito.when;
3136

3237
@ExtendWith(MockitoExtension.class)
33-
public class RedisItemReaderTests {
38+
class RedisItemReaderTests {
3439

3540
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
3641
private RedisTemplate<String, String> redisTemplate;
@@ -41,15 +46,35 @@ public class RedisItemReaderTests {
4146
@Mock
4247
private Cursor<String> cursor;
4348

49+
private List<String> results;
50+
51+
@BeforeEach
52+
void setUp() {
53+
this.results = new ArrayList<>();
54+
when(this.redisTemplate.executePipelined(any(SessionCallback.class))).thenAnswer(invocation -> {
55+
SessionCallback<?> sessionCallback = invocation.getArgument(0);
56+
sessionCallback.execute(this.redisTemplate);
57+
return this.results;
58+
});
59+
}
60+
4461
@Test
4562
void testRead() throws Exception {
4663
// given
47-
Mockito.when(this.redisTemplate.scan(this.scanOptions)).thenReturn(this.cursor);
48-
Mockito.when(this.cursor.hasNext()).thenReturn(true, true, false);
49-
Mockito.when(this.cursor.next()).thenReturn("person:1", "person:2");
50-
Mockito.when(this.redisTemplate.opsForValue().get("person:1")).thenReturn("foo");
51-
Mockito.when(this.redisTemplate.opsForValue().get("person:2")).thenReturn("bar");
52-
RedisItemReader<String, String> redisItemReader = new RedisItemReader<>(this.redisTemplate, this.scanOptions);
64+
when(this.redisTemplate.scan(this.scanOptions)).thenReturn(this.cursor);
65+
when(this.cursor.hasNext()).thenReturn(true, true, false);
66+
when(this.cursor.next()).thenReturn("person:1", "person:2");
67+
when(this.redisTemplate.opsForValue().get("person:1")).thenAnswer(invocation -> {
68+
results.add("foo");
69+
return null;
70+
});
71+
when(this.redisTemplate.opsForValue().get("person:2")).thenAnswer(invocation -> {
72+
results.add("bar");
73+
return null;
74+
});
75+
76+
RedisItemReader<String, String> redisItemReader = new RedisItemReader<>(this.redisTemplate, this.scanOptions,
77+
10);
5378
redisItemReader.open(new ExecutionContext());
5479

5580
// when

spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/redis/RedisItemWriterIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
@ExtendWith(SpringExtension.class)
4949
class RedisItemWriterIntegrationTests {
5050

51-
private static final DockerImageName REDIS_IMAGE = DockerImageName.parse("redis:8.0.3");
51+
private static final DockerImageName REDIS_IMAGE = DockerImageName.parse("redis:8.2.2");
5252

5353
@Container
5454
public static RedisContainer redis = new RedisContainer(REDIS_IMAGE);

0 commit comments

Comments
 (0)