From 6eeca7aa7f09ee9356652c639cdb3eabe55f531d Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Fri, 31 Oct 2025 09:33:57 +0200 Subject: [PATCH 01/17] Adding CLAIM option to XREADGROUP command + unit tests --- redis/_parsers/helpers.py | 17 +- redis/commands/core.py | 15 +- tests/test_asyncio/test_commands.py | 363 ++++++++++++++++++++++++++++ tests/test_commands.py | 317 ++++++++++++++++++++++++ 4 files changed, 705 insertions(+), 7 deletions(-) diff --git a/redis/_parsers/helpers.py b/redis/_parsers/helpers.py index bf3b2ee2f2..df4c200b05 100644 --- a/redis/_parsers/helpers.py +++ b/redis/_parsers/helpers.py @@ -268,13 +268,16 @@ def sort_return_tuples(response, **options): return list(zip(*[response[i::n] for i in range(n)])) -def parse_stream_list(response): +def parse_stream_list(response, **options): if response is None: return None data = [] for r in response: if r is not None: - data.append((r[0], pairs_to_dict(r[1]))) + if "claim_min_idle_time" in options: + data.append((r[0], pairs_to_dict(r[1]), r[2], r[3])) + else: + data.append((r[0], pairs_to_dict(r[1]))) else: data.append((None, None)) return data @@ -332,16 +335,18 @@ def parse_xinfo_stream(response, **options): return data -def parse_xread(response): +def parse_xread(response, **options): if response is None: return [] - return [[r[0], parse_stream_list(r[1])] for r in response] + return [[r[0], parse_stream_list(r[1], **options)] for r in response] -def parse_xread_resp3(response): +def parse_xread_resp3(response, **options): if response is None: return {} - return {key: [parse_stream_list(value)] for key, value in response.items()} + return { + key: [parse_stream_list(value, **options)] for key, value in response.items() + } def parse_xpending(response, **options): diff --git a/redis/commands/core.py b/redis/commands/core.py index 34086e3c09..81d0e35a21 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -4013,6 +4013,7 @@ def xreadgroup( count: Optional[int] = None, block: Optional[int] = None, noack: bool = False, + claim_min_idle_time: Optional[int] = None, ) -> ResponseT: """ Read from a stream via a consumer group. @@ -4030,8 +4031,12 @@ def xreadgroup( block: number of milliseconds to wait, if nothing already present. noack: do not add messages to the PEL + claim_min_idle_time: accepts an integer type and represents a + time interval in milliseconds + For more information, see https://redis.io/commands/xreadgroup """ + options = {} pieces: list[EncodableT] = [b"GROUP", groupname, consumername] if count is not None: if not isinstance(count, int) or count < 1: @@ -4045,12 +4050,20 @@ def xreadgroup( pieces.append(str(block)) if noack: pieces.append(b"NOACK") + if claim_min_idle_time is not None: + if not isinstance(claim_min_idle_time, int) or claim_min_idle_time < 0: + raise DataError( + "XREADGROUP claim_min_idle_time must be a non-negative integer" + ) + pieces.append(b"CLAIM") + pieces.append(claim_min_idle_time) + options["claim_min_idle_time"] = claim_min_idle_time if not isinstance(streams, dict) or len(streams) == 0: raise DataError("XREADGROUP streams must be a non empty dict") pieces.append(b"STREAMS") pieces.extend(streams.keys()) pieces.extend(streams.values()) - return self.execute_command("XREADGROUP", *pieces) + return self.execute_command("XREADGROUP", *pieces, **options) def xrevrange( self, diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 14690ef6e6..05d438906c 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -11,6 +11,7 @@ import pytest import pytest_asyncio +from redis import ResponseError import redis from redis import exceptions from redis._parsers.helpers import ( @@ -3476,6 +3477,368 @@ async def test_xreadgroup(self, r: redis.Redis): r, res, [[strem_name, expected_entries]], {strem_name: [expected_entries]} ) + def _validate_xreadgroup_with_claim_min_idle_time_response( + self, r, response, expected_entries + ): + # validate the number of streams + assert len(response) == len(expected_entries) + + expected_streams = expected_entries.keys() + for str_index, expected_stream in enumerate(expected_streams): + expected_entries_per_stream = expected_entries[expected_stream] + + if is_resp2_connection(r): + actual_entries_per_stream = response[str_index][1] + actual_stream = response[str_index][0] + assert actual_stream == expected_stream + else: + actual_entries_per_stream = response[expected_stream][0] + + # validate the number of entries + assert len(actual_entries_per_stream) == len(expected_entries_per_stream) + + for i, entry in enumerate(actual_entries_per_stream): + message = (entry[0], entry[1]) + message_idle = int(entry[2]) + message_delivered_count = int(entry[3]) + + expected_idle = expected_entries_per_stream[i]["min_idle_time"] + + assert message == expected_entries_per_stream[i]["msg"] + if expected_idle == 0: + assert message_idle == 0 + assert message_delivered_count == 0 + else: + assert message_idle >= expected_idle + assert message_delivered_count > 0 + + @skip_if_server_version_lt("8.3.224") + async def test_xreadgroup_with_claim_min_idle_time(self, r): + stream = "stream" + group = "group" + consumer_1 = "c1" + stream_name = stream.encode() + + try: + # before test cleanup + await r.xgroup_destroy(stream, group) + except ResponseError: + pass + + m1 = await r.xadd(stream, {"key_m1": "val_m1"}) + m2 = await r.xadd(stream, {"key_m2": "val_m2"}) + + await r.xgroup_create(stream, group, 0) + + # read all the messages - this will save the msgs in PEL + res = await r.xreadgroup(group, consumer_1, streams={stream: ">"}) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + await asyncio.sleep(0.1) + + m3 = await r.xadd(stream, {"key_m3": "val_m3"}) + m4 = await r.xadd(stream, {"key_m4": "val_m4"}) + + expected_entries = { + stream_name: [ + {"msg": await get_stream_message(r, stream, m1), "min_idle_time": 100}, + {"msg": await get_stream_message(r, stream, m2), "min_idle_time": 100}, + {"msg": await get_stream_message(r, stream, m3), "min_idle_time": 0}, + {"msg": await get_stream_message(r, stream, m4), "min_idle_time": 0}, + ] + } + + res = await r.xreadgroup( + group, consumer_1, streams={stream: ">"}, claim_min_idle_time=100 + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknoledged + response = await r.xpending_range(stream, group, min="-", max="+", count=5) + assert len(response) == 4 + + # add 2 more messages + m5 = await r.xadd(stream, {"key_m5": "val_m5"}) + m6 = await r.xadd(stream, {"key_m6": "val_m6"}) + + expected_entries = [ + await get_stream_message(r, stream, m5), + await get_stream_message(r, stream, m6), + ] + # read all the messages - this will save the msgs in PEL + res = await r.xreadgroup(group, consumer_1, streams={stream: ">"}) + assert_resp_response( + r, + res, + [[stream_name, expected_entries]], + {stream_name: [expected_entries]}, + ) + + # add 2 more messages + m7 = await r.xadd(stream, {"key_m7": "val_m7"}) + m8 = await r.xadd(stream, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_name: [ + {"msg": await get_stream_message(r, stream, m7), "min_idle_time": 0}, + {"msg": await get_stream_message(r, stream, m8), "min_idle_time": 0}, + ] + } + res = await r.xreadgroup( + group, consumer_1, streams={stream: ">"}, claim_min_idle_time=100 + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + @skip_if_server_version_lt("8.3.224") + async def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots( + self, r + ): + stream_1 = "stream1:{same_slot:42}" + stream_2 = "stream2:{same_slot:42}" + group = "group" + consumer_1 = "c1" + + stream_1_name = stream_1.encode() + stream_2_name = stream_2.encode() + + # before test cleanup + try: + await r.xgroup_destroy(stream_1, group) + except ResponseError: + pass + try: + await r.xgroup_destroy(stream_2, group) + except ResponseError: + pass + + m1_s1 = await r.xadd(stream_1, {"key_m1_s1": "val_m1"}) + m2_s1 = await r.xadd(stream_1, {"key_m2_s1": "val_m2"}) + await r.xgroup_create(stream_1, group, 0) + + m1_s2 = await r.xadd(stream_2, {"key_m1_s2": "val_m1"}) + m2_s2 = await r.xadd(stream_2, {"key_m2_s2": "val_m2"}) + await r.xgroup_create(stream_2, group, 0) + + # read all the messages - this will save the msgs in PEL + res = await r.xreadgroup( + group, consumer_1, streams={stream_1: ">", stream_2: ">"} + ) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + await asyncio.sleep(0.1) + + m3_s1 = await r.xadd(stream_1, {"key_m3_s1": "val_m3"}) + m4_s2 = await r.xadd(stream_2, {"key_m4_s2": "val_m4"}) + + expected_entries = { + stream_1_name: [ + { + "msg": await get_stream_message(r, stream_1, m1_s1), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_1, m2_s1), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_1, m3_s1), + "min_idle_time": 0, + }, + ], + stream_2_name: [ + { + "msg": await get_stream_message(r, stream_2, m1_s2), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_2, m2_s2), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_2, m4_s2), + "min_idle_time": 0, + }, + ], + } + + res = await r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknoledged + response = await r.xpending_range(stream_1, group, min="-", max="+", count=5) + assert len(response) == 3 + response = await r.xpending_range(stream_2, group, min="-", max="+", count=5) + assert len(response) == 3 + + # add 2 more messages + await r.xadd(stream_1, {"key_m5": "val_m5"}) + await r.xadd(stream_2, {"key_m6": "val_m6"}) + + # read all the messages - this will save the msgs in PEL + res = await r.xreadgroup( + group, consumer_1, streams={stream_1: ">", stream_2: ">"} + ) + + # add 2 more messages + m7 = await r.xadd(stream_1, {"key_m7": "val_m7"}) + m8 = await r.xadd(stream_2, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_1_name: [ + {"msg": await get_stream_message(r, stream_1, m7), "min_idle_time": 0} + ], + stream_2_name: [ + {"msg": await get_stream_message(r, stream_2, m8), "min_idle_time": 0} + ], + } + res = await r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + @pytest.mark.onlynoncluster + @skip_if_server_version_lt("8.3.224") + async def test_xreadgroup_with_claim_min_idle_time_multiple_streams(self, r): + stream_1 = "stream1" + stream_2 = "stream2" + group = "group" + consumer_1 = "c1" + + stream_1_name = stream_1.encode() + stream_2_name = stream_2.encode() + + # before test cleanup + try: + await r.xgroup_destroy(stream_1, group) + except ResponseError: + pass + try: + await r.xgroup_destroy(stream_2, group) + except ResponseError: + pass + + m1_s1 = await r.xadd(stream_1, {"key_m1_s1": "val_m1"}) + m2_s1 = await r.xadd(stream_1, {"key_m2_s1": "val_m2"}) + await r.xgroup_create(stream_1, group, 0) + + m1_s2 = await r.xadd(stream_2, {"key_m1_s2": "val_m1"}) + m2_s2 = await r.xadd(stream_2, {"key_m2_s2": "val_m2"}) + await r.xgroup_create(stream_2, group, 0) + + # read all the messages - this will save the msgs in PEL + res = await r.xreadgroup( + group, consumer_1, streams={stream_1: ">", stream_2: ">"} + ) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + await asyncio.sleep(0.1) + + m3_s1 = await r.xadd(stream_1, {"key_m3_s1": "val_m3"}) + m4_s2 = await r.xadd(stream_2, {"key_m4_s2": "val_m4"}) + + expected_entries = { + stream_1_name: [ + { + "msg": await get_stream_message(r, stream_1, m1_s1), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_1, m2_s1), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_1, m3_s1), + "min_idle_time": 0, + }, + ], + stream_2_name: [ + { + "msg": await get_stream_message(r, stream_2, m1_s2), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_2, m2_s2), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_2, m4_s2), + "min_idle_time": 0, + }, + ], + } + + res = await r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknoledged + response = await r.xpending_range(stream_1, group, min="-", max="+", count=5) + assert len(response) == 3 + response = await r.xpending_range(stream_2, group, min="-", max="+", count=5) + assert len(response) == 3 + + # add 2 more messages + await r.xadd(stream_1, {"key_m5": "val_m5"}) + await r.xadd(stream_2, {"key_m6": "val_m6"}) + + # read all the messages - this will save the msgs in PEL + res = await r.xreadgroup( + group, consumer_1, streams={stream_1: ">", stream_2: ">"} + ) + + # add 2 more messages + m7 = await r.xadd(stream_1, {"key_m7": "val_m7"}) + m8 = await r.xadd(stream_2, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_1_name: [ + {"msg": await get_stream_message(r, stream_1, m7), "min_idle_time": 0} + ], + stream_2_name: [ + {"msg": await get_stream_message(r, stream_2, m8), "min_idle_time": 0} + ], + } + res = await r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + @skip_if_server_version_lt("5.0.0") async def test_xrevrange(self, r: redis.Redis): stream = "stream" diff --git a/tests/test_commands.py b/tests/test_commands.py index c5974ea0fc..37c9ab5113 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -9,6 +9,7 @@ from unittest.mock import patch import pytest +from redis import ResponseError import redis from redis import exceptions from redis._parsers.helpers import ( @@ -5108,6 +5109,322 @@ def test_xreadgroup(self, r): {stream_name: [expected_entries]}, ) + def _validate_xreadgroup_with_claim_min_idle_time_response( + self, r, response, expected_entries + ): + # validate the number of streams + assert len(response) == len(expected_entries) + + expected_streams = expected_entries.keys() + for str_index, expected_stream in enumerate(expected_streams): + expected_entries_per_stream = expected_entries[expected_stream] + + if is_resp2_connection(r): + actual_entries_per_stream = response[str_index][1] + actual_stream = response[str_index][0] + assert actual_stream == expected_stream + else: + actual_entries_per_stream = response[expected_stream][0] + + # validate the number of entries + assert len(actual_entries_per_stream) == len(expected_entries_per_stream) + + for i, entry in enumerate(actual_entries_per_stream): + message = (entry[0], entry[1]) + message_idle = int(entry[2]) + message_delivered_count = int(entry[3]) + + expected_idle = expected_entries_per_stream[i]["min_idle_time"] + + assert message == expected_entries_per_stream[i]["msg"] + if expected_idle == 0: + assert message_idle == 0 + assert message_delivered_count == 0 + else: + assert message_idle >= expected_idle + assert message_delivered_count > 0 + + @skip_if_server_version_lt("8.3.224") + def test_xreadgroup_with_claim_min_idle_time(self, r): + stream = "stream" + group = "group" + consumer_1 = "c1" + stream_name = stream.encode() + + try: + # before test cleanup + r.xgroup_destroy(stream, group) + except ResponseError: + pass + + m1 = r.xadd(stream, {"key_m1": "val_m1"}) + m2 = r.xadd(stream, {"key_m2": "val_m2"}) + + r.xgroup_create(stream, group, 0) + + # read all the messages - this will save the msgs in PEL + res = r.xreadgroup(group, consumer_1, streams={stream: ">"}) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + time.sleep(0.1) + + m3 = r.xadd(stream, {"key_m3": "val_m3"}) + m4 = r.xadd(stream, {"key_m4": "val_m4"}) + + expected_entries = { + stream_name: [ + {"msg": get_stream_message(r, stream, m1), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream, m2), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream, m3), "min_idle_time": 0}, + {"msg": get_stream_message(r, stream, m4), "min_idle_time": 0}, + ] + } + + res = r.xreadgroup( + group, consumer_1, streams={stream: ">"}, claim_min_idle_time=100 + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknoledged + response = r.xpending_range(stream, group, min="-", max="+", count=5) + assert len(response) == 4 + + # add 2 more messages + m5 = r.xadd(stream, {"key_m5": "val_m5"}) + m6 = r.xadd(stream, {"key_m6": "val_m6"}) + + expected_entries = [ + get_stream_message(r, stream, m5), + get_stream_message(r, stream, m6), + ] + # read all the messages - this will save the msgs in PEL + res = r.xreadgroup(group, consumer_1, streams={stream: ">"}) + assert_resp_response( + r, + res, + [[stream_name, expected_entries]], + {stream_name: [expected_entries]}, + ) + + # add 2 more messages + m7 = r.xadd(stream, {"key_m7": "val_m7"}) + m8 = r.xadd(stream, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_name: [ + {"msg": get_stream_message(r, stream, m7), "min_idle_time": 0}, + {"msg": get_stream_message(r, stream, m8), "min_idle_time": 0}, + ] + } + res = r.xreadgroup( + group, consumer_1, streams={stream: ">"}, claim_min_idle_time=100 + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + @skip_if_server_version_lt("8.3.224") + def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots(self, r): + stream_1 = "stream1:{same_slot:42}" + stream_2 = "stream2:{same_slot:42}" + group = "group" + consumer_1 = "c1" + + stream_1_name = stream_1.encode() + stream_2_name = stream_2.encode() + + # before test cleanup + try: + r.xgroup_destroy(stream_1, group) + except ResponseError: + pass + try: + r.xgroup_destroy(stream_2, group) + except ResponseError: + pass + + m1_s1 = r.xadd(stream_1, {"key_m1_s1": "val_m1"}) + m2_s1 = r.xadd(stream_1, {"key_m2_s1": "val_m2"}) + r.xgroup_create(stream_1, group, 0) + + m1_s2 = r.xadd(stream_2, {"key_m1_s2": "val_m1"}) + m2_s2 = r.xadd(stream_2, {"key_m2_s2": "val_m2"}) + r.xgroup_create(stream_2, group, 0) + + # read all the messages - this will save the msgs in PEL + res = r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + time.sleep(0.1) + + m3_s1 = r.xadd(stream_1, {"key_m3_s1": "val_m3"}) + m4_s2 = r.xadd(stream_2, {"key_m4_s2": "val_m4"}) + + expected_entries = { + stream_1_name: [ + {"msg": get_stream_message(r, stream_1, m1_s1), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_1, m2_s1), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_1, m3_s1), "min_idle_time": 0}, + ], + stream_2_name: [ + {"msg": get_stream_message(r, stream_2, m1_s2), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_2, m2_s2), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_2, m4_s2), "min_idle_time": 0}, + ], + } + + res = r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknoledged + response = r.xpending_range(stream_1, group, min="-", max="+", count=5) + assert len(response) == 3 + response = r.xpending_range(stream_2, group, min="-", max="+", count=5) + assert len(response) == 3 + + # add 2 more messages + r.xadd(stream_1, {"key_m5": "val_m5"}) + r.xadd(stream_2, {"key_m6": "val_m6"}) + + # read all the messages - this will save the msgs in PEL + res = r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # add 2 more messages + m7 = r.xadd(stream_1, {"key_m7": "val_m7"}) + m8 = r.xadd(stream_2, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_1_name: [ + {"msg": get_stream_message(r, stream_1, m7), "min_idle_time": 0} + ], + stream_2_name: [ + {"msg": get_stream_message(r, stream_2, m8), "min_idle_time": 0} + ], + } + res = r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + @pytest.mark.onlynoncluster + @skip_if_server_version_lt("8.3.224") + def test_xreadgroup_with_claim_min_idle_time_multiple_streams(self, r): + stream_1 = "stream1" + stream_2 = "stream2" + group = "group" + consumer_1 = "c1" + + stream_1_name = stream_1.encode() + stream_2_name = stream_2.encode() + + # before test cleanup + try: + r.xgroup_destroy(stream_1, group) + except ResponseError: + pass + try: + r.xgroup_destroy(stream_2, group) + except ResponseError: + pass + + m1_s1 = r.xadd(stream_1, {"key_m1_s1": "val_m1"}) + m2_s1 = r.xadd(stream_1, {"key_m2_s1": "val_m2"}) + r.xgroup_create(stream_1, group, 0) + + m1_s2 = r.xadd(stream_2, {"key_m1_s2": "val_m1"}) + m2_s2 = r.xadd(stream_2, {"key_m2_s2": "val_m2"}) + r.xgroup_create(stream_2, group, 0) + + # read all the messages - this will save the msgs in PEL + res = r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + time.sleep(0.1) + + m3_s1 = r.xadd(stream_1, {"key_m3_s1": "val_m3"}) + m4_s2 = r.xadd(stream_2, {"key_m4_s2": "val_m4"}) + + expected_entries = { + stream_1_name: [ + {"msg": get_stream_message(r, stream_1, m1_s1), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_1, m2_s1), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_1, m3_s1), "min_idle_time": 0}, + ], + stream_2_name: [ + {"msg": get_stream_message(r, stream_2, m1_s2), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_2, m2_s2), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_2, m4_s2), "min_idle_time": 0}, + ], + } + + res = r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknoledged + response = r.xpending_range(stream_1, group, min="-", max="+", count=5) + assert len(response) == 3 + response = r.xpending_range(stream_2, group, min="-", max="+", count=5) + assert len(response) == 3 + + # add 2 more messages + r.xadd(stream_1, {"key_m5": "val_m5"}) + r.xadd(stream_2, {"key_m6": "val_m6"}) + + # read all the messages - this will save the msgs in PEL + res = r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # add 2 more messages + m7 = r.xadd(stream_1, {"key_m7": "val_m7"}) + m8 = r.xadd(stream_2, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_1_name: [ + {"msg": get_stream_message(r, stream_1, m7), "min_idle_time": 0} + ], + stream_2_name: [ + {"msg": get_stream_message(r, stream_2, m8), "min_idle_time": 0} + ], + } + res = r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + @skip_if_server_version_lt("5.0.0") def test_xrevrange(self, r): stream = "stream" From eb5d49983dacaa34a9ec25743ca394484c1a6840 Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 10:56:49 +0200 Subject: [PATCH 02/17] Update tests/test_commands.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 37c9ab5113..88321f2e83 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -5188,7 +5188,7 @@ def test_xreadgroup_with_claim_min_idle_time(self, r): r, res, expected_entries ) - # validate PEL still holds the messages until they are acknoledged + # validate PEL still holds the messages until they are acknowledged response = r.xpending_range(stream, group, min="-", max="+", count=5) assert len(response) == 4 From 99c542f2ef6e17255935f177b06d32d702e9cba4 Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 10:57:02 +0200 Subject: [PATCH 03/17] Update tests/test_commands.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 88321f2e83..7d5917e63d 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -5289,7 +5289,7 @@ def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots(self, r r, res, expected_entries ) - # validate PEL still holds the messages until they are acknoledged + # validate PEL still holds the messages until they are acknowledged response = r.xpending_range(stream_1, group, min="-", max="+", count=5) assert len(response) == 3 response = r.xpending_range(stream_2, group, min="-", max="+", count=5) From c293b54d8efa35a3145a84d876cf252ec5891f72 Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 10:57:11 +0200 Subject: [PATCH 04/17] Update tests/test_commands.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 7d5917e63d..a68cf126b6 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -5388,7 +5388,7 @@ def test_xreadgroup_with_claim_min_idle_time_multiple_streams(self, r): r, res, expected_entries ) - # validate PEL still holds the messages until they are acknoledged + # validate PEL still holds the messages until they are acknowledged response = r.xpending_range(stream_1, group, min="-", max="+", count=5) assert len(response) == 3 response = r.xpending_range(stream_2, group, min="-", max="+", count=5) From d1a1e368e689600b012b3fd2cc53c5c20645be28 Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 10:57:47 +0200 Subject: [PATCH 05/17] Update tests/test_asyncio/test_commands.py Fix spelling error. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_asyncio/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 05d438906c..801df89c55 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -3556,7 +3556,7 @@ async def test_xreadgroup_with_claim_min_idle_time(self, r): r, res, expected_entries ) - # validate PEL still holds the messages until they are acknoledged + # validate PEL still holds the messages until they are acknowledged response = await r.xpending_range(stream, group, min="-", max="+", count=5) assert len(response) == 4 From 30f260edfec9ff55aab5de356e20fd9e5c2ab4d7 Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 10:58:02 +0200 Subject: [PATCH 06/17] Update tests/test_asyncio/test_commands.py Fix spelling error. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_asyncio/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 801df89c55..0f6d5bed16 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -3679,7 +3679,7 @@ async def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots( r, res, expected_entries ) - # validate PEL still holds the messages until they are acknoledged + # validate PEL still holds the messages until they are acknowledged response = await r.xpending_range(stream_1, group, min="-", max="+", count=5) assert len(response) == 3 response = await r.xpending_range(stream_2, group, min="-", max="+", count=5) From 493d73c64a5acbe2e9b316fc693d7e61f6efabd5 Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 10:58:20 +0200 Subject: [PATCH 07/17] Update tests/test_asyncio/test_commands.py Fix spelling error. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_asyncio/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 0f6d5bed16..fffbbaf893 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -3800,7 +3800,7 @@ async def test_xreadgroup_with_claim_min_idle_time_multiple_streams(self, r): r, res, expected_entries ) - # validate PEL still holds the messages until they are acknoledged + # validate PEL still holds the messages until they are acknowledged response = await r.xpending_range(stream_1, group, min="-", max="+", count=5) assert len(response) == 3 response = await r.xpending_range(stream_2, group, min="-", max="+", count=5) From a63addaae622814fc63305dd47ebfc361132ac47 Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 10:59:25 +0200 Subject: [PATCH 08/17] Update tests/test_asyncio/test_commands.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_asyncio/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index fffbbaf893..6dd804acd7 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -3531,7 +3531,7 @@ async def test_xreadgroup_with_claim_min_idle_time(self, r): await r.xgroup_create(stream, group, 0) # read all the messages - this will save the msgs in PEL - res = await r.xreadgroup(group, consumer_1, streams={stream: ">"}) + await r.xreadgroup(group, consumer_1, streams={stream: ">"}) # wait for 100ms - so that the messages would have been in the PEL for long enough await asyncio.sleep(0.1) From 55bb2490268980354cdbca4f3289fb29b121b819 Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 10:59:49 +0200 Subject: [PATCH 09/17] Update tests/test_asyncio/test_commands.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_asyncio/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 6dd804acd7..2796c2f642 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -3627,7 +3627,7 @@ async def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots( await r.xgroup_create(stream_2, group, 0) # read all the messages - this will save the msgs in PEL - res = await r.xreadgroup( + await r.xreadgroup( group, consumer_1, streams={stream_1: ">", stream_2: ">"} ) From 4a35465f3c5baa3baccd25bd41834bec5cda7250 Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 11:01:35 +0200 Subject: [PATCH 10/17] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_asyncio/test_commands.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 2796c2f642..4771342bbf 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -3690,9 +3690,6 @@ async def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots( await r.xadd(stream_2, {"key_m6": "val_m6"}) # read all the messages - this will save the msgs in PEL - res = await r.xreadgroup( - group, consumer_1, streams={stream_1: ">", stream_2: ">"} - ) # add 2 more messages m7 = await r.xadd(stream_1, {"key_m7": "val_m7"}) From 35eedb49d971d9b5c6f52a2aacdae3adfc2db7af Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 11:05:39 +0200 Subject: [PATCH 11/17] Update tests/test_asyncio/test_commands.py This assignment to 'res' is unnecessary as it is redefined before this value is used. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_asyncio/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 4771342bbf..40268aefbe 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -3745,7 +3745,7 @@ async def test_xreadgroup_with_claim_min_idle_time_multiple_streams(self, r): await r.xgroup_create(stream_2, group, 0) # read all the messages - this will save the msgs in PEL - res = await r.xreadgroup( + await r.xreadgroup( group, consumer_1, streams={stream_1: ">", stream_2: ">"} ) From 112b99231f1f1287f806d234cc5d282f2bad330f Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 11:06:23 +0200 Subject: [PATCH 12/17] Update tests/test_commands.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index a68cf126b6..6bea2af26d 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -5163,7 +5163,7 @@ def test_xreadgroup_with_claim_min_idle_time(self, r): r.xgroup_create(stream, group, 0) # read all the messages - this will save the msgs in PEL - res = r.xreadgroup(group, consumer_1, streams={stream: ">"}) + r.xreadgroup(group, consumer_1, streams={stream: ">"}) # wait for 100ms - so that the messages would have been in the PEL for long enough time.sleep(0.1) From 0f4c44554285b285a161069d5e0431c11542da85 Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 11:07:23 +0200 Subject: [PATCH 13/17] Update tests/test_commands.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 6bea2af26d..476e184579 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -5356,7 +5356,7 @@ def test_xreadgroup_with_claim_min_idle_time_multiple_streams(self, r): r.xgroup_create(stream_2, group, 0) # read all the messages - this will save the msgs in PEL - res = r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) # wait for 100ms - so that the messages would have been in the PEL for long enough time.sleep(0.1) From 1f9a3c8a3870a9c7f78550950d275bbcb586f7e2 Mon Sep 17 00:00:00 2001 From: petyaslavova Date: Fri, 31 Oct 2025 11:07:50 +0200 Subject: [PATCH 14/17] Update tests/test_commands.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 476e184579..a8e826027c 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -5257,7 +5257,7 @@ def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots(self, r r.xgroup_create(stream_2, group, 0) # read all the messages - this will save the msgs in PEL - res = r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) # wait for 100ms - so that the messages would have been in the PEL for long enough time.sleep(0.1) From 45346917dd7efaace6db64a8016ea738838437ca Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Fri, 31 Oct 2025 12:35:05 +0200 Subject: [PATCH 15/17] Fixing some test errors --- tests/test_asyncio/test_commands.py | 13 ++++--------- tests/test_commands.py | 4 ++-- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 40268aefbe..757dff44cd 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -3627,9 +3627,7 @@ async def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots( await r.xgroup_create(stream_2, group, 0) # read all the messages - this will save the msgs in PEL - await r.xreadgroup( - group, consumer_1, streams={stream_1: ">", stream_2: ">"} - ) + await r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) # wait for 100ms - so that the messages would have been in the PEL for long enough await asyncio.sleep(0.1) @@ -3690,6 +3688,7 @@ async def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots( await r.xadd(stream_2, {"key_m6": "val_m6"}) # read all the messages - this will save the msgs in PEL + r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) # add 2 more messages m7 = await r.xadd(stream_1, {"key_m7": "val_m7"}) @@ -3745,9 +3744,7 @@ async def test_xreadgroup_with_claim_min_idle_time_multiple_streams(self, r): await r.xgroup_create(stream_2, group, 0) # read all the messages - this will save the msgs in PEL - await r.xreadgroup( - group, consumer_1, streams={stream_1: ">", stream_2: ">"} - ) + await r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) # wait for 100ms - so that the messages would have been in the PEL for long enough await asyncio.sleep(0.1) @@ -3808,9 +3805,7 @@ async def test_xreadgroup_with_claim_min_idle_time_multiple_streams(self, r): await r.xadd(stream_2, {"key_m6": "val_m6"}) # read all the messages - this will save the msgs in PEL - res = await r.xreadgroup( - group, consumer_1, streams={stream_1: ">", stream_2: ">"} - ) + await r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) # add 2 more messages m7 = await r.xadd(stream_1, {"key_m7": "val_m7"}) diff --git a/tests/test_commands.py b/tests/test_commands.py index a8e826027c..87f8460a64 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -5300,7 +5300,7 @@ def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots(self, r r.xadd(stream_2, {"key_m6": "val_m6"}) # read all the messages - this will save the msgs in PEL - res = r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) # add 2 more messages m7 = r.xadd(stream_1, {"key_m7": "val_m7"}) @@ -5399,7 +5399,7 @@ def test_xreadgroup_with_claim_min_idle_time_multiple_streams(self, r): r.xadd(stream_2, {"key_m6": "val_m6"}) # read all the messages - this will save the msgs in PEL - res = r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) # add 2 more messages m7 = r.xadd(stream_1, {"key_m7": "val_m7"}) From 4050c91276b3202cd401628820f776975ee91fe5 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Fri, 31 Oct 2025 19:45:48 +0200 Subject: [PATCH 16/17] Fix async tests - await was missing --- tests/test_asyncio/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 757dff44cd..026082b485 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -3688,7 +3688,7 @@ async def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots( await r.xadd(stream_2, {"key_m6": "val_m6"}) # read all the messages - this will save the msgs in PEL - r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + await r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) # add 2 more messages m7 = await r.xadd(stream_1, {"key_m7": "val_m7"}) From 2d23865f854f70a0a65bed38ded0ba83e4e1ce6a Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Mon, 3 Nov 2025 19:08:22 +0200 Subject: [PATCH 17/17] Applying review comments --- redis/_parsers/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/_parsers/helpers.py b/redis/_parsers/helpers.py index df4c200b05..b6c3feb877 100644 --- a/redis/_parsers/helpers.py +++ b/redis/_parsers/helpers.py @@ -275,7 +275,7 @@ def parse_stream_list(response, **options): for r in response: if r is not None: if "claim_min_idle_time" in options: - data.append((r[0], pairs_to_dict(r[1]), r[2], r[3])) + data.append((r[0], pairs_to_dict(r[1]), *r[2:])) else: data.append((r[0], pairs_to_dict(r[1]))) else: