diff --git a/redis/_parsers/helpers.py b/redis/_parsers/helpers.py index bf3b2ee2f2..b6c3feb877 100644 --- a/redis/_parsers/helpers.py +++ b/redis/_parsers/helpers.py @@ -268,13 +268,16 @@ def sort_return_tuples(response, **options): return list(zip(*[response[i::n] for i in range(n)])) -def parse_stream_list(response): +def parse_stream_list(response, **options): if response is None: return None data = [] for r in response: if r is not None: - data.append((r[0], pairs_to_dict(r[1]))) + if "claim_min_idle_time" in options: + data.append((r[0], pairs_to_dict(r[1]), *r[2:])) + else: + data.append((r[0], pairs_to_dict(r[1]))) else: data.append((None, None)) return data @@ -332,16 +335,18 @@ def parse_xinfo_stream(response, **options): return data -def parse_xread(response): +def parse_xread(response, **options): if response is None: return [] - return [[r[0], parse_stream_list(r[1])] for r in response] + return [[r[0], parse_stream_list(r[1], **options)] for r in response] -def parse_xread_resp3(response): +def parse_xread_resp3(response, **options): if response is None: return {} - return {key: [parse_stream_list(value)] for key, value in response.items()} + return { + key: [parse_stream_list(value, **options)] for key, value in response.items() + } def parse_xpending(response, **options): diff --git a/redis/commands/core.py b/redis/commands/core.py index 8f3be68f32..7b9ba7295c 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -4101,6 +4101,7 @@ def xreadgroup( count: Optional[int] = None, block: Optional[int] = None, noack: bool = False, + claim_min_idle_time: Optional[int] = None, ) -> ResponseT: """ Read from a stream via a consumer group. @@ -4118,8 +4119,12 @@ def xreadgroup( block: number of milliseconds to wait, if nothing already present. noack: do not add messages to the PEL + claim_min_idle_time: accepts an integer type and represents a + time interval in milliseconds + For more information, see https://redis.io/commands/xreadgroup """ + options = {} pieces: list[EncodableT] = [b"GROUP", groupname, consumername] if count is not None: if not isinstance(count, int) or count < 1: @@ -4133,12 +4138,20 @@ def xreadgroup( pieces.append(str(block)) if noack: pieces.append(b"NOACK") + if claim_min_idle_time is not None: + if not isinstance(claim_min_idle_time, int) or claim_min_idle_time < 0: + raise DataError( + "XREADGROUP claim_min_idle_time must be a non-negative integer" + ) + pieces.append(b"CLAIM") + pieces.append(claim_min_idle_time) + options["claim_min_idle_time"] = claim_min_idle_time if not isinstance(streams, dict) or len(streams) == 0: raise DataError("XREADGROUP streams must be a non empty dict") pieces.append(b"STREAMS") pieces.extend(streams.keys()) pieces.extend(streams.values()) - return self.execute_command("XREADGROUP", *pieces) + return self.execute_command("XREADGROUP", *pieces, **options) def xrevrange( self, diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index cf25118022..1072779192 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -11,7 +11,7 @@ import pytest import pytest_asyncio -from redis import RedisClusterException +from redis import RedisClusterException, ResponseError import redis from redis import exceptions from redis._parsers.helpers import ( @@ -3821,6 +3821,360 @@ async def test_xreadgroup(self, r: redis.Redis): r, res, [[strem_name, expected_entries]], {strem_name: [expected_entries]} ) + def _validate_xreadgroup_with_claim_min_idle_time_response( + self, r, response, expected_entries + ): + # validate the number of streams + assert len(response) == len(expected_entries) + + expected_streams = expected_entries.keys() + for str_index, expected_stream in enumerate(expected_streams): + expected_entries_per_stream = expected_entries[expected_stream] + + if is_resp2_connection(r): + actual_entries_per_stream = response[str_index][1] + actual_stream = response[str_index][0] + assert actual_stream == expected_stream + else: + actual_entries_per_stream = response[expected_stream][0] + + # validate the number of entries + assert len(actual_entries_per_stream) == len(expected_entries_per_stream) + + for i, entry in enumerate(actual_entries_per_stream): + message = (entry[0], entry[1]) + message_idle = int(entry[2]) + message_delivered_count = int(entry[3]) + + expected_idle = expected_entries_per_stream[i]["min_idle_time"] + + assert message == expected_entries_per_stream[i]["msg"] + if expected_idle == 0: + assert message_idle == 0 + assert message_delivered_count == 0 + else: + assert message_idle >= expected_idle + assert message_delivered_count > 0 + + @skip_if_server_version_lt("8.3.224") + async def test_xreadgroup_with_claim_min_idle_time(self, r): + stream = "stream" + group = "group" + consumer_1 = "c1" + stream_name = stream.encode() + + try: + # before test cleanup + await r.xgroup_destroy(stream, group) + except ResponseError: + pass + + m1 = await r.xadd(stream, {"key_m1": "val_m1"}) + m2 = await r.xadd(stream, {"key_m2": "val_m2"}) + + await r.xgroup_create(stream, group, 0) + + # read all the messages - this will save the msgs in PEL + await r.xreadgroup(group, consumer_1, streams={stream: ">"}) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + await asyncio.sleep(0.1) + + m3 = await r.xadd(stream, {"key_m3": "val_m3"}) + m4 = await r.xadd(stream, {"key_m4": "val_m4"}) + + expected_entries = { + stream_name: [ + {"msg": await get_stream_message(r, stream, m1), "min_idle_time": 100}, + {"msg": await get_stream_message(r, stream, m2), "min_idle_time": 100}, + {"msg": await get_stream_message(r, stream, m3), "min_idle_time": 0}, + {"msg": await get_stream_message(r, stream, m4), "min_idle_time": 0}, + ] + } + + res = await r.xreadgroup( + group, consumer_1, streams={stream: ">"}, claim_min_idle_time=100 + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknowledged + response = await r.xpending_range(stream, group, min="-", max="+", count=5) + assert len(response) == 4 + + # add 2 more messages + m5 = await r.xadd(stream, {"key_m5": "val_m5"}) + m6 = await r.xadd(stream, {"key_m6": "val_m6"}) + + expected_entries = [ + await get_stream_message(r, stream, m5), + await get_stream_message(r, stream, m6), + ] + # read all the messages - this will save the msgs in PEL + res = await r.xreadgroup(group, consumer_1, streams={stream: ">"}) + assert_resp_response( + r, + res, + [[stream_name, expected_entries]], + {stream_name: [expected_entries]}, + ) + + # add 2 more messages + m7 = await r.xadd(stream, {"key_m7": "val_m7"}) + m8 = await r.xadd(stream, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_name: [ + {"msg": await get_stream_message(r, stream, m7), "min_idle_time": 0}, + {"msg": await get_stream_message(r, stream, m8), "min_idle_time": 0}, + ] + } + res = await r.xreadgroup( + group, consumer_1, streams={stream: ">"}, claim_min_idle_time=100 + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + @skip_if_server_version_lt("8.3.224") + async def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots( + self, r + ): + stream_1 = "stream1:{same_slot:42}" + stream_2 = "stream2:{same_slot:42}" + group = "group" + consumer_1 = "c1" + + stream_1_name = stream_1.encode() + stream_2_name = stream_2.encode() + + # before test cleanup + try: + await r.xgroup_destroy(stream_1, group) + except ResponseError: + pass + try: + await r.xgroup_destroy(stream_2, group) + except ResponseError: + pass + + m1_s1 = await r.xadd(stream_1, {"key_m1_s1": "val_m1"}) + m2_s1 = await r.xadd(stream_1, {"key_m2_s1": "val_m2"}) + await r.xgroup_create(stream_1, group, 0) + + m1_s2 = await r.xadd(stream_2, {"key_m1_s2": "val_m1"}) + m2_s2 = await r.xadd(stream_2, {"key_m2_s2": "val_m2"}) + await r.xgroup_create(stream_2, group, 0) + + # read all the messages - this will save the msgs in PEL + await r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + await asyncio.sleep(0.1) + + m3_s1 = await r.xadd(stream_1, {"key_m3_s1": "val_m3"}) + m4_s2 = await r.xadd(stream_2, {"key_m4_s2": "val_m4"}) + + expected_entries = { + stream_1_name: [ + { + "msg": await get_stream_message(r, stream_1, m1_s1), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_1, m2_s1), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_1, m3_s1), + "min_idle_time": 0, + }, + ], + stream_2_name: [ + { + "msg": await get_stream_message(r, stream_2, m1_s2), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_2, m2_s2), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_2, m4_s2), + "min_idle_time": 0, + }, + ], + } + + res = await r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknowledged + response = await r.xpending_range(stream_1, group, min="-", max="+", count=5) + assert len(response) == 3 + response = await r.xpending_range(stream_2, group, min="-", max="+", count=5) + assert len(response) == 3 + + # add 2 more messages + await r.xadd(stream_1, {"key_m5": "val_m5"}) + await r.xadd(stream_2, {"key_m6": "val_m6"}) + + # read all the messages - this will save the msgs in PEL + await r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # add 2 more messages + m7 = await r.xadd(stream_1, {"key_m7": "val_m7"}) + m8 = await r.xadd(stream_2, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_1_name: [ + {"msg": await get_stream_message(r, stream_1, m7), "min_idle_time": 0} + ], + stream_2_name: [ + {"msg": await get_stream_message(r, stream_2, m8), "min_idle_time": 0} + ], + } + res = await r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + @pytest.mark.onlynoncluster + @skip_if_server_version_lt("8.3.224") + async def test_xreadgroup_with_claim_min_idle_time_multiple_streams(self, r): + stream_1 = "stream1" + stream_2 = "stream2" + group = "group" + consumer_1 = "c1" + + stream_1_name = stream_1.encode() + stream_2_name = stream_2.encode() + + # before test cleanup + try: + await r.xgroup_destroy(stream_1, group) + except ResponseError: + pass + try: + await r.xgroup_destroy(stream_2, group) + except ResponseError: + pass + + m1_s1 = await r.xadd(stream_1, {"key_m1_s1": "val_m1"}) + m2_s1 = await r.xadd(stream_1, {"key_m2_s1": "val_m2"}) + await r.xgroup_create(stream_1, group, 0) + + m1_s2 = await r.xadd(stream_2, {"key_m1_s2": "val_m1"}) + m2_s2 = await r.xadd(stream_2, {"key_m2_s2": "val_m2"}) + await r.xgroup_create(stream_2, group, 0) + + # read all the messages - this will save the msgs in PEL + await r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + await asyncio.sleep(0.1) + + m3_s1 = await r.xadd(stream_1, {"key_m3_s1": "val_m3"}) + m4_s2 = await r.xadd(stream_2, {"key_m4_s2": "val_m4"}) + + expected_entries = { + stream_1_name: [ + { + "msg": await get_stream_message(r, stream_1, m1_s1), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_1, m2_s1), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_1, m3_s1), + "min_idle_time": 0, + }, + ], + stream_2_name: [ + { + "msg": await get_stream_message(r, stream_2, m1_s2), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_2, m2_s2), + "min_idle_time": 100, + }, + { + "msg": await get_stream_message(r, stream_2, m4_s2), + "min_idle_time": 0, + }, + ], + } + + res = await r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknowledged + response = await r.xpending_range(stream_1, group, min="-", max="+", count=5) + assert len(response) == 3 + response = await r.xpending_range(stream_2, group, min="-", max="+", count=5) + assert len(response) == 3 + + # add 2 more messages + await r.xadd(stream_1, {"key_m5": "val_m5"}) + await r.xadd(stream_2, {"key_m6": "val_m6"}) + + # read all the messages - this will save the msgs in PEL + await r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # add 2 more messages + m7 = await r.xadd(stream_1, {"key_m7": "val_m7"}) + m8 = await r.xadd(stream_2, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_1_name: [ + {"msg": await get_stream_message(r, stream_1, m7), "min_idle_time": 0} + ], + stream_2_name: [ + {"msg": await get_stream_message(r, stream_2, m8), "min_idle_time": 0} + ], + } + res = await r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + @skip_if_server_version_lt("5.0.0") async def test_xrevrange(self, r: redis.Redis): stream = "stream" diff --git a/tests/test_commands.py b/tests/test_commands.py index 68017236ee..cd427ef62f 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -9,7 +9,7 @@ from unittest.mock import patch import pytest -from redis import RedisClusterException +from redis import RedisClusterException, ResponseError import redis from redis import exceptions from redis._parsers.helpers import ( @@ -5440,6 +5440,322 @@ def test_xreadgroup(self, r): {stream_name: [expected_entries]}, ) + def _validate_xreadgroup_with_claim_min_idle_time_response( + self, r, response, expected_entries + ): + # validate the number of streams + assert len(response) == len(expected_entries) + + expected_streams = expected_entries.keys() + for str_index, expected_stream in enumerate(expected_streams): + expected_entries_per_stream = expected_entries[expected_stream] + + if is_resp2_connection(r): + actual_entries_per_stream = response[str_index][1] + actual_stream = response[str_index][0] + assert actual_stream == expected_stream + else: + actual_entries_per_stream = response[expected_stream][0] + + # validate the number of entries + assert len(actual_entries_per_stream) == len(expected_entries_per_stream) + + for i, entry in enumerate(actual_entries_per_stream): + message = (entry[0], entry[1]) + message_idle = int(entry[2]) + message_delivered_count = int(entry[3]) + + expected_idle = expected_entries_per_stream[i]["min_idle_time"] + + assert message == expected_entries_per_stream[i]["msg"] + if expected_idle == 0: + assert message_idle == 0 + assert message_delivered_count == 0 + else: + assert message_idle >= expected_idle + assert message_delivered_count > 0 + + @skip_if_server_version_lt("8.3.224") + def test_xreadgroup_with_claim_min_idle_time(self, r): + stream = "stream" + group = "group" + consumer_1 = "c1" + stream_name = stream.encode() + + try: + # before test cleanup + r.xgroup_destroy(stream, group) + except ResponseError: + pass + + m1 = r.xadd(stream, {"key_m1": "val_m1"}) + m2 = r.xadd(stream, {"key_m2": "val_m2"}) + + r.xgroup_create(stream, group, 0) + + # read all the messages - this will save the msgs in PEL + r.xreadgroup(group, consumer_1, streams={stream: ">"}) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + time.sleep(0.1) + + m3 = r.xadd(stream, {"key_m3": "val_m3"}) + m4 = r.xadd(stream, {"key_m4": "val_m4"}) + + expected_entries = { + stream_name: [ + {"msg": get_stream_message(r, stream, m1), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream, m2), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream, m3), "min_idle_time": 0}, + {"msg": get_stream_message(r, stream, m4), "min_idle_time": 0}, + ] + } + + res = r.xreadgroup( + group, consumer_1, streams={stream: ">"}, claim_min_idle_time=100 + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknowledged + response = r.xpending_range(stream, group, min="-", max="+", count=5) + assert len(response) == 4 + + # add 2 more messages + m5 = r.xadd(stream, {"key_m5": "val_m5"}) + m6 = r.xadd(stream, {"key_m6": "val_m6"}) + + expected_entries = [ + get_stream_message(r, stream, m5), + get_stream_message(r, stream, m6), + ] + # read all the messages - this will save the msgs in PEL + res = r.xreadgroup(group, consumer_1, streams={stream: ">"}) + assert_resp_response( + r, + res, + [[stream_name, expected_entries]], + {stream_name: [expected_entries]}, + ) + + # add 2 more messages + m7 = r.xadd(stream, {"key_m7": "val_m7"}) + m8 = r.xadd(stream, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_name: [ + {"msg": get_stream_message(r, stream, m7), "min_idle_time": 0}, + {"msg": get_stream_message(r, stream, m8), "min_idle_time": 0}, + ] + } + res = r.xreadgroup( + group, consumer_1, streams={stream: ">"}, claim_min_idle_time=100 + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + @skip_if_server_version_lt("8.3.224") + def test_xreadgroup_with_claim_min_idle_time_multiple_streams_same_slots(self, r): + stream_1 = "stream1:{same_slot:42}" + stream_2 = "stream2:{same_slot:42}" + group = "group" + consumer_1 = "c1" + + stream_1_name = stream_1.encode() + stream_2_name = stream_2.encode() + + # before test cleanup + try: + r.xgroup_destroy(stream_1, group) + except ResponseError: + pass + try: + r.xgroup_destroy(stream_2, group) + except ResponseError: + pass + + m1_s1 = r.xadd(stream_1, {"key_m1_s1": "val_m1"}) + m2_s1 = r.xadd(stream_1, {"key_m2_s1": "val_m2"}) + r.xgroup_create(stream_1, group, 0) + + m1_s2 = r.xadd(stream_2, {"key_m1_s2": "val_m1"}) + m2_s2 = r.xadd(stream_2, {"key_m2_s2": "val_m2"}) + r.xgroup_create(stream_2, group, 0) + + # read all the messages - this will save the msgs in PEL + r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + time.sleep(0.1) + + m3_s1 = r.xadd(stream_1, {"key_m3_s1": "val_m3"}) + m4_s2 = r.xadd(stream_2, {"key_m4_s2": "val_m4"}) + + expected_entries = { + stream_1_name: [ + {"msg": get_stream_message(r, stream_1, m1_s1), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_1, m2_s1), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_1, m3_s1), "min_idle_time": 0}, + ], + stream_2_name: [ + {"msg": get_stream_message(r, stream_2, m1_s2), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_2, m2_s2), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_2, m4_s2), "min_idle_time": 0}, + ], + } + + res = r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknowledged + response = r.xpending_range(stream_1, group, min="-", max="+", count=5) + assert len(response) == 3 + response = r.xpending_range(stream_2, group, min="-", max="+", count=5) + assert len(response) == 3 + + # add 2 more messages + r.xadd(stream_1, {"key_m5": "val_m5"}) + r.xadd(stream_2, {"key_m6": "val_m6"}) + + # read all the messages - this will save the msgs in PEL + r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # add 2 more messages + m7 = r.xadd(stream_1, {"key_m7": "val_m7"}) + m8 = r.xadd(stream_2, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_1_name: [ + {"msg": get_stream_message(r, stream_1, m7), "min_idle_time": 0} + ], + stream_2_name: [ + {"msg": get_stream_message(r, stream_2, m8), "min_idle_time": 0} + ], + } + res = r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + @pytest.mark.onlynoncluster + @skip_if_server_version_lt("8.3.224") + def test_xreadgroup_with_claim_min_idle_time_multiple_streams(self, r): + stream_1 = "stream1" + stream_2 = "stream2" + group = "group" + consumer_1 = "c1" + + stream_1_name = stream_1.encode() + stream_2_name = stream_2.encode() + + # before test cleanup + try: + r.xgroup_destroy(stream_1, group) + except ResponseError: + pass + try: + r.xgroup_destroy(stream_2, group) + except ResponseError: + pass + + m1_s1 = r.xadd(stream_1, {"key_m1_s1": "val_m1"}) + m2_s1 = r.xadd(stream_1, {"key_m2_s1": "val_m2"}) + r.xgroup_create(stream_1, group, 0) + + m1_s2 = r.xadd(stream_2, {"key_m1_s2": "val_m1"}) + m2_s2 = r.xadd(stream_2, {"key_m2_s2": "val_m2"}) + r.xgroup_create(stream_2, group, 0) + + # read all the messages - this will save the msgs in PEL + r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # wait for 100ms - so that the messages would have been in the PEL for long enough + time.sleep(0.1) + + m3_s1 = r.xadd(stream_1, {"key_m3_s1": "val_m3"}) + m4_s2 = r.xadd(stream_2, {"key_m4_s2": "val_m4"}) + + expected_entries = { + stream_1_name: [ + {"msg": get_stream_message(r, stream_1, m1_s1), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_1, m2_s1), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_1, m3_s1), "min_idle_time": 0}, + ], + stream_2_name: [ + {"msg": get_stream_message(r, stream_2, m1_s2), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_2, m2_s2), "min_idle_time": 100}, + {"msg": get_stream_message(r, stream_2, m4_s2), "min_idle_time": 0}, + ], + } + + res = r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + + # validate PEL still holds the messages until they are acknowledged + response = r.xpending_range(stream_1, group, min="-", max="+", count=5) + assert len(response) == 3 + response = r.xpending_range(stream_2, group, min="-", max="+", count=5) + assert len(response) == 3 + + # add 2 more messages + r.xadd(stream_1, {"key_m5": "val_m5"}) + r.xadd(stream_2, {"key_m6": "val_m6"}) + + # read all the messages - this will save the msgs in PEL + r.xreadgroup(group, consumer_1, streams={stream_1: ">", stream_2: ">"}) + + # add 2 more messages + m7 = r.xadd(stream_1, {"key_m7": "val_m7"}) + m8 = r.xadd(stream_2, {"key_m8": "val_m8"}) + # read the messages with claim_min_idle_time=1000 + # only m7 and m8 should be returned + # because the other messages have not been in the PEL for long enough + expected_entries = { + stream_1_name: [ + {"msg": get_stream_message(r, stream_1, m7), "min_idle_time": 0} + ], + stream_2_name: [ + {"msg": get_stream_message(r, stream_2, m8), "min_idle_time": 0} + ], + } + res = r.xreadgroup( + group, + consumer_1, + streams={stream_1: ">", stream_2: ">"}, + claim_min_idle_time=100, + ) + self._validate_xreadgroup_with_claim_min_idle_time_response( + r, res, expected_entries + ) + @skip_if_server_version_lt("5.0.0") def test_xrevrange(self, r): stream = "stream"