|
39 | 39 | import org.springframework.data.redis.core.StreamOperations;
|
40 | 40 | import org.springframework.data.redis.serializer.RedisSerializer;
|
41 | 41 | import org.springframework.util.Assert;
|
| 42 | +import org.springframework.util.CollectionUtils; |
42 | 43 | import org.springframework.util.ErrorHandler;
|
43 | 44 | import org.springframework.util.ObjectUtils;
|
44 | 45 |
|
|
50 | 51 | *
|
51 | 52 | * @author Mark Paluch
|
52 | 53 | * @author Christoph Strobl
|
| 54 | + * @author Edsuns |
53 | 55 | * @since 2.2
|
54 | 56 | */
|
55 | 57 | class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implements StreamMessageListenerContainer<K, V> {
|
@@ -229,8 +231,18 @@ private Function<ReadOffset, List<ByteRecord>> getReadFunction(StreamReadRequest
|
229 | 231 | : this.readOptions;
|
230 | 232 | Consumer consumer = consumerStreamRequest.getConsumer();
|
231 | 233 |
|
232 |
| - return (offset) -> template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands() |
233 |
| - .xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, offset))); |
| 234 | + return (offset) -> { |
| 235 | + List<ByteRecord> records = template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands() |
| 236 | + .xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, offset))); |
| 237 | + if (CollectionUtils.isEmpty(records) && !ReadOffset.lastConsumed().equals(offset)) { |
| 238 | + // see https://redis.io/docs/latest/commands/xreadgroup/ |
| 239 | + // if ID in XREADGROUP command is other than >, new messages won't be read |
| 240 | + // so reads new messages here |
| 241 | + return template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands() |
| 242 | + .xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, ReadOffset.lastConsumed()))); |
| 243 | + } |
| 244 | + return records; |
| 245 | + }; |
234 | 246 | }
|
235 | 247 |
|
236 | 248 | return (offset) -> template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands()
|
|
0 commit comments