Skip to content

Commit 923cdf4

Browse files
committed
Fix the issue where new messages are not read when a consumer is specified.
Closes #3201 Signed-off-by: Edsuns <edsuns@qq.com>
1 parent 0471ce8 commit 923cdf4

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.springframework.data.redis.core.StreamOperations;
4040
import org.springframework.data.redis.serializer.RedisSerializer;
4141
import org.springframework.util.Assert;
42+
import org.springframework.util.CollectionUtils;
4243
import org.springframework.util.ErrorHandler;
4344
import org.springframework.util.ObjectUtils;
4445

@@ -50,6 +51,7 @@
5051
*
5152
* @author Mark Paluch
5253
* @author Christoph Strobl
54+
* @author Edsuns
5355
* @since 2.2
5456
*/
5557
class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implements StreamMessageListenerContainer<K, V> {
@@ -229,8 +231,18 @@ private Function<ReadOffset, List<ByteRecord>> getReadFunction(StreamReadRequest
229231
: this.readOptions;
230232
Consumer consumer = consumerStreamRequest.getConsumer();
231233

232-
return (offset) -> template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands()
233-
.xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, offset)));
234+
return (offset) -> {
235+
List<ByteRecord> records = template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands()
236+
.xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, offset)));
237+
if (CollectionUtils.isEmpty(records) && !ReadOffset.lastConsumed().equals(offset)) {
238+
// see https://redis.io/docs/latest/commands/xreadgroup/
239+
// if ID in XREADGROUP command is other than >, new messages won't be read
240+
// so reads new messages here
241+
return template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands()
242+
.xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, ReadOffset.lastConsumed())));
243+
}
244+
return records;
245+
};
234246
}
235247

236248
return (offset) -> template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands()

0 commit comments

Comments
 (0)